diff --git a/core/src/main/java/com/google/bitcoin/core/Peer.java b/core/src/main/java/com/google/bitcoin/core/Peer.java index a3c1aff3..9b24fafb 100644 --- a/core/src/main/java/com/google/bitcoin/core/Peer.java +++ b/core/src/main/java/com/google/bitcoin/core/Peer.java @@ -23,10 +23,7 @@ import com.google.bitcoin.utils.EventListenerInvoker; import com.google.common.base.Objects; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; +import com.google.common.util.concurrent.*; import org.jboss.netty.channel.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -93,11 +90,15 @@ public class Peer { private static class GetDataRequest { Sha256Hash hash; SettableFuture future; + // If the peer does not support the notfound message, we'll use ping/pong messages to simulate it. This is + // a nasty hack that relies on the fact that bitcoin-qt is single threaded and processes messages in order. + // The nonce field records which pong should clear this request as "not found". + long nonce; } - private final List getDataFutures; + private final CopyOnWriteArrayList getDataFutures; // Outstanding pings against this peer and how long the last one took to complete. - private CopyOnWriteArrayList pendingPings; + private final CopyOnWriteArrayList pendingPings; private long[] lastPingTimes; private static final int PING_MOVING_AVERAGE_WINDOW = 20; @@ -316,6 +317,7 @@ public class Peer { if (item.hash.equals(req.hash)) { req.future.cancel(true); getDataFutures.remove(req); + break; } } } @@ -489,11 +491,15 @@ public class Peer { // Build the request for the missing dependencies. List> futures = Lists.newArrayList(); GetDataMessage getdata = new GetDataMessage(params); + final long nonce = (long)(Math.random()*Long.MAX_VALUE); for (Sha256Hash hash : needToRequest) { getdata.addTransaction(hash); GetDataRequest req = new GetDataRequest(); req.hash = hash; req.future = SettableFuture.create(); + if (!isNotFoundMessageSupported()) { + req.nonce = nonce; + } futures.add(req.future); getDataFutures.add(req); } @@ -538,7 +544,25 @@ public class Peer { }); // Start the operation. sendMessage(getdata); - } catch (IOException e) { + if (!isNotFoundMessageSupported()) { + // If the peer isn't new enough to support the notfound message, we use a nasty hack instead and + // assume if we send a ping message after the getdata message, it'll be processed after all answers + // from getdata are done, so we can watch for the pong message as a substitute. + ping(nonce).addListener(new Runnable() { + public void run() { + // The pong came back so clear out any transactions we requested but didn't get. + for (ListIterator it = getDataFutures.listIterator(); it.hasNext();) { + GetDataRequest req = it.next(); + if (req.nonce == nonce) { + req.future.cancel(true); + getDataFutures.remove(req); + break; + } + } + } + }, MoreExecutors.sameThreadExecutor()); + } + } catch (Exception e) { log.error("Couldn't send getdata in downloadDependencies({})", tx.getHash()); resultFuture.setException(e); return resultFuture; @@ -973,9 +997,9 @@ public class Peer { // Measurement of the time elapsed. public long startTimeMsec; - public PendingPing() { + public PendingPing(long nonce) { future = SettableFuture.create(); - nonce = (long) Math.random() * Long.MAX_VALUE; + this.nonce = nonce; startTimeMsec = Utils.now().getTime(); } @@ -1011,10 +1035,14 @@ public class Peer { * @throws ProtocolException if the peer version is too low to support measurable pings. */ public synchronized ListenableFuture ping() throws IOException, ProtocolException { + return ping((long) Math.random() * Long.MAX_VALUE); + } + + protected synchronized ListenableFuture ping(long nonce) throws IOException, ProtocolException { int peerVersion = getPeerVersionMessage().clientVersion; if (peerVersion < Pong.MIN_PROTOCOL_VERSION) throw new ProtocolException("Peer version is too low for measurable pings: " + peerVersion); - PendingPing pendingPing = new PendingPing(); + PendingPing pendingPing = new PendingPing(nonce); pendingPings.add(pendingPing); sendMessage(new Ping(pendingPing.nonce)); return pendingPing.future; @@ -1073,6 +1101,10 @@ public class Peer { return chainHeight - blockChain.getBestChainHeight(); } + private boolean isNotFoundMessageSupported() { + return getPeerVersionMessage().clientVersion >= 70001; + } + /** * Returns true if this peer will try and download things it is sent in "inv" messages. Normally you only need * one peer to be downloading data. Defaults to true. diff --git a/core/src/test/java/com/google/bitcoin/core/PeerTest.java b/core/src/test/java/com/google/bitcoin/core/PeerTest.java index f8faa557..812ecc02 100644 --- a/core/src/test/java/com/google/bitcoin/core/PeerTest.java +++ b/core/src/test/java/com/google/bitcoin/core/PeerTest.java @@ -64,6 +64,7 @@ public class PeerTest extends TestWithNetworkConnections { private void connect(PeerHandler handler, Channel channel, ChannelHandlerContext ctx) throws Exception { handler.connectRequested(ctx, new UpstreamChannelStateEvent(channel, ChannelState.CONNECTED, socketAddress)); VersionMessage peerVersion = new VersionMessage(unitTestParams, OTHER_PEER_CHAIN_HEIGHT); + peerVersion.clientVersion = 70001; DownstreamMessageEvent versionEvent = new DownstreamMessageEvent(channel, Channels.future(channel), peerVersion, null); handler.messageReceived(ctx, versionEvent);