3
0
mirror of https://github.com/Qortal/altcoinj.git synced 2025-02-11 17:55:53 +00:00

Rewrite how peer group manages connections and its internal thread:

- No longer uses Guava services, the change is source compatible but the two-step API is no longer needed

- Now has a dedicated ScheduledExecutorService as its core service thread, so we can schedule jobs for the future instead of using sleeps.

- Connection code was rewritten to be easier to follow (at least I think so).

The goal here is to generalise the peer group thread so it can do more things.
This commit is contained in:
Mike Hearn 2014-11-13 22:45:52 +01:00
parent 573b487c2b
commit 27bc229fab
6 changed files with 293 additions and 193 deletions

View File

@ -37,6 +37,7 @@ import org.bitcoinj.net.discovery.PeerDiscovery;
import org.bitcoinj.net.discovery.PeerDiscoveryException;
import org.bitcoinj.net.discovery.TorDiscovery;
import org.bitcoinj.script.Script;
import org.bitcoinj.utils.DaemonThreadFactory;
import org.bitcoinj.utils.ExponentialBackoff;
import org.bitcoinj.utils.ListenerRegistration;
import org.bitcoinj.utils.Threading;
@ -76,14 +77,24 @@ import static com.google.common.base.Preconditions.checkState;
* when finished. Note that not all methods of PeerGroup are safe to call from a UI thread as some may do
* network IO, but starting and stopping the service should be fine.</p>
*/
public class PeerGroup extends AbstractExecutionThreadService implements TransactionBroadcaster {
public class PeerGroup implements TransactionBroadcaster {
private static final Logger log = LoggerFactory.getLogger(PeerGroup.class);
private static final int DEFAULT_CONNECTIONS = 4;
private static final int TOR_TIMEOUT_SECONDS = 60;
private int maxPeersToDiscoverCount = 100;
private int vMaxPeersToDiscoverCount = 100;
protected final ReentrantLock lock = Threading.lock("peergroup");
// This executor is used to queue up jobs: it's used when we don't want to use locks for mutual exclusion,
// typically because the job might call in to user provided code that needs/wants the freedom to use the API
// however it wants, or because a job needs to be ordered relative to other jobs like that.
protected final ListeningScheduledExecutorService executor;
// Whether the peer group is currently running. Once shut down it cannot be restarted.
private volatile boolean vRunning;
// Whether the peer group has been started or not. An unstarted PG does not try to access the network.
private volatile boolean vUsedUp;
// Addresses to try to connect to, excluding active peers.
@GuardedBy("lock") private final PriorityQueue<PeerAddress> inactives;
@GuardedBy("lock") private final Map<PeerAddress, ExponentialBackoff> backoffMap;
@ -180,13 +191,13 @@ public class PeerGroup extends AbstractExecutionThreadService implements Transac
if (!sendIfChangedQueued) {
log.info("Queuing recalc of the Bloom filter due to new keys or scripts becoming available");
sendIfChangedQueued = true;
Uninterruptibles.putUninterruptibly(jobQueue, bloomSendIfChanged);
executor.execute(bloomSendIfChanged);
}
} else {
if (!dontSendQueued) {
log.info("Queuing recalc of the Bloom filter due to observing a pay to pubkey output on a relevant tx");
dontSendQueued = true;
Uninterruptibles.putUninterruptibly(jobQueue, bloomDontSend);
executor.execute(bloomDontSend);
}
}
}
@ -238,10 +249,7 @@ public class PeerGroup extends AbstractExecutionThreadService implements Transac
// Exponential backoff for peers starts at 1 second and maxes at 10 minutes.
private ExponentialBackoff.Params peerBackoffParams = new ExponentialBackoff.Params(1000, 1.5f, 10 * 60 * 1000);
// Tracks failures globally in case of a network failure.
private ExponentialBackoff groupBackoff = new ExponentialBackoff(new ExponentialBackoff.Params(1000, 1.5f, 10 * 1000));
// Things for the dedicated PeerGroup management thread to do.
private LinkedBlockingQueue<Runnable> jobQueue = new LinkedBlockingQueue<Runnable>();
@GuardedBy("lock") private ExponentialBackoff groupBackoff = new ExponentialBackoff(new ExponentialBackoff.Params(1000, 1.5f, 10 * 1000));
// This is a synchronized set, so it locks on itself. We use it to prevent TransactionBroadcast objects from
// being garbage collected if nothing in the apps code holds on to them transitively. See the discussion
@ -348,6 +356,8 @@ public class PeerGroup extends AbstractExecutionThreadService implements Transac
peerFilterProviders = new CopyOnWriteArrayList<PeerFilterProvider>();
this.torClient = torClient;
executor = createPrivateExecutor();
// This default sentinel value will be overridden by one of two actions:
// - adding a peer discovery source sets it to the default
// - using connectTo() will increment it by one
@ -382,6 +392,23 @@ public class PeerGroup extends AbstractExecutionThreadService implements Transac
bloomFilterMerger = new FilterMerger(DEFAULT_BLOOM_FILTER_FP_RATE);
}
private CountDownLatch executorStartupLatch = new CountDownLatch(1);
protected ListeningScheduledExecutorService createPrivateExecutor() {
ListeningScheduledExecutorService result = MoreExecutors.listeningDecorator(
new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory("PeerGroup Thread"))
);
// Hack: jam the executor so jobs just queue up until the user calls start() on us. For example, adding a wallet
// results in a bloom filter recalc being queued, but we don't want to do that until we're actually started.
result.execute(new Runnable() {
@Override
public void run() {
Uninterruptibles.awaitUninterruptibly(executorStartupLatch);
}
});
return result;
}
/**
* Adjusts the desired number of connections that we will create to peers. Note that if there are already peers
* open and the new value is lower than the current number of peers, those connections will be terminated. Likewise
@ -419,23 +446,86 @@ public class PeerGroup extends AbstractExecutionThreadService implements Transac
}
private Runnable triggerConnectionsJob = new Runnable() {
private boolean firstRun = true;
@Override
public void run() {
// We have to test the condition at the end, because during startup we need to run this at least once
// when isRunning() can return false.
do {
try {
connectToAnyPeer();
} catch (PeerDiscoveryException e) {
groupBackoff.trackFailure();
if (!vRunning) return;
boolean doDiscovery = false;
long now = Utils.currentTimeMillis();
lock.lock();
try {
// First run: try and use a local node if there is one, for the additional security it can provide.
// But, not on Android as there are none for this platform: it could only be a malicious app trying
// to hijack our traffic.
if (!Utils.isAndroidRuntime() && useLocalhostPeerWhenPossible && maybeCheckForLocalhostPeer() && firstRun) {
log.info("Localhost peer detected, trying to use it instead of P2P discovery");
maxConnections = 0;
connectToLocalHost();
return;
}
} while (isRunning() && countConnectedAndPendingPeers() < getMaxConnections());
boolean havePeerWeCanTry = !inactives.isEmpty() && backoffMap.get(inactives.peek()).getRetryTime() <= now;
doDiscovery = !havePeerWeCanTry;
} finally {
firstRun = false;
lock.unlock();
}
// Don't hold the lock across discovery as this process can be very slow.
boolean discoverySuccess = false;
if (doDiscovery) {
try {
discoverySuccess = discoverPeers() > 0;
} catch (PeerDiscoveryException e) {
log.error("Peer discovery failure", e);
}
}
long retryTime = 0;
PeerAddress addrToTry = null;
lock.lock();
try {
if (doDiscovery) {
if (discoverySuccess) {
groupBackoff.trackSuccess();
} else {
groupBackoff.trackFailure();
}
}
// Inactives is sorted by backoffMap time.
if (inactives.isEmpty()) {
log.info("Peer discovery didn't provide us any more peers, will try again later.");
executor.schedule(this, groupBackoff.getRetryTime() - now, TimeUnit.MILLISECONDS);
return;
} else {
do {
addrToTry = inactives.poll();
} while (ipv6Unreachable && addrToTry.getAddr() instanceof Inet6Address);
retryTime = backoffMap.get(addrToTry).getRetryTime();
}
retryTime = Math.max(retryTime, groupBackoff.getRetryTime());
if (retryTime > now) {
long delay = retryTime - now;
log.info("Waiting {} msec before next connect attempt {}", delay, addrToTry == null ? "" : "to " + addrToTry);
inactives.add(addrToTry);
executor.schedule(this, delay, TimeUnit.MILLISECONDS);
return;
}
} finally {
lock.unlock();
}
connectTo(addrToTry, false, vConnectTimeoutMillis);
if (countConnectedAndPendingPeers() < getMaxConnections()) {
executor.execute(this); // Try next peer immediately.
}
}
};
private void triggerConnections() {
// Run on a background thread due to the need to potentially retry and back off in the background.
Uninterruptibles.putUninterruptibly(jobQueue, triggerConnectionsJob);
executor.execute(triggerConnectionsJob);
}
/** The maximum number of connections that we will create to peers. */
@ -641,11 +731,16 @@ public class PeerGroup extends AbstractExecutionThreadService implements Transac
}
private void addInactive(PeerAddress peerAddress) {
// Deduplicate
if (backoffMap.containsKey(peerAddress))
return;
backoffMap.put(peerAddress, new ExponentialBackoff(peerBackoffParams));
inactives.offer(peerAddress);
lock.lock();
try {
// Deduplicate
if (backoffMap.containsKey(peerAddress))
return;
backoffMap.put(peerAddress, new ExponentialBackoff(peerBackoffParams));
inactives.offer(peerAddress);
} finally {
lock.unlock();
}
}
/** Convenience method for addAddress(new PeerAddress(address, params.port)); */
@ -668,62 +763,41 @@ public class PeerGroup extends AbstractExecutionThreadService implements Transac
}
}
protected void discoverPeers() throws PeerDiscoveryException {
checkState(lock.isHeldByCurrentThread());
if (peerDiscoverers.isEmpty())
throw new PeerDiscoveryException("No peer discoverers registered");
/** Returns number of discovered peers. */
protected int discoverPeers() throws PeerDiscoveryException {
// Don't hold the lock whilst doing peer discovery: it can take a long time and cause high API latency.
checkState(!lock.isHeldByCurrentThread());
int maxPeersToDiscoverCount = this.vMaxPeersToDiscoverCount;
long start = System.currentTimeMillis();
final List<PeerAddress> addressList = Lists.newLinkedList();
for (PeerDiscovery peerDiscovery : peerDiscoverers) {
for (PeerDiscovery peerDiscovery : peerDiscoverers /* COW */) {
InetSocketAddress[] addresses;
// Don't hold the peergroup lock across peer discovery as it's likely to be very slow and would make the
// peergroup API high latency.
lock.unlock();
try {
addresses = peerDiscovery.getPeers(5, TimeUnit.SECONDS);
} finally {
lock.lock();
}
addresses = peerDiscovery.getPeers(5, TimeUnit.SECONDS);
for (InetSocketAddress address : addresses) addressList.add(new PeerAddress(address));
if (addressList.size() >= maxPeersToDiscoverCount) break;
}
for (PeerAddress address : addressList) {
addInactive(address);
if (!addressList.isEmpty()) {
for (PeerAddress address : addressList) {
addInactive(address);
}
final ImmutableSet<PeerAddress> peersDiscoveredSet = ImmutableSet.copyOf(addressList);
for (final ListenerRegistration<PeerEventListener> registration : peerEventListeners /* COW */) {
registration.executor.execute(new Runnable() {
@Override
public void run() {
registration.listener.onPeersDiscovered(peersDiscoveredSet);
}
});
}
}
final ImmutableSet<PeerAddress> peersDiscoveredSet = ImmutableSet.copyOf(addressList);
for (final ListenerRegistration<PeerEventListener> registration : peerEventListeners) {
registration.executor.execute(new Runnable() {
@Override
public void run() {
registration.listener.onPeersDiscovered(peersDiscoveredSet);
}
});
}
log.info("Peer discovery took {}msec and returned {} items",
System.currentTimeMillis() - start, addressList.size());
}
@Override
protected void run() throws Exception {
// Runs in a background thread dedicated to the PeerGroup. Jobs are for handling peer connections with backoff,
// and also recalculating filters.
while (isRunning()) {
jobQueue.take().run();
}
return addressList.size();
}
@VisibleForTesting
void waitForJobQueue() {
final CountDownLatch latch = new CountDownLatch(1);
Uninterruptibles.putUninterruptibly(jobQueue, new Runnable() {
@Override
public void run() {
latch.countDown();
}
});
Uninterruptibles.awaitUninterruptibly(latch);
Futures.getUnchecked(executor.submit(Runnables.doNothing()));
}
private int countConnectedAndPendingPeers() {
@ -765,107 +839,98 @@ public class PeerGroup extends AbstractExecutionThreadService implements Transac
return false;
}
/** Picks a peer from discovery and connects to it. If connection fails, picks another and tries again. */
protected void connectToAnyPeer() throws PeerDiscoveryException {
final State state = state();
if (!(state == State.STARTING || state == State.RUNNING)) return;
PeerAddress addr = null;
long nowMillis = Utils.currentTimeMillis();
long retryTime = 0;
lock.lock();
try {
if (useLocalhostPeerWhenPossible && maybeCheckForLocalhostPeer()) {
log.info("Localhost peer detected, trying to use it instead of P2P discovery");
maxConnections = 0;
connectToLocalHost();
return;
}
if (!haveReadyInactivePeer(nowMillis)) {
// Release the lock here because we'll probably do slow things like DNS lookups below,
discoverPeers();
groupBackoff.trackSuccess();
nowMillis = Utils.currentTimeMillis();
}
if (inactives.size() == 0) {
log.debug("Peer discovery didn't provide us any more peers, not trying to build new connection.");
return;
}
while (addr == null || (ipv6Unreachable && addr.getAddr() instanceof Inet6Address))
addr = inactives.poll();
retryTime = backoffMap.get(addr).getRetryTime();
} finally {
// discoverPeers might throw an exception if something goes wrong: we then hit this path with addr == null.
retryTime = Math.max(retryTime, groupBackoff.getRetryTime());
lock.unlock();
if (retryTime > nowMillis) {
// Sleep until retry time
final long millis = retryTime - nowMillis;
log.info("Waiting {} msec before next connect attempt {}", millis, addr == null ? "" : "to " + addr);
Utils.sleep(millis);
}
}
// This method constructs a Peer and puts it into pendingPeers.
checkNotNull(addr); // Help static analysis which can't see that addr is always set if we didn't throw above.
connectTo(addr, false, vConnectTimeoutMillis);
}
private boolean haveReadyInactivePeer(long nowMillis) {
// No inactive peers to try?
if (inactives.size() == 0)
return false;
// All peers have not reached backoff retry time?
if (backoffMap.get(inactives.peek()).getRetryTime() > nowMillis)
return false;
return true;
}
@Override
protected void startUp() throws Exception {
/**
* Starts the PeerGroup and begins network activity.
* @return A future that completes when first connection activity has been triggered (note: not first connection made).
*/
public ListenableFuture startAsync() {
// This is run in a background thread by the Service implementation.
if (chain == null) {
// Just try to help catch what might be a programming error.
log.warn("Starting up with no attached block chain. Did you forget to pass one to the constructor?");
}
vPingTimer = new Timer("Peer pinging thread", true);
if (torClient != null) {
log.info("Starting Tor/Orchid ...");
torClient.start();
torClient.waitUntilReady(TOR_TIMEOUT_SECONDS * 1000);
log.info("Tor ready");
}
channels.startAsync();
channels.awaitRunning();
triggerConnections();
}
@Override
protected void shutDown() throws Exception {
// This is run on a separate thread by the Service implementation.
vPingTimer.cancel();
// Blocking close of all sockets.
channels.stopAsync();
channels.awaitTerminated();
for (PeerDiscovery peerDiscovery : peerDiscoverers) {
peerDiscovery.shutdown();
}
if (torClient != null) {
torClient.stop();
}
}
@Override
protected void triggerShutdown() {
// Force the thread to wake up.
Uninterruptibles.putUninterruptibly(jobQueue, new Runnable() {
checkState(!vUsedUp, "Cannot start a peer group twice");
vRunning = true;
vUsedUp = true;
executorStartupLatch.countDown();
// We do blocking waits during startup, so run on the executor thread.
return executor.submit(new Runnable() {
@Override
public void run() {
log.info("Starting ...");
vPingTimer = new Timer("Peer pinging thread", true);
if (torClient != null) {
log.info("Starting Tor/Orchid ...");
torClient.start();
try {
torClient.waitUntilReady(TOR_TIMEOUT_SECONDS * 1000);
} catch (Exception e) {
throw new RuntimeException(e);
}
log.info("Tor ready");
}
channels.startAsync();
channels.awaitRunning();
triggerConnections();
}
});
}
/** Does a blocking startup. */
public void start() {
Futures.getUnchecked(startAsync());
}
/** Can just use start() for a blocking start here instead of startAsync/awaitRunning: PeerGroup is no longer a Guava service. */
@Deprecated
public void awaitRunning() {
waitForJobQueue();
}
public ListenableFuture stopAsync() {
checkState(vRunning);
vRunning = false;
ListenableFuture future = executor.submit(new Runnable() {
@Override
public void run() {
log.info("Stopping ...");
vPingTimer.cancel();
// Blocking close of all sockets.
channels.stopAsync();
channels.awaitTerminated();
for (PeerDiscovery peerDiscovery : peerDiscoverers) {
peerDiscovery.shutdown();
}
if (torClient != null) {
torClient.stop();
}
vRunning = false;
}
});
executor.shutdown();
return future;
}
/** Does a blocking stop */
public void stop() {
try {
stopAsync();
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
/** Can just use stop() here instead of stopAsync/awaitTerminated: PeerGroup is no longer a Guava service. */
@Deprecated
public void awaitTerminated() {
try {
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
/**
* <p>Link the given wallet to this PeerGroup. This is used for three purposes:</p>
*
@ -1332,8 +1397,7 @@ public class PeerGroup extends AbstractExecutionThreadService implements Transac
protected void handlePeerDeath(final Peer peer, @Nullable Exception exception) {
// Peer deaths can occur during startup if a connect attempt after peer discovery aborts immediately.
final State state = state();
if (state != State.RUNNING && state != State.STARTING) return;
if (!isRunning()) return;
int numPeers;
int numConnectedPeers = 0;
@ -1715,7 +1779,7 @@ public class PeerGroup extends AbstractExecutionThreadService implements Transac
* @return the maximum number of peers to discover
*/
public int getMaxPeersToDiscoverCount() {
return maxPeersToDiscoverCount;
return vMaxPeersToDiscoverCount;
}
/**
@ -1724,7 +1788,7 @@ public class PeerGroup extends AbstractExecutionThreadService implements Transac
* @param maxPeersToDiscoverCount the maximum number of peers to discover
*/
public void setMaxPeersToDiscoverCount(int maxPeersToDiscoverCount) {
this.maxPeersToDiscoverCount = maxPeersToDiscoverCount;
this.vMaxPeersToDiscoverCount = maxPeersToDiscoverCount;
}
/** See {@link #setUseLocalhostPeerWhenPossible(boolean)} */
@ -1750,4 +1814,8 @@ public class PeerGroup extends AbstractExecutionThreadService implements Transac
lock.unlock();
}
}
public boolean isRunning() {
return vRunning;
}
}

View File

@ -18,9 +18,7 @@
package org.bitcoinj.kits;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Service;
import com.google.common.util.concurrent.*;
import com.subgraph.orchid.TorClient;
import org.bitcoinj.core.*;
import org.bitcoinj.net.discovery.DnsDiscovery;
@ -293,8 +291,7 @@ public class WalletAppKit extends AbstractIdleService {
onSetupCompleted();
if (blockingStartup) {
vPeerGroup.startAsync();
vPeerGroup.awaitRunning();
vPeerGroup.start();
// Make sure we shut down cleanly.
installShutdownHook();
completeExtensionInitiations(vPeerGroup);
@ -304,20 +301,20 @@ public class WalletAppKit extends AbstractIdleService {
vPeerGroup.startBlockChainDownload(listener);
listener.await();
} else {
vPeerGroup.startAsync();
vPeerGroup.addListener(new Service.Listener() {
Futures.addCallback(vPeerGroup.startAsync(), new FutureCallback() {
@Override
public void running() {
public void onSuccess(@Nullable Object result) {
completeExtensionInitiations(vPeerGroup);
final PeerEventListener l = downloadListener == null ? new DownloadListener() : downloadListener;
vPeerGroup.startBlockChainDownload(l);
}
@Override
public void failed(State from, Throwable failure) {
throw new RuntimeException(failure);
public void onFailure(Throwable t) {
throw new RuntimeException(t);
}
}, MoreExecutors.sameThreadExecutor());
});
}
} catch (BlockStoreException e) {
throw new IOException(e);
@ -441,8 +438,7 @@ public class WalletAppKit extends AbstractIdleService {
protected void shutDown() throws Exception {
// Runs in a separate thread.
try {
vPeerGroup.stopAsync();
vPeerGroup.awaitTerminated();
vPeerGroup.stop();
vWallet.saveToFile(vWalletFile);
vStore.close();

View File

@ -16,15 +16,19 @@
package org.bitcoinj.testing;
import com.google.common.util.concurrent.*;
import org.bitcoinj.core.*;
import org.bitcoinj.net.BlockingClientManager;
import org.bitcoinj.net.ClientConnectionManager;
import org.bitcoinj.net.NioClientManager;
import org.bitcoinj.params.UnitTestParams;
import org.bitcoinj.store.BlockStore;
import org.bitcoinj.store.MemoryBlockStore;
import com.google.common.base.Preconditions;
import org.bitcoinj.utils.DaemonThreadFactory;
import java.net.InetSocketAddress;
import java.util.concurrent.*;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
@ -58,6 +62,7 @@ public class TestWithPeerGroup extends TestWithNetworkConnections {
remoteVersionMessage = new VersionMessage(unitTestParams, 1);
remoteVersionMessage.localServices = VersionMessage.NODE_NETWORK;
remoteVersionMessage.clientVersion = NotFoundMessage.MIN_PROTOCOL_VERSION;
blockJobs = false;
initPeerGroup();
}
@ -65,9 +70,9 @@ public class TestWithPeerGroup extends TestWithNetworkConnections {
public void tearDown() {
try {
super.tearDown();
blockJobs = false;
Utils.finishMockSleep();
peerGroup.stopAsync();
peerGroup.awaitTerminated();
} catch (Exception e) {
throw new RuntimeException(e);
}
@ -75,14 +80,40 @@ public class TestWithPeerGroup extends TestWithNetworkConnections {
protected void initPeerGroup() {
if (clientType == ClientType.NIO_CLIENT_MANAGER)
peerGroup = new PeerGroup(unitTestParams, blockChain, new NioClientManager());
peerGroup = createPeerGroup(new NioClientManager());
else
peerGroup = new PeerGroup(unitTestParams, blockChain, new BlockingClientManager());
peerGroup = createPeerGroup(new BlockingClientManager());
peerGroup.setPingIntervalMsec(0); // Disable the pings as they just get in the way of most tests.
peerGroup.addWallet(wallet);
peerGroup.setUseLocalhostPeerWhenPossible(false); // Prevents from connecting to bitcoin nodes on localhost.
}
protected boolean blockJobs = false;
protected final Semaphore jobBlocks = new Semaphore(0);
private PeerGroup createPeerGroup(final ClientConnectionManager manager) {
return new PeerGroup(unitTestParams, blockChain, manager) {
@Override
protected ListeningScheduledExecutorService createPrivateExecutor() {
return MoreExecutors.listeningDecorator(new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory("PeerGroup test thread")) {
@Override
public ScheduledFuture<?> schedule(final Runnable command, final long delay, final TimeUnit unit) {
if (!blockJobs)
return super.schedule(command, delay, unit);
return super.schedule(new Runnable() {
@Override
public void run() {
Utils.rollMockClockMillis(unit.toMillis(delay));
command.run();
jobBlocks.acquireUninterruptibly();
}
}, 0 /* immediate */, unit);
}
});
}
};
}
protected InboundMessageQueuer connectPeerWithoutVersionExchange(int id) throws Exception {
Preconditions.checkArgument(id < PEER_SERVERS);
InetSocketAddress remoteAddress = new InetSocketAddress("127.0.0.1", 2000 + id);

View File

@ -1,16 +1,28 @@
package org.bitcoinj.utils;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
/** Thread factory whose threads are marked as daemon and won't prevent process exit. */
public class DaemonThreadFactory implements ThreadFactory {
@Nullable private final String name;
public DaemonThreadFactory(@Nullable String name) {
this.name = name;
}
public DaemonThreadFactory() {
this(null);
}
@Override
public Thread newThread(@Nonnull Runnable runnable) {
Thread thread = Executors.defaultThreadFactory().newThread(runnable);
thread.setDaemon(true);
if (name != null)
thread.setName(name);
return thread;
}
}

View File

@ -188,7 +188,6 @@ public class FilteredBlockAndPartialMerkleTreeTests extends TestWithPeerGroup {
// Peer 1 goes away.
closePeer(peerOf(p1));
peerGroup.stopAsync();
super.tearDown();
}
}

View File

@ -159,8 +159,7 @@ public class PeerGroupTest extends TestWithPeerGroup {
public void shutdown() {
}
});
peerGroup.startAsync();
peerGroup.awaitRunning();
peerGroup.start();
latch.await();
// Check that we did indeed throw an exception. If we got here it means we threw and then PeerGroup tried
// again a bit later.
@ -238,8 +237,6 @@ public class PeerGroupTest extends TestWithPeerGroup {
inbound(p2, new NotFoundMessage(unitTestParams, getdata.getItems()));
pingAndWait(p2);
assertEquals(value, wallet.getBalance(Wallet.BalanceType.ESTIMATED));
peerGroup.stopAsync();
peerGroup.awaitTerminated();
}
@ -276,9 +273,7 @@ public class PeerGroupTest extends TestWithPeerGroup {
inbound(p1, new NotFoundMessage(unitTestParams, getdata.getItems()));
pingAndWait(p1);
assertEquals(value, wallet2.getBalance(Wallet.BalanceType.ESTIMATED));
peerGroup.stopAsync();
peerGroup.awaitTerminated();
}
}
@Test
public void singleDownloadPeer1() throws Exception {
@ -320,7 +315,6 @@ public class PeerGroupTest extends TestWithPeerGroup {
// Peer 2 fetches it next time it hears an inv (should it fetch immediately?).
inbound(p2, inv);
assertTrue(outbound(p2) instanceof GetDataMessage);
peerGroup.stopAsync();
}
@Test
@ -358,7 +352,6 @@ public class PeerGroupTest extends TestWithPeerGroup {
InboundMessageQueuer p2 = connectPeer(2);
Message message = (Message)outbound(p2);
assertNull(message == null ? "" : message.toString(), message);
peerGroup.stopAsync();
}
@Test
@ -559,10 +552,15 @@ public class PeerGroupTest extends TestWithPeerGroup {
}
});
peerGroup.setMaxConnections(3);
Utils.setMockSleep(true);
blockJobs = true;
jobBlocks.release(2); // startup + first peer discovery
peerGroup.startAsync();
peerGroup.awaitRunning();
jobBlocks.release(3); // One for each peer.
handleConnectToPeer(0);
handleConnectToPeer(1);
handleConnectToPeer(2);
@ -575,6 +573,7 @@ public class PeerGroupTest extends TestWithPeerGroup {
assertEquals(2002, disconnectedPeers.take().getAddress().getPort()); // peer died
// discovers, connects to new peer
jobBlocks.release(1);
handleConnectToPeer(3);
assertEquals(2003, connectedPeers.take().getAddress().getPort());
@ -582,30 +581,25 @@ public class PeerGroupTest extends TestWithPeerGroup {
assertEquals(2001, disconnectedPeers.take().getAddress().getPort()); // peer died
// Alternates trying two offline peers
Utils.passMockSleep();
jobBlocks.release(10);
assertEquals(2001, disconnectedPeers.take().getAddress().getPort());
Utils.passMockSleep();
assertEquals(2002, disconnectedPeers.take().getAddress().getPort());
Utils.passMockSleep();
assertEquals(2001, disconnectedPeers.take().getAddress().getPort());
Utils.passMockSleep();
assertEquals(2002, disconnectedPeers.take().getAddress().getPort());
Utils.passMockSleep();
assertEquals(2001, disconnectedPeers.take().getAddress().getPort());
// Peer 2 comes online
startPeerServer(2);
Utils.passMockSleep();
jobBlocks.release(1);
handleConnectToPeer(2);
assertEquals(2002, connectedPeers.take().getAddress().getPort());
jobBlocks.release(6);
stopPeerServer(2);
assertEquals(2002, disconnectedPeers.take().getAddress().getPort()); // peer died
// Peer 2 is tried before peer 1, since it has a lower backoff due to recent success
Utils.passMockSleep();
assertEquals(2002, disconnectedPeers.take().getAddress().getPort());
Utils.passMockSleep();
assertEquals(2001, disconnectedPeers.take().getAddress().getPort());
}