3
0
mirror of https://github.com/Qortal/altcoinj.git synced 2025-02-11 17:55:53 +00:00

Don't hold the peergroup lock whilst calculating bloom filters, and run on the executor thread. Move responsibility for deduplication into recalculateFastCatchupAndFilter().

This commit is contained in:
Mike Hearn 2014-11-13 23:07:02 +01:00
parent 27bc229fab
commit 10340b13a6
2 changed files with 76 additions and 83 deletions

View File

@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.net.InetAddresses;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
@ -166,48 +167,12 @@ public class PeerGroup implements TransactionBroadcaster {
private int minBroadcastConnections = 0;
private final AbstractWalletEventListener walletEventListener = new AbstractWalletEventListener() {
// Because calculation of the new filter takes place asynchronously, these flags deduplicate requests.
@GuardedBy("this") private boolean sendIfChangedQueued, dontSendQueued;
private Runnable bloomSendIfChanged = new Runnable() {
@Override public void run() {
recalculateFastCatchupAndFilter(FilterRecalculateMode.SEND_IF_CHANGED);
synchronized (walletEventListener) {
sendIfChangedQueued = false;
}
}
};
private Runnable bloomDontSend = new Runnable() {
@Override public void run() {
recalculateFastCatchupAndFilter(FilterRecalculateMode.DONT_SEND);
synchronized (walletEventListener) {
dontSendQueued = false;
}
}
};
private synchronized void queueRecalc(boolean andTransmit) {
if (andTransmit) {
if (!sendIfChangedQueued) {
log.info("Queuing recalc of the Bloom filter due to new keys or scripts becoming available");
sendIfChangedQueued = true;
executor.execute(bloomSendIfChanged);
}
} else {
if (!dontSendQueued) {
log.info("Queuing recalc of the Bloom filter due to observing a pay to pubkey output on a relevant tx");
dontSendQueued = true;
executor.execute(bloomDontSend);
}
}
}
@Override public void onScriptsChanged(Wallet wallet, List<Script> scripts, boolean isAddingScripts) {
queueRecalc(true);
recalculateFastCatchupAndFilter(FilterRecalculateMode.SEND_IF_CHANGED);
}
@Override public void onKeysAdded(List<ECKey> keys) {
queueRecalc(true);
recalculateFastCatchupAndFilter(FilterRecalculateMode.SEND_IF_CHANGED);
}
@Override
@ -237,9 +202,9 @@ public class PeerGroup implements TransactionBroadcaster {
for (TransactionOutput output : tx.getOutputs()) {
if (output.getScriptPubKey().isSentToRawPubKey() && output.isMine(wallet)) {
if (tx.getConfidence().getConfidenceType() == TransactionConfidence.ConfidenceType.BUILDING)
queueRecalc(true);
recalculateFastCatchupAndFilter(FilterRecalculateMode.SEND_IF_CHANGED);
else
queueRecalc(false);
recalculateFastCatchupAndFilter(FilterRecalculateMode.DONT_SEND);
return;
}
}
@ -284,7 +249,7 @@ public class PeerGroup implements TransactionBroadcaster {
public static final double MAX_FP_RATE_INCREASE = 2.0f;
// An object that calculates bloom filters given a list of filter providers, whilst tracking some state useful
// for privacy purposes.
private FilterMerger bloomFilterMerger;
private final FilterMerger bloomFilterMerger;
/** The default timeout between when a connection attempt begins and version message exchange completes */
public static final int DEFAULT_CONNECT_TIMEOUT_MILLIS = 5000;
@ -373,6 +338,7 @@ public class PeerGroup implements TransactionBroadcaster {
memoryPool = new MemoryPool();
inactives = new PriorityQueue<PeerAddress>(1, new Comparator<PeerAddress>() {
@SuppressWarnings("FieldAccessNotGuarded") // only called when inactives is accessed, and lock is held then.
@Override
public int compare(PeerAddress a, PeerAddress b) {
int result = backoffMap.get(a).compareTo(backoffMap.get(b));
@ -1030,43 +996,67 @@ public class PeerGroup implements TransactionBroadcaster {
DONT_SEND,
}
private final Map<FilterRecalculateMode, SettableFuture<BloomFilter>> inFlightRecalculations = Maps.newHashMap();
/**
* Recalculates the bloom filter given to peers as well as the timestamp after which full blocks are downloaded
* (instead of only headers).
* (instead of only headers). Note that calls made one after another may return the same future, if the request
* wasn't processed yet (i.e. calls are deduplicated).
*
* @param mode In what situations to send the filter to connected peers.
* @return a future that completes once the filter has been calculated (note: this does not mean acknowledged by remote peers).
*/
public void recalculateFastCatchupAndFilter(FilterRecalculateMode mode) {
lock.lock();
try {
// Fully verifying mode doesn't use this optimization (it can't as it needs to see all transactions).
if (chain != null && chain.shouldVerifyTransactions())
return;
FilterMerger.Result result = bloomFilterMerger.calculate(ImmutableList.copyOf(peerFilterProviders));
boolean send;
switch (mode) {
case SEND_IF_CHANGED: send = result.changed; break;
case DONT_SEND: send = false; break;
case FORCE_SEND_FOR_REFRESH: send = true; break;
default: throw new UnsupportedOperationException();
}
if (send) {
for (Peer peer : peers) {
// Only query the mempool if this recalculation request is not in order to lower the observed FP
// rate. There's no point querying the mempool when doing this because the FP rate can only go
// down, and we will have seen all the relevant txns before: it's pointless to ask for them again.
peer.setBloomFilter(result.filter, mode != FilterRecalculateMode.FORCE_SEND_FOR_REFRESH);
}
// Reset the false positive estimate so that we don't send a flood of filter updates
// if the estimate temporarily overshoots our threshold.
if (chain != null)
chain.resetFalsePositiveEstimate();
}
// Do this last so that bloomFilter is already set when it gets called.
setFastCatchupTimeSecs(result.earliestKeyTimeSecs);
} finally {
lock.unlock();
public ListenableFuture<BloomFilter> recalculateFastCatchupAndFilter(final FilterRecalculateMode mode) {
final SettableFuture<BloomFilter> future = SettableFuture.create();
synchronized (inFlightRecalculations) {
if (inFlightRecalculations.get(mode) != null)
return inFlightRecalculations.get(mode);
inFlightRecalculations.put(mode, future);
}
executor.execute(new Runnable() {
@Override
public void run() {
checkState(!lock.isHeldByCurrentThread());
// Fully verifying mode doesn't use this optimization (it can't as it needs to see all transactions).
if (chain != null && chain.shouldVerifyTransactions())
return;
// We only ever call bloomFilterMerger.calculate on jobQueue, so we cannot be calculating two filters at once.
FilterMerger.Result result = bloomFilterMerger.calculate(ImmutableList.copyOf(peerFilterProviders /* COW */));
boolean send;
switch (mode) {
case SEND_IF_CHANGED:
send = result.changed;
break;
case DONT_SEND:
send = false;
break;
case FORCE_SEND_FOR_REFRESH:
send = true;
break;
default:
throw new UnsupportedOperationException();
}
if (send) {
for (Peer peer : peers /* COW */) {
// Only query the mempool if this recalculation request is not in order to lower the observed FP
// rate. There's no point querying the mempool when doing this because the FP rate can only go
// down, and we will have seen all the relevant txns before: it's pointless to ask for them again.
peer.setBloomFilter(result.filter, mode != FilterRecalculateMode.FORCE_SEND_FOR_REFRESH);
}
// Reset the false positive estimate so that we don't send a flood of filter updates
// if the estimate temporarily overshoots our threshold.
if (chain != null)
chain.resetFalsePositiveEstimate();
}
// Do this last so that bloomFilter is already set when it gets called.
setFastCatchupTimeSecs(result.earliestKeyTimeSecs);
synchronized (inFlightRecalculations) {
inFlightRecalculations.put(mode, null);
}
future.set(result.filter);
}
});
return future;
}
/**

View File

@ -6,7 +6,6 @@ import org.bitcoinj.core.PeerFilterProvider;
import com.google.common.collect.ImmutableList;
import java.util.LinkedList;
import java.util.concurrent.locks.Lock;
// This code is unit tested by the PeerGroup tests.
@ -15,20 +14,23 @@ import java.util.concurrent.locks.Lock;
* {@link org.bitcoinj.core.BloomFilter} and earliest key time for all of them.
* Used by the {@link org.bitcoinj.core.PeerGroup} class internally.</p>
*
* <p>Thread safety: this class tracks the element count of the last filter it calculated and so must be synchronised
* externally or used from only one thread. It will acquire a lock on each filter in turn before performing the
* calculation because the providers may be mutated in other threads in parallel, but global consistency is required
* to produce a merged filter.</p>
* <p>Thread safety: threading here can be complicated. Each filter provider is given a begin event, which may acquire
* a lock (and is guaranteed to receive an end event). This class is mostly thread unsafe and is meant to be used from a
* single thread only, PeerGroup ensures this by only accessing it from the dedicated PeerGroup thread. PeerGroup does
* not hold any locks whilst this object is used, relying on the single thread to prevent multiple filters being
* calculated in parallel, thus a filter provider can do things like make blocking calls into PeerGroup from a separate
* thread. However the bloomFilterFPRate property IS thread safe, for convenience.</p>
*/
public class FilterMerger {
// We use a constant tweak to avoid giving up privacy when we regenerate our filter with new keys
private final long bloomFilterTweak = (long) (Math.random() * Long.MAX_VALUE);
private double bloomFilterFPRate;
private volatile double vBloomFilterFPRate;
private int lastBloomFilterElementCount;
private BloomFilter lastFilter;
public FilterMerger(double bloomFilterFPRate) {
this.bloomFilterFPRate = bloomFilterFPRate;
this.vBloomFilterFPRate = bloomFilterFPRate;
}
public static class Result {
@ -66,9 +68,10 @@ public class FilterMerger {
lastBloomFilterElementCount = elements > lastBloomFilterElementCount ? elements + 100 : lastBloomFilterElementCount;
BloomFilter.BloomUpdate bloomFlags =
requiresUpdateAll ? BloomFilter.BloomUpdate.UPDATE_ALL : BloomFilter.BloomUpdate.UPDATE_P2PUBKEY_ONLY;
BloomFilter filter = new BloomFilter(lastBloomFilterElementCount, bloomFilterFPRate, bloomFilterTweak, bloomFlags);
double fpRate = vBloomFilterFPRate;
BloomFilter filter = new BloomFilter(lastBloomFilterElementCount, fpRate, bloomFilterTweak, bloomFlags);
for (PeerFilterProvider p : providers)
filter.merge(p.getBloomFilter(lastBloomFilterElementCount, bloomFilterFPRate, bloomFilterTweak));
filter.merge(p.getBloomFilter(lastBloomFilterElementCount, fpRate, bloomFilterTweak));
result.changed = !filter.equals(lastFilter);
result.filter = lastFilter = filter;
@ -86,11 +89,11 @@ public class FilterMerger {
}
public void setBloomFilterFPRate(double bloomFilterFPRate) {
this.bloomFilterFPRate = bloomFilterFPRate;
this.vBloomFilterFPRate = bloomFilterFPRate;
}
public double getBloomFilterFPRate() {
return bloomFilterFPRate;
return vBloomFilterFPRate;
}
public BloomFilter getLastFilter() {