diff --git a/core/src/main/java/com/google/bitcoin/core/AbstractBlockChain.java b/core/src/main/java/com/google/bitcoin/core/AbstractBlockChain.java index 72e28df5..210f231e 100644 --- a/core/src/main/java/com/google/bitcoin/core/AbstractBlockChain.java +++ b/core/src/main/java/com/google/bitcoin/core/AbstractBlockChain.java @@ -430,6 +430,22 @@ public abstract class AbstractBlockChain { } } + /** + * Returns the hashes of the currently stored orphan blocks and then deletes them from this objects storage. + * Used by Peer when a filter exhaustion event has occurred and thus any orphan blocks that have been downloaded + * might be inaccurate/incomplete. + */ + public Set drainOrphanBlocks() { + lock.lock(); + try { + Set hashes = new HashSet(orphanBlocks.keySet()); + orphanBlocks.clear(); + return hashes; + } finally { + lock.unlock(); + } + } + // expensiveChecks enables checks that require looking at blocks further back in the chain // than the previous one when connecting (eg median timestamp check) // It could be exposed, but for now we just set it to shouldVerifyTransactions() @@ -796,7 +812,6 @@ public abstract class AbstractBlockChain { Iterator iter = orphanBlocks.values().iterator(); while (iter.hasNext()) { OrphanBlock orphanBlock = iter.next(); - log.debug("Trying to connect {}", orphanBlock.block.getHash()); // Look up the blocks previous. StoredBlock prev = getStoredBlockInCurrentScope(orphanBlock.block.getPrevBlockHash()); if (prev == null) { @@ -806,6 +821,7 @@ public abstract class AbstractBlockChain { } // Otherwise we can connect it now. // False here ensures we don't recurse infinitely downwards when connecting huge chains. + log.info("Connected orphan {}", orphanBlock.block.getHash()); add(orphanBlock.block, false, orphanBlock.filteredTxHashes, orphanBlock.filteredTxn); iter.remove(); blocksConnectedThisRound++; diff --git a/core/src/main/java/com/google/bitcoin/core/Peer.java b/core/src/main/java/com/google/bitcoin/core/Peer.java index 215c8a0a..4c80ba33 100644 --- a/core/src/main/java/com/google/bitcoin/core/Peer.java +++ b/core/src/main/java/com/google/bitcoin/core/Peer.java @@ -107,7 +107,12 @@ public class Peer extends PeerSocketHandler { // How many filtered blocks have been received during the lifetime of this connection. Used to decide when to // refresh the server-side side filter by sending a new one (it degrades over time as false positives are added // on the remote side, see BIP 37 for a discussion of this). + // TODO: Is this still needed? It should not be since the auto FP tracking logic was added. private int filteredBlocksReceived; + // If non-null, we should discard incoming filtered blocks because we ran out of keys and are awaiting a new filter + // to be calculated by the PeerGroup. The discarded block hashes should be added here so we can re-request them + // once we've recalculated and resent a new filter. + @GuardedBy("lock") @Nullable private List awaitingFreshFilter; // How frequently to refresh the filter. This should become dynamic in future and calculated depending on the // actual false positive rate. For now a good value was determined empirically around January 2013. private static final int RESEND_BLOOM_FILTER_BLOCK_COUNT = 25000; @@ -888,9 +893,8 @@ public class Peer extends PeerSocketHandler { // TODO: Fix this duplication. private void endFilteredBlock(FilteredBlock m) { - if (log.isDebugEnabled()) { + if (log.isDebugEnabled()) log.debug("{}: Received broadcast filtered block {}", getAddress(), m.getHash().toString()); - } if (!vDownloadData) { log.debug("{}: Received block we did not ask for: {}", getAddress(), m.getHash().toString()); return; @@ -904,7 +908,7 @@ public class Peer extends PeerSocketHandler { // by cross-checking peers against each other. pendingBlockDownloads.remove(m.getBlockHeader().getHash()); try { - // Otherwise it's a block sent to us because the peer thought we needed it, so add it to the block chain. + // It's a block sent to us because the peer thought we needed it, so maybe add it to the block chain. // The FilteredBlock m here contains a list of hashes, and may contain Transaction objects for a subset // of the hashes (those that were sent to us by the remote peer). Any hashes that haven't had a tx // provided in processTransaction are ones that were announced to us previously via an 'inv' so the @@ -919,6 +923,35 @@ public class Peer extends PeerSocketHandler { // confirmation and become stuck forever. The fix is to notice that there's a pending getdata for a tx // that appeared in this block and delay processing until it arrived ... it's complicated by the fact that // the data may be requested by a different peer to this one. + + // Ask each wallet attached to the peer/blockchain if this block exhausts the list of data items + // (keys/addresses) that were used to calculate the previous filter. If so, then it's possible this block + // is only partial. Check for discarding first so we don't check for exhaustion on blocks we already know + // we're going to discard, otherwise redundant filters might end up being queued and calculated. + lock.lock(); + try { + if (awaitingFreshFilter != null) { + log.info("Discarding block {} because we're still waiting for a fresh filter", m.getHash()); + // We must record the hashes of blocks we discard because you cannot do getblocks twice on the same + // range of blocks and get an inv both times, due to the codepath in Bitcoin Core hitting + // CPeer::PushInventory() which checks CPeer::setInventoryKnown and thus deduplicates. + awaitingFreshFilter.add(m.getHash()); + return; // Chain download process is restarted via a call to setBloomFilter. + } else if (checkForFilterExhaustion(m)) { + // Yes, so we must abandon the attempt to process this block and any further blocks we receive, + // then wait for the Bloom filter to be recalculated, sent to this peer and for the peer to acknowledge + // that the new filter is now in use (which we have to simulate with a ping/pong), and then we can + // safely restart the chain download with the new filter that contains a new set of lookahead keys. + log.info("Bloom filter exhausted whilst processing block {}, discarding", m.getHash()); + awaitingFreshFilter = new LinkedList(); + awaitingFreshFilter.add(m.getHash()); + awaitingFreshFilter.addAll(blockChain.drainOrphanBlocks()); + return; // Chain download process is restarted via a call to setBloomFilter. + } + } finally { + lock.unlock(); + } + if (blockChain.add(m)) { // The block was successfully linked into the chain. Notify the user of our progress. invokeOnBlocksDownloaded(m.getBlockHeader()); @@ -957,6 +990,14 @@ public class Peer extends PeerSocketHandler { } } + private boolean checkForFilterExhaustion(FilteredBlock m) { + boolean exhausted = false; + for (Wallet wallet : wallets) { + exhausted |= wallet.checkForFilterExhaustion(m); + } + return exhausted; + } + private boolean maybeHandleRequestedData(Message m) { boolean found = false; Sha256Hash hash = m.getHash(); @@ -1082,7 +1123,7 @@ public class Peer extends PeerSocketHandler { // it's better to be safe here. if (!pendingBlockDownloads.contains(item.hash)) { if (vPeerVersionMessage.isBloomFilteringSupported() && useFilteredBlocks) { - getdata.addItem(new InventoryItem(InventoryItem.Type.FilteredBlock, item.hash)); + getdata.addFilteredBlock(item.hash); pingAfterGetData = true; } else { getdata.addItem(item); @@ -1259,8 +1300,9 @@ public class Peer extends PeerSocketHandler { log.info("blockChainDownloadLocked({}): ignoring duplicated request", toHash.toString()); return; } - log.debug("{}: blockChainDownloadLocked({}) current head = {}", - toString(), toHash.toString(), chainHead.getHeader().getHashAsString()); + if (log.isDebugEnabled()) + log.debug("{}: blockChainDownloadLocked({}) current head = {}", + toString(), toHash.toString(), chainHead.getHeader().getHashAsString()); StoredBlock cursor = chainHead; for (int i = 100; cursor != null && i > 0; i--) { blockLocator.add(cursor.getHeader().getHash()); @@ -1272,9 +1314,8 @@ public class Peer extends PeerSocketHandler { } } // Only add the locator if we didn't already do so. If the chain is < 50 blocks we already reached it. - if (cursor != null) { + if (cursor != null) blockLocator.add(params.getGenesisBlock().getHash()); - } // Record that we requested this range of blocks so we can filter out duplicate requests in the event of a // block being solved during chain download. @@ -1539,6 +1580,43 @@ public class Peer extends PeerSocketHandler { sendMessage(filter); if (andQueryMemPool) sendMessage(new MemoryPoolMessage()); + maybeRestartChainDownload(); + } + + private void maybeRestartChainDownload() { + lock.lock(); + try { + if (awaitingFreshFilter == null) + return; + if (!vDownloadData) { + // This branch should be harmless but I want to know how often it happens in reality. + log.warn("Lost download peer status whilst awaiting fresh filter."); + return; + } + // Ping/pong to wait for blocks that are still being streamed to us to finish being downloaded and + // discarded. + ping().addListener(new Runnable() { + @Override + public void run() { + lock.lock(); + checkNotNull(awaitingFreshFilter); + GetDataMessage getdata = new GetDataMessage(params); + for (Sha256Hash hash : awaitingFreshFilter) + getdata.addFilteredBlock(hash); + awaitingFreshFilter = null; + lock.unlock(); + + log.info("Restarting chain download"); + sendMessage(getdata); + // TODO: This bizarre ping-after-getdata hack probably isn't necessary. + // It's to ensure we know when the end of a filtered block stream of txns is, but we should just be + // able to match txns with the merkleblock. Ask Matt why it's written this way. + sendMessage(new Ping((long) (Math.random() * Long.MAX_VALUE))); + } + }, Threading.SAME_THREAD); + } finally { + lock.unlock(); + } } /** diff --git a/core/src/main/java/com/google/bitcoin/core/Wallet.java b/core/src/main/java/com/google/bitcoin/core/Wallet.java index c0431e08..84eb5d07 100644 --- a/core/src/main/java/com/google/bitcoin/core/Wallet.java +++ b/core/src/main/java/com/google/bitcoin/core/Wallet.java @@ -3887,6 +3887,29 @@ public class Wallet extends BaseTaggableObject implements Serializable, BlockCha out.isWatched(this); } + /** + * Used by {@link Peer} to decide whether or not to discard this block and any blocks building upon it, in case + * the Bloom filter used to request them may be exhausted, that is, not have sufficient keys in the deterministic + * sequence within it to reliably find relevant transactions. + */ + public boolean checkForFilterExhaustion(FilteredBlock block) { + lock.lock(); + try { + int epoch = keychain.getCombinedKeyLookaheadEpochs(); + for (Transaction tx : block.getAssociatedTransactions().values()) { + markKeysAsUsed(tx); + } + int newEpoch = keychain.getCombinedKeyLookaheadEpochs(); + checkState(newEpoch >= epoch); + // If the key lookahead epoch has advanced, there was a call to addKeys and the PeerGroup already has a + // pending request to recalculate the filter queued up on another thread. The calling Peer should abandon + // block at this point and await a new filter before restarting the download. + return newEpoch > epoch; + } finally { + lock.unlock(); + } + } + //endregion /******************************************************************************************************************/ diff --git a/core/src/main/java/com/google/bitcoin/wallet/DeterministicKeyChain.java b/core/src/main/java/com/google/bitcoin/wallet/DeterministicKeyChain.java index 0b4e0034..87697db5 100644 --- a/core/src/main/java/com/google/bitcoin/wallet/DeterministicKeyChain.java +++ b/core/src/main/java/com/google/bitcoin/wallet/DeterministicKeyChain.java @@ -130,6 +130,12 @@ public class DeterministicKeyChain implements EncryptableKeyChain { // How many keys on each path have actually been used. This may be fewer than the number that have been deserialized // or held in memory, because of the lookahead zone. private int issuedExternalKeys, issuedInternalKeys; + // A counter that is incremented each time a key in the lookahead threshold zone is marked as used and lookahead + // is triggered. The Wallet/KCG reads these counters and combines them so it can tell the Peer whether to throw + // away the current block (and any future blocks in the same download batch) and restart chain sync once a new + // filter has been calculated. This field isn't persisted to the wallet as it's only relevant within a network + // session. + private int keyLookaheadEpoch; // We simplify by wrapping a basic key chain and that way we get some functionality like key lookup and event // listeners "for free". All keys in the key tree appear here, even if they aren't meant to be used for receiving @@ -936,8 +942,9 @@ public class DeterministicKeyChain implements EncryptableKeyChain { } /** - * Gets the threshold for the key pre-generation. - * See {@link #setLookaheadThreshold(int)} for details on what this is. + * Gets the threshold for the key pre-generation. See {@link #setLookaheadThreshold(int)} for details on what this + * is. The default is a third of the lookahead size (100 / 3 == 33). If you don't modify it explicitly then this + * value will always be one third of the lookahead size. */ public int getLookaheadThreshold() { lock.lock(); @@ -959,6 +966,9 @@ public class DeterministicKeyChain implements EncryptableKeyChain { try { List keys = maybeLookAhead(externalKey, issuedExternalKeys); keys.addAll(maybeLookAhead(internalKey, issuedInternalKeys)); + if (keys.isEmpty()) + return; + keyLookaheadEpoch++; // Batch add all keys at once so there's only one event listener invocation, as this will be listened to // by the wallet and used to rebuild/broadcast the Bloom filter. That's expensive so we don't want to do // it more often than necessary. @@ -1093,4 +1103,17 @@ public class DeterministicKeyChain implements EncryptableKeyChain { proto.setDeterministicSeed(ByteString.copyFrom(secret)); } } + + /** + * Returns a counter that is incremented each time new keys are generated due to lookahead. Used by the network + * code to learn whether to discard the current block and await calculation of a new filter. + */ + public int getKeyLookaheadEpoch() { + lock.lock(); + try { + return keyLookaheadEpoch; + } finally { + lock.unlock(); + } + } } diff --git a/core/src/main/java/com/google/bitcoin/wallet/KeyChainGroup.java b/core/src/main/java/com/google/bitcoin/wallet/KeyChainGroup.java index 58d8e19d..3d09a92b 100644 --- a/core/src/main/java/com/google/bitcoin/wallet/KeyChainGroup.java +++ b/core/src/main/java/com/google/bitcoin/wallet/KeyChainGroup.java @@ -26,7 +26,6 @@ import com.google.bitcoin.script.ScriptBuilder; import com.google.bitcoin.store.UnreadableWalletException; import com.google.bitcoin.utils.ListenerRegistration; import com.google.bitcoin.utils.Threading; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.collect.HashMultimap; import com.google.common.collect.ImmutableList; @@ -482,14 +481,14 @@ public class KeyChainGroup implements KeyBag { for (DeterministicKeyChain chain : chains) { DeterministicKey key; if ((key = chain.markPubHashAsUsed(pubkeyHash)) != null) { - markKeyAsUsed(key); + maybeMarkCurrentKeyAsUsed(key); return; } } } /** If the given key is "current", advance the current key to a new one. */ - private void markKeyAsUsed(DeterministicKey key) { + private void maybeMarkCurrentKeyAsUsed(DeterministicKey key) { for (Map.Entry entry : currentKeys.entrySet()) { if (entry.getValue() != null && entry.getValue().equals(key)) { log.info("Marking key as used: {}", key); @@ -499,7 +498,6 @@ public class KeyChainGroup implements KeyBag { } } - public boolean hasKey(ECKey key) { if (basic.hasKey(key)) return true; @@ -530,7 +528,7 @@ public class KeyChainGroup implements KeyBag { for (DeterministicKeyChain chain : chains) { DeterministicKey key; if ((key = chain.markPubKeyAsUsed(pubkey)) != null) { - markKeyAsUsed(key); + maybeMarkCurrentKeyAsUsed(key); return; } } @@ -955,4 +953,15 @@ public class KeyChainGroup implements KeyBag { public int getSigsRequiredToSpend() { return sigsRequiredToSpend; } + + /** + * Returns a counter that increases (by an arbitrary amount) each time new keys have been calculated due to + * lookahead and thus the Bloom filter that was previously calculated has become stale. + */ + public int getCombinedKeyLookaheadEpochs() { + int epoch = 0; + for (DeterministicKeyChain chain : chains) + epoch += chain.getKeyLookaheadEpoch(); + return epoch; + } } diff --git a/core/src/test/java/com/google/bitcoin/core/PeerGroupTest.java b/core/src/test/java/com/google/bitcoin/core/PeerGroupTest.java index 2d6d1eaf..f7ec9414 100644 --- a/core/src/test/java/com/google/bitcoin/core/PeerGroupTest.java +++ b/core/src/test/java/com/google/bitcoin/core/PeerGroupTest.java @@ -662,4 +662,102 @@ public class PeerGroupTest extends TestWithPeerGroup { local.close(); } } + + private T assertNextMessageIs(InboundMessageQueuer q, Class klass) throws Exception { + Message outbound = waitForOutbound(q); + assertEquals(klass, outbound.getClass()); + return (T) outbound; + } + + @Test + public void autoRescanOnKeyExhaustion() throws Exception { + // Check that if the last key that was inserted into the bloom filter is seen in some requested blocks, + // that the exhausting block is discarded, a new filter is calculated and sent, and then the download resumes. + + final int NUM_KEYS = 9; + + // First, grab a load of keys from the wallet, and then recreate it so it forgets that those keys were issued. + Wallet shadow = Wallet.fromSeed(wallet.getParams(), wallet.getKeyChainSeed()); + List keys = new ArrayList(NUM_KEYS); + for (int i = 0; i < NUM_KEYS; i++) { + keys.add(shadow.freshReceiveKey()); + } + // Reduce the number of keys we need to work with to speed up this test. + wallet.setKeychainLookaheadSize(4); + wallet.setKeychainLookaheadThreshold(2); + + peerGroup.startAsync(); + peerGroup.awaitRunning(); + InboundMessageQueuer p1 = connectPeer(1); + assertTrue(p1.lastReceivedFilter.contains(keys.get(0).getPubKey())); + assertTrue(p1.lastReceivedFilter.contains(keys.get(5).getPubKeyHash())); + assertFalse(p1.lastReceivedFilter.contains(keys.get(keys.size() - 1).getPubKey())); + peerGroup.startBlockChainDownload(null); + assertNextMessageIs(p1, GetBlocksMessage.class); + + // Make some transactions and blocks that send money to the wallet thus using up all the keys. + List blocks = Lists.newArrayList(); + Coin expectedBalance = Coin.ZERO; + Block prev = blockStore.getChainHead().getHeader(); + for (ECKey key1 : keys) { + Address addr = key1.toAddress(params); + Block next = FakeTxBuilder.makeSolvedTestBlock(prev, FakeTxBuilder.createFakeTx(params, Coin.FIFTY_COINS, addr)); + expectedBalance = expectedBalance.add(next.getTransactions().get(2).getOutput(0).getValue()); + blocks.add(next); + prev = next; + } + + // Send the chain that doesn't have all the transactions in it. The blocks after the exhaustion point should all + // be ignored. + int epoch = wallet.keychain.getCombinedKeyLookaheadEpochs(); + BloomFilter filter = new BloomFilter(params, p1.lastReceivedFilter.bitcoinSerialize()); + filterAndSend(p1, blocks, filter); + Block exhaustionPoint = blocks.get(3); + pingAndWait(p1); + + assertNotEquals(epoch, wallet.keychain.getCombinedKeyLookaheadEpochs()); + // 4th block was end of the lookahead zone and thus was discarded, so we got 3 blocks worth of money (50 each). + assertEquals(Coin.FIFTY_COINS.multiply(3), wallet.getBalance()); + assertEquals(exhaustionPoint.getPrevBlockHash(), blockChain.getChainHead().getHeader().getHash()); + + // Await the new filter. + peerGroup.waitForJobQueue(); + BloomFilter newFilter = assertNextMessageIs(p1, BloomFilter.class); + assertNotEquals(filter, newFilter); + assertNextMessageIs(p1, MemoryPoolMessage.class); + Ping ping = assertNextMessageIs(p1, Ping.class); + inbound(p1, new Pong(ping.getNonce())); + + // Await restart of the chain download. + GetDataMessage getdata = assertNextMessageIs(p1, GetDataMessage.class); + assertEquals(exhaustionPoint.getHash(), getdata.getHashOf(0)); + assertEquals(InventoryItem.Type.FilteredBlock, getdata.getItems().get(0).type); + List newBlocks = blocks.subList(3, blocks.size()); + filterAndSend(p1, newBlocks, newFilter); + assertNextMessageIs(p1, Ping.class); + + // It happened again. + peerGroup.waitForJobQueue(); + newFilter = assertNextMessageIs(p1, BloomFilter.class); + assertNextMessageIs(p1, MemoryPoolMessage.class); + inbound(p1, new Pong(assertNextMessageIs(p1, Ping.class).getNonce())); + assertNextMessageIs(p1, GetDataMessage.class); + newBlocks = blocks.subList(6, blocks.size()); + filterAndSend(p1, newBlocks, newFilter); + // Send a non-tx message so the peer knows the filtered block is over and force processing. + inbound(p1, new Ping()); + pingAndWait(p1); + + assertEquals(expectedBalance, wallet.getBalance()); + assertEquals(blocks.get(blocks.size() - 1).getHash(), blockChain.getChainHead().getHeader().getHash()); + } + + private void filterAndSend(InboundMessageQueuer p1, List blocks, BloomFilter filter) { + for (Block block : blocks) { + FilteredBlock fb = filter.applyAndUpdate(block); + inbound(p1, fb); + for (Transaction tx : fb.getAssociatedTransactions().values()) + inbound(p1, tx); + } + } }