diff --git a/core/pom.xml b/core/pom.xml index ed62bf34..653cd6e9 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -136,7 +136,6 @@ com.h2database:h2:1.3.167:jar:null:compile:d3867d586f087e53eb12fc65e5693d8ee9a5da17 com.lambdaworks:scrypt:1.3.3:jar:null:compile:06d6813de41e177189e1722717979b4fb5454b1d com.madgag:sc-light-jdk15on:1.47.0.2:jar:null:compile:d5c98671cc97fa0d928be1c7eb5edd3fb95d3234 - io.netty:netty:3.6.3.Final:jar:null:compile:1eebfd2f79dd72c44d09d9917c549c60322462b8 net.jcip:jcip-annotations:1.0:jar:null:compile:afba4942caaeaf46aab0b976afd57cc7c181467e net.sf.jopt-simple:jopt-simple:4.3:jar:null:compile:88ffca34311a6564a98f14820431e17b4382a069 org.slf4j:slf4j-api:1.7.5:jar:null:compile:6b262da268f8ad9eff941b25503a9198f0a0ac93 diff --git a/core/src/main/java/com/google/bitcoin/core/PeerGroup.java b/core/src/main/java/com/google/bitcoin/core/PeerGroup.java index 5fc1b253..5cb0f0b4 100644 --- a/core/src/main/java/com/google/bitcoin/core/PeerGroup.java +++ b/core/src/main/java/com/google/bitcoin/core/PeerGroup.java @@ -495,10 +495,6 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca } finally { lock.unlock(); } - // Don't do connectTo whilst holding the PeerGroup lock because this can trigger some amazingly deep stacks - // and potentially circular deadlock in the case of immediate failure (eg, attempt to access IPv6 node from - // a non-v6 capable machine). It doesn't relay control immediately to the netty boss thread as you may expect. - // // This method eventually constructs a Peer and puts it into pendingPeers. If the connection fails to establish, // handlePeerDeath will be called, which will potentially call this method again to replace the dead or failed // connection. @@ -764,7 +760,6 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca int newSize = -1; lock.lock(); try { - // Runs on a netty worker thread for every peer that is newly connected. Peer is not locked at this point. // Sets up the newly connected peer so it can do everything it needs to. log.info("{}: New peer", peer); pendingPeers.remove(peer); @@ -933,10 +928,6 @@ public class PeerGroup extends AbstractIdleService implements TransactionBroadca } protected void handlePeerDeath(final Peer peer) { - // This can run on any Netty worker thread. Because connectToAnyPeer() must run unlocked to avoid circular - // deadlock, this method must run largely unlocked too. Some members are thread-safe and others aren't, so - // we synchronize only the parts that need it. - // Peer deaths can occur during startup if a connect attempt after peer discovery aborts immediately. final State state = state(); if (state != State.RUNNING && state != State.STARTING) return; diff --git a/core/src/main/java/com/google/bitcoin/networkabstraction/BlockingClient.java b/core/src/main/java/com/google/bitcoin/networkabstraction/BlockingClient.java index e41590dd..fe87cd24 100644 --- a/core/src/main/java/com/google/bitcoin/networkabstraction/BlockingClient.java +++ b/core/src/main/java/com/google/bitcoin/networkabstraction/BlockingClient.java @@ -16,16 +16,16 @@ package com.google.bitcoin.networkabstraction; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.io.IOException; import java.io.InputStream; import java.net.Socket; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.util.Set; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; - -import org.slf4j.LoggerFactory; import static com.google.common.base.Preconditions.checkState; @@ -95,7 +95,7 @@ public class BlockingClient implements MessageWriteTarget { } } catch (Exception e) { if (!vCloseRequested) - log.error("Error trying to open/read from connection", e); + log.error("Error trying to open/read from connection: " + serverAddress, e); } finally { try { socket.close(); diff --git a/core/src/main/java/com/google/bitcoin/networkabstraction/NioClientManager.java b/core/src/main/java/com/google/bitcoin/networkabstraction/NioClientManager.java index e9611046..1ade70df 100644 --- a/core/src/main/java/com/google/bitcoin/networkabstraction/NioClientManager.java +++ b/core/src/main/java/com/google/bitcoin/networkabstraction/NioClientManager.java @@ -16,18 +16,15 @@ package com.google.bitcoin.networkabstraction; +import com.google.common.util.concurrent.AbstractExecutionThreadService; +import org.slf4j.LoggerFactory; + import java.io.IOException; import java.net.SocketAddress; import java.nio.channels.*; import java.nio.channels.spi.SelectorProvider; import java.util.*; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.Lock; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.util.concurrent.AbstractExecutionThreadService; -import org.slf4j.LoggerFactory; /** * A class which manages a set of client connections. Uses Java NIO to select network events and processes them in a @@ -149,6 +146,7 @@ public class NioClientManager extends AbstractExecutionThreadService implements newConnectionChannels.offer(new SocketChannelAndParser(sc, parser)); selector.wakeup(); } catch (IOException e) { + log.error("Could not connect to " + serverAddress); throw new RuntimeException(e); // This should only happen if we are, eg, out of system resources } } diff --git a/core/src/test/java/com/google/bitcoin/core/PeerGroupTest.java b/core/src/test/java/com/google/bitcoin/core/PeerGroupTest.java index 212eef0d..d03f1eb5 100644 --- a/core/src/test/java/com/google/bitcoin/core/PeerGroupTest.java +++ b/core/src/test/java/com/google/bitcoin/core/PeerGroupTest.java @@ -32,8 +32,8 @@ import org.junit.runners.Parameterized; import java.math.BigInteger; import java.net.InetSocketAddress; import java.util.*; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import static org.junit.Assert.*; @@ -70,23 +70,20 @@ public class PeerGroupTest extends TestWithPeerGroup { @Test public void listener() throws Exception { - final AtomicInteger connectedPeers = new AtomicInteger(0); - final AtomicInteger disconnectedPeers = new AtomicInteger(0); + final BlockingQueue connectedPeers = new LinkedBlockingQueue(); + final BlockingQueue disconnectedPeers = new LinkedBlockingQueue(); final SettableFuture firstDisconnectFuture = SettableFuture.create(); final SettableFuture secondDisconnectFuture = SettableFuture.create(); final Map peerToMessageCount = new HashMap(); AbstractPeerEventListener listener = new AbstractPeerEventListener() { @Override public void onPeerConnected(Peer peer, int peerCount) { - connectedPeers.incrementAndGet(); + connectedPeers.add(peer); } @Override public void onPeerDisconnected(Peer peer, int peerCount) { - if (disconnectedPeers.incrementAndGet() == 1) - firstDisconnectFuture.set(null); - else - secondDisconnectFuture.set(null); + disconnectedPeers.add(peer); } @Override @@ -106,54 +103,50 @@ public class PeerGroupTest extends TestWithPeerGroup { // Create a couple of peers. InboundMessageQueuer p1 = connectPeer(1); - Threading.waitForUserCode(); - assertEquals(1, connectedPeers.get()); InboundMessageQueuer p2 = connectPeer(2); - Threading.waitForUserCode(); - assertEquals(2, connectedPeers.get()); + connectedPeers.take(); + connectedPeers.take(); pingAndWait(p1); pingAndWait(p2); Threading.waitForUserCode(); - assertEquals(0, disconnectedPeers.get()); + assertEquals(0, disconnectedPeers.size()); p1.close(); - firstDisconnectFuture.get(); - assertEquals(1, disconnectedPeers.get()); + disconnectedPeers.take(); + assertEquals(0, disconnectedPeers.size()); p2.close(); - secondDisconnectFuture.get(); - assertEquals(2, disconnectedPeers.get()); + disconnectedPeers.take(); + assertEquals(0, disconnectedPeers.size()); assertTrue(peerGroup.removeEventListener(listener)); assertFalse(peerGroup.removeEventListener(listener)); } @Test - public void peerDiscoveryPolling() throws Exception { + public void peerDiscoveryPolling() throws InterruptedException { // Check that if peer discovery fails, we keep trying until we have some nodes to talk with. - final Semaphore sem = new Semaphore(0); - final boolean[] result = new boolean[1]; - result[0] = false; + final CountDownLatch latch = new CountDownLatch(1); + final AtomicBoolean result = new AtomicBoolean(); peerGroup.addPeerDiscovery(new PeerDiscovery() { public InetSocketAddress[] getPeers(long unused, TimeUnit unused2) throws PeerDiscoveryException { - if (result[0] == false) { + if (!result.getAndSet(true)) { // Pretend we are not connected to the internet. - result[0] = true; throw new PeerDiscoveryException("test failure"); } else { // Return a bogus address. - sem.release(); - return new InetSocketAddress[]{new InetSocketAddress("localhost", 0)}; + latch.countDown(); + return new InetSocketAddress[]{new InetSocketAddress("localhost", 1)}; } } public void shutdown() { } }); peerGroup.startAndWait(); - sem.acquire(); + latch.await(); // Check that we did indeed throw an exception. If we got here it means we threw and then PeerGroup tried // again a bit later. - assertTrue(result[0]); + assertTrue(result.get()); } @Test