From 38bf9a8a80d42fbb15ac3fa9b77354ca3521e387 Mon Sep 17 00:00:00 2001 From: Mike Hearn Date: Tue, 8 Mar 2011 13:18:33 +0000 Subject: [PATCH] Change how socket errors are handled in NetworkConnection and Peer. This allows for cleaner shutdown and simplifies the code a bit. Get rid of some unchecked conversion warnings to do with getblock futures. --- .../bitcoin/core/NetworkConnection.java | 50 +++++++++-------- src/com/google/bitcoin/core/Peer.java | 56 +++++++++---------- 2 files changed, 51 insertions(+), 55 deletions(-) diff --git a/src/com/google/bitcoin/core/NetworkConnection.java b/src/com/google/bitcoin/core/NetworkConnection.java index 0d528ec3..7c9fe6b4 100644 --- a/src/com/google/bitcoin/core/NetworkConnection.java +++ b/src/com/google/bitcoin/core/NetworkConnection.java @@ -22,6 +22,7 @@ import java.io.OutputStream; import java.io.UnsupportedEncodingException; import java.net.InetAddress; import java.net.Socket; +import java.net.SocketException; import static com.google.bitcoin.core.Utils.*; @@ -49,7 +50,8 @@ public class NetworkConnection { private final Socket socket; private final OutputStream out; private final InputStream in; - + // The IP address to which we are connecting. + private InetAddress remoteIp; private boolean usesChecksumming; private final NetworkParameters params; static final private boolean PROTOCOL_LOG = false; @@ -64,6 +66,7 @@ public class NetworkConnection { */ public NetworkConnection(InetAddress remoteIp, NetworkParameters params) throws IOException, ProtocolException { this.params = params; + this.remoteIp = remoteIp; socket = new Socket(remoteIp, params.port); out = socket.getOutputStream(); in = socket.getInputStream(); @@ -101,13 +104,19 @@ public class NetworkConnection { socket.close(); } + @Override + public String toString() { + return "[" + remoteIp.getHostAddress() + "]:" + params.port + " (" + (socket.isConnected() ? "connected" : + "disconnected") + ")"; + } + /** * Reads a network message from the wire, blocking until the message is fully received. * - * @return An instance of a Message subclass. - * @throws ProtocolException if the message is badly formatted, failed checksum or there was a protocol failure. + * @return An instance of a Message subclass + * @throws ProtocolException if the message is badly formatted, failed checksum or there was a TCP failure. */ - public Message readMessage() throws ProtocolException { + public Message readMessage() throws IOException, ProtocolException { // A BitCoin protocol message has the following format. // // - 4 byte magic number: 0xfabfb5da for the testnet or @@ -120,17 +129,14 @@ public class NetworkConnection { // The checksum is the first 4 bytes of a SHA256 hash of the message payload. It isn't // present for all messages, notably, the first one on a connection. byte[] header = new byte[4 + COMMAND_LEN + 4 + (usesChecksumming ? 4 : 0)]; - try { - int readCursor = 0; - while (readCursor < header.length) { - int bytesRead = in.read(header, readCursor, header.length - readCursor); - if (bytesRead == -1) { - throw new ProtocolException("Socket disconnected half way through a message"); - } - readCursor += bytesRead; + int readCursor = 0; + while (readCursor < header.length) { + int bytesRead = in.read(header, readCursor, header.length - readCursor); + if (bytesRead == -1) { + // There's no more data to read. + throw new IOException("Socket is disconnected"); } - } catch (IOException e) { - throw new ProtocolException(e); + readCursor += bytesRead; } int cursor = 0; @@ -171,18 +177,14 @@ public class NetworkConnection { } // Now try to read the whole message. - int readCursor = 0; + readCursor = 0; byte[] payloadBytes = new byte[size]; - try { - while (readCursor < payloadBytes.length - 1) { - int bytesRead = in.read(payloadBytes, readCursor, size - readCursor); - if (bytesRead == -1) { - throw new ProtocolException("Socket disconnected half way through a message"); - } - readCursor += bytesRead; + while (readCursor < payloadBytes.length - 1) { + int bytesRead = in.read(payloadBytes, readCursor, size - readCursor); + if (bytesRead == -1) { + throw new ProtocolException("Socket disconnected half way through a message"); } - } catch (IOException e) { - throw new ProtocolException(e); + readCursor += bytesRead; } // Verify the checksum. diff --git a/src/com/google/bitcoin/core/Peer.java b/src/com/google/bitcoin/core/Peer.java index 00de2b29..546de5c3 100644 --- a/src/com/google/bitcoin/core/Peer.java +++ b/src/com/google/bitcoin/core/Peer.java @@ -35,6 +35,8 @@ public class Peer { private final NetworkConnection conn; private final NetworkParameters params; private Thread thread; + // Whether the peer thread is supposed to be running or not. Set to false during shutdown so the peer thread + // knows to quit when the socket goes away. private boolean running; private final BlockChain blockChain; @@ -42,7 +44,7 @@ public class Peer { private CountDownLatch chainCompletionLatch; // When we want to download a block or transaction from a peer, the InventoryItem is put here whilst waiting for // the response. Synchronized on itself. - private List pendingGetDataFutures; + private List> pendingGetBlockFutures; /** * Construct a peer that handles the given network connection and reads/writes from the given block chain. Note that @@ -52,7 +54,7 @@ public class Peer { this.conn = conn; this.params = params; this.blockChain = blockChain; - this.pendingGetDataFutures = new ArrayList(); + this.pendingGetBlockFutures = new ArrayList>(); } /** Starts the background thread that processes messages. */ @@ -62,6 +64,10 @@ public class Peer { Peer.this.run(); } }); + synchronized (this) { + running = true; + } + this.thread.setName("Bitcoin peer thread: " + conn.toString()); this.thread.start(); } @@ -70,17 +76,8 @@ public class Peer { */ private void run() { assert Thread.currentThread() == thread; - // Synchronization here is unnecessary but makes FindBugs happy. - synchronized (this) { - this.running = true; - } - while (true) { - synchronized (this) { - if (!this.running) break; - } - // TODO: This isn't a good way to handle network I/O, we should probably switch to nio. - // TODO: Improve the exception handling here. - try { + try { + while (true) { Message m = conn.readMessage(); if (m instanceof InventoryMessage) { processInv((InventoryMessage) m); @@ -94,18 +91,15 @@ public class Peer { // TODO: Handle the other messages we can receive. LOG("Received unhandled message: " + m.toString()); } - } catch (ProtocolException e) { - if (!(e.getCause() instanceof SocketException)) { - LOG(e.toString()); - e.printStackTrace(); - } else { - // Time to die ... - break; - } - } catch (IOException e) { - LOG(e.toString()); + } + } catch (Exception e) { + if (e instanceof IOException && !running) { + // This exception was expected because we are tearing down the socket as part of quitting. + LOG("Shutting down peer thread"); + } else { + // We caught an unexpected exception. + System.err.println(e.toString()); e.printStackTrace(); - break; } } synchronized (this) { @@ -120,14 +114,14 @@ public class Peer { assert Thread.currentThread() == thread; try { // Was this block requested by getBlock()? - synchronized (pendingGetDataFutures) { - for (int i = 0; i < pendingGetDataFutures.size(); i++) { - GetDataFuture f = pendingGetDataFutures.get(i); + synchronized (pendingGetBlockFutures) { + for (int i = 0; i < pendingGetBlockFutures.size(); i++) { + GetDataFuture f = pendingGetBlockFutures.get(i); if (Arrays.equals(f.getItem().hash, m.getHash())) { // Yes, it was. So pass it through the future. f.setResult(m); // Blocks explicitly requested don't get sent to the block chain. - pendingGetDataFutures.remove(i); + pendingGetBlockFutures.remove(i); return; } } @@ -210,11 +204,11 @@ public class Peer { InventoryMessage getdata = new InventoryMessage(params); InventoryItem inventoryItem = new InventoryItem(InventoryItem.Type.Block, blockHash); getdata.items.add(inventoryItem); - GetDataFuture future = new GetDataFuture(inventoryItem); + GetDataFuture future = new GetDataFuture(inventoryItem); // Add to the list of things we're waiting for. It's important this come before the network send to avoid // race conditions. - synchronized (pendingGetDataFutures) { - pendingGetDataFutures.add(future); + synchronized (pendingGetBlockFutures) { + pendingGetBlockFutures.add(future); } conn.writeMessage(NetworkConnection.MSG_GETDATA, getdata); return future;