diff --git a/core/src/main/java/com/google/bitcoin/networkabstraction/ConnectionHandler.java b/core/src/main/java/com/google/bitcoin/networkabstraction/ConnectionHandler.java index eb1fa522..75847df0 100644 --- a/core/src/main/java/com/google/bitcoin/networkabstraction/ConnectionHandler.java +++ b/core/src/main/java/com/google/bitcoin/networkabstraction/ConnectionHandler.java @@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory; import javax.annotation.concurrent.GuardedBy; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.channels.CancelledKeyException; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.util.Arrays; @@ -96,6 +97,14 @@ class ConnectionHandler implements MessageWriteTarget { checkState(connectedHandlers.add(this)); } + @GuardedBy("lock") + private void setWriteOps() { + // Make sure we are registered to get updated when writing is available again + key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); + // Refresh the selector to make sure it gets the new interestOps + key.selector().wakeup(); + } + // Tries to write any outstanding write bytes, runs in any thread (possibly unlocked) private void tryWriteBytes() throws IOException { lock.lock(); @@ -108,10 +117,7 @@ class ConnectionHandler implements MessageWriteTarget { if (!buff.hasRemaining()) bytesIterator.remove(); else { - // Make sure we are registered to get updated when writing is available again - key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); - // Refresh the selector to make sure it gets the new interestOps - key.selector().wakeup(); + setWriteOps(); break; } } @@ -139,12 +145,17 @@ class ConnectionHandler implements MessageWriteTarget { // TODO: Kill the needless message duplication when the write completes right away bytesToWrite.offer(ByteBuffer.wrap(Arrays.copyOf(message, message.length))); bytesToWriteRemaining += message.length; - tryWriteBytes(); + setWriteOps(); } catch (IOException e) { lock.unlock(); log.error("Error writing message to connection, closing connection", e); closeConnection(); throw e; + } catch (CancelledKeyException e) { + lock.unlock(); + log.error("Error writing message to connection, closing connection", e); + closeConnection(); + throw new IOException(e); } lock.unlock(); } @@ -211,7 +222,7 @@ class ConnectionHandler implements MessageWriteTarget { } catch (Exception e) { // This can happen eg if the channel closes while the thread is about to get killed // (ClosedByInterruptException), or if handler.parser.receiveBytes throws something - log.error("Error handling SelectionKey: " + e.getMessage()); + log.error("Error handling SelectionKey: ", e); if (handler != null) handler.closeConnection(); } diff --git a/core/src/test/java/com/google/bitcoin/core/PeerTest.java b/core/src/test/java/com/google/bitcoin/core/PeerTest.java index 5a8f619e..d17683f3 100644 --- a/core/src/test/java/com/google/bitcoin/core/PeerTest.java +++ b/core/src/test/java/com/google/bitcoin/core/PeerTest.java @@ -34,6 +34,7 @@ import java.io.OutputStream; import java.math.BigInteger; import java.net.InetSocketAddress; import java.net.SocketException; +import java.nio.channels.CancelledKeyException; import java.nio.channels.ClosedChannelException; import java.nio.charset.Charset; import java.util.ArrayList; @@ -803,8 +804,8 @@ public class PeerTest extends TestWithNetworkConnections { peer.writeTarget.writeBytes(new byte[1]); fail(); } catch (IOException e) { - assertTrue(e instanceof ClosedChannelException || - (e instanceof SocketException && e.getMessage().equals("Socket is closed"))); + assertTrue((e.getCause() != null && e.getCause() instanceof CancelledKeyException) + || (e instanceof SocketException && e.getMessage().equals("Socket is closed"))); } } @@ -881,6 +882,13 @@ public class PeerTest extends TestWithNetworkConnections { } }; connect(); // Writes out a verack+version. + final SettableFuture peerDisconnected = SettableFuture.create(); + writeTarget.peer.addEventListener(new AbstractPeerEventListener() { + @Override + public void onPeerDisconnected(Peer p, int peerCount) { + peerDisconnected.set(null); + } + }); final NetworkParameters params = TestNet3Params.testNet(); BitcoinSerializer serializer = new BitcoinSerializer(params); // Now write some bogus truncated message. @@ -908,12 +916,13 @@ public class PeerTest extends TestWithNetworkConnections { } catch (ExecutionException e) { assertTrue(e.getCause() instanceof ProtocolException); } + peerDisconnected.get(); try { peer.writeTarget.writeBytes(new byte[1]); fail(); } catch (IOException e) { - assertTrue(e instanceof ClosedChannelException || - (e instanceof SocketException && e.getMessage().equals("Socket is closed"))); + assertTrue((e.getCause() != null && e.getCause() instanceof CancelledKeyException) + || (e instanceof SocketException && e.getMessage().equals("Socket is closed"))); } } } diff --git a/core/src/test/java/com/google/bitcoin/core/TestWithNetworkConnections.java b/core/src/test/java/com/google/bitcoin/core/TestWithNetworkConnections.java index 287f4e24..1788a10b 100644 --- a/core/src/test/java/com/google/bitcoin/core/TestWithNetworkConnections.java +++ b/core/src/test/java/com/google/bitcoin/core/TestWithNetworkConnections.java @@ -30,6 +30,7 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nullable; import static com.google.common.base.Preconditions.checkArgument; @@ -111,6 +112,17 @@ public class TestWithNetworkConnections { protected InboundMessageQueuer connect(Peer peer, VersionMessage versionMessage) throws Exception { checkArgument(versionMessage.hasBlockChain()); + final AtomicBoolean doneConnecting = new AtomicBoolean(false); + final Thread thisThread = Thread.currentThread(); + peer.addEventListener(new AbstractPeerEventListener() { + @Override + public void onPeerDisconnected(Peer p, int peerCount) { + synchronized (doneConnecting) { + if (!doneConnecting.get()) + thisThread.interrupt(); + } + } + }); if (clientType == ClientType.NIO_CLIENT_MANAGER || clientType == ClientType.BLOCKING_CLIENT_MANAGER) channels.openConnection(new InetSocketAddress("127.0.0.1", 2000), peer); else if (clientType == ClientType.NIO_CLIENT) @@ -125,8 +137,15 @@ public class TestWithNetworkConnections { // Complete handshake with the peer - send/receive version(ack)s, receive bloom filter writeTarget.sendMessage(versionMessage); writeTarget.sendMessage(new VersionAck()); - assertTrue(writeTarget.nextMessageBlocking() instanceof VersionMessage); - assertTrue(writeTarget.nextMessageBlocking() instanceof VersionAck); + try { + assertTrue(writeTarget.nextMessageBlocking() instanceof VersionMessage); + assertTrue(writeTarget.nextMessageBlocking() instanceof VersionAck); + synchronized (doneConnecting) { + doneConnecting.set(true); + } + } catch (InterruptedException e) { + // We were disconnected before we got back version/verack + } return writeTarget; }