3
0
mirror of https://github.com/Qortal/altcoinj.git synced 2025-02-14 19:25:51 +00:00

Add a PeerGroup.waitForPeersOfVersion API and add unit tests for the waitForPeers* methods.

This commit is contained in:
Mike Hearn 2014-05-20 14:01:55 +02:00
parent f378bb3a43
commit 9e5a06120a
2 changed files with 79 additions and 11 deletions

View File

@ -1275,26 +1275,35 @@ public class PeerGroup extends AbstractExecutionThreadService implements Transac
}
/**
* Returns a future that is triggered when the number of connected peers is equal to the given number of connected
* Returns a future that is triggered when the number of connected peers is equal to the given number of
* peers. By using this with {@link com.google.bitcoin.core.PeerGroup#getMaxConnections()} you can wait until the
* network is fully online. To block immediately, just call get() on the result.
* network is fully online. To block immediately, just call get() on the result. Just calls
* {@link #waitForPeersOfVersion(int, long)} with zero as the protocol version.
*
* @param numPeers How many peers to wait for.
* @return a future that will be triggered when the number of connected peers >= numPeers
*/
public ListenableFuture<PeerGroup> waitForPeers(final int numPeers) {
lock.lock();
try {
if (peers.size() >= numPeers) {
return Futures.immediateFuture(this);
}
} finally {
lock.unlock();
return waitForPeersOfVersion(numPeers, 0);
}
/**
* Returns a future that is triggered when there are at least the requested number of connected peers that support
* the given protocol version or higher. To block immediately, just call get() on the result.
*
* @param numPeers How many peers to wait for.
* @param protocolVersion The protocol version the awaited peers must implement (or better).
* @return a future that will be triggered when the number of connected peers implementing protocolVersion or higher >= numPeers
*/
public ListenableFuture<PeerGroup> waitForPeersOfVersion(final int numPeers, final long protocolVersion) {
int foundPeers = countPeersOfAtLeastVersion(protocolVersion);
if (foundPeers >= numPeers) {
return Futures.immediateFuture(this);
}
final SettableFuture<PeerGroup> future = SettableFuture.create();
addEventListener(new AbstractPeerEventListener() {
@Override public void onPeerConnected(Peer peer, int peerCount) {
if (peerCount >= numPeers) {
if (countPeersOfAtLeastVersion(protocolVersion) >= numPeers) {
future.set(PeerGroup.this);
removeEventListener(this);
}
@ -1303,6 +1312,19 @@ public class PeerGroup extends AbstractExecutionThreadService implements Transac
return future;
}
private int countPeersOfAtLeastVersion(long protocolVersion) {
lock.lock();
try {
int foundPeers = 0;
for (Peer peer : peers)
if (peer.getPeerVersionMessage().clientVersion >= protocolVersion)
foundPeers++;
return foundPeers;
} finally {
lock.unlock();
}
}
/**
* Returns the number of connections that are required before transactions will be broadcast. If there aren't
* enough, {@link PeerGroup#broadcastTransaction(Transaction)} will wait until the minimum number is reached so

View File

@ -26,6 +26,7 @@ import com.google.bitcoin.testing.InboundMessageQueuer;
import com.google.bitcoin.utils.Threading;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.junit.After;
import org.junit.Before;
@ -36,7 +37,10 @@ import org.junit.runners.Parameterized;
import java.math.BigInteger;
import java.net.InetSocketAddress;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -581,4 +585,46 @@ public class PeerGroupTest extends TestWithPeerGroup {
assertFalse(f1.contains(key.getPubKey()));
assertFalse(f1.contains(key.getPubKeyHash()));
}
@Test
public void waitForNumPeers1() throws Exception {
ListenableFuture<PeerGroup> future = peerGroup.waitForPeers(3);
peerGroup.startAsync();
peerGroup.awaitRunning();
assertFalse(future.isDone());
connectPeer(1);
assertFalse(future.isDone());
connectPeer(2);
assertFalse(future.isDone());
assertTrue(peerGroup.waitForPeers(2).isDone()); // Immediate completion.
connectPeer(3);
future.get();
assertTrue(future.isDone());
}
@Test
public void waitForPeersOfVersion() throws Exception {
final int baseVer = peerGroup.getMinRequiredProtocolVersion() + 3000;
final int newVer = baseVer + 1000;
ListenableFuture<PeerGroup> future = peerGroup.waitForPeersOfVersion(2, newVer);
VersionMessage ver1 = new VersionMessage(params, 10);
ver1.clientVersion = baseVer;
ver1.localServices = VersionMessage.NODE_NETWORK;
VersionMessage ver2 = new VersionMessage(params, 10);
ver2.clientVersion = newVer;
ver2.localServices = VersionMessage.NODE_NETWORK;
peerGroup.startAsync();
peerGroup.awaitRunning();
assertFalse(future.isDone());
connectPeer(1, ver1);
assertFalse(future.isDone());
connectPeer(2, ver2);
assertFalse(future.isDone());
assertTrue(peerGroup.waitForPeersOfVersion(1, newVer).isDone()); // Immediate completion.
connectPeer(3, ver2);
future.get();
assertTrue(future.isDone());
}
}