diff --git a/core/src/main/java/com/google/bitcoin/core/MemoryPool.java b/core/src/main/java/com/google/bitcoin/core/MemoryPool.java index 6767e297..b17bf5b0 100644 --- a/core/src/main/java/com/google/bitcoin/core/MemoryPool.java +++ b/core/src/main/java/com/google/bitcoin/core/MemoryPool.java @@ -16,7 +16,7 @@ package com.google.bitcoin.core; -import com.google.common.base.Preconditions; +import com.google.bitcoin.utils.Locks; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,6 +27,10 @@ import java.util.HashSet; import java.util.LinkedHashMap; import java.util.Map; import java.util.Set; +import java.util.concurrent.locks.ReentrantLock; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; /** *

Tracks transactions that are being announced across the network. Typically one is created for you by a @@ -41,6 +45,7 @@ import java.util.Set; */ public class MemoryPool { private static final Logger log = LoggerFactory.getLogger(MemoryPool.class); + protected ReentrantLock lock = Locks.lock("mempool"); // For each transaction we may have seen: // - only its hash in an inv packet @@ -108,41 +113,51 @@ public class MemoryPool { * which bothered to keep a reference. Typically, this is because the transaction does not involve any keys that * are relevant to any of our wallets. */ - private synchronized void cleanPool() { - Reference ref; - while ((ref = referenceQueue.poll()) != null) { - // Find which transaction got deleted by the GC. - WeakTransactionReference txRef = (WeakTransactionReference) ref; - // And remove the associated map entry so the other bits of memory can also be reclaimed. - memoryPool.remove(txRef.hash); + private void cleanPool() { + lock.lock(); + try { + Reference ref; + while ((ref = referenceQueue.poll()) != null) { + // Find which transaction got deleted by the GC. + WeakTransactionReference txRef = (WeakTransactionReference) ref; + // And remove the associated map entry so the other bits of memory can also be reclaimed. + memoryPool.remove(txRef.hash); + } + } finally { + lock.unlock(); } } /** * Returns the number of peers that have seen the given hash recently. */ - public synchronized int numBroadcastPeers(Sha256Hash txHash) { - cleanPool(); - Entry entry = memoryPool.get(txHash); - if (entry == null) { - // No such TX known. - return 0; - } else if (entry.tx == null) { - // We've seen at least one peer announce with an inv. - Preconditions.checkNotNull(entry.addresses); - return entry.addresses.size(); - } else { - final Transaction tx = entry.tx.get(); - if (tx == null) { - // We previously downloaded this transaction, but nothing cared about it so the garbage collector threw - // it away. We also deleted the set that tracked which peers had seen it. Treat this case as a zero and - // just delete it from the map. - memoryPool.remove(txHash); + public int numBroadcastPeers(Sha256Hash txHash) { + lock.lock(); + try { + cleanPool(); + Entry entry = memoryPool.get(txHash); + if (entry == null) { + // No such TX known. return 0; + } else if (entry.tx == null) { + // We've seen at least one peer announce with an inv. + checkNotNull(entry.addresses); + return entry.addresses.size(); } else { - Preconditions.checkState(entry.addresses == null); - return tx.getConfidence().numBroadcastPeers(); + final Transaction tx = entry.tx.get(); + if (tx == null) { + // We previously downloaded this transaction, but nothing cared about it so the garbage collector threw + // it away. We also deleted the set that tracked which peers had seen it. Treat this case as a zero and + // just delete it from the map. + memoryPool.remove(txHash); + return 0; + } else { + checkState(entry.addresses == null); + return tx.getConfidence().numBroadcastPeers(); + } } + } finally { + lock.unlock(); } } @@ -152,54 +167,65 @@ public class MemoryPool { * @param byPeer The Peer that received it. * @return An object that is semantically the same TX but may be a different object instance. */ - public synchronized Transaction seen(Transaction tx, PeerAddress byPeer) { - cleanPool(); - Entry entry = memoryPool.get(tx.getHash()); - if (entry != null) { - // This TX or its hash have been previously announced. - if (entry.tx != null) { - // We already downloaded it. - Preconditions.checkState(entry.addresses == null); - // We only want one canonical object instance for a transaction no matter how many times it is - // deserialized. - Transaction transaction = entry.tx.get(); - if (transaction == null) { - // We previously downloaded this transaction, but the garbage collector threw it away because - // no other part of the system cared enough to keep it around (it's not relevant to us). - // Given the lack of interest last time we probably don't need to track it this time either. - log.info("{}: Provided with a transaction that we previously threw away: {}", byPeer, tx.getHash()); + public Transaction seen(Transaction tx, PeerAddress byPeer) { + lock.lock(); + try { + cleanPool(); + Entry entry = memoryPool.get(tx.getHash()); + if (entry != null) { + // This TX or its hash have been previously announced. + if (entry.tx != null) { + // We already downloaded it. + checkState(entry.addresses == null); + // We only want one canonical object instance for a transaction no matter how many times it is + // deserialized. + Transaction transaction = entry.tx.get(); + if (transaction == null) { + // We previously downloaded this transaction, but the garbage collector threw it away because + // no other part of the system cared enough to keep it around (it's not relevant to us). + // Given the lack of interest last time we probably don't need to track it this time either. + log.info("{}: Provided with a transaction that we previously threw away: {}", byPeer, tx.getHash()); + } else { + // We saw it before and kept it around. Hand back the canonical copy. + tx = transaction; + log.info("{}: Provided with a transaction downloaded before: [{}] {}", + new Object[]{byPeer, tx.getConfidence().numBroadcastPeers(), tx.getHash()}); + } + markBroadcast(byPeer, tx); + return tx; } else { - // We saw it before and kept it around. Hand back the canonical copy. - tx = transaction; - log.info("{}: Provided with a transaction downloaded before: [{}] {}", - new Object[] { byPeer, tx.getConfidence().numBroadcastPeers(), tx.getHash() }); + // We received a transaction that we have previously seen announced but not downloaded until now. + checkNotNull(entry.addresses); + entry.tx = new WeakTransactionReference(tx, referenceQueue); + // Copy the previously announced peers into the confidence and then clear it out. Unlock here + // because markBroadcastBy can trigger event listeners and thus inversions. + lock.unlock(); + try { + TransactionConfidence confidence = tx.getConfidence(); + for (PeerAddress a : entry.addresses) { + confidence.markBroadcastBy(a); + } + entry.addresses = null; + log.debug("{}: Adding tx [{}] {} to the memory pool", + new Object[]{byPeer, confidence.numBroadcastPeers(), tx.getHashAsString()}); + } finally { + lock.lock(); + } + return tx; } - tx.getConfidence().markBroadcastBy(byPeer); - return tx; } else { - // We received a transaction that we have previously seen announced but not downloaded until now. - Preconditions.checkNotNull(entry.addresses); + // This often happens when we are downloading a Bloom filtered chain, or recursively downloading + // dependencies of a relevant transaction (see Peer.downloadDependencies). + log.debug("{}: Provided with a downloaded transaction we didn't see announced yet: {}", + byPeer, tx.getHashAsString()); + entry = new Entry(); entry.tx = new WeakTransactionReference(tx, referenceQueue); - // Copy the previously announced peers into the confidence and then clear it out. - TransactionConfidence confidence = tx.getConfidence(); - for (PeerAddress a : entry.addresses) { - confidence.markBroadcastBy(a); - } - entry.addresses = null; - log.debug("{}: Adding tx [{}] {} to the memory pool", - new Object[] { byPeer, confidence.numBroadcastPeers(), tx.getHashAsString() }); + memoryPool.put(tx.getHash(), entry); + markBroadcast(byPeer, tx); return tx; } - } else { - // This often happens when we are downloading a Bloom filtered chain, or recursively downloading - // dependencies of a relevant transaction (see Peer.downloadDependencies). - log.debug("{}: Provided with a downloaded transaction we didn't see announced yet: {}", - byPeer, tx.getHashAsString()); - entry = new Entry(); - entry.tx = new WeakTransactionReference(tx, referenceQueue); - memoryPool.put(tx.getHash(), entry); - tx.getConfidence().markBroadcastBy(byPeer); - return tx; + } finally { + lock.unlock(); } } @@ -207,36 +233,53 @@ public class MemoryPool { * Called by peers when they see a transaction advertised in an "inv" message. It either will increase the * confidence of the pre-existing transaction or will just keep a record of the address for future usage. */ - public synchronized void seen(Sha256Hash hash, PeerAddress byPeer) { - cleanPool(); - Entry entry = memoryPool.get(hash); - if (entry != null) { - // This TX or its hash have been previously announced. - if (entry.tx != null) { - Preconditions.checkState(entry.addresses == null); - Transaction tx = entry.tx.get(); - if (tx != null) { - tx.getConfidence().markBroadcastBy(byPeer); - log.debug("{}: Announced transaction we have seen before [{}] {}", - new Object[] { byPeer, tx.getConfidence().numBroadcastPeers(), tx.getHashAsString() }); + public void seen(Sha256Hash hash, PeerAddress byPeer) { + lock.lock(); + try { + cleanPool(); + Entry entry = memoryPool.get(hash); + if (entry != null) { + // This TX or its hash have been previously announced. + if (entry.tx != null) { + checkState(entry.addresses == null); + Transaction tx = entry.tx.get(); + if (tx != null) { + markBroadcast(byPeer, tx); + log.debug("{}: Announced transaction we have seen before [{}] {}", + new Object[]{byPeer, tx.getConfidence().numBroadcastPeers(), tx.getHashAsString()}); + } else { + // The inv is telling us about a transaction that we previously downloaded, and threw away because + // nothing found it interesting enough to keep around. So do nothing. + } } else { - // The inv is telling us about a transaction that we previously downloaded, and threw away because - // nothing found it interesting enough to keep around. So do nothing. + checkNotNull(entry.addresses); + entry.addresses.add(byPeer); + log.debug("{}: Announced transaction we have seen announced before [{}] {}", + new Object[]{byPeer, entry.addresses.size(), hash}); } } else { - Preconditions.checkNotNull(entry.addresses); + // This TX has never been seen before. + entry = new Entry(); + // TODO: Using hashsets here is inefficient compared to just having an array. + entry.addresses = new HashSet(); entry.addresses.add(byPeer); - log.debug("{}: Announced transaction we have seen announced before [{}] {}", - new Object[] { byPeer, entry.addresses.size(), hash }); + memoryPool.put(hash, entry); + log.info("{}: Announced new transaction [1] {}", byPeer, hash); } - } else { - // This TX has never been seen before. - entry = new Entry(); - // TODO: Using hashsets here is inefficient compared to just having an array. - entry.addresses = new HashSet(); - entry.addresses.add(byPeer); - memoryPool.put(hash, entry); - log.info("{}: Announced new transaction [1] {}", byPeer, hash); + } finally { + lock.unlock(); + } + } + + private void markBroadcast(PeerAddress byPeer, Transaction tx) { + // Marking a TX as broadcast by a peer can run event listeners that might call back into Peer or PeerGroup. + // Thus we unlock ourselves here to avoid potential inversions. + checkState(lock.isLocked()); + lock.unlock(); + try { + tx.getConfidence().markBroadcastBy(byPeer); + } finally { + lock.lock(); } } @@ -245,14 +288,19 @@ public class MemoryPool { * we only saw advertisements for it yet or it has been downloaded but garbage collected due to nowhere else * holding a reference to it. */ - public synchronized Transaction get(Sha256Hash hash) { - Entry entry = memoryPool.get(hash); - if (entry == null) return null; // Unknown. - if (entry.tx == null) return null; // Seen but only in advertisements. - if (entry.tx.get() == null) return null; // Was downloaded but garbage collected. - Transaction tx = entry.tx.get(); - Preconditions.checkNotNull(tx); - return tx; + public Transaction get(Sha256Hash hash) { + lock.lock(); + try { + Entry entry = memoryPool.get(hash); + if (entry == null) return null; // Unknown. + if (entry.tx == null) return null; // Seen but only in advertisements. + if (entry.tx.get() == null) return null; // Was downloaded but garbage collected. + Transaction tx = entry.tx.get(); + checkNotNull(tx); + return tx; + } finally { + lock.unlock(); + } } /** @@ -260,8 +308,13 @@ public class MemoryPool { * was broadcast, downloaded and nothing kept a reference to it will eventually be cleared out by the garbage * collector and wasSeen() will return false - it does not keep a permanent record of every hash ever broadcast. */ - public synchronized boolean maybeWasSeen(Sha256Hash hash) { - Entry entry = memoryPool.get(hash); - return entry != null; + public boolean maybeWasSeen(Sha256Hash hash) { + lock.lock(); + try { + Entry entry = memoryPool.get(hash); + return entry != null; + } finally { + lock.unlock(); + } } } diff --git a/core/src/main/java/com/google/bitcoin/core/TransactionConfidence.java b/core/src/main/java/com/google/bitcoin/core/TransactionConfidence.java index fee9427b..4b533c0a 100644 --- a/core/src/main/java/com/google/bitcoin/core/TransactionConfidence.java +++ b/core/src/main/java/com/google/bitcoin/core/TransactionConfidence.java @@ -65,7 +65,7 @@ public class TransactionConfidence implements Serializable { /** The Transaction that this confidence object is associated with. */ private Transaction transaction; // Lazily created listeners array. - private transient ArrayList listeners; + private transient CopyOnWriteArrayList listeners; // The depth of the transaction on the best chain in blocks. An unconfirmed block has depth 0. private int depth; @@ -152,6 +152,7 @@ public class TransactionConfidence implements Serializable { public TransactionConfidence(Transaction tx) { // Assume a default number of peers for our set. broadcastBy = new CopyOnWriteArrayList(); + listeners = new CopyOnWriteArrayList(); transaction = tx; } @@ -178,18 +179,13 @@ public class TransactionConfidence implements Serializable { * {@link BlockChainListener}, attach it to a {@link BlockChain} and then use the getters on the * confidence object to determine the new depth.

*/ - public synchronized void addEventListener(Listener listener) { + public void addEventListener(Listener listener) { Preconditions.checkNotNull(listener); - if (listeners == null) - listeners = new ArrayList(2); - // Dedupe registrations. This makes the wallet code simpler. - if (!listeners.contains(listener)) - listeners.add(listener); + listeners.addIfAbsent(listener); } - public synchronized void removeEventListener(Listener listener) { + public void removeEventListener(Listener listener) { Preconditions.checkNotNull(listener); - Preconditions.checkNotNull(listeners); listeners.remove(listener); } @@ -225,11 +221,13 @@ public class TransactionConfidence implements Serializable { * Called by other objects in the system, like a {@link Wallet}, when new information about the confidence of a * transaction becomes available. */ - public synchronized void setConfidenceType(ConfidenceType confidenceType) { + public void setConfidenceType(ConfidenceType confidenceType) { // Don't inform the event listeners if the confidence didn't really change. - if (confidenceType == this.confidenceType) - return; - this.confidenceType = confidenceType; + synchronized (this) { + if (confidenceType == this.confidenceType) + return; + this.confidenceType = confidenceType; + } runListeners(); } @@ -261,7 +259,7 @@ public class TransactionConfidence implements Serializable { } /** - * Returns a synchronized set of {@link PeerAddress}es that announced the transaction. + * Returns a snapshot of {@link PeerAddress}es that announced the transaction. */ public ListIterator getBroadcastBy() { return broadcastBy.listIterator(); @@ -310,12 +308,17 @@ public class TransactionConfidence implements Serializable { * Updates the internal counter that tracks how deeply buried the block is. * Work is the value of block.getWork(). */ - public synchronized void notifyWorkDone(Block block) throws VerificationException { - if (getConfidenceType() == ConfidenceType.BUILDING) { - this.depth++; - this.workDone = this.workDone.add(block.getWork()); - runListeners(); + public void notifyWorkDone(Block block) throws VerificationException { + boolean notify = false; + synchronized (this) { + if (getConfidenceType() == ConfidenceType.BUILDING) { + this.depth++; + this.workDone = this.workDone.add(block.getWork()); + notify = true; + } } + if (notify) + runListeners(); } /** @@ -405,12 +408,8 @@ public class TransactionConfidence implements Serializable { } private void runListeners() { - EventListenerInvoker.invoke(listeners, new EventListenerInvoker() { - @Override - public void invoke(Listener listener) { - listener.onConfidenceChanged(transaction); - } - }); + for (Listener listener : listeners) + listener.onConfidenceChanged(transaction); } /** diff --git a/examples/src/main/java/com/google/bitcoin/examples/PingService.java b/examples/src/main/java/com/google/bitcoin/examples/PingService.java index 0e9dff82..88cb5994 100644 --- a/examples/src/main/java/com/google/bitcoin/examples/PingService.java +++ b/examples/src/main/java/com/google/bitcoin/examples/PingService.java @@ -120,6 +120,7 @@ public class PingService { ": " + tx); tx.getConfidence().addEventListener(new TransactionConfidence.Listener() { public void onConfidenceChanged(Transaction tx2) { + // Must be thread safe. if (tx2.getConfidence().getConfidenceType() == TransactionConfidence.ConfidenceType.BUILDING) { // Coins were confirmed (appeared in a block). tx2.getConfidence().removeEventListener(this);