diff --git a/core/src/main/java/com/google/bitcoin/core/Peer.java b/core/src/main/java/com/google/bitcoin/core/Peer.java index 3e8018ee..28ab4d1d 100644 --- a/core/src/main/java/com/google/bitcoin/core/Peer.java +++ b/core/src/main/java/com/google/bitcoin/core/Peer.java @@ -19,6 +19,7 @@ package com.google.bitcoin.core; import com.google.bitcoin.store.BlockStore; import com.google.bitcoin.store.BlockStoreException; import com.google.bitcoin.utils.EventListenerInvoker; +import com.google.bitcoin.utils.Locks; import com.google.common.base.Objects; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -38,6 +39,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; +import static com.google.common.base.Preconditions.checkState; + /** * A Peer handles the high level communication with a Bitcoin node. * @@ -52,6 +55,7 @@ public class Peer { } private static final Logger log = LoggerFactory.getLogger(Peer.class); + protected final ReentrantLock lock = Locks.lock("peer"); private final NetworkParameters params; private final AbstractBlockChain blockChain; @@ -171,13 +175,18 @@ public class Peer { } @Override - public synchronized String toString() { - PeerAddress addr = address.get(); - if (addr == null) { - // User-provided NetworkConnection object. - return "Peer()"; - } else { - return "Peer(" + addr.getAddr() + ":" + addr.getPort() + ")"; + public String toString() { + lock.lock(); + try { + PeerAddress addr = address.get(); + if (addr == null) { + // User-provided NetworkConnection object. + return "Peer()"; + } else { + return "Peer(" + addr.getAddr() + ":" + addr.getPort() + ")"; + } + } finally { + lock.unlock(); } } @@ -226,22 +235,22 @@ public class Peer { // Allow event listeners to filter the message stream. Listeners are allowed to drop messages by // returning null. - synchronized (Peer.this) { + lock.lock(); + try { for (PeerEventListener listener : eventListeners) { synchronized (listener) { m = listener.onPreMessageReceived(Peer.this, m); if (m == null) break; } } - } + if (m == null) return; - if (m == null) return; - - synchronized (Peer.this) { if (currentFilteredBlock != null && !(m instanceof Transaction)) { processFilteredBlock(currentFilteredBlock); currentFilteredBlock = null; } + } finally { + lock.unlock(); } if (m instanceof NotFoundMessage) { @@ -257,8 +266,11 @@ public class Peer { // messages stream in. We'll call processFilteredBlock when a non-tx message arrives (eg, another // FilteredBlock) or when a tx that isn't needed by that block is found. A ping message is sent after // a getblocks, to force the non-tx message path. - synchronized (Peer.this) { - currentFilteredBlock = (FilteredBlock)m; + lock.lock(); + try { + currentFilteredBlock = (FilteredBlock) m; + } finally { + lock.unlock(); } } else if (m instanceof Transaction) { processTransaction((Transaction) m); @@ -309,6 +321,8 @@ public class Peer { } private void processNotFoundMessage(NotFoundMessage m) { + // This does not need to be locked. + // This is received when we previously did a getdata but the peer couldn't find what we requested in it's // memory pool. Typically, because we are downloading dependencies of a relevant transaction and reached // the bottom of the dependency tree (where the unconfirmed transactions connect to transactions that are @@ -327,7 +341,8 @@ public class Peer { } } - private synchronized void processAlert(AlertMessage m) { + private void processAlert(AlertMessage m) { + // This does not need to be locked. try { if (m.isSignatureValid()) { log.info("Received alert from peer {}: {}", toString(), m.getStatusBar()); @@ -347,16 +362,16 @@ public class Peer { return handler; } - private synchronized void processHeaders(HeadersMessage m) throws IOException, ProtocolException { + private void processHeaders(HeadersMessage m) throws IOException, ProtocolException { // Runs in network loop thread for this peer. // // This method can run if a peer just randomly sends us a "headers" message (should never happen), or more // likely when we've requested them as part of chain download using fast catchup. We need to add each block to // the chain if it pre-dates the fast catchup time. If we go past it, we can stop processing the headers and // request the full blocks from that point on instead. - Preconditions.checkState(!downloadBlockBodies, toString()); - + lock.lock(); try { + checkState(!downloadBlockBodies, toString()); for (int i = 0; i < m.getBlockHeaders().size(); i++) { Block header = m.getBlockHeaders().get(i); if (header.getTimeSeconds() < fastCatchupTimeSecs) { @@ -392,10 +407,13 @@ public class Peer { } catch (PrunedException e) { // Unreachable when in SPV mode. throw new RuntimeException(e); + } finally { + lock.unlock(); } } - private synchronized void processGetData(GetDataMessage getdata) throws IOException { + private void processGetData(GetDataMessage getdata) throws IOException { + // This does not need to be locked. log.info("{}: Received getdata message: {}", address.get(), getdata.toString()); ArrayList items = new ArrayList(); for (PeerEventListener listener : eventListeners) { @@ -414,77 +432,82 @@ public class Peer { } } - private synchronized void processTransaction(Transaction tx) throws VerificationException, IOException { - log.debug("{}: Received tx {}", address.get(), tx.getHashAsString()); - if (memoryPool != null) { - // We may get back a different transaction object. - tx = memoryPool.seen(tx, getAddress()); - } - final Transaction fTx = tx; - // Label the transaction as coming in from the P2P network (as opposed to being created by us, direct import, - // etc). This helps the wallet decide how to risk analyze it later. - fTx.getConfidence().setSource(TransactionConfidence.Source.NETWORK); - if (maybeHandleRequestedData(fTx)) { - return; - } - if (currentFilteredBlock != null) { - if (!currentFilteredBlock.provideTransaction(tx)) { - // Got a tx that didn't fit into the filtered block, so we must have received everything. - processFilteredBlock(currentFilteredBlock); - currentFilteredBlock = null; + private void processTransaction(Transaction tx) throws VerificationException, IOException { + lock.lock(); + try { + log.debug("{}: Received tx {}", address.get(), tx.getHashAsString()); + if (memoryPool != null) { + // We may get back a different transaction object. + tx = memoryPool.seen(tx, getAddress()); } - // Don't tell wallets or listeners about this tx as they'll learn about it when the filtered block is - // fully downloaded instead. - return; - } - // It's a broadcast transaction. Tell all wallets about this tx so they can check if it's relevant or not. - for (ListIterator it = wallets.listIterator(); it.hasNext();) { - final Wallet wallet = it.next(); - try { - if (wallet.isPendingTransactionRelevant(fTx)) { - // This transaction seems interesting to us, so let's download its dependencies. This has several - // purposes: we can check that the sender isn't attacking us by engaging in protocol abuse games, - // like depending on a time-locked transaction that will never confirm, or building huge chains - // of unconfirmed transactions (again - so they don't confirm and the money can be taken - // back with a Finney attack). Knowing the dependencies also lets us store them in a serialized - // wallet so we always have enough data to re-announce to the network and get the payment into - // the chain, in case the sender goes away and the network starts to forget. - // TODO: Not all the above things are implemented. + final Transaction fTx = tx; + // Label the transaction as coming in from the P2P network (as opposed to being created by us, direct import, + // etc). This helps the wallet decide how to risk analyze it later. + fTx.getConfidence().setSource(TransactionConfidence.Source.NETWORK); + if (maybeHandleRequestedData(fTx)) { + return; + } + if (currentFilteredBlock != null) { + if (!currentFilteredBlock.provideTransaction(tx)) { + // Got a tx that didn't fit into the filtered block, so we must have received everything. + processFilteredBlock(currentFilteredBlock); + currentFilteredBlock = null; + } + // Don't tell wallets or listeners about this tx as they'll learn about it when the filtered block is + // fully downloaded instead. + return; + } + // It's a broadcast transaction. Tell all wallets about this tx so they can check if it's relevant or not. + for (ListIterator it = wallets.listIterator(); it.hasNext(); ) { + final Wallet wallet = it.next(); + try { + if (wallet.isPendingTransactionRelevant(fTx)) { + // This transaction seems interesting to us, so let's download its dependencies. This has several + // purposes: we can check that the sender isn't attacking us by engaging in protocol abuse games, + // like depending on a time-locked transaction that will never confirm, or building huge chains + // of unconfirmed transactions (again - so they don't confirm and the money can be taken + // back with a Finney attack). Knowing the dependencies also lets us store them in a serialized + // wallet so we always have enough data to re-announce to the network and get the payment into + // the chain, in case the sender goes away and the network starts to forget. + // TODO: Not all the above things are implemented. - Futures.addCallback(downloadDependencies(fTx), new FutureCallback>() { - public void onSuccess(List dependencies) { - try { - log.info("{}: Dependency download complete!", address.get()); - wallet.receivePending(fTx, dependencies); - } catch (VerificationException e) { - log.error("{}: Wallet failed to process pending transaction {}", - address.get(), fTx.getHashAsString()); - log.error("Error was: ", e); + Futures.addCallback(downloadDependencies(fTx), new FutureCallback>() { + public void onSuccess(List dependencies) { + try { + log.info("{}: Dependency download complete!", address.get()); + wallet.receivePending(fTx, dependencies); + } catch (VerificationException e) { + log.error("{}: Wallet failed to process pending transaction {}", + address.get(), fTx.getHashAsString()); + log.error("Error was: ", e); + // Not much more we can do at this point. + } + } + + public void onFailure(Throwable throwable) { + log.error("Could not download dependencies of tx {}", fTx.getHashAsString()); + log.error("Error was: ", throwable); // Not much more we can do at this point. } - } - - public void onFailure(Throwable throwable) { - log.error("Could not download dependencies of tx {}", fTx.getHashAsString()); - log.error("Error was: ", throwable); - // Not much more we can do at this point. - } - }); + }); + } + } catch (VerificationException e) { + log.error("Wallet failed to verify tx", e); + // Carry on, listeners may still want to know. } - } catch (VerificationException e) { - log.error("Wallet failed to verify tx", e); - // Carry on, listeners may still want to know. } + // Tell all listeners about this tx so they can decide whether to keep it or not. If no listener keeps a + // reference around then the memory pool will forget about it after a while too because it uses weak references. + EventListenerInvoker.invoke(eventListeners, new EventListenerInvoker() { + @Override + public void invoke(PeerEventListener listener) { + listener.onTransaction(Peer.this, fTx); + } + }); + } finally { + lock.unlock(); } - // Tell all listeners about this tx so they can decide whether to keep it or not. If no listener keeps a - // reference around then the memory pool will forget about it after a while too because it uses weak references. - EventListenerInvoker.invoke(eventListeners, new EventListenerInvoker() { - @Override - public void invoke(PeerEventListener listener) { - listener.onTransaction(Peer.this, fTx); - } - }); } /** @@ -541,16 +564,15 @@ public class Peer { for (TransactionInput input : tx.getInputs()) { // There may be multiple inputs that connect to the same transaction. Sha256Hash hash = input.getOutpoint().getHash(); - synchronized (this) { - Transaction dep = memoryPool.get(hash); - if (dep == null) { - needToRequest.add(hash); - } else { - dependencies.add(dep); - } + Transaction dep = memoryPool.get(hash); + if (dep == null) { + needToRequest.add(hash); + } else { + dependencies.add(dep); } } results.addAll(dependencies); + lock.lock(); try { // Build the request for the missing dependencies. List> futures = Lists.newArrayList(); @@ -631,13 +653,16 @@ public class Peer { log.error("Couldn't send getdata in downloadDependencies({})", tx.getHash()); resultFuture.setException(e); return resultFuture; + } finally { + lock.unlock(); } return resultFuture; } - private synchronized void processBlock(Block m) throws IOException { + private void processBlock(Block m) throws IOException { if (log.isDebugEnabled()) log.debug("{}: Received broadcast block {}", address.get(), m.getHashAsString()); + lock.lock(); try { // Was this block requested by getBlock()? if (maybeHandleRequestedData(m)) return; @@ -683,13 +708,16 @@ public class Peer { } catch (PrunedException e) { // Unreachable when in SPV mode. throw new RuntimeException(e); + } finally { + lock.unlock(); } } // TODO: Fix this duplication. - private synchronized void processFilteredBlock(FilteredBlock m) throws IOException { + private void processFilteredBlock(FilteredBlock m) throws IOException { if (log.isDebugEnabled()) log.debug("{}: Received broadcast filtered block {}", address.get(), m.getHash().toString()); + lock.lock(); try { if (!downloadData.get()) { log.debug("{}: Received block we did not ask for: {}", address.get(), m.getHash().toString()); @@ -731,10 +759,13 @@ public class Peer { // data from the remote peer and fix things. Or just give up. // TODO: Request e.getHash() and submit it to the block store before any other blocks throw new RuntimeException(e); + } finally { + lock.unlock(); } } private boolean maybeHandleRequestedData(Message m) { + checkState(lock.isLocked()); boolean found = false; Sha256Hash hash = m.getHash(); for (ListIterator it = getDataFutures.listIterator(); it.hasNext();) { @@ -749,7 +780,8 @@ public class Peer { return found; } - private synchronized void invokeOnBlocksDownloaded(final Block m) { + private void invokeOnBlocksDownloaded(final Block m) { + checkState(lock.isLocked()); // It is possible for the peer block height difference to be negative when blocks have been solved and broadcast // since the time we first connected to the peer. However, it's weird and unexpected to receive a callback // with negative "blocks left" in this case, so we clamp to zero so the API user doesn't have to think about it. @@ -762,117 +794,127 @@ public class Peer { }); } - private synchronized void processInv(InventoryMessage inv) throws IOException { - // This should be called in the network loop thread for this peer. - List items = inv.getItems(); + private void processInv(InventoryMessage inv) throws IOException { + lock.lock(); + try { + // This should be called in the network loop thread for this peer. + List items = inv.getItems(); - // Separate out the blocks and transactions, we'll handle them differently - List transactions = new LinkedList(); - List blocks = new LinkedList(); + // Separate out the blocks and transactions, we'll handle them differently + List transactions = new LinkedList(); + List blocks = new LinkedList(); - for (InventoryItem item : items) { - switch (item.type) { - case Transaction: transactions.add(item); break; - case Block: blocks.add(item); break; - default: throw new IllegalStateException("Not implemented: " + item.type); + for (InventoryItem item : items) { + switch (item.type) { + case Transaction: + transactions.add(item); + break; + case Block: + blocks.add(item); + break; + default: + throw new IllegalStateException("Not implemented: " + item.type); + } } - } - final boolean downloadData = this.downloadData.get(); + final boolean downloadData = this.downloadData.get(); - if (transactions.size() == 0 && blocks.size() == 1) { - // Single block announcement. If we're downloading the chain this is just a tickle to make us continue - // (the block chain download protocol is very implicit and not well thought out). If we're not downloading - // the chain then this probably means a new block was solved and the peer believes it connects to the best - // chain, so count it. This way getBestChainHeight() can be accurate. - if (downloadData) { - if (!blockChain.isOrphan(blocks.get(0).hash)) { + if (transactions.size() == 0 && blocks.size() == 1) { + // Single block announcement. If we're downloading the chain this is just a tickle to make us continue + // (the block chain download protocol is very implicit and not well thought out). If we're not downloading + // the chain then this probably means a new block was solved and the peer believes it connects to the best + // chain, so count it. This way getBestChainHeight() can be accurate. + if (downloadData) { + if (!blockChain.isOrphan(blocks.get(0).hash)) { + blocksAnnounced.incrementAndGet(); + } + } else { blocksAnnounced.incrementAndGet(); } - } else { - blocksAnnounced.incrementAndGet(); } - } - GetDataMessage getdata = new GetDataMessage(params); + GetDataMessage getdata = new GetDataMessage(params); - Iterator it = transactions.iterator(); - while (it.hasNext()) { - InventoryItem item = it.next(); - if (memoryPool == null) { - if (downloadData) { - // If there's no memory pool only download transactions if we're configured to. - getdata.addItem(item); - } - } else { - // Only download the transaction if we are the first peer that saw it be advertised. Other peers will also - // see it be advertised in inv packets asynchronously, they co-ordinate via the memory pool. We could - // potentially download transactions faster by always asking every peer for a tx when advertised, as remote - // peers run at different speeds. However to conserve bandwidth on mobile devices we try to only download a - // transaction once. This means we can miss broadcasts if the peer disconnects between sending us an inv and - // sending us the transaction: currently we'll never try to re-fetch after a timeout. - if (memoryPool.maybeWasSeen(item.hash)) { - // Some other peer already announced this so don't download. - it.remove(); + Iterator it = transactions.iterator(); + while (it.hasNext()) { + InventoryItem item = it.next(); + if (memoryPool == null) { + if (downloadData) { + // If there's no memory pool only download transactions if we're configured to. + getdata.addItem(item); + } } else { - log.debug("{}: getdata on tx {}", address.get(), item.hash); - getdata.addItem(item); + // Only download the transaction if we are the first peer that saw it be advertised. Other peers will also + // see it be advertised in inv packets asynchronously, they co-ordinate via the memory pool. We could + // potentially download transactions faster by always asking every peer for a tx when advertised, as remote + // peers run at different speeds. However to conserve bandwidth on mobile devices we try to only download a + // transaction once. This means we can miss broadcasts if the peer disconnects between sending us an inv and + // sending us the transaction: currently we'll never try to re-fetch after a timeout. + if (memoryPool.maybeWasSeen(item.hash)) { + // Some other peer already announced this so don't download. + it.remove(); + } else { + log.debug("{}: getdata on tx {}", address.get(), item.hash); + getdata.addItem(item); + } + memoryPool.seen(item.hash, this.getAddress()); } - memoryPool.seen(item.hash, this.getAddress()); } - } - - // If we are requesting filteredblocks we have to send a ping after the getdata so that we have a clear - // end to the final FilteredBlock's transactions (in the form of a pong) sent to us - boolean pingAfterGetData = false; - if (blocks.size() > 0 && downloadData && blockChain != null) { - // Ideally, we'd only ask for the data here if we actually needed it. However that can imply a lot of - // disk IO to figure out what we've got. Normally peers will not send us inv for things we already have - // so we just re-request it here, and if we get duplicates the block chain / wallet will filter them out. - for (InventoryItem item : blocks) { - if (blockChain.isOrphan(item.hash) && downloadBlockBodies) { - // If an orphan was re-advertised, ask for more blocks unless we are not currently downloading - // full block data because we have a getheaders outstanding. - blockChainDownload(blockChain.getOrphanRoot(item.hash).getHash()); - } else { - // Don't re-request blocks we already requested. Normally this should not happen. However there is - // an edge case: if a block is solved and we complete the inv<->getdata<->block<->getblocks cycle - // whilst other parts of the chain are streaming in, then the new getblocks request won't match the - // previous one: whilst the stopHash is the same (because we use the orphan root), the start hash - // will be different and so the getblocks req won't be dropped as a duplicate. We'll end up - // requesting a subset of what we already requested, which can lead to parallel chain downloads - // and other nastyness. So we just do a quick removal of redundant getdatas here too. - // - // Note that as of June 2012 the Satoshi client won't actually ever interleave blocks pushed as - // part of chain download with newly announced blocks, so it should always be taken care of by - // the duplicate check in blockChainDownload(). But the satoshi client may change in future so - // it's better to be safe here. - if (!pendingBlockDownloads.contains(item.hash)) { - if (getPeerVersionMessage().isBloomFilteringSupported() && useFilteredBlocks) { - getdata.addItem(new InventoryItem(InventoryItem.Type.FilteredBlock, item.hash)); - pingAfterGetData = true; - } else { - getdata.addItem(item); + // If we are requesting filteredblocks we have to send a ping after the getdata so that we have a clear + // end to the final FilteredBlock's transactions (in the form of a pong) sent to us + boolean pingAfterGetData = false; + + if (blocks.size() > 0 && downloadData && blockChain != null) { + // Ideally, we'd only ask for the data here if we actually needed it. However that can imply a lot of + // disk IO to figure out what we've got. Normally peers will not send us inv for things we already have + // so we just re-request it here, and if we get duplicates the block chain / wallet will filter them out. + for (InventoryItem item : blocks) { + if (blockChain.isOrphan(item.hash) && downloadBlockBodies) { + // If an orphan was re-advertised, ask for more blocks unless we are not currently downloading + // full block data because we have a getheaders outstanding. + blockChainDownload(blockChain.getOrphanRoot(item.hash).getHash()); + } else { + // Don't re-request blocks we already requested. Normally this should not happen. However there is + // an edge case: if a block is solved and we complete the inv<->getdata<->block<->getblocks cycle + // whilst other parts of the chain are streaming in, then the new getblocks request won't match the + // previous one: whilst the stopHash is the same (because we use the orphan root), the start hash + // will be different and so the getblocks req won't be dropped as a duplicate. We'll end up + // requesting a subset of what we already requested, which can lead to parallel chain downloads + // and other nastyness. So we just do a quick removal of redundant getdatas here too. + // + // Note that as of June 2012 the Satoshi client won't actually ever interleave blocks pushed as + // part of chain download with newly announced blocks, so it should always be taken care of by + // the duplicate check in blockChainDownload(). But the satoshi client may change in future so + // it's better to be safe here. + if (!pendingBlockDownloads.contains(item.hash)) { + if (getPeerVersionMessage().isBloomFilteringSupported() && useFilteredBlocks) { + getdata.addItem(new InventoryItem(InventoryItem.Type.FilteredBlock, item.hash)); + pingAfterGetData = true; + } else { + getdata.addItem(item); + } + pendingBlockDownloads.add(item.hash); } - pendingBlockDownloads.add(item.hash); } } + // If we're downloading the chain, doing a getdata on the last block we were told about will cause the + // peer to advertize the head block to us in a single-item inv. When we download THAT, it will be an + // orphan block, meaning we'll re-enter blockChainDownload() to trigger another getblocks between the + // current best block we have and the orphan block. If more blocks arrive in the meantime they'll also + // become orphan. } - // If we're downloading the chain, doing a getdata on the last block we were told about will cause the - // peer to advertize the head block to us in a single-item inv. When we download THAT, it will be an - // orphan block, meaning we'll re-enter blockChainDownload() to trigger another getblocks between the - // current best block we have and the orphan block. If more blocks arrive in the meantime they'll also - // become orphan. - } - if (!getdata.getItems().isEmpty()) { - // This will cause us to receive a bunch of block or tx messages. - sendMessage(getdata); + if (!getdata.getItems().isEmpty()) { + // This will cause us to receive a bunch of block or tx messages. + sendMessage(getdata); + } + + if (pingAfterGetData) + sendMessage(new Ping((long) Math.random() * Long.MAX_VALUE)); + } finally { + lock.unlock(); } - - if (pingAfterGetData) - sendMessage(new Ping((long) Math.random() * Long.MAX_VALUE)); } /** @@ -881,6 +923,7 @@ public class Peer { * will block until the peer answers. */ public ListenableFuture getBlock(Sha256Hash blockHash) throws IOException { + // This does not need to be locked. log.info("Request to fetch block {}", blockHash); GetDataMessage getdata = new GetDataMessage(params); getdata.addBlock(blockHash); @@ -893,6 +936,7 @@ public class Peer { * in future many peers will delete old transaction data they don't need. */ public ListenableFuture getPeerMempoolTransaction(Sha256Hash hash) throws IOException { + // This does not need to be locked. // TODO: Unit test this method. log.info("Request to fetch peer mempool tx {}", hash); GetDataMessage getdata = new GetDataMessage(params); @@ -902,6 +946,7 @@ public class Peer { /** Sends a getdata with a single item in it. */ private ListenableFuture sendSingleGetData(GetDataMessage getdata) throws IOException { + // This does not need to be locked. Preconditions.checkArgument(getdata.getItems().size() == 1); GetDataRequest req = new GetDataRequest(); req.future = SettableFuture.create(); @@ -920,20 +965,25 @@ public class Peer { * * @param secondsSinceEpoch Time in seconds since the epoch or 0 to reset to always downloading block bodies. */ - public synchronized void setDownloadParameters(long secondsSinceEpoch, boolean useFilteredBlocks) { - Preconditions.checkNotNull(blockChain); - if (secondsSinceEpoch == 0) { - fastCatchupTimeSecs = params.genesisBlock.getTimeSeconds(); - downloadBlockBodies = true; - } else { - fastCatchupTimeSecs = secondsSinceEpoch; - // If the given time is before the current chains head block time, then this has no effect (we already - // downloaded everything we need). - if (fastCatchupTimeSecs > blockChain.getChainHead().getHeader().getTimeSeconds()) { - downloadBlockBodies = false; + public void setDownloadParameters(long secondsSinceEpoch, boolean useFilteredBlocks) { + lock.lock(); + try { + Preconditions.checkNotNull(blockChain); + if (secondsSinceEpoch == 0) { + fastCatchupTimeSecs = params.genesisBlock.getTimeSeconds(); + downloadBlockBodies = true; + } else { + fastCatchupTimeSecs = secondsSinceEpoch; + // If the given time is before the current chains head block time, then this has no effect (we already + // downloaded everything we need). + if (fastCatchupTimeSecs > blockChain.getChainHead().getHeader().getTimeSeconds()) { + downloadBlockBodies = false; + } } + this.useFilteredBlocks = useFilteredBlocks; + } finally { + lock.unlock(); } - this.useFilteredBlocks = useFilteredBlocks; } /** @@ -942,11 +992,13 @@ public class Peer { * independently, otherwise the wallet will receive duplicate notifications. */ public void addWallet(Wallet wallet) { + // This does not need to be locked. wallets.add(wallet); } /** Unlinks the given wallet from peer. See {@link Peer#addWallet(Wallet)}. */ public void removeWallet(Wallet wallet) { + // This does not need to be locked. wallets.remove(wallet); } @@ -954,6 +1006,7 @@ public class Peer { * Sends the given message on the peers Channel. */ public ChannelFuture sendMessage(Message m) throws IOException { + // This does not need to be locked. return Channels.write(channel, m); } @@ -962,7 +1015,7 @@ public class Peer { // multiple threads simultaneously. private Sha256Hash lastGetBlocksBegin, lastGetBlocksEnd; - private synchronized void blockChainDownload(Sha256Hash toHash) throws IOException { + private void blockChainDownload(Sha256Hash toHash) throws IOException { // The block chain download process is a bit complicated. Basically, we start with one or more blocks in a // chain that we have from a previous session. We want to catch up to the head of the chain BUT we don't know // where that chain is up to or even if the top block we have is even still in the chain - we @@ -996,52 +1049,57 @@ public class Peer { // headers and then request the blocks from that point onwards. "getheaders" does not send us an inv, it just // sends us the data we requested in a "headers" message. - // TODO: Block locators should be abstracted out rather than special cased here. - List blockLocator = new ArrayList(51); - // For now we don't do the exponential thinning as suggested here: - // - // https://en.bitcoin.it/wiki/Protocol_specification#getblocks - // - // This is because it requires scanning all the block chain headers, which is very slow. Instead we add the top - // 50 block headers. If there is a re-org deeper than that, we'll end up downloading the entire chain. We - // must always put the genesis block as the first entry. - BlockStore store = blockChain.getBlockStore(); - StoredBlock chainHead = blockChain.getChainHead(); - Sha256Hash chainHeadHash = chainHead.getHeader().getHash(); - // Did we already make this request? If so, don't do it again. - if (Objects.equal(lastGetBlocksBegin, chainHeadHash) && Objects.equal(lastGetBlocksEnd, toHash)) { - log.info("blockChainDownload({}): ignoring duplicated request", toHash.toString()); - return; - } - log.debug("{}: blockChainDownload({}) current head = {}", new Object[] { toString(), - toHash.toString(), chainHead.getHeader().getHashAsString() }); - StoredBlock cursor = chainHead; - for (int i = 100; cursor != null && i > 0; i--) { - blockLocator.add(cursor.getHeader().getHash()); - try { - cursor = cursor.getPrev(store); - } catch (BlockStoreException e) { - log.error("Failed to walk the block chain whilst constructing a locator"); - throw new RuntimeException(e); + lock.lock(); + try { + // TODO: Block locators should be abstracted out rather than special cased here. + List blockLocator = new ArrayList(51); + // For now we don't do the exponential thinning as suggested here: + // + // https://en.bitcoin.it/wiki/Protocol_specification#getblocks + // + // This is because it requires scanning all the block chain headers, which is very slow. Instead we add the top + // 50 block headers. If there is a re-org deeper than that, we'll end up downloading the entire chain. We + // must always put the genesis block as the first entry. + BlockStore store = blockChain.getBlockStore(); + StoredBlock chainHead = blockChain.getChainHead(); + Sha256Hash chainHeadHash = chainHead.getHeader().getHash(); + // Did we already make this request? If so, don't do it again. + if (Objects.equal(lastGetBlocksBegin, chainHeadHash) && Objects.equal(lastGetBlocksEnd, toHash)) { + log.info("blockChainDownload({}): ignoring duplicated request", toHash.toString()); + return; + } + log.debug("{}: blockChainDownload({}) current head = {}", new Object[]{toString(), + toHash.toString(), chainHead.getHeader().getHashAsString()}); + StoredBlock cursor = chainHead; + for (int i = 100; cursor != null && i > 0; i--) { + blockLocator.add(cursor.getHeader().getHash()); + try { + cursor = cursor.getPrev(store); + } catch (BlockStoreException e) { + log.error("Failed to walk the block chain whilst constructing a locator"); + throw new RuntimeException(e); + } + } + // Only add the locator if we didn't already do so. If the chain is < 50 blocks we already reached it. + if (cursor != null) { + blockLocator.add(params.genesisBlock.getHash()); } - } - // Only add the locator if we didn't already do so. If the chain is < 50 blocks we already reached it. - if (cursor != null) { - blockLocator.add(params.genesisBlock.getHash()); - } - // Record that we requested this range of blocks so we can filter out duplicate requests in the event of a - // block being solved during chain download. - lastGetBlocksBegin = chainHeadHash; - lastGetBlocksEnd = toHash; + // Record that we requested this range of blocks so we can filter out duplicate requests in the event of a + // block being solved during chain download. + lastGetBlocksBegin = chainHeadHash; + lastGetBlocksEnd = toHash; - if (downloadBlockBodies) { - GetBlocksMessage message = new GetBlocksMessage(params, blockLocator, toHash); - sendMessage(message); - } else { - // Downloading headers for a while instead of full blocks. - GetHeadersMessage message = new GetHeadersMessage(params, blockLocator, toHash); - sendMessage(message); + if (downloadBlockBodies) { + GetBlocksMessage message = new GetBlocksMessage(params, blockLocator, toHash); + sendMessage(message); + } else { + // Downloading headers for a while instead of full blocks. + GetHeadersMessage message = new GetHeadersMessage(params, blockLocator, toHash); + sendMessage(message); + } + } finally { + lock.unlock(); } } @@ -1049,15 +1107,17 @@ public class Peer { * Starts an asynchronous download of the block chain. The chain download is deemed to be complete once we've * downloaded the same number of blocks that the peer advertised having in its version handshake message. */ - public synchronized void startBlockChainDownload() throws IOException { + public void startBlockChainDownload() throws IOException { + // This does not need to be locked. setDownloadData(true); // TODO: peer might still have blocks that we don't have, and even have a heavier // chain even if the chain block count is lower. - if (getPeerBlockHeightDifference() >= 0) { + final int peerBlockHeightDifference = getPeerBlockHeightDifference(); + if (peerBlockHeightDifference >= 0) { EventListenerInvoker.invoke(eventListeners, new EventListenerInvoker() { @Override public void invoke(PeerEventListener listener) { - listener.onChainDownloadStarted(Peer.this, getPeerBlockHeightDifference()); + listener.onChainDownloadStarted(Peer.this, peerBlockHeightDifference); } }); @@ -1116,11 +1176,12 @@ public class Peer { * updated. * @throws ProtocolException if the peer version is too low to support measurable pings. */ - public synchronized ListenableFuture ping() throws IOException, ProtocolException { + public ListenableFuture ping() throws IOException, ProtocolException { return ping((long) Math.random() * Long.MAX_VALUE); } - protected synchronized ListenableFuture ping(long nonce) throws IOException, ProtocolException { + protected ListenableFuture ping(long nonce) throws IOException, ProtocolException { + // This does not need to be locked. if (!getPeerVersionMessage().isPingPongSupported()) throw new ProtocolException("Peer version is too low for measurable pings: " + getPeerVersionMessage()); PendingPing pendingPing = new PendingPing(nonce); @@ -1163,6 +1224,7 @@ public class Peer { } private void processPong(Pong m) { + // This does not need to be locked. PendingPing ping = null; // Iterates over a snapshot of the list, so we can run unlocked here. ListIterator it = pendingPings.listIterator(); @@ -1182,14 +1244,19 @@ public class Peer { * Returns the difference between our best chain height and the peers, which can either be positive if we are * behind the peer, or negative if the peer is ahead of us. */ - public synchronized int getPeerBlockHeightDifference() { - // Chain will overflow signed int blocks in ~41,000 years. - int chainHeight = (int) getBestHeight(); - // chainHeight should not be zero/negative because we shouldn't have given the user a Peer that is to another - // client-mode node, nor should it be unconnected. If that happens it means the user overrode us somewhere or - // there is a bug in the peer management code. - Preconditions.checkState(params.allowEmptyPeerChains || chainHeight > 0, "Connected to peer with zero/negative chain height", chainHeight); - return chainHeight - blockChain.getBestChainHeight(); + public int getPeerBlockHeightDifference() { + lock.lock(); + try { + // Chain will overflow signed int blocks in ~41,000 years. + int chainHeight = (int) getBestHeight(); + // chainHeight should not be zero/negative because we shouldn't have given the user a Peer that is to another + // client-mode node, nor should it be unconnected. If that happens it means the user overrode us somewhere or + // there is a bug in the peer management code. + checkState(params.allowEmptyPeerChains || chainHeight > 0, "Connected to peer with zero/negative chain height", chainHeight); + return chainHeight - blockChain.getBestChainHeight(); + } finally { + lock.unlock(); + } } private boolean isNotFoundMessageSupported() { @@ -1243,8 +1310,11 @@ public class Peer { * @return if not-null then this is the future for the Peer disconnection event. */ public ChannelFuture setMinProtocolVersion(int minProtocolVersion) { - synchronized (this) { + lock.lock(); + try { this.minProtocolVersion = minProtocolVersion; + } finally { + lock.unlock(); } if (getVersionMessage().clientVersion < minProtocolVersion) { log.warn("{}: Disconnecting due to new min protocol version {}", this, minProtocolVersion); @@ -1272,8 +1342,11 @@ public class Peer { if (!getPeerVersionMessage().isBloomFilteringSupported()) return; boolean shouldQueryMemPool; - synchronized (this) { + lock.lock(); + try { shouldQueryMemPool = memoryPool != null || downloadData.get(); + } finally { + lock.unlock(); } log.info("{}: Sending Bloom filter{}", this, shouldQueryMemPool ? " and querying mempool" : ""); ChannelFuture future = sendMessage(filter); diff --git a/core/src/main/java/com/google/bitcoin/core/PeerGroup.java b/core/src/main/java/com/google/bitcoin/core/PeerGroup.java index 37611642..407d721a 100644 --- a/core/src/main/java/com/google/bitcoin/core/PeerGroup.java +++ b/core/src/main/java/com/google/bitcoin/core/PeerGroup.java @@ -1,5 +1,5 @@ /** - * Copyright 2011 Google Inc. + * Copyright 2013 Google Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,6 +21,7 @@ import com.google.bitcoin.core.Peer.PeerHandler; import com.google.bitcoin.discovery.PeerDiscovery; import com.google.bitcoin.discovery.PeerDiscoveryException; import com.google.bitcoin.utils.EventListenerInvoker; +import com.google.bitcoin.utils.Locks; import com.google.common.base.Preconditions; import com.google.common.collect.Sets; import com.google.common.util.concurrent.*; @@ -39,9 +40,11 @@ import java.net.SocketAddress; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; /** *

Runs a set of connections to the P2P network, brings up connections to replace disconnected nodes and manages @@ -69,12 +72,12 @@ public class PeerGroup extends AbstractIdleService { private static final int DEFAULT_CONNECTIONS = 4; private static final Logger log = LoggerFactory.getLogger(PeerGroup.class); + protected final ReentrantLock lock = Locks.lock("peergroup"); // These lists are all thread-safe so do not have to be accessed under the PeerGroup lock. // Addresses to try to connect to, excluding active peers. private List inactives; - // Currently active peers. This is an ordered list rather than a set to make unit tests predictable. This is a - // synchronized list. Locking order is: PeerGroup < Peer < peers. Same for pendingPeers. + // Currently active peers. This is an ordered list rather than a set to make unit tests predictable. private List peers; // Currently connecting peers. private List pendingPeers; @@ -85,9 +88,9 @@ public class PeerGroup extends AbstractIdleService { // Callback for events related to chain download private PeerEventListener downloadListener; // Callbacks for events related to peer connection/disconnection - private List peerEventListeners; + private final CopyOnWriteArrayList peerEventListeners; // Peer discovery sources, will be polled occasionally if there aren't enough inactives. - private Set peerDiscoverers; + private CopyOnWriteArraySet peerDiscoverers; // The version message to use for new connections. private VersionMessage versionMessage; // A class that tracks recent transactions that have been broadcast across the network, counts how many @@ -99,7 +102,7 @@ public class PeerGroup extends AbstractIdleService { // Runs a background thread that we use for scheduling pings to our peers, so we can measure their performance // and network latency. We ping peers every pingIntervalMsec milliseconds. - private Timer pingTimer; + private volatile Timer pingTimer; /** How many milliseconds to wait after receiving a pong before sending another ping. */ public static final long DEFAULT_PING_INTERVAL_MSEC = 2000; private long pingIntervalMsec = DEFAULT_PING_INTERVAL_MSEC; @@ -107,7 +110,7 @@ public class PeerGroup extends AbstractIdleService { private final NetworkParameters params; private final AbstractBlockChain chain; private long fastCatchupTimeSecs; - private ArrayList wallets; + private final CopyOnWriteArrayList wallets; private AbstractPeerEventListener getDataListener; private ClientBootstrap bootstrap; @@ -190,7 +193,7 @@ public class PeerGroup extends AbstractIdleService { this.params = params; this.chain = chain; // Can be null. this.fastCatchupTimeSecs = params.genesisBlock.getTimeSeconds(); - this.wallets = new ArrayList(1); + this.wallets = new CopyOnWriteArrayList(); // This default sentinel value will be overridden by one of two actions: // - adding a peer discovery source sets it to the default @@ -217,7 +220,7 @@ public class PeerGroup extends AbstractIdleService { pendingPeers = Collections.synchronizedList(new ArrayList()); channels = new DefaultChannelGroup(); peerDiscoverers = new CopyOnWriteArraySet(); - peerEventListeners = new ArrayList(); + peerEventListeners = new CopyOnWriteArrayList(); // This event listener is added to every peer. It's here so when we announce transactions via an "inv", every // peer can fetch them. getDataListener = new AbstractPeerEventListener() { @@ -250,6 +253,7 @@ public class PeerGroup extends AbstractIdleService { private ChannelPipelineFactory makePipelineFactory(final NetworkParameters params, final AbstractBlockChain chain) { return new ChannelPipelineFactory() { public ChannelPipeline getPipeline() throws Exception { + // This runs unlocked. VersionMessage ver = getVersionMessage().duplicate(); ver.bestHeight = chain == null ? 0 : chain.getBestChainHeight(); ver.time = Utils.now().getTime() / 1000; @@ -274,9 +278,12 @@ public class PeerGroup extends AbstractIdleService { */ public void setMaxConnections(int maxConnections) { int adjustment; - synchronized (this) { + lock.lock(); + try { this.maxConnections = maxConnections; if (!isRunning()) return; + } finally { + lock.unlock(); } // We may now have too many or too few open connections. Add more or drop some to get to the right amount. adjustment = maxConnections - channels.size(); @@ -295,37 +302,47 @@ public class PeerGroup extends AbstractIdleService { } /** The maximum number of connections that we will create to peers. */ - public synchronized int getMaxConnections() { - return maxConnections; + public int getMaxConnections() { + lock.lock(); + try { + return maxConnections; + } finally { + lock.unlock(); + } } - private synchronized List handleGetData(GetDataMessage m) { + private List handleGetData(GetDataMessage m) { // Scans the wallets and memory pool for transactions in the getdata message and returns them. // Runs on peer threads. - LinkedList transactions = new LinkedList(); - LinkedList items = new LinkedList(m.getItems()); - Iterator it = items.iterator(); - while (it.hasNext()) { - InventoryItem item = it.next(); - // Check the mempool first. - Transaction tx = memoryPool.get(item.hash); - if (tx != null) { - transactions.add(tx); - it.remove(); - } else { - // Check the wallets. - for (Wallet w : wallets) { - synchronized (w) { - tx = w.getTransaction(item.hash); - if (tx == null) continue; - transactions.add(tx); - it.remove(); - break; + lock.lock(); + try { + LinkedList transactions = new LinkedList(); + LinkedList items = new LinkedList(m.getItems()); + Iterator it = items.iterator(); + while (it.hasNext()) { + InventoryItem item = it.next(); + // Check the mempool first. + Transaction tx = memoryPool.get(item.hash); + if (tx != null) { + transactions.add(tx); + it.remove(); + } else { + // Check the wallets. + for (Wallet w : wallets) { + synchronized (w) { + tx = w.getTransaction(item.hash); + if (tx == null) continue; + transactions.add(tx); + it.remove(); + break; + } } } } + return transactions; + } finally { + lock.unlock(); } - return transactions; } /** @@ -337,15 +354,25 @@ public class PeerGroup extends AbstractIdleService { * The VersionMessage you provide is copied and the best chain height/time filled in for each new connection, * therefore you don't have to worry about setting that. The provided object is really more of a template. */ - public synchronized void setVersionMessage(VersionMessage ver) { - versionMessage = ver; + public void setVersionMessage(VersionMessage ver) { + lock.lock(); + try { + versionMessage = ver; + } finally { + lock.unlock(); + } } /** * Returns the version message provided by setVersionMessage or a default if none was given. */ - public synchronized VersionMessage getVersionMessage() { - return versionMessage; + public VersionMessage getVersionMessage() { + lock.lock(); + try { + return versionMessage; + } finally { + lock.unlock(); + } } /** @@ -367,11 +394,16 @@ public class PeerGroup extends AbstractIdleService { } // Updates the relayTxesBeforeFilter flag of ver - private synchronized void updateVersionMessageRelayTxesBeforeFilter(VersionMessage ver) { + private void updateVersionMessageRelayTxesBeforeFilter(VersionMessage ver) { // We will provide the remote node with a bloom filter (ie they shouldn't relay yet) // iff chain == null || !chain.shouldVerifyTransactions() and a wallet is added // Note that the default here means that no tx invs will be received if no wallet is ever added - ver.relayTxesBeforeFilter = chain != null && chain.shouldVerifyTransactions() && wallets.size() > 0; + lock.lock(); + try { + ver.relayTxesBeforeFilter = chain != null && chain.shouldVerifyTransactions() && wallets.size() > 0; + } finally { + lock.unlock(); + } } /** @@ -401,12 +433,12 @@ public class PeerGroup extends AbstractIdleService { *

The listener will be locked during callback execution, which in turn will cause network message processing * to stop until the listener returns.

*/ - public synchronized void addEventListener(PeerEventListener listener) { + public void addEventListener(PeerEventListener listener) { peerEventListeners.add(checkNotNull(listener)); } /** The given event listener will no longer be called with events. */ - public synchronized boolean removeEventListener(PeerEventListener listener) { + public boolean removeEventListener(PeerEventListener listener) { return peerEventListeners.remove(checkNotNull(listener)); } @@ -437,9 +469,12 @@ public class PeerGroup extends AbstractIdleService { */ public void addAddress(PeerAddress peerAddress) { int newMax; - synchronized (this) { + lock.lock(); + try { inactives.add(peerAddress); newMax = getMaxConnections() + 1; + } finally { + lock.unlock(); } setMaxConnections(newMax); } @@ -453,13 +488,19 @@ public class PeerGroup extends AbstractIdleService { * Add addresses from a discovery source to the list of potential peers to connect to. If max connections has not * been configured, or set to zero, then it's set to the default at this point. */ - public synchronized void addPeerDiscovery(PeerDiscovery peerDiscovery) { - if (getMaxConnections() == 0) - setMaxConnections(DEFAULT_CONNECTIONS); - peerDiscoverers.add(peerDiscovery); + public void addPeerDiscovery(PeerDiscovery peerDiscovery) { + lock.lock(); + try { + if (getMaxConnections() == 0) + setMaxConnections(DEFAULT_CONNECTIONS); + peerDiscoverers.add(peerDiscovery); + } finally { + lock.unlock(); + } } protected void discoverPeers() throws PeerDiscoveryException { + // This does not need to be locked. long start = System.currentTimeMillis(); Set addressSet = Sets.newHashSet(); for (PeerDiscovery peerDiscovery : peerDiscoverers) { @@ -477,9 +518,11 @@ public class PeerGroup extends AbstractIdleService { /** Picks a peer from discovery and connects to it. If connection fails, picks another and tries again. */ protected void connectToAnyPeer() throws PeerDiscoveryException { - // Do not call this method whilst synchronized on the PeerGroup lock. + checkState(!lock.isLocked()); + final State state = state(); + if (!(state == State.STARTING || state == State.RUNNING)) return; + final PeerAddress addr; - if (!(state() == State.STARTING || state() == State.RUNNING)) return; synchronized (inactives) { if (inactives.size() == 0) { discoverPeers(); @@ -503,9 +546,7 @@ public class PeerGroup extends AbstractIdleService { @Override protected void startUp() throws Exception { // This is run in a background thread by the AbstractIdleService implementation. - synchronized (this) { - pingTimer = new Timer("Peer pinging thread", true); - } + pingTimer = new Timer("Peer pinging thread", true); // Bring up the requested number of connections. If a connect attempt fails, // new peers will be tried until there is a success, so just calling connectToAnyPeer for the wanted number // of peers is sufficient. @@ -530,9 +571,7 @@ public class PeerGroup extends AbstractIdleService { for (PeerDiscovery peerDiscovery : peerDiscoverers) { peerDiscovery.shutdown(); } - synchronized (this) { - pingTimer.cancel(); - } + pingTimer.cancel(); } /** @@ -546,27 +585,38 @@ public class PeerGroup extends AbstractIdleService { *

Note that this should be done before chain download commences because if you add a wallet with keys earlier * than the current chain head, the relevant parts of the chain won't be redownloaded for you.

*/ - public synchronized void addWallet(Wallet wallet) { - Preconditions.checkNotNull(wallet); - Preconditions.checkState(!wallets.contains(wallet)); - wallets.add(wallet); - announcePendingWalletTransactions(Collections.singletonList(wallet), peers); + public void addWallet(Wallet wallet) { + lock.lock(); + try { + Preconditions.checkNotNull(wallet); + Preconditions.checkState(!wallets.contains(wallet)); + wallets.add(wallet); + announcePendingWalletTransactions(Collections.singletonList(wallet), peers); - // Don't bother downloading block bodies before the oldest keys in all our wallets. Make sure we recalculate - // if a key is added. Of course, by then we may have downloaded the chain already. Ideally adding keys would - // automatically rewind the block chain and redownload the blocks to find transactions relevant to those keys, - // all transparently and in the background. But we are a long way from that yet. - wallet.addEventListener(new AbstractWalletEventListener() { - @Override - public void onKeyAdded(ECKey key) { - recalculateFastCatchupAndFilter(); - } - }); - recalculateFastCatchupAndFilter(); - updateVersionMessageRelayTxesBeforeFilter(getVersionMessage()); + // Don't bother downloading block bodies before the oldest keys in all our wallets. Make sure we recalculate + // if a key is added. Of course, by then we may have downloaded the chain already. Ideally adding keys would + // automatically rewind the block chain and redownload the blocks to find transactions relevant to those keys, + // all transparently and in the background. But we are a long way from that yet. + wallet.addEventListener(new AbstractWalletEventListener() { + @Override + public void onKeyAdded(ECKey key) { + lock.lock(); + try { + recalculateFastCatchupAndFilter(); + } finally { + lock.unlock(); + } + } + }); + recalculateFastCatchupAndFilter(); + updateVersionMessageRelayTxesBeforeFilter(getVersionMessage()); + } finally { + lock.unlock(); + } } - private synchronized void recalculateFastCatchupAndFilter() { + private void recalculateFastCatchupAndFilter() { + checkState(lock.isLocked()); // Fully verifying mode doesn't use this optimization (it can't as it needs to see all transactions). if (chain != null && chain.shouldVerifyTransactions()) return; @@ -606,18 +656,21 @@ public class PeerGroup extends AbstractIdleService { * See the docs for {@link BloomFilter#BloomFilter(int, double, long)} for a brief explanation of anonymity when * using bloom filters. */ - public synchronized void setBloomFilterFalsePositiveRate(double bloomFilterFPRate) { - this.bloomFilterFPRate = bloomFilterFPRate; - recalculateFastCatchupAndFilter(); + public void setBloomFilterFalsePositiveRate(double bloomFilterFPRate) { + lock.lock(); + try { + this.bloomFilterFPRate = bloomFilterFPRate; + recalculateFastCatchupAndFilter(); + } finally { + lock.unlock(); + } } /** * Unlinks the given wallet so it no longer receives broadcast transactions or has its transactions announced. */ public void removeWallet(Wallet wallet) { - if (wallet == null) - throw new IllegalArgumentException("wallet is null"); - wallets.remove(wallet); + wallets.remove(checkNotNull(wallet)); } /** @@ -639,8 +692,9 @@ public class PeerGroup extends AbstractIdleService { return connectTo(address, true); } - // Internal version. Do not call whilst holding the PeerGroup lock. + // Internal version. protected ChannelFuture connectTo(SocketAddress address, boolean incrementMaxConnections) { + checkState(!lock.isLocked()); ChannelFuture future = bootstrap.connect(address); // Make sure that the channel group gets access to the channel only if it connects successfully (otherwise // it cannot be closed and trying to do so will cause problems). @@ -661,11 +715,14 @@ public class PeerGroup extends AbstractIdleService { // This can be null in unit tests or apps that don't use TCP connections. networkHandler.getOwnerObject().setRemoteAddress(address); } - synchronized (this) { - if (incrementMaxConnections) { - // We don't use setMaxConnections here as that would trigger a recursive attempt to establish a new - // outbound connection. + if (incrementMaxConnections) { + // We don't use setMaxConnections here as that would trigger a recursive attempt to establish a new + // outbound connection. + lock.lock(); + try { maxConnections++; + } finally { + lock.unlock(); } } return future; @@ -687,15 +744,20 @@ public class PeerGroup extends AbstractIdleService { * * @param listener a listener for chain download events, may not be null */ - public synchronized void startBlockChainDownload(PeerEventListener listener) { - this.downloadListener = listener; - // TODO: be more nuanced about which peer to download from. We can also try - // downloading from multiple peers and handle the case when a new peer comes along - // with a longer chain after we thought we were done. - synchronized (peers) { - if (!peers.isEmpty()) { - startBlockChainDownloadFromPeer(peers.iterator().next()); + public void startBlockChainDownload(PeerEventListener listener) { + lock.lock(); + try { + this.downloadListener = listener; + // TODO: be more nuanced about which peer to download from. We can also try + // downloading from multiple peers and handle the case when a new peer comes along + // with a longer chain after we thought we were done. + synchronized (peers) { + if (!peers.isEmpty()) { + startBlockChainDownloadFromPeer(peers.iterator().next()); + } } + } finally { + lock.unlock(); } } @@ -715,77 +777,85 @@ public class PeerGroup extends AbstractIdleService { } } - protected synchronized void handleNewPeer(final Peer peer) { - // Runs on a netty worker thread for every peer that is newly connected. Peer is not locked at this point. - // Sets up the newly connected peer so it can do everything it needs to. - log.info("{}: New peer", peer); - // Give the peer a filter that can be used to probabilistically drop transactions that - // aren't relevant to our wallet. We may still receive some false positives, which is - // OK because it helps improve wallet privacy. Old nodes will just ignore the message. + protected void handleNewPeer(final Peer peer) { + lock.lock(); try { - if (bloomFilter != null) peer.setBloomFilter(bloomFilter); - } catch (IOException e) { } // That was quick...already disconnected - // Link the peer to the memory pool so broadcast transactions have their confidence levels updated. - peer.setDownloadData(false); - // TODO: The peer should calculate the fast catchup time from the added wallets here. - for (Wallet wallet : wallets) - peer.addWallet(wallet); - // Re-evaluate download peers. - Peer newDownloadPeer = selectDownloadPeer(peers); - if (downloadPeer != newDownloadPeer) { - setDownloadPeer(newDownloadPeer); - boolean shouldDownloadChain = downloadListener != null && chain != null; - if (shouldDownloadChain) { - startBlockChainDownloadFromPeer(downloadPeer); + // Runs on a netty worker thread for every peer that is newly connected. Peer is not locked at this point. + // Sets up the newly connected peer so it can do everything it needs to. + log.info("{}: New peer", peer); + // Give the peer a filter that can be used to probabilistically drop transactions that + // aren't relevant to our wallet. We may still receive some false positives, which is + // OK because it helps improve wallet privacy. Old nodes will just ignore the message. + try { + if (bloomFilter != null) peer.setBloomFilter(bloomFilter); + } catch (IOException e) { + } // That was quick...already disconnected + // Link the peer to the memory pool so broadcast transactions have their confidence levels updated. + peer.setDownloadData(false); + // TODO: The peer should calculate the fast catchup time from the added wallets here. + for (Wallet wallet : wallets) + peer.addWallet(wallet); + // Re-evaluate download peers. + Peer newDownloadPeer = selectDownloadPeer(peers); + if (downloadPeer != newDownloadPeer) { + setDownloadPeer(newDownloadPeer); + boolean shouldDownloadChain = downloadListener != null && chain != null; + if (shouldDownloadChain) { + startBlockChainDownloadFromPeer(downloadPeer); + } } - } - // Make sure the peer knows how to upload transactions that are requested from us. - peer.addEventListener(getDataListener); - // Now tell the peers about any transactions we have which didn't appear in the chain yet. These are not - // necessarily spends we created. They may also be transactions broadcast across the network that we saw, - // which are relevant to us, and which we therefore wish to help propagate (ie they send us coins). - // - // Note that this can cause a DoS attack against us if a malicious remote peer knows what keys we own, and - // then sends us fake relevant transactions. We'll attempt to relay the bad transactions, our badness score - // in the Satoshi client will increase and we'll get disconnected. - // - // TODO: Find a way to balance the desire to propagate useful transactions against DoS attacks. - announcePendingWalletTransactions(wallets, Collections.singletonList(peer)); - // And set up event listeners for clients. This will allow them to find out about new transactions and blocks. - for (PeerEventListener listener : peerEventListeners) { - peer.addEventListener(listener); - } - setupPingingForNewPeer(peer); - EventListenerInvoker.invoke(peerEventListeners, new EventListenerInvoker() { - @Override - public void invoke(PeerEventListener listener) { - listener.onPeerConnected(peer, peers.size()); + // Make sure the peer knows how to upload transactions that are requested from us. + peer.addEventListener(getDataListener); + // Now tell the peers about any transactions we have which didn't appear in the chain yet. These are not + // necessarily spends we created. They may also be transactions broadcast across the network that we saw, + // which are relevant to us, and which we therefore wish to help propagate (ie they send us coins). + // + // Note that this can cause a DoS attack against us if a malicious remote peer knows what keys we own, and + // then sends us fake relevant transactions. We'll attempt to relay the bad transactions, our badness score + // in the Satoshi client will increase and we'll get disconnected. + // + // TODO: Find a way to balance the desire to propagate useful transactions against DoS attacks. + announcePendingWalletTransactions(wallets, Collections.singletonList(peer)); + // And set up event listeners for clients. This will allow them to find out about new transactions and blocks. + for (PeerEventListener listener : peerEventListeners) { + peer.addEventListener(listener); } - }); - final PeerGroup thisGroup = this; - // TODO: Move this into the Peer object itself. - peer.addEventListener(new AbstractPeerEventListener() { - int filteredBlocksReceivedFromPeer = 0; - @Override - public Message onPreMessageReceived(Peer peer, Message m) { - if (m instanceof FilteredBlock) { - filteredBlocksReceivedFromPeer++; - if (filteredBlocksReceivedFromPeer % RESEND_BLOOM_FILTER_BLOCK_COUNT == RESEND_BLOOM_FILTER_BLOCK_COUNT-1) { - try { - synchronized(thisGroup) { + setupPingingForNewPeer(peer); + EventListenerInvoker.invoke(peerEventListeners, new EventListenerInvoker() { + @Override + public void invoke(PeerEventListener listener) { + listener.onPeerConnected(peer, peers.size()); + } + }); + // TODO: Move this into the Peer object itself. + peer.addEventListener(new AbstractPeerEventListener() { + int filteredBlocksReceivedFromPeer = 0; + + @Override + public Message onPreMessageReceived(Peer peer, Message m) { + if (m instanceof FilteredBlock) { + filteredBlocksReceivedFromPeer++; + if (filteredBlocksReceivedFromPeer % RESEND_BLOOM_FILTER_BLOCK_COUNT == RESEND_BLOOM_FILTER_BLOCK_COUNT - 1) { + lock.lock(); + try { peer.sendMessage(bloomFilter); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + lock.unlock(); } - } catch (IOException e) { - throw new RuntimeException(e); } } + return m; } - return m; - } - }); + }); + } finally { + lock.unlock(); + } } private void setupPingingForNewPeer(final Peer peer) { + checkState(lock.isLocked()); if (peer.getPeerVersionMessage().clientVersion < Pong.MIN_PROTOCOL_VERSION) return; if (getPingIntervalMsec() <= 0) @@ -831,6 +901,7 @@ public class PeerGroup extends AbstractIdleService { /** Returns true if at least one peer received an inv. */ private synchronized boolean announcePendingWalletTransactions(List announceWallets, List announceToPeers) { + checkState(lock.isLocked()); // Build up an inv announcing the hashes of all pending transactions in all our wallets. InventoryMessage inv = new InventoryMessage(params); for (Wallet w : announceWallets) { @@ -855,25 +926,30 @@ public class PeerGroup extends AbstractIdleService { return success; } - private synchronized void setDownloadPeer(Peer peer) { - if (downloadPeer == peer) { - return; - } - if (chain == null) { - // PeerGroup creator did not want us to download any data. We still track the download peer for - // informational purposes. + private void setDownloadPeer(Peer peer) { + lock.lock(); + try { + if (downloadPeer == peer) { + return; + } + if (chain == null) { + // PeerGroup creator did not want us to download any data. We still track the download peer for + // informational purposes. + downloadPeer = peer; + return; + } + if (downloadPeer != null) { + log.info("Unsetting download peer: {}", downloadPeer); + downloadPeer.setDownloadData(false); + } downloadPeer = peer; - return; - } - if (downloadPeer != null) { - log.info("Unsetting download peer: {}", downloadPeer); - downloadPeer.setDownloadData(false); - } - downloadPeer = peer; - if (downloadPeer != null) { - log.info("Setting download peer: {}", downloadPeer); - downloadPeer.setDownloadData(true); - downloadPeer.setDownloadParameters(fastCatchupTimeSecs, bloomFilter != null); + if (downloadPeer != null) { + log.info("Setting download peer: {}", downloadPeer); + downloadPeer.setDownloadData(true); + downloadPeer.setDownloadParameters(fastCatchupTimeSecs, bloomFilter != null); + } + } finally { + lock.unlock(); } } @@ -884,19 +960,23 @@ public class PeerGroup extends AbstractIdleService { * have that it's really valid. */ public MemoryPool getMemoryPool() { - // Locking unneeded as memoryPool is final. return memoryPool; } /** - * Tells the PeerGroup to download only block headers before a certain time and bodies after that. See - * {@link Peer#setFastCatchupTime(long)} for further explanation. Call this before starting block chain download. + * Tells the PeerGroup to download only block headers before a certain time and bodies after that. Call this + * before starting block chain download. */ - public synchronized void setFastCatchupTimeSecs(long secondsSinceEpoch) { - Preconditions.checkState(chain == null || !chain.shouldVerifyTransactions(), "Fast catchup is incompatible with fully verifying"); - fastCatchupTimeSecs = secondsSinceEpoch; - if (downloadPeer != null) { - downloadPeer.setDownloadParameters(secondsSinceEpoch, bloomFilter != null); + public void setFastCatchupTimeSecs(long secondsSinceEpoch) { + lock.lock(); + try { + Preconditions.checkState(chain == null || !chain.shouldVerifyTransactions(), "Fast catchup is incompatible with fully verifying"); + fastCatchupTimeSecs = secondsSinceEpoch; + if (downloadPeer != null) { + downloadPeer.setDownloadParameters(secondsSinceEpoch, bloomFilter != null); + } + } finally { + lock.unlock(); } } @@ -906,8 +986,13 @@ public class PeerGroup extends AbstractIdleService { * the min of the wallets earliest key times. * @return a time in seconds since the epoch */ - public synchronized long getFastCatchupTimeSecs() { - return fastCatchupTimeSecs; + public long getFastCatchupTimeSecs() { + lock.lock(); + try { + return fastCatchupTimeSecs; + } finally { + lock.unlock(); + } } protected void handlePeerDeath(final Peer peer) { @@ -921,9 +1006,12 @@ public class PeerGroup extends AbstractIdleService { checkArgument(!peers.contains(peer)); final Peer downloadPeer; final PeerEventListener downloadListener; - synchronized (this) { + lock.lock(); + try { downloadPeer = this.downloadPeer; downloadListener = this.downloadListener; + } finally { + lock.unlock(); } if (peer == downloadPeer) { log.info("Download peer died. Picking a new one."); @@ -961,7 +1049,8 @@ public class PeerGroup extends AbstractIdleService { }); } - private synchronized void startBlockChainDownloadFromPeer(Peer peer) { + private void startBlockChainDownloadFromPeer(Peer peer) { + lock.lock(); try { peer.addEventListener(downloadListener); setDownloadPeer(peer); @@ -969,7 +1058,8 @@ public class PeerGroup extends AbstractIdleService { peer.startBlockChainDownload(); } catch (IOException e) { log.error("failed to start block chain download from " + peer, e); - return; + } finally { + lock.unlock(); } } @@ -1006,21 +1096,31 @@ public class PeerGroup extends AbstractIdleService { * @return */ public int getMinBroadcastConnections() { - if (minBroadcastConnections == 0) { - int max = getMaxConnections(); - if (max <= 1) - return max; - else - return (int)Math.round(getMaxConnections() / 2.0); + lock.lock(); + try { + if (minBroadcastConnections == 0) { + int max = getMaxConnections(); + if (max <= 1) + return max; + else + return (int) Math.round(getMaxConnections() / 2.0); + } + return minBroadcastConnections; + } finally { + lock.unlock(); } - return minBroadcastConnections; } /** * See {@link com.google.bitcoin.core.PeerGroup#getMinBroadcastConnections()}. */ public void setMinBroadcastConnections(int value) { - minBroadcastConnections = value; + lock.lock(); + try { + minBroadcastConnections = value; + } finally { + lock.unlock(); + } } /** @@ -1076,7 +1176,8 @@ public class PeerGroup extends AbstractIdleService { boolean done = false; log.info("broadcastTransaction: TX {} seen by {} peers", pinnedTx.getHashAsString(), numSeenPeers); - synchronized (PeerGroup.this) { + lock.lock(); + try { if (numSeenPeers >= minConnections) { // We've seen the min required number of peers announce the transaction. Note that we // can't wait for the current number of connected peers right now because we could have @@ -1101,6 +1202,8 @@ public class PeerGroup extends AbstractIdleService { } done = true; } + } finally { + lock.unlock(); } if (done) { // We're done! Run this outside of the peer group lock as setting the future may immediately @@ -1128,7 +1231,8 @@ public class PeerGroup extends AbstractIdleService { if (minConnections == 1) { sendComplete.addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture _) throws Exception { - synchronized (PeerGroup.this) { + lock.lock(); + try { for (Wallet wallet : wallets) { try { if (wallet.isPendingTransactionRelevant(pinnedTx)) { @@ -1141,6 +1245,8 @@ public class PeerGroup extends AbstractIdleService { return; } } + } finally { + lock.unlock(); } future.set(pinnedTx); return; @@ -1161,8 +1267,13 @@ public class PeerGroup extends AbstractIdleService { * times are available via {@link com.google.bitcoin.core.Peer#getLastPingTime()} but it increases load on the * remote node. It defaults to 5000. */ - public synchronized long getPingIntervalMsec() { - return pingIntervalMsec; + public long getPingIntervalMsec() { + lock.lock(); + try { + return pingIntervalMsec; + } finally { + lock.unlock(); + } } /** @@ -1172,8 +1283,13 @@ public class PeerGroup extends AbstractIdleService { * Setting the value to be <= 0 disables pinging entirely, although you can still request one yourself * using {@link com.google.bitcoin.core.Peer#ping()}. */ - public synchronized void setPingIntervalMsec(long pingIntervalMsec) { - this.pingIntervalMsec = pingIntervalMsec; + public void setPingIntervalMsec(long pingIntervalMsec) { + lock.lock(); + try { + this.pingIntervalMsec = pingIntervalMsec; + } finally { + lock.unlock(); + } } /** @@ -1181,7 +1297,7 @@ public class PeerGroup extends AbstractIdleService { * If no peers are connected, returns zero. */ public int getMostCommonChainHeight() { - // Copy the peers list so we can calculate on it without violating lock ordering: Peer < peers + // Copy the peers list so we can calculate on it without violating lock ordering. ArrayList peers; synchronized (this.peers) { peers = new ArrayList(this.peers); @@ -1307,6 +1423,11 @@ public class PeerGroup extends AbstractIdleService { * returns. Can return null if no peer was selected. */ public Peer getDownloadPeer() { - return downloadPeer; + lock.lock(); + try { + return downloadPeer; + } finally { + lock.unlock(); + } } } diff --git a/core/src/main/java/com/google/bitcoin/core/Utils.java b/core/src/main/java/com/google/bitcoin/core/Utils.java index a0ad6bbf..7f8d68a9 100644 --- a/core/src/main/java/com/google/bitcoin/core/Utils.java +++ b/core/src/main/java/com/google/bitcoin/core/Utils.java @@ -37,27 +37,13 @@ import static com.google.common.base.Preconditions.checkArgument; * To enable debug logging from the library, run with -Dbitcoinj.logging=true on your command line. */ public class Utils { - public static final CycleDetectingLockFactory cycleDetectingLockFactory; private static final MessageDigest digest; - static { try { digest = MessageDigest.getInstance("SHA-256"); } catch (NoSuchAlgorithmException e) { throw new RuntimeException(e); // Can't happen. } - - cycleDetectingLockFactory = CycleDetectingLockFactory.newInstance(CycleDetectingLockFactory.Policies.THROW); - } - - private static final boolean detectLockCycles = true; - - /** Returns a re-entrant lock that may be cycle detecting, depending on {@link Utils#detectLockCycles}. */ - public static ReentrantLock lock(String name) { - if (detectLockCycles) - return cycleDetectingLockFactory.newReentrantLock(name); - else - return new ReentrantLock(); } /** The string that prefixes all text messages signed using Bitcoin keys. */ diff --git a/core/src/main/java/com/google/bitcoin/core/Wallet.java b/core/src/main/java/com/google/bitcoin/core/Wallet.java index 04df7dad..2719da3a 100644 --- a/core/src/main/java/com/google/bitcoin/core/Wallet.java +++ b/core/src/main/java/com/google/bitcoin/core/Wallet.java @@ -20,6 +20,7 @@ import com.google.bitcoin.core.TransactionConfidence.ConfidenceType; import com.google.bitcoin.core.WalletTransaction.Pool; import com.google.bitcoin.store.WalletProtobufSerializer; import com.google.bitcoin.utils.EventListenerInvoker; +import com.google.bitcoin.utils.Locks; import com.google.common.base.Objects; import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; @@ -74,7 +75,7 @@ public class Wallet implements Serializable, BlockChainListener { private static final Logger log = LoggerFactory.getLogger(Wallet.class); private static final long serialVersionUID = 2L; - protected final ReentrantLock lock = Utils.lock("wallet"); + protected final ReentrantLock lock = Locks.lock("wallet"); // Algorithm for movement of transactions between pools. Outbound tx = us spending coins. Inbound tx = us // receiving coins. If a tx is both inbound and outbound (spend with change) it is considered outbound for the @@ -1388,44 +1389,50 @@ public class Wallet implements Serializable, BlockChainListener { * @param includeInactive If true, transactions that are on side chains (are unspendable) are included. */ public Set getTransactions(boolean includeDead, boolean includeInactive) { - Set all = new HashSet(); lock.lock(); - all.addAll(unspent.values()); - all.addAll(spent.values()); - all.addAll(pending.values()); - if (includeDead) - all.addAll(dead.values()); - if (includeInactive) - all.addAll(inactive.values()); - lock.unlock(); - return all; + try { + Set all = new HashSet(); + all.addAll(unspent.values()); + all.addAll(spent.values()); + all.addAll(pending.values()); + if (includeDead) + all.addAll(dead.values()); + if (includeInactive) + all.addAll(inactive.values()); + return all; + } finally { + lock.unlock(); + } } /** * Returns a set of all WalletTransactions in the wallet. */ public Iterable getWalletTransactions() { - HashSet pendingInactive = new HashSet(); lock.lock(); - pendingInactive.addAll(pending.values()); - pendingInactive.retainAll(inactive.values()); - HashSet onlyPending = new HashSet(); - HashSet onlyInactive = new HashSet(); - onlyPending.addAll(pending.values()); - onlyPending.removeAll(pendingInactive); - onlyInactive.addAll(inactive.values()); - onlyInactive.removeAll(pendingInactive); - - Set all = new HashSet(); + try { + HashSet pendingInactive = new HashSet(); + pendingInactive.addAll(pending.values()); + pendingInactive.retainAll(inactive.values()); + HashSet onlyPending = new HashSet(); + HashSet onlyInactive = new HashSet(); + onlyPending.addAll(pending.values()); + onlyPending.removeAll(pendingInactive); + onlyInactive.addAll(inactive.values()); + onlyInactive.removeAll(pendingInactive); - addWalletTransactionsToSet(all, Pool.UNSPENT, unspent.values()); - addWalletTransactionsToSet(all, Pool.SPENT, spent.values()); - addWalletTransactionsToSet(all, Pool.DEAD, dead.values()); - addWalletTransactionsToSet(all, Pool.PENDING, onlyPending); - addWalletTransactionsToSet(all, Pool.INACTIVE, onlyInactive); - addWalletTransactionsToSet(all, Pool.PENDING_INACTIVE, pendingInactive); - lock.unlock(); - return all; + Set all = new HashSet(); + + addWalletTransactionsToSet(all, Pool.UNSPENT, unspent.values()); + addWalletTransactionsToSet(all, Pool.SPENT, spent.values()); + addWalletTransactionsToSet(all, Pool.DEAD, dead.values()); + addWalletTransactionsToSet(all, Pool.PENDING, onlyPending); + addWalletTransactionsToSet(all, Pool.INACTIVE, onlyInactive); + addWalletTransactionsToSet(all, Pool.PENDING_INACTIVE, pendingInactive); + return all; + } finally { + lock.unlock(); + } } private static void addWalletTransactionsToSet(Set txs, @@ -1452,7 +1459,7 @@ public class Wallet implements Serializable, BlockChainListener { /** * Adds the given transaction to the given pools and registers a confidence change listener on it. */ - private synchronized void addWalletTransaction(Pool pool, Transaction tx) { + private void addWalletTransaction(Pool pool, Transaction tx) { checkState(lock.isLocked()); switch (pool) { case UNSPENT: diff --git a/core/src/main/java/com/google/bitcoin/store/SPVBlockStore.java b/core/src/main/java/com/google/bitcoin/store/SPVBlockStore.java index 376c8200..3703e16b 100644 --- a/core/src/main/java/com/google/bitcoin/store/SPVBlockStore.java +++ b/core/src/main/java/com/google/bitcoin/store/SPVBlockStore.java @@ -17,6 +17,7 @@ package com.google.bitcoin.store; import com.google.bitcoin.core.*; +import com.google.bitcoin.utils.Locks; import com.google.common.base.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,7 +55,7 @@ public class SPVBlockStore implements BlockStore { protected int numHeaders; protected NetworkParameters params; - protected ReentrantLock lock = Utils.lock("SPVBlockStore"); + protected ReentrantLock lock = Locks.lock("SPVBlockStore"); // The entire ring-buffer is mmapped and accessing it should be as fast as accessing regular memory once it's // faulted in. Unfortunately, in theory practice and theory are the same. In practice they aren't. diff --git a/core/src/main/java/com/google/bitcoin/utils/Locks.java b/core/src/main/java/com/google/bitcoin/utils/Locks.java new file mode 100644 index 00000000..3c684b20 --- /dev/null +++ b/core/src/main/java/com/google/bitcoin/utils/Locks.java @@ -0,0 +1,55 @@ +/** + * Copyright 2013 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.bitcoin.utils; + +import com.google.common.util.concurrent.CycleDetectingLockFactory; + +import java.util.concurrent.locks.ReentrantLock; + +/** + * A wrapper around explicit lock creation that lets you control whether bitcoinj performs cycle detection or not. + */ +public class Locks { + + static { + // Default policy goes here. If you want to change this, use one of the static methods before + // instantiating any bitcoinj objects. The policy change will take effect only on new objects + // from that point onwards. + warnOnLockCycles(); + } + + public static CycleDetectingLockFactory factory = null; + + public static ReentrantLock lock(String name) { + if (factory != null) + return factory.newReentrantLock(name); + else + return new ReentrantLock(); + } + + public static void warnOnLockCycles() { + factory = CycleDetectingLockFactory.newInstance(CycleDetectingLockFactory.Policies.WARN); + } + + public static void throwOnLockCycles() { + factory = CycleDetectingLockFactory.newInstance(CycleDetectingLockFactory.Policies.THROW); + } + + public static void ignoreLockCycles() { + factory = null; + } +} diff --git a/core/src/test/java/com/google/bitcoin/core/WalletTest.java b/core/src/test/java/com/google/bitcoin/core/WalletTest.java index e9ca34da..83c4f3fa 100644 --- a/core/src/test/java/com/google/bitcoin/core/WalletTest.java +++ b/core/src/test/java/com/google/bitcoin/core/WalletTest.java @@ -21,6 +21,7 @@ import com.google.bitcoin.core.WalletTransaction.Pool; import com.google.bitcoin.store.BlockStore; import com.google.bitcoin.store.MemoryBlockStore; import com.google.bitcoin.utils.BriefLogFormatter; +import com.google.bitcoin.utils.Locks; import com.google.common.collect.Lists; import com.google.common.util.concurrent.CycleDetectingLockFactory; import org.junit.Before; @@ -917,7 +918,9 @@ public class WalletTest { @Test public void lockCycles() { - final ReentrantLock lock = Utils.cycleDetectingLockFactory.newReentrantLock("test"); + Locks.throwOnLockCycles(); + final ReentrantLock lock = Locks.lock("test"); + wallet = new Wallet(params); lock.lock(); int foo = wallet.getKeychainSize(); lock.unlock();