diff --git a/core/src/main/java/com/google/bitcoin/core/PeerGroup.java b/core/src/main/java/com/google/bitcoin/core/PeerGroup.java index b7916c86..ecf4973a 100644 --- a/core/src/main/java/com/google/bitcoin/core/PeerGroup.java +++ b/core/src/main/java/com/google/bitcoin/core/PeerGroup.java @@ -1124,7 +1124,7 @@ public class PeerGroup extends AbstractIdleService { public void run() { // We now have enough connected peers to send the transaction. // This can be called immediately if we already have enough. Otherwise it'll be called from a peer - // thread. TODO: Fix the race that exists here. + // thread. // Pick a peer to be the lucky recipient of our tx. final Peer somePeer = peers.get(0); @@ -1135,47 +1135,47 @@ public class PeerGroup extends AbstractIdleService { // Only bother with this if we might actually hear back: if (minConnections > 1) tx.getConfidence().addEventListener(new TransactionConfidence.Listener() { public void onConfidenceChanged(Transaction tx) { - // The number of peers that announced this tx has gone up. This will run in a peer thread. - final int numSeenPeers = tx.getConfidence().numBroadcastPeers(); - boolean done = false; - log.info("broadcastTransaction: TX {} seen by {} peers", pinnedTx.getHashAsString(), - numSeenPeers); - lock.lock(); - try { - if (numSeenPeers >= minConnections) { - // We've seen the min required number of peers announce the transaction. Note that we - // can't wait for the current number of connected peers right now because we could have - // added more peers after the broadcast took place, which means they won't have seen - // the transaction. In future when peers sync up their memory pools after they connect - // we could come back and change this. + // The number of peers that announced this tx has gone up. + // Thread safe - this can run in parallel. + final TransactionConfidence conf = tx.getConfidence(); + int numSeenPeers = conf.numBroadcastPeers(); + boolean mined = conf.getConfidenceType() != TransactionConfidence.ConfidenceType.NOT_SEEN_IN_CHAIN; + log.info("broadcastTransaction: TX {} seen by {} peers{}", + new Object[]{pinnedTx.getHashAsString(), numSeenPeers, mined ? " and mined" : ""}); + if (!(numSeenPeers >= minConnections || mined)) + return; + // We've seen the min required number of peers announce the transaction, or it was included + // in a block. Normally we'd expect to see it fully propagate before it gets mined, but + // it can be that a block is solved very soon after broadcast, and it's also possible that + // due to version skew and changes in the relay rules our transaction is not going to + // fully propagate yet can get mined anyway. + // + // Note that we can't wait for the current number of connected peers right now because we + // could have added more peers after the broadcast took place, which means they won't + // have seen the transaction. In future when peers sync up their memory pools after they + // connect we could come back and change this. + // + // OK, now tell the wallet about the transaction. If the wallet created the transaction then + // it already knows and will ignore this. If it's a transaction we received from + // somebody else via a side channel and are now broadcasting, this will put it into the + // wallet now we know it's valid. + for (Wallet wallet : wallets) { + try { + // Assumption here is there are no dependencies of the created transaction. // - // Now tell the wallet about the transaction. If the wallet created the transaction then - // it already knows and will ignore this. If it's a transaction we received from - // somebody else via a side channel and are now broadcasting, this will put it into the - // wallet now we know it's valid. - for (Wallet wallet : wallets) { - try { - if (wallet.isPendingTransactionRelevant(pinnedTx)) { - // Assumption here is there are no dependencies of the created transaction. - wallet.receivePending(pinnedTx, null); - } - } catch (Throwable t) { - future.setException(t); - return; - } - } - done = true; + // We may end up with two threads trying to do this in parallel - the wallet will + // ignore whichever one loses the race. + wallet.receivePending(pinnedTx, null); + } catch (Throwable t) { + future.setException(t); // RE-ENTRANCY POINT + return; } - } finally { - lock.unlock(); - } - if (done) { - // We're done! Run this outside of the peer group lock as setting the future may immediately - // invoke any listeners associated with it and it's simpler if the PeerGroup isn't locked. - log.info("broadcastTransaction: {} complete", pinnedTx.getHashAsString()); - tx.getConfidence().removeEventListener(this); - future.set(pinnedTx); } + // We're done! It's important that the PeerGroup lock is not held (by this thread) at this + // point to avoid triggering inversions when the Future completes. + log.info("broadcastTransaction: {} complete", pinnedTx.getHashAsString()); + tx.getConfidence().removeEventListener(this); + future.set(pinnedTx); // RE-ENTRANCY POINT } }); @@ -1187,6 +1187,8 @@ public class PeerGroup extends AbstractIdleService { // transaction or not. However, we are not a fully validating node and this is advertised in // our version message, as SPV nodes cannot relay it doesn't give away any additional information // to skip the inv here - we wouldn't send invs anyway. + // + // TODO: The peer we picked might be dead by now. If we can't write the message, pick again and retry. ChannelFuture sendComplete = somePeer.sendMessage(pinnedTx); // If we've been limited to talk to only one peer, we can't wait to hear back because the // remote peer won't tell us about transactions we just announced to it for obvious reasons. @@ -1195,22 +1197,14 @@ public class PeerGroup extends AbstractIdleService { if (minConnections == 1) { sendComplete.addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture _) throws Exception { - lock.lock(); - try { - for (Wallet wallet : wallets) { - try { - if (wallet.isPendingTransactionRelevant(pinnedTx)) { - // Assumption here is there are no dependencies of the created - // transaction. - wallet.receivePending(pinnedTx, null); - } - } catch (Throwable t) { - future.setException(t); - return; - } + for (Wallet wallet : wallets) { + try { + // Assumption here is there are no dependencies of the created transaction. + wallet.receivePending(pinnedTx, null); + } catch (Throwable t) { + future.setException(t); + return; } - } finally { - lock.unlock(); } future.set(pinnedTx); } diff --git a/core/src/main/java/com/google/bitcoin/core/Wallet.java b/core/src/main/java/com/google/bitcoin/core/Wallet.java index 9f3aefa0..66642a2c 100644 --- a/core/src/main/java/com/google/bitcoin/core/Wallet.java +++ b/core/src/main/java/com/google/bitcoin/core/Wallet.java @@ -885,6 +885,10 @@ public class Wallet implements Serializable, BlockChainListener { // Do a brief risk analysis of the transaction and its dependencies to check for any possible attacks. lock.lock(); try { + // Repeat the check of relevancy here, even though the caller may have already done so - this is to avoid + // race conditions where receivePending may be being called in parallel. + if (!isPendingTransactionRelevant(tx)) + return; AnalysisResult analysis = analyzeTransactionAndDependencies(tx, dependencies); if (analysis.timeLocked != null && !doesAcceptTimeLockedTransactions()) { log.warn("Transaction {}, dependency of {} has a time lock value of {}", new Object[]{