From 709e6f75624d80cb23abac2fc0bedebdde525cc6 Mon Sep 17 00:00:00 2001 From: Andreas Schildbach Date: Fri, 26 Feb 2016 19:06:22 +0100 Subject: [PATCH] Peer: Both halves of the version handshake need to be complete before the protocol can continue. --- .../src/main/java/org/bitcoinj/core/Peer.java | 36 +++++++++++++------ 1 file changed, 25 insertions(+), 11 deletions(-) diff --git a/core/src/main/java/org/bitcoinj/core/Peer.java b/core/src/main/java/org/bitcoinj/core/Peer.java index 22fd22c3..361ef627 100644 --- a/core/src/main/java/org/bitcoinj/core/Peer.java +++ b/core/src/main/java/org/bitcoinj/core/Peer.java @@ -153,11 +153,13 @@ public class Peer extends PeerSocketHandler { private static final int PING_MOVING_AVERAGE_WINDOW = 20; private volatile VersionMessage vPeerVersionMessage; - private boolean isAcked; // A settable future which completes (with this) when the connection is open private final SettableFuture connectionOpenFuture = SettableFuture.create(); - private final SettableFuture versionHandshakeFuture = SettableFuture.create(); + private final SettableFuture outgoingVersionHandshakeFuture = SettableFuture.create(); + private final SettableFuture incomingVersionHandshakeFuture = SettableFuture.create(); + private final ListenableFuture> versionHandshakeFuture = Futures + .allAsList(outgoingVersionHandshakeFuture, incomingVersionHandshakeFuture); /** *

Construct a peer that reads/writes from the given block chain.

@@ -219,11 +221,17 @@ public class Peer extends PeerSocketHandler { this.getDataFutures = new CopyOnWriteArrayList(); this.getAddrFutures = new LinkedList>(); this.fastCatchupTimeSecs = params.getGenesisBlock().getTimeSeconds(); - this.isAcked = false; this.pendingPings = new CopyOnWriteArrayList(); this.vMinProtocolVersion = params.getProtocolVersionNum(NetworkParameters.ProtocolVersion.PONG); this.wallets = new CopyOnWriteArrayList(); this.context = Context.get(); + + this.versionHandshakeFuture.addListener(new Runnable() { + @Override + public void run() { + versionHandshakeComplete(); + } + }, Threading.SAME_THREAD); } /** @@ -426,7 +434,7 @@ public class Peer extends PeerSocketHandler { return connectionOpenFuture; } - public ListenableFuture getVersionHandshakeFuture() { + public ListenableFuture> getVersionHandshakeFuture() { return versionHandshakeFuture; } @@ -527,9 +535,6 @@ public class Peer extends PeerSocketHandler { vPeerVersionMessage.localServices, String.format(Locale.US, "%tF %tT", peerTime, peerTime), vPeerVersionMessage.bestHeight); - // Now it's our turn ... - // Send an ACK message stating we accept the peers protocol version. - sendMessage(new VersionAck()); // bitcoinj is a client mode implementation. That means there's not much point in us talking to other client // mode nodes because we can't download the data from them we need to find/verify transactions. Some bogus // implementations claim to have a block chain in their services field but then report a height of zero, filter @@ -544,18 +549,27 @@ public class Peer extends PeerSocketHandler { if (vPeerVersionMessage.bestHeight < 0) // In this case, it's a protocol violation. throw new ProtocolException("Peer reports invalid best height: " + vPeerVersionMessage.bestHeight); - versionHandshakeFuture.set(this); + // Now it's our turn ... + // Send an ACK message stating we accept the peers protocol version. + sendMessage(new VersionAck()); + log.debug("{}: Incoming version handshake complete.", this); + incomingVersionHandshakeFuture.set(this); } private void processVersionAck(VersionAck m) throws ProtocolException { if (vPeerVersionMessage == null) { throw new ProtocolException("got a version ack before version"); } - if (isAcked) { + if (outgoingVersionHandshakeFuture.isDone()) { throw new ProtocolException("got more than one version ack"); } - isAcked = true; - this.setTimeoutEnabled(false); + log.debug("{}: Outgoing version handshake complete.", this); + outgoingVersionHandshakeFuture.set(this); + } + + private void versionHandshakeComplete() { + log.debug("{}: Handshake complete.", this); + setTimeoutEnabled(false); for (final ListenerRegistration registration : connectedEventListeners) { registration.executor.execute(new Runnable() { @Override