3
0
mirror of https://github.com/Qortal/altcoinj.git synced 2025-02-14 11:15:51 +00:00

Priority queue and exponential backoff for PeerGroup

Connect to peers in a service loop thread.

Resolves issue #159
Resolves issue #503
This commit is contained in:
Devrandom 2013-12-15 18:18:46 -08:00 committed by Mike Hearn
parent b0ce535aa9
commit 3d99be48bc
12 changed files with 566 additions and 109 deletions

View File

@ -155,7 +155,7 @@ public class Peer extends PeerSocketHandler {
* <p>The remoteAddress provided should match the remote address of the peer which is being connected to, and is
* used to keep track of which peers relayed transactions and offer more descriptive logging.</p>
*/
public Peer(NetworkParameters params, VersionMessage ver, @Nullable AbstractBlockChain chain, InetSocketAddress remoteAddress) {
public Peer(NetworkParameters params, VersionMessage ver, @Nullable AbstractBlockChain chain, PeerAddress remoteAddress) {
this(params, ver, remoteAddress, chain, null);
}
@ -173,7 +173,7 @@ public class Peer extends PeerSocketHandler {
* <p>The remoteAddress provided should match the remote address of the peer which is being connected to, and is
* used to keep track of which peers relayed transactions and offer more descriptive logging.</p>
*/
public Peer(NetworkParameters params, VersionMessage ver, InetSocketAddress remoteAddress,
public Peer(NetworkParameters params, VersionMessage ver, PeerAddress remoteAddress,
@Nullable AbstractBlockChain chain, @Nullable MemoryPool mempool) {
super(params, remoteAddress);
this.params = Preconditions.checkNotNull(params);
@ -203,8 +203,8 @@ public class Peer extends PeerSocketHandler {
* <p>The remoteAddress provided should match the remote address of the peer which is being connected to, and is
* used to keep track of which peers relayed transactions and offer more descriptive logging.</p>
*/
public Peer(NetworkParameters params, AbstractBlockChain blockChain, InetSocketAddress remoteAddress, String thisSoftwareName, String thisSoftwareVersion) {
this(params, new VersionMessage(params, blockChain.getBestChainHeight(), true), blockChain, remoteAddress);
public Peer(NetworkParameters params, AbstractBlockChain blockChain, PeerAddress peerAddress, String thisSoftwareName, String thisSoftwareVersion) {
this(params, new VersionMessage(params, blockChain.getBestChainHeight(), true), blockChain, peerAddress);
this.versionMessage.appendToSubVer(thisSoftwareName, thisSoftwareVersion, null);
}

View File

@ -252,6 +252,7 @@ public class PeerAddress extends ChildMessage {
other.port == port &&
other.services.equals(services) &&
other.time == time;
//FIXME including services and time could cause same peer to be added multiple times in collections
}
@Override

View File

@ -22,6 +22,7 @@ import com.google.bitcoin.net.NioClientManager;
import com.google.bitcoin.net.discovery.PeerDiscovery;
import com.google.bitcoin.net.discovery.PeerDiscoveryException;
import com.google.bitcoin.script.Script;
import com.google.bitcoin.utils.ExponentialBackoff;
import com.google.bitcoin.utils.ListenerRegistration;
import com.google.bitcoin.utils.Threading;
import com.google.common.base.Preconditions;
@ -36,10 +37,7 @@ import java.math.BigInteger;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;
import static com.google.common.base.Preconditions.checkNotNull;
@ -67,14 +65,16 @@ import static com.google.common.base.Preconditions.checkState;
* when finished. Note that not all methods of PeerGroup are safe to call from a UI thread as some may do
* network IO, but starting and stopping the service should be fine.</p>
*/
public class PeerGroup extends AbstractIdleService implements TransactionBroadcaster {
public class PeerGroup extends AbstractExecutionThreadService implements TransactionBroadcaster {
private static final int DEFAULT_CONNECTIONS = 4;
private static final Logger log = LoggerFactory.getLogger(PeerGroup.class);
protected final ReentrantLock lock = Threading.lock("peergroup");
// Addresses to try to connect to, excluding active peers.
@GuardedBy("lock") private final List<PeerAddress> inactives;
@GuardedBy("lock") private final PriorityQueue<PeerAddress> inactives;
@GuardedBy("lock") private final Map<PeerAddress, ExponentialBackoff> backoffMap;
// Currently active peers. This is an ordered list rather than a set to make unit tests predictable.
private final CopyOnWriteArrayList<Peer> peers;
// Currently connecting peers.
@ -142,6 +142,21 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca
@Override public void onCoinsSent(Wallet wallet, Transaction tx, BigInteger prevBalance, BigInteger newBalance) { onChanged(); }
};
// Exponential backoff for peers starts at 1 second and maxes at 10 minutes.
private ExponentialBackoff.Params peerBackoffParams = new ExponentialBackoff.Params(1000, 1.5f, 10 * 60 * 1000);
// Tracks failures globally in case of a network failure
private ExponentialBackoff groupBackoff = new ExponentialBackoff(new ExponentialBackoff.Params(100, 1.1f, 30 * 1000));
private LinkedBlockingQueue<Object> morePeersMailbox = new LinkedBlockingQueue<Object>();
private void handleBlocksDownloaded() {
double rate = chain.getFalsePositiveRate();
if (rate > bloomFilterFPRate * MAX_FP_RATE_INCREASE) {
log.info("Force update Bloom filter due to high false positive rate");
recalculateFastCatchupAndFilter(true);
}
}
private class PeerStartupListener extends AbstractPeerEventListener {
@Override
public void onPeerConnected(Peer peer, int peerCount) {
@ -222,7 +237,17 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca
memoryPool = new MemoryPool();
inactives = new ArrayList<PeerAddress>();
inactives = new PriorityQueue<PeerAddress>(1, new Comparator<PeerAddress>() {
@Override
public int compare(PeerAddress a, PeerAddress b) {
int result = backoffMap.get(a).compareTo(backoffMap.get(b));
// Sort by port if otherwise equals - for testing
if (result == 0)
result = Integer.valueOf(a.getPort()).compareTo(b.getPort());
return result;
}
});
backoffMap = new HashMap<PeerAddress, ExponentialBackoff>();
peers = new CopyOnWriteArrayList<Peer>();
pendingPeers = new CopyOnWriteArrayList<Peer>();
channels = connectionManager;
@ -246,18 +271,17 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca
}
// We may now have too many or too few open connections. Add more or drop some to get to the right amount.
adjustment = maxConnections - channels.getConnectedClientCount();
while (adjustment > 0) {
try {
connectToAnyPeer();
} catch (PeerDiscoveryException e) {
throw new RuntimeException(e);
}
adjustment--;
}
if (adjustment > 0)
notifyServiceThread();
if (adjustment < 0)
channels.closeConnections(-adjustment);
}
private void notifyServiceThread() {
morePeersMailbox.offer(this); // Any non-null object will do.
}
/** The maximum number of connections that we will create to peers. */
public int getMaxConnections() {
lock.lock();
@ -442,7 +466,7 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca
int newMax;
lock.lock();
try {
inactives.add(peerAddress);
addInactive(peerAddress);
newMax = getMaxConnections() + 1;
} finally {
lock.unlock();
@ -450,6 +474,14 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca
setMaxConnections(newMax);
}
private void addInactive(PeerAddress peerAddress) {
// Deduplicate
if (backoffMap.containsKey(peerAddress))
return;
backoffMap.put(peerAddress, new ExponentialBackoff(peerBackoffParams));
inactives.offer(peerAddress);
}
/** Convenience method for addAddress(new PeerAddress(address, params.port)); */
public void addAddress(InetAddress address) {
addAddress(new PeerAddress(address, params.getPort()));
@ -481,11 +513,37 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca
}
lock.lock();
try {
inactives.addAll(addressSet);
for (PeerAddress address : addressSet) {
addInactive(address);
}
} finally {
lock.unlock();
}
log.info("Peer discovery took {}msec", System.currentTimeMillis() - start);
log.info("Peer discovery took {}msec and returned {} items",
System.currentTimeMillis() - start, addressSet.size());
}
@Override
protected void run() throws Exception {
while (isRunning()) {
int numPeers;
lock.lock();
try {
numPeers = peers.size() + pendingPeers.size();
} finally {
lock.unlock();
}
if (numPeers < getMaxConnections()) {
try {
connectToAnyPeer();
} catch(PeerDiscoveryException e) {
groupBackoff.trackFailure();
}
}
else
morePeersMailbox.take();
}
}
/** Picks a peer from discovery and connects to it. If connection fails, picks another and tries again. */
@ -494,46 +552,56 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca
if (!(state == State.STARTING || state == State.RUNNING)) return;
final PeerAddress addr;
long nowMillis = Utils.currentTimeMillis();
lock.lock();
try {
if (inactives.size() == 0) {
if (!haveReadyInactivePeer(nowMillis)) {
discoverPeers();
groupBackoff.trackSuccess();
nowMillis = Utils.currentTimeMillis();
}
if (inactives.size() == 0) {
log.debug("Peer discovery didn't provide us any more peers, not trying to build new connection.");
return;
}
addr = inactives.remove(inactives.size() - 1);
addr = inactives.poll();
} finally {
lock.unlock();
}
// This method eventually constructs a Peer and puts it into pendingPeers. If the connection fails to establish,
// handlePeerDeath will be called, which will potentially call this method again to replace the dead or failed
// connection.
connectTo(addr.toSocketAddress(), false);
// Delay if any backoff is required
long retryTime = Math.max(backoffMap.get(addr).getRetryTime(), groupBackoff.getRetryTime());
if (retryTime > nowMillis) {
// Sleep until retry time
Utils.sleep(retryTime - nowMillis);
}
// This method constructs a Peer and puts it into pendingPeers.
connectTo(addr, false);
}
private boolean haveReadyInactivePeer(long nowMillis) {
// No inactive peers to try?
if (inactives.size() == 0)
return false;
// All peers have not reached backoff retry time?
if (backoffMap.get(inactives.peek()).getRetryTime() > nowMillis)
return false;
return true;
}
@Override
protected void startUp() throws Exception {
// This is run in a background thread by the AbstractIdleService implementation.
// This is run in a background thread by the Service implementation.
vPingTimer = new Timer("Peer pinging thread", true);
channels.startAndWait();
// Bring up the requested number of connections. If a connect attempt fails,
// new peers will be tried until there is a success, so just calling connectToAnyPeer for the wanted number
// of peers is sufficient.
for (int i = 0; i < getMaxConnections(); i++) {
try {
connectToAnyPeer();
} catch (PeerDiscoveryException e) {
if (e.getCause() instanceof InterruptedException) return;
log.error(e.getMessage());
}
}
}
@Override
protected void shutDown() throws Exception {
// This is run on a separate thread by the AbstractIdleService implementation.
// This is run on a separate thread by the Service implementation.
vPingTimer.cancel();
// Blocking close of all sockets.
channels.stopAndWait();
@ -542,6 +610,11 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca
}
}
@Override
protected void triggerShutdown() {
notifyServiceThread();
}
/**
* <p>Link the given wallet to this PeerGroup. This is used for three purposes:</p>
*
@ -690,20 +763,24 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca
}
/**
* Connect to a peer by creating a channel to the destination address.
* Connect to a peer by creating a channel to the destination address. This should not be
* used normally - let the PeerGroup manage connections through {@link #start()}
*
* @param address destination IP and port.
* @return The newly created Peer object. Use {@link com.google.bitcoin.core.Peer#getConnectionOpenFuture()} if you
* want a future which completes when the connection is open, or null if the peer could not be connected.
* @return The newly created Peer object or null if the peer could not be connected.
* Use {@link com.google.bitcoin.core.Peer#getConnectionOpenFuture()} if you
* want a future which completes when the connection is open.
*/
@Nullable
public Peer connectTo(InetSocketAddress address) {
return connectTo(address, true);
PeerAddress peerAddress = new PeerAddress(address);
backoffMap.put(peerAddress, new ExponentialBackoff(peerBackoffParams));
return connectTo(peerAddress, true);
}
// Internal version.
@Nullable
protected Peer connectTo(InetSocketAddress address, boolean incrementMaxConnections) {
protected Peer connectTo(PeerAddress address, boolean incrementMaxConnections) {
VersionMessage ver = getVersionMessage().duplicate();
ver.bestHeight = chain == null ? 0 : chain.getBestChainHeight();
ver.time = Utils.now().getTime() / 1000;
@ -714,7 +791,7 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca
pendingPeers.add(peer);
try {
channels.openConnection(address, peer);
channels.openConnection(address.toSocketAddress(), peer);
} catch (Exception e) {
log.warn("Failed to connect to " + address + ": " + e.getMessage());
handlePeerDeath(peer);
@ -790,6 +867,9 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca
int newSize = -1;
lock.lock();
try {
groupBackoff.trackSuccess();
backoffMap.get(peer.getAddress()).trackSuccess();
// Sets up the newly connected peer so it can do everything it needs to.
log.info("{}: New peer", peer);
pendingPeers.remove(peer);
@ -972,7 +1052,10 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca
try {
pendingPeers.remove(peer);
peers.remove(peer);
log.info("{}: Peer died", peer.getAddress());
PeerAddress address = peer.getAddress();
log.info("{}: Peer died", address);
if (peer == downloadPeer) {
log.info("Download peer died. Picking a new one.");
setDownloadPeer(null);
@ -987,17 +1070,21 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca
}
numPeers = peers.size() + pendingPeers.size();
numConnectedPeers = peers.size();
groupBackoff.trackFailure();
//TODO: if network failure is suspected, do not backoff peer
backoffMap.get(address).trackFailure();
// Put back on inactive list
inactives.offer(address);
if (numPeers < getMaxConnections()) {
notifyServiceThread();
}
} finally {
lock.unlock();
}
// Replace this peer with a new one to keep our connection count up, if necessary.
if (numPeers < getMaxConnections()) {
try {
connectToAnyPeer();
} catch (PeerDiscoveryException e) {
log.error(e.getMessage());
}
}
peer.removeEventListener(peerListener);
for (Wallet wallet : wallets) {
peer.removeWallet(wallet);

View File

@ -42,13 +42,9 @@ import static com.google.common.base.Preconditions.*;
public abstract class PeerSocketHandler extends AbstractTimeoutHandler implements StreamParser {
private static final Logger log = LoggerFactory.getLogger(PeerSocketHandler.class);
// The IP address to which we are connecting.
@VisibleForTesting
InetSocketAddress remoteIp;
private final BitcoinSerializer serializer;
/** If we close() before we know our writeTarget, set this to true to call writeTarget.closeConnection() right away */
protected PeerAddress peerAddress;
// If we close() before we know our writeTarget, set this to true to call writeTarget.closeConnection() right away.
private boolean closePending = false;
// writeTarget will be thread-safe, and may call into PeerGroup, which calls us, so we should call it unlocked
@VisibleForTesting MessageWriteTarget writeTarget = null;
@ -62,9 +58,14 @@ public abstract class PeerSocketHandler extends AbstractTimeoutHandler implement
private Lock lock = Threading.lock("PeerSocketHandler");
public PeerSocketHandler(NetworkParameters params, InetSocketAddress peerAddress) {
public PeerSocketHandler(NetworkParameters params, InetSocketAddress remoteIp) {
serializer = new BitcoinSerializer(checkNotNull(params));
this.remoteIp = checkNotNull(peerAddress);
this.peerAddress = new PeerAddress(remoteIp);
}
public PeerSocketHandler(NetworkParameters params, PeerAddress peerAddress) {
serializer = new BitcoinSerializer(checkNotNull(params));
this.peerAddress = checkNotNull(peerAddress);
}
/**
@ -212,7 +213,7 @@ public abstract class PeerSocketHandler extends AbstractTimeoutHandler implement
* @return the IP address and port of peer.
*/
public PeerAddress getAddress() {
return new PeerAddress(remoteIp);
return peerAddress;
}
/** Catch any exceptions, logging them and then closing the channel. */

View File

@ -30,8 +30,12 @@ import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.Date;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
/**
* A collection of various utility methods that are helpful for working with the Bitcoin protocol.
@ -71,6 +75,7 @@ public class Utils {
* of them in a coin (whereas one would expect 1 billion).
*/
public static final BigInteger CENT = new BigInteger("1000000", 10);
private static BlockingQueue<Boolean> mockSleepQueue;
/**
* Convert an amount expressed in the way humans are used to into nanocoins.
@ -447,15 +452,21 @@ public class Utils {
* Advances (or rewinds) the mock clock by the given number of seconds.
*/
public static Date rollMockClock(int seconds) {
return rollMockClockMillis(seconds * 1000);
}
/**
* Advances (or rewinds) the mock clock by the given number of milliseconds.
*/
public static Date rollMockClockMillis(long millis) {
if (mockTime == null)
mockTime = new Date();
mockTime = new Date(mockTime.getTime() + (seconds * 1000));
mockTime = new Date(mockTime.getTime() + millis);
return mockTime;
}
/**
* Sets the mock clock to the given time (in seconds)
* @param mockClock
*/
public static void setMockClock(long mockClock) {
mockTime = new Date(mockClock * 1000);
@ -470,6 +481,14 @@ public class Utils {
else
return new Date();
}
/** Returns the current time in seconds since the epoch, or a mocked out equivalent. */
public static long currentTimeMillis() {
if (mockTime != null)
return mockTime.getTime();
else
return System.currentTimeMillis();
}
public static byte[] copyOf(byte[] in, int length) {
byte[] out = new byte[length];
@ -539,4 +558,42 @@ public class Utils {
public static void setBitLE(byte[] data, int index) {
data[index >>> 3] |= bitMask[7 & index];
}
/** Sleep for a span of time, or mock sleep if enabled */
public static void sleep(long millis) {
if (mockSleepQueue == null) {
sleepUninterruptibly(millis, TimeUnit.MILLISECONDS);
} else {
try {
boolean isMultiPass = mockSleepQueue.take();
rollMockClockMillis(millis);
if (isMultiPass)
mockSleepQueue.offer(true);
} catch (InterruptedException e) {
// Ignored.
}
}
}
/** Enable or disable mock sleep. If enabled, set mock time to current time. */
public static void setMockSleep(boolean isEnable) {
if (isEnable) {
mockSleepQueue = new ArrayBlockingQueue<Boolean>(1);
mockTime = new Date(System.currentTimeMillis());
} else {
mockSleepQueue = null;
}
}
/** Let sleeping thread pass the synchronization point. */
public static void passMockSleep() {
mockSleepQueue.offer(false);
}
/** Let the sleeping thread pass the synchronization point any number of times. */
public static void finishMockSleep() {
if (mockSleepQueue != null) {
mockSleepQueue.offer(true);
}
}
}

View File

@ -0,0 +1,104 @@
/**
* Copyright 2013 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.bitcoin.utils;
import com.google.bitcoin.core.Utils;
import static com.google.common.base.Preconditions.checkArgument;
/**
* <p>Tracks successes and failures and calculates a time to retry the operation.</p>
*
* <p>The retries are exponentially backed off, up to a maximum interval. On success the back off interval is reset.</p>
*/
public class ExponentialBackoff implements Comparable<ExponentialBackoff> {
public static final int DEFAULT_INITIAL_MILLIS = 100;
public static final float DEFAULT_MULTIPLIER = 1.1f;
public static final int DEFAULT_MAXIMUM_MILLIS = 30 * 1000;
private float backoff;
private long retryTime;
private final Params params;
/**
* Parameters to configure a particular kind of exponential backoff.
*/
public static class Params {
private final float initial;
private final float multiplier;
private final float maximum;
/**
* @param initialMillis the initial interval to wait, in milliseconds
* @param multiplier the multiplier to apply on each failure
* @param maximumMillis the maximum interval to wait, in milliseconds
*/
public Params(long initialMillis, float multiplier, long maximumMillis) {
checkArgument(multiplier > 1.0f, "multiplier must be greater than 1.0");
checkArgument(maximumMillis >= initialMillis, "maximum must not be less than initial");
this.initial = initialMillis;
this.multiplier = multiplier;
this.maximum = maximumMillis;
}
/**
* Construct params with default values.
*/
public Params() {
initial = DEFAULT_INITIAL_MILLIS;
multiplier = DEFAULT_MULTIPLIER;
maximum = DEFAULT_MAXIMUM_MILLIS;
}
}
public ExponentialBackoff(Params params) {
this.params = params;
trackSuccess();
}
/** Track a success - reset back off interval to the initial value */
public void trackSuccess() {
backoff = params.initial;
retryTime = Utils.currentTimeMillis();
}
/** Track a failure - multiply the back off interval by the multiplier */
public void trackFailure() {
retryTime = Utils.currentTimeMillis() + (long)backoff;
backoff = Math.min(backoff * params.multiplier, params.maximum);
}
/** Get the next time to retry, in milliseconds since the epoch */
public long getRetryTime() {
return retryTime;
}
@Override
public int compareTo(ExponentialBackoff other) {
if (retryTime < other.retryTime)
return -1;
if (retryTime > other.retryTime)
return 1;
return 0;
}
@Override
public String toString() {
return "ExponentialBackoff retry=" + retryTime + " backoff=" + backoff;
}
}

View File

@ -22,6 +22,7 @@ import com.google.bitcoin.params.UnitTestParams;
import com.google.bitcoin.store.MemoryBlockStore;
import com.google.bitcoin.utils.TestUtils;
import com.google.bitcoin.utils.Threading;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.SettableFuture;
import org.junit.After;
import org.junit.Before;
@ -44,6 +45,10 @@ import static org.junit.Assert.*;
@RunWith(value = Parameterized.class)
public class PeerGroupTest extends TestWithPeerGroup {
static final NetworkParameters params = UnitTestParams.get();
private BlockingQueue<Peer> connectedPeers;
private BlockingQueue<Peer> disconnectedPeers;
private PeerEventListener listener;
private Map<Peer, AtomicInteger> peerToMessageCount;
@Parameterized.Parameters
public static Collection<ClientType[]> parameters() {
@ -58,24 +63,10 @@ public class PeerGroupTest extends TestWithPeerGroup {
@Override
@Before
public void setUp() throws Exception {
super.setUp(new MemoryBlockStore(UnitTestParams.get()));
peerGroup.addWallet(wallet);
}
@After
public void tearDown() throws Exception {
super.tearDown();
peerGroup.stopAndWait();
}
@Test
public void listener() throws Exception {
final BlockingQueue<Peer> connectedPeers = new LinkedBlockingQueue<Peer>();
final BlockingQueue<Peer> disconnectedPeers = new LinkedBlockingQueue<Peer>();
final SettableFuture<Void> firstDisconnectFuture = SettableFuture.create();
final SettableFuture<Void> secondDisconnectFuture = SettableFuture.create();
final Map<Peer, AtomicInteger> peerToMessageCount = new HashMap<Peer, AtomicInteger>();
AbstractPeerEventListener listener = new AbstractPeerEventListener() {
peerToMessageCount = new HashMap<Peer, AtomicInteger>();
connectedPeers = new LinkedBlockingQueue<Peer>();
disconnectedPeers = new LinkedBlockingQueue<Peer>();
listener = new AbstractPeerEventListener() {
@Override
public void onPeerConnected(Peer peer, int peerCount) {
connectedPeers.add(peer);
@ -98,6 +89,21 @@ public class PeerGroupTest extends TestWithPeerGroup {
return m;
}
};
super.setUp(new MemoryBlockStore(UnitTestParams.get()));
peerGroup.addWallet(wallet);
}
@After
public void tearDown() throws Exception {
super.tearDown();
Utils.finishMockSleep();
peerGroup.stopAndWait();
}
@Test
public void listener() throws Exception {
final SettableFuture<Void> firstDisconnectFuture = SettableFuture.create();
final SettableFuture<Void> secondDisconnectFuture = SettableFuture.create();
peerGroup.startAndWait();
peerGroup.addEventListener(listener);
@ -430,6 +436,75 @@ public class PeerGroupTest extends TestWithPeerGroup {
Thread.sleep(50);
assertFalse(peerConnectedFuture.isDone() || peerDisconnectedFuture.isDone());
Thread.sleep(60);
assertTrue(!peerConnectedFuture.isDone());
assertTrue(!peerConnectedFuture.isDone() && peerDisconnectedFuture.isDone());
}
@Test
public void peerPriority() throws Exception {
final List<InetSocketAddress> addresses = Lists.newArrayList(
new InetSocketAddress("localhost", 2000),
new InetSocketAddress("localhost", 2001),
new InetSocketAddress("localhost", 2002)
);
peerGroup.addEventListener(listener);
peerGroup.addPeerDiscovery(new PeerDiscovery() {
public InetSocketAddress[] getPeers(long unused, TimeUnit unused2) throws PeerDiscoveryException {
return addresses.toArray(new InetSocketAddress[0]);
}
public void shutdown() {
}
});
peerGroup.setMaxConnections(3);
Utils.setMockSleep(true);
peerGroup.startAndWait();
handleConnectToPeer(0);
handleConnectToPeer(1);
handleConnectToPeer(2);
connectedPeers.take();
connectedPeers.take();
connectedPeers.take();
addresses.clear();
addresses.addAll(Lists.newArrayList(new InetSocketAddress("localhost", 2003)));
stopPeerServer(2);
assertEquals(2002, disconnectedPeers.take().getAddress().getPort()); // peer died
// discovers, connects to new peer
handleConnectToPeer(3);
assertEquals(2003, connectedPeers.take().getAddress().getPort());
stopPeerServer(1);
assertEquals(2001, disconnectedPeers.take().getAddress().getPort()); // peer died
// Alternates trying two offline peers
Utils.passMockSleep();
assertEquals(2001, disconnectedPeers.take().getAddress().getPort());
Utils.passMockSleep();
assertEquals(2002, disconnectedPeers.take().getAddress().getPort());
Utils.passMockSleep();
assertEquals(2001, disconnectedPeers.take().getAddress().getPort());
Utils.passMockSleep();
assertEquals(2002, disconnectedPeers.take().getAddress().getPort());
Utils.passMockSleep();
assertEquals(2001, disconnectedPeers.take().getAddress().getPort());
// Peer 2 comes online
startPeerServer(2);
Utils.passMockSleep();
handleConnectToPeer(2);
assertEquals(2002, connectedPeers.take().getAddress().getPort());
stopPeerServer(2);
assertEquals(2002, disconnectedPeers.take().getAddress().getPort()); // peer died
// Peer 2 is tried twice before peer 1, since it has a lower backoff due to recent success
Utils.passMockSleep();
assertEquals(2002, disconnectedPeers.take().getAddress().getPort());
Utils.passMockSleep();
assertEquals(2002, disconnectedPeers.take().getAddress().getPort());
Utils.passMockSleep();
assertEquals(2001, disconnectedPeers.take().getAddress().getPort());
}
}

View File

@ -78,7 +78,8 @@ public class PeerTest extends TestWithNetworkConnections {
memoryPool = new MemoryPool();
VersionMessage ver = new VersionMessage(unitTestParams, 100);
peer = new Peer(unitTestParams, ver, new InetSocketAddress("127.0.0.1", 4000), blockChain, memoryPool);
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 4000);
peer = new Peer(unitTestParams, ver, new PeerAddress(address), blockChain, memoryPool);
peer.addWallet(wallet);
}
@ -265,7 +266,8 @@ public class PeerTest extends TestWithNetworkConnections {
public void invDownloadTxMultiPeer() throws Exception {
// Check co-ordination of which peer to download via the memory pool.
VersionMessage ver = new VersionMessage(unitTestParams, 100);
Peer peer2 = new Peer(unitTestParams, ver, new InetSocketAddress("127.0.0.1", 4242), blockChain, memoryPool);
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 4242);
Peer peer2 = new Peer(unitTestParams, ver, new PeerAddress(address), blockChain, memoryPool);
peer2.addWallet(wallet);
VersionMessage peerVersion = new VersionMessage(unitTestParams, OTHER_PEER_CHAIN_HEIGHT);
peerVersion.clientVersion = 70001;
@ -276,7 +278,7 @@ public class PeerTest extends TestWithNetworkConnections {
// Make a tx and advertise it to one of the peers.
BigInteger value = Utils.toNanoCoins(1, 0);
Transaction tx = createFakeTx(unitTestParams, value, address);
Transaction tx = createFakeTx(unitTestParams, value, this.address);
InventoryMessage inv = new InventoryMessage(unitTestParams);
InventoryItem item = new InventoryItem(InventoryItem.Type.Transaction, tx.getHash());
inv.addItem(item);

View File

@ -24,6 +24,7 @@ import com.google.bitcoin.utils.BriefLogFormatter;
import com.google.bitcoin.utils.Threading;
import com.google.common.util.concurrent.SettableFuture;
import java.io.IOException;
import java.math.BigInteger;
import java.net.InetAddress;
import java.net.InetSocketAddress;
@ -40,6 +41,7 @@ import static org.junit.Assert.assertTrue;
* Utility class that makes it easy to work with mock NetworkConnections.
*/
public class TestWithNetworkConnections {
public static final int PEER_SERVERS = 5;
protected NetworkParameters unitTestParams;
protected BlockStore blockStore;
protected BlockChain blockChain;
@ -48,7 +50,7 @@ public class TestWithNetworkConnections {
protected Address address;
protected SocketAddress socketAddress;
private NioServer peerServer;
private NioServer peerServers[] = new NioServer[PEER_SERVERS];
private final ClientConnectionManager channels;
protected final BlockingQueue<InboundMessageQueuer> newPeerWriteTargetQueue = new LinkedBlockingQueue<InboundMessageQueuer>();
@ -85,29 +87,51 @@ public class TestWithNetworkConnections {
wallet.addKey(key);
blockChain = new BlockChain(unitTestParams, wallet, blockStore);
peerServer = new NioServer(new StreamParserFactory() {
@Nullable
@Override
public StreamParser getNewParser(InetAddress inetAddress, int port) {
return new InboundMessageQueuer(unitTestParams) {
@Override public void connectionClosed() { }
@Override
public void connectionOpened() {
newPeerWriteTargetQueue.offer(this);
}
};
}
}, new InetSocketAddress("127.0.0.1", 2000));
peerServer.startAndWait();
startPeerServers();
if (clientType == ClientType.NIO_CLIENT_MANAGER || clientType == ClientType.BLOCKING_CLIENT_MANAGER)
channels.startAndWait();
socketAddress = new InetSocketAddress("127.0.0.1", 1111);
}
protected void startPeerServers() throws IOException {
for (int i = 0 ; i < PEER_SERVERS ; i++) {
startPeerServer(i);
}
}
protected void startPeerServer(int i) throws IOException {
peerServers[i] = new NioServer(new StreamParserFactory() {
@Nullable
@Override
public StreamParser getNewParser(InetAddress inetAddress, int port) {
return new InboundMessageQueuer(unitTestParams) {
@Override
public void connectionClosed() {
}
@Override
public void connectionOpened() {
newPeerWriteTargetQueue.offer(this);
}
};
}
}, new InetSocketAddress("127.0.0.1", 2000 + i));
peerServers[i].startAndWait();
}
public void tearDown() throws Exception {
Wallet.SendRequest.DEFAULT_FEE_PER_KB = Transaction.REFERENCE_DEFAULT_MIN_TX_FEE;
peerServer.stopAndWait();
stopPeerServers();
}
protected void stopPeerServers() {
for (int i = 0 ; i < PEER_SERVERS ; i++)
stopPeerServer(i);
}
protected void stopPeerServer(int i) {
peerServers[i].stopAndWait();
}
protected InboundMessageQueuer connect(Peer peer, VersionMessage versionMessage) throws Exception {

View File

@ -20,6 +20,8 @@ import com.google.bitcoin.params.UnitTestParams;
import com.google.bitcoin.net.BlockingClientManager;
import com.google.bitcoin.net.NioClientManager;
import com.google.bitcoin.store.BlockStore;
import com.google.bitcoin.utils.ExponentialBackoff;
import com.google.common.base.Preconditions;
import java.net.InetSocketAddress;
@ -61,10 +63,9 @@ public class TestWithPeerGroup extends TestWithNetworkConnections {
}
protected InboundMessageQueuer connectPeerWithoutVersionExchange(int id) throws Exception {
InetSocketAddress remoteAddress = new InetSocketAddress("127.0.0.1", 2000);
Preconditions.checkArgument(id < PEER_SERVERS);
InetSocketAddress remoteAddress = new InetSocketAddress("127.0.0.1", 2000 + id);
Peer peer = peerGroup.connectTo(remoteAddress).getConnectionOpenFuture().get();
// Claim we are connected to a different IP that what we really are, so tx confidence broadcastBy sets work
peer.remoteIp = new InetSocketAddress("127.0.0.1", 2000 + id);
InboundMessageQueuer writeTarget = newPeerWriteTargetQueue.take();
writeTarget.peer = peer;
return writeTarget;
@ -88,4 +89,25 @@ public class TestWithPeerGroup extends TestWithNetworkConnections {
}
return writeTarget;
}
// handle peer discovered by PeerGroup
protected InboundMessageQueuer handleConnectToPeer(int id) throws Exception {
return handleConnectToPeer(id, remoteVersionMessage);
}
// handle peer discovered by PeerGroup
protected InboundMessageQueuer handleConnectToPeer(int id, VersionMessage versionMessage) throws Exception {
InboundMessageQueuer writeTarget = newPeerWriteTargetQueue.take();
checkArgument(versionMessage.hasBlockChain());
// Complete handshake with the peer - send/receive version(ack)s, receive bloom filter
writeTarget.sendMessage(versionMessage);
writeTarget.sendMessage(new VersionAck());
assertTrue(writeTarget.nextMessageBlocking() instanceof VersionMessage);
assertTrue(writeTarget.nextMessageBlocking() instanceof VersionAck);
if (versionMessage.isBloomFilteringSupported()) {
assertTrue(writeTarget.nextMessageBlocking() instanceof BloomFilter);
assertTrue(writeTarget.nextMessageBlocking() instanceof MemoryPoolMessage);
}
return writeTarget;
}
}

View File

@ -0,0 +1,82 @@
/**
* Copyright 2013 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.bitcoin.core.utils;
import com.google.bitcoin.core.Utils;
import com.google.bitcoin.utils.ExponentialBackoff;
import org.junit.Before;
import org.junit.Test;
import java.util.PriorityQueue;
import static org.junit.Assert.*;
/**
*/
public class ExponentialBackoffTest {
private ExponentialBackoff.Params params;
private ExponentialBackoff backoff;
@Before
public void setUp() {
Utils.setMockClock(System.currentTimeMillis() / 1000);
params = new ExponentialBackoff.Params();
backoff = new ExponentialBackoff(params);
}
@Test
public void testSuccess() {
assertEquals(Utils.currentTimeMillis(), backoff.getRetryTime());
backoff.trackFailure();
backoff.trackFailure();
backoff.trackSuccess();
assertEquals(Utils.currentTimeMillis(), backoff.getRetryTime());
}
@Test
public void testFailure() {
assertEquals(Utils.currentTimeMillis(), backoff.getRetryTime());
backoff.trackFailure();
backoff.trackFailure();
backoff.trackFailure();
assertEquals(Utils.currentTimeMillis() + 121, backoff.getRetryTime());
}
@Test
public void testInQueue() {
PriorityQueue<ExponentialBackoff> queue = new PriorityQueue<ExponentialBackoff>();
ExponentialBackoff backoff1 = new ExponentialBackoff(params);
backoff.trackFailure();
backoff.trackFailure();
backoff1.trackFailure();
backoff1.trackFailure();
backoff1.trackFailure();
queue.offer(backoff);
queue.offer(backoff1);
assertEquals(queue.poll(), backoff); // The one with soonest retry time
assertEquals(queue.peek(), backoff1);
queue.offer(backoff);
assertEquals(queue.poll(), backoff); // Still the same one
}
}

View File

@ -19,6 +19,7 @@ package com.google.bitcoin.examples;
import com.google.bitcoin.core.AbstractPeerEventListener;
import com.google.bitcoin.core.NetworkParameters;
import com.google.bitcoin.core.Peer;
import com.google.bitcoin.core.PeerAddress;
import com.google.bitcoin.core.VersionMessage;
import com.google.bitcoin.net.discovery.DnsDiscovery;
import com.google.bitcoin.net.discovery.PeerDiscoveryException;
@ -79,7 +80,8 @@ public class PrintPeers {
List<ListenableFuture<Void>> futures = Lists.newArrayList();
NioClientManager clientManager = new NioClientManager();
for (final InetAddress addr : addrs) {
final Peer peer = new Peer(params, new VersionMessage(params, 0), null, new InetSocketAddress(addr, params.getPort()));
InetSocketAddress address = new InetSocketAddress(addr, params.getPort());
final Peer peer = new Peer(params, new VersionMessage(params, 0), null, new PeerAddress(address));
final SettableFuture future = SettableFuture.create();
// Once the connection has completed version handshaking ...
peer.addEventListener(new AbstractPeerEventListener() {
@ -110,7 +112,7 @@ public class PrintPeers {
future.set(null);
}
});
clientManager.openConnection(new InetSocketAddress(addr, params.getPort()), peer);
clientManager.openConnection(address, peer);
futures.add(future);
}
// Wait for every tried connection to finish.