3
0
mirror of https://github.com/Qortal/altcoinj.git synced 2025-02-14 11:15:51 +00:00

HD Wallets: final part needed before release.

If a key is seen in a filtered block that is too far inside our lookahead zone, discard that block and any further blocks being sent to us by a remote peer and recalculate the Bloom filter after more keys are pre-calculated. Then restart the chain download process. This ensures that we can catch up/replay the block chain and keep up with the deterministic key sequence.
This commit is contained in:
Mike Hearn 2014-09-21 18:04:53 +02:00
parent c24ede14a8
commit e9204fd196
6 changed files with 263 additions and 16 deletions

View File

@ -430,6 +430,22 @@ public abstract class AbstractBlockChain {
}
}
/**
* Returns the hashes of the currently stored orphan blocks and then deletes them from this objects storage.
* Used by Peer when a filter exhaustion event has occurred and thus any orphan blocks that have been downloaded
* might be inaccurate/incomplete.
*/
public Set<Sha256Hash> drainOrphanBlocks() {
lock.lock();
try {
Set<Sha256Hash> hashes = new HashSet<Sha256Hash>(orphanBlocks.keySet());
orphanBlocks.clear();
return hashes;
} finally {
lock.unlock();
}
}
// expensiveChecks enables checks that require looking at blocks further back in the chain
// than the previous one when connecting (eg median timestamp check)
// It could be exposed, but for now we just set it to shouldVerifyTransactions()
@ -796,7 +812,6 @@ public abstract class AbstractBlockChain {
Iterator<OrphanBlock> iter = orphanBlocks.values().iterator();
while (iter.hasNext()) {
OrphanBlock orphanBlock = iter.next();
log.debug("Trying to connect {}", orphanBlock.block.getHash());
// Look up the blocks previous.
StoredBlock prev = getStoredBlockInCurrentScope(orphanBlock.block.getPrevBlockHash());
if (prev == null) {
@ -806,6 +821,7 @@ public abstract class AbstractBlockChain {
}
// Otherwise we can connect it now.
// False here ensures we don't recurse infinitely downwards when connecting huge chains.
log.info("Connected orphan {}", orphanBlock.block.getHash());
add(orphanBlock.block, false, orphanBlock.filteredTxHashes, orphanBlock.filteredTxn);
iter.remove();
blocksConnectedThisRound++;

View File

@ -107,7 +107,12 @@ public class Peer extends PeerSocketHandler {
// How many filtered blocks have been received during the lifetime of this connection. Used to decide when to
// refresh the server-side side filter by sending a new one (it degrades over time as false positives are added
// on the remote side, see BIP 37 for a discussion of this).
// TODO: Is this still needed? It should not be since the auto FP tracking logic was added.
private int filteredBlocksReceived;
// If non-null, we should discard incoming filtered blocks because we ran out of keys and are awaiting a new filter
// to be calculated by the PeerGroup. The discarded block hashes should be added here so we can re-request them
// once we've recalculated and resent a new filter.
@GuardedBy("lock") @Nullable private List<Sha256Hash> awaitingFreshFilter;
// How frequently to refresh the filter. This should become dynamic in future and calculated depending on the
// actual false positive rate. For now a good value was determined empirically around January 2013.
private static final int RESEND_BLOOM_FILTER_BLOCK_COUNT = 25000;
@ -888,9 +893,8 @@ public class Peer extends PeerSocketHandler {
// TODO: Fix this duplication.
private void endFilteredBlock(FilteredBlock m) {
if (log.isDebugEnabled()) {
if (log.isDebugEnabled())
log.debug("{}: Received broadcast filtered block {}", getAddress(), m.getHash().toString());
}
if (!vDownloadData) {
log.debug("{}: Received block we did not ask for: {}", getAddress(), m.getHash().toString());
return;
@ -904,7 +908,7 @@ public class Peer extends PeerSocketHandler {
// by cross-checking peers against each other.
pendingBlockDownloads.remove(m.getBlockHeader().getHash());
try {
// Otherwise it's a block sent to us because the peer thought we needed it, so add it to the block chain.
// It's a block sent to us because the peer thought we needed it, so maybe add it to the block chain.
// The FilteredBlock m here contains a list of hashes, and may contain Transaction objects for a subset
// of the hashes (those that were sent to us by the remote peer). Any hashes that haven't had a tx
// provided in processTransaction are ones that were announced to us previously via an 'inv' so the
@ -919,6 +923,35 @@ public class Peer extends PeerSocketHandler {
// confirmation and become stuck forever. The fix is to notice that there's a pending getdata for a tx
// that appeared in this block and delay processing until it arrived ... it's complicated by the fact that
// the data may be requested by a different peer to this one.
// Ask each wallet attached to the peer/blockchain if this block exhausts the list of data items
// (keys/addresses) that were used to calculate the previous filter. If so, then it's possible this block
// is only partial. Check for discarding first so we don't check for exhaustion on blocks we already know
// we're going to discard, otherwise redundant filters might end up being queued and calculated.
lock.lock();
try {
if (awaitingFreshFilter != null) {
log.info("Discarding block {} because we're still waiting for a fresh filter", m.getHash());
// We must record the hashes of blocks we discard because you cannot do getblocks twice on the same
// range of blocks and get an inv both times, due to the codepath in Bitcoin Core hitting
// CPeer::PushInventory() which checks CPeer::setInventoryKnown and thus deduplicates.
awaitingFreshFilter.add(m.getHash());
return; // Chain download process is restarted via a call to setBloomFilter.
} else if (checkForFilterExhaustion(m)) {
// Yes, so we must abandon the attempt to process this block and any further blocks we receive,
// then wait for the Bloom filter to be recalculated, sent to this peer and for the peer to acknowledge
// that the new filter is now in use (which we have to simulate with a ping/pong), and then we can
// safely restart the chain download with the new filter that contains a new set of lookahead keys.
log.info("Bloom filter exhausted whilst processing block {}, discarding", m.getHash());
awaitingFreshFilter = new LinkedList<Sha256Hash>();
awaitingFreshFilter.add(m.getHash());
awaitingFreshFilter.addAll(blockChain.drainOrphanBlocks());
return; // Chain download process is restarted via a call to setBloomFilter.
}
} finally {
lock.unlock();
}
if (blockChain.add(m)) {
// The block was successfully linked into the chain. Notify the user of our progress.
invokeOnBlocksDownloaded(m.getBlockHeader());
@ -957,6 +990,14 @@ public class Peer extends PeerSocketHandler {
}
}
private boolean checkForFilterExhaustion(FilteredBlock m) {
boolean exhausted = false;
for (Wallet wallet : wallets) {
exhausted |= wallet.checkForFilterExhaustion(m);
}
return exhausted;
}
private boolean maybeHandleRequestedData(Message m) {
boolean found = false;
Sha256Hash hash = m.getHash();
@ -1082,7 +1123,7 @@ public class Peer extends PeerSocketHandler {
// it's better to be safe here.
if (!pendingBlockDownloads.contains(item.hash)) {
if (vPeerVersionMessage.isBloomFilteringSupported() && useFilteredBlocks) {
getdata.addItem(new InventoryItem(InventoryItem.Type.FilteredBlock, item.hash));
getdata.addFilteredBlock(item.hash);
pingAfterGetData = true;
} else {
getdata.addItem(item);
@ -1259,8 +1300,9 @@ public class Peer extends PeerSocketHandler {
log.info("blockChainDownloadLocked({}): ignoring duplicated request", toHash.toString());
return;
}
log.debug("{}: blockChainDownloadLocked({}) current head = {}",
toString(), toHash.toString(), chainHead.getHeader().getHashAsString());
if (log.isDebugEnabled())
log.debug("{}: blockChainDownloadLocked({}) current head = {}",
toString(), toHash.toString(), chainHead.getHeader().getHashAsString());
StoredBlock cursor = chainHead;
for (int i = 100; cursor != null && i > 0; i--) {
blockLocator.add(cursor.getHeader().getHash());
@ -1272,9 +1314,8 @@ public class Peer extends PeerSocketHandler {
}
}
// Only add the locator if we didn't already do so. If the chain is < 50 blocks we already reached it.
if (cursor != null) {
if (cursor != null)
blockLocator.add(params.getGenesisBlock().getHash());
}
// Record that we requested this range of blocks so we can filter out duplicate requests in the event of a
// block being solved during chain download.
@ -1539,6 +1580,43 @@ public class Peer extends PeerSocketHandler {
sendMessage(filter);
if (andQueryMemPool)
sendMessage(new MemoryPoolMessage());
maybeRestartChainDownload();
}
private void maybeRestartChainDownload() {
lock.lock();
try {
if (awaitingFreshFilter == null)
return;
if (!vDownloadData) {
// This branch should be harmless but I want to know how often it happens in reality.
log.warn("Lost download peer status whilst awaiting fresh filter.");
return;
}
// Ping/pong to wait for blocks that are still being streamed to us to finish being downloaded and
// discarded.
ping().addListener(new Runnable() {
@Override
public void run() {
lock.lock();
checkNotNull(awaitingFreshFilter);
GetDataMessage getdata = new GetDataMessage(params);
for (Sha256Hash hash : awaitingFreshFilter)
getdata.addFilteredBlock(hash);
awaitingFreshFilter = null;
lock.unlock();
log.info("Restarting chain download");
sendMessage(getdata);
// TODO: This bizarre ping-after-getdata hack probably isn't necessary.
// It's to ensure we know when the end of a filtered block stream of txns is, but we should just be
// able to match txns with the merkleblock. Ask Matt why it's written this way.
sendMessage(new Ping((long) (Math.random() * Long.MAX_VALUE)));
}
}, Threading.SAME_THREAD);
} finally {
lock.unlock();
}
}
/**

View File

@ -3887,6 +3887,29 @@ public class Wallet extends BaseTaggableObject implements Serializable, BlockCha
out.isWatched(this);
}
/**
* Used by {@link Peer} to decide whether or not to discard this block and any blocks building upon it, in case
* the Bloom filter used to request them may be exhausted, that is, not have sufficient keys in the deterministic
* sequence within it to reliably find relevant transactions.
*/
public boolean checkForFilterExhaustion(FilteredBlock block) {
lock.lock();
try {
int epoch = keychain.getCombinedKeyLookaheadEpochs();
for (Transaction tx : block.getAssociatedTransactions().values()) {
markKeysAsUsed(tx);
}
int newEpoch = keychain.getCombinedKeyLookaheadEpochs();
checkState(newEpoch >= epoch);
// If the key lookahead epoch has advanced, there was a call to addKeys and the PeerGroup already has a
// pending request to recalculate the filter queued up on another thread. The calling Peer should abandon
// block at this point and await a new filter before restarting the download.
return newEpoch > epoch;
} finally {
lock.unlock();
}
}
//endregion
/******************************************************************************************************************/

View File

@ -130,6 +130,12 @@ public class DeterministicKeyChain implements EncryptableKeyChain {
// How many keys on each path have actually been used. This may be fewer than the number that have been deserialized
// or held in memory, because of the lookahead zone.
private int issuedExternalKeys, issuedInternalKeys;
// A counter that is incremented each time a key in the lookahead threshold zone is marked as used and lookahead
// is triggered. The Wallet/KCG reads these counters and combines them so it can tell the Peer whether to throw
// away the current block (and any future blocks in the same download batch) and restart chain sync once a new
// filter has been calculated. This field isn't persisted to the wallet as it's only relevant within a network
// session.
private int keyLookaheadEpoch;
// We simplify by wrapping a basic key chain and that way we get some functionality like key lookup and event
// listeners "for free". All keys in the key tree appear here, even if they aren't meant to be used for receiving
@ -936,8 +942,9 @@ public class DeterministicKeyChain implements EncryptableKeyChain {
}
/**
* Gets the threshold for the key pre-generation.
* See {@link #setLookaheadThreshold(int)} for details on what this is.
* Gets the threshold for the key pre-generation. See {@link #setLookaheadThreshold(int)} for details on what this
* is. The default is a third of the lookahead size (100 / 3 == 33). If you don't modify it explicitly then this
* value will always be one third of the lookahead size.
*/
public int getLookaheadThreshold() {
lock.lock();
@ -959,6 +966,9 @@ public class DeterministicKeyChain implements EncryptableKeyChain {
try {
List<DeterministicKey> keys = maybeLookAhead(externalKey, issuedExternalKeys);
keys.addAll(maybeLookAhead(internalKey, issuedInternalKeys));
if (keys.isEmpty())
return;
keyLookaheadEpoch++;
// Batch add all keys at once so there's only one event listener invocation, as this will be listened to
// by the wallet and used to rebuild/broadcast the Bloom filter. That's expensive so we don't want to do
// it more often than necessary.
@ -1093,4 +1103,17 @@ public class DeterministicKeyChain implements EncryptableKeyChain {
proto.setDeterministicSeed(ByteString.copyFrom(secret));
}
}
/**
* Returns a counter that is incremented each time new keys are generated due to lookahead. Used by the network
* code to learn whether to discard the current block and await calculation of a new filter.
*/
public int getKeyLookaheadEpoch() {
lock.lock();
try {
return keyLookaheadEpoch;
} finally {
lock.unlock();
}
}
}

View File

@ -26,7 +26,6 @@ import com.google.bitcoin.script.ScriptBuilder;
import com.google.bitcoin.store.UnreadableWalletException;
import com.google.bitcoin.utils.ListenerRegistration;
import com.google.bitcoin.utils.Threading;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
@ -482,14 +481,14 @@ public class KeyChainGroup implements KeyBag {
for (DeterministicKeyChain chain : chains) {
DeterministicKey key;
if ((key = chain.markPubHashAsUsed(pubkeyHash)) != null) {
markKeyAsUsed(key);
maybeMarkCurrentKeyAsUsed(key);
return;
}
}
}
/** If the given key is "current", advance the current key to a new one. */
private void markKeyAsUsed(DeterministicKey key) {
private void maybeMarkCurrentKeyAsUsed(DeterministicKey key) {
for (Map.Entry<KeyChain.KeyPurpose, DeterministicKey> entry : currentKeys.entrySet()) {
if (entry.getValue() != null && entry.getValue().equals(key)) {
log.info("Marking key as used: {}", key);
@ -499,7 +498,6 @@ public class KeyChainGroup implements KeyBag {
}
}
public boolean hasKey(ECKey key) {
if (basic.hasKey(key))
return true;
@ -530,7 +528,7 @@ public class KeyChainGroup implements KeyBag {
for (DeterministicKeyChain chain : chains) {
DeterministicKey key;
if ((key = chain.markPubKeyAsUsed(pubkey)) != null) {
markKeyAsUsed(key);
maybeMarkCurrentKeyAsUsed(key);
return;
}
}
@ -955,4 +953,15 @@ public class KeyChainGroup implements KeyBag {
public int getSigsRequiredToSpend() {
return sigsRequiredToSpend;
}
/**
* Returns a counter that increases (by an arbitrary amount) each time new keys have been calculated due to
* lookahead and thus the Bloom filter that was previously calculated has become stale.
*/
public int getCombinedKeyLookaheadEpochs() {
int epoch = 0;
for (DeterministicKeyChain chain : chains)
epoch += chain.getKeyLookaheadEpoch();
return epoch;
}
}

View File

@ -662,4 +662,102 @@ public class PeerGroupTest extends TestWithPeerGroup {
local.close();
}
}
private <T extends Message> T assertNextMessageIs(InboundMessageQueuer q, Class<T> klass) throws Exception {
Message outbound = waitForOutbound(q);
assertEquals(klass, outbound.getClass());
return (T) outbound;
}
@Test
public void autoRescanOnKeyExhaustion() throws Exception {
// Check that if the last key that was inserted into the bloom filter is seen in some requested blocks,
// that the exhausting block is discarded, a new filter is calculated and sent, and then the download resumes.
final int NUM_KEYS = 9;
// First, grab a load of keys from the wallet, and then recreate it so it forgets that those keys were issued.
Wallet shadow = Wallet.fromSeed(wallet.getParams(), wallet.getKeyChainSeed());
List<ECKey> keys = new ArrayList<ECKey>(NUM_KEYS);
for (int i = 0; i < NUM_KEYS; i++) {
keys.add(shadow.freshReceiveKey());
}
// Reduce the number of keys we need to work with to speed up this test.
wallet.setKeychainLookaheadSize(4);
wallet.setKeychainLookaheadThreshold(2);
peerGroup.startAsync();
peerGroup.awaitRunning();
InboundMessageQueuer p1 = connectPeer(1);
assertTrue(p1.lastReceivedFilter.contains(keys.get(0).getPubKey()));
assertTrue(p1.lastReceivedFilter.contains(keys.get(5).getPubKeyHash()));
assertFalse(p1.lastReceivedFilter.contains(keys.get(keys.size() - 1).getPubKey()));
peerGroup.startBlockChainDownload(null);
assertNextMessageIs(p1, GetBlocksMessage.class);
// Make some transactions and blocks that send money to the wallet thus using up all the keys.
List<Block> blocks = Lists.newArrayList();
Coin expectedBalance = Coin.ZERO;
Block prev = blockStore.getChainHead().getHeader();
for (ECKey key1 : keys) {
Address addr = key1.toAddress(params);
Block next = FakeTxBuilder.makeSolvedTestBlock(prev, FakeTxBuilder.createFakeTx(params, Coin.FIFTY_COINS, addr));
expectedBalance = expectedBalance.add(next.getTransactions().get(2).getOutput(0).getValue());
blocks.add(next);
prev = next;
}
// Send the chain that doesn't have all the transactions in it. The blocks after the exhaustion point should all
// be ignored.
int epoch = wallet.keychain.getCombinedKeyLookaheadEpochs();
BloomFilter filter = new BloomFilter(params, p1.lastReceivedFilter.bitcoinSerialize());
filterAndSend(p1, blocks, filter);
Block exhaustionPoint = blocks.get(3);
pingAndWait(p1);
assertNotEquals(epoch, wallet.keychain.getCombinedKeyLookaheadEpochs());
// 4th block was end of the lookahead zone and thus was discarded, so we got 3 blocks worth of money (50 each).
assertEquals(Coin.FIFTY_COINS.multiply(3), wallet.getBalance());
assertEquals(exhaustionPoint.getPrevBlockHash(), blockChain.getChainHead().getHeader().getHash());
// Await the new filter.
peerGroup.waitForJobQueue();
BloomFilter newFilter = assertNextMessageIs(p1, BloomFilter.class);
assertNotEquals(filter, newFilter);
assertNextMessageIs(p1, MemoryPoolMessage.class);
Ping ping = assertNextMessageIs(p1, Ping.class);
inbound(p1, new Pong(ping.getNonce()));
// Await restart of the chain download.
GetDataMessage getdata = assertNextMessageIs(p1, GetDataMessage.class);
assertEquals(exhaustionPoint.getHash(), getdata.getHashOf(0));
assertEquals(InventoryItem.Type.FilteredBlock, getdata.getItems().get(0).type);
List<Block> newBlocks = blocks.subList(3, blocks.size());
filterAndSend(p1, newBlocks, newFilter);
assertNextMessageIs(p1, Ping.class);
// It happened again.
peerGroup.waitForJobQueue();
newFilter = assertNextMessageIs(p1, BloomFilter.class);
assertNextMessageIs(p1, MemoryPoolMessage.class);
inbound(p1, new Pong(assertNextMessageIs(p1, Ping.class).getNonce()));
assertNextMessageIs(p1, GetDataMessage.class);
newBlocks = blocks.subList(6, blocks.size());
filterAndSend(p1, newBlocks, newFilter);
// Send a non-tx message so the peer knows the filtered block is over and force processing.
inbound(p1, new Ping());
pingAndWait(p1);
assertEquals(expectedBalance, wallet.getBalance());
assertEquals(blocks.get(blocks.size() - 1).getHash(), blockChain.getChainHead().getHeader().getHash());
}
private void filterAndSend(InboundMessageQueuer p1, List<Block> blocks, BloomFilter filter) {
for (Block block : blocks) {
FilteredBlock fb = filter.applyAndUpdate(block);
inbound(p1, fb);
for (Transaction tx : fb.getAssociatedTransactions().values())
inbound(p1, tx);
}
}
}