3
0
mirror of https://github.com/Qortal/altcoinj.git synced 2025-02-11 17:55:53 +00:00

Change how socket errors are handled in NetworkConnection and Peer. This allows for cleaner shutdown and simplifies

the code a bit.
 
Get rid of some unchecked conversion warnings to do with getblock futures.
This commit is contained in:
Mike Hearn 2011-03-08 13:18:33 +00:00
parent 66321804e7
commit 38bf9a8a80
2 changed files with 51 additions and 55 deletions

View File

@ -22,6 +22,7 @@ import java.io.OutputStream;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.Socket; import java.net.Socket;
import java.net.SocketException;
import static com.google.bitcoin.core.Utils.*; import static com.google.bitcoin.core.Utils.*;
@ -49,7 +50,8 @@ public class NetworkConnection {
private final Socket socket; private final Socket socket;
private final OutputStream out; private final OutputStream out;
private final InputStream in; private final InputStream in;
// The IP address to which we are connecting.
private InetAddress remoteIp;
private boolean usesChecksumming; private boolean usesChecksumming;
private final NetworkParameters params; private final NetworkParameters params;
static final private boolean PROTOCOL_LOG = false; static final private boolean PROTOCOL_LOG = false;
@ -64,6 +66,7 @@ public class NetworkConnection {
*/ */
public NetworkConnection(InetAddress remoteIp, NetworkParameters params) throws IOException, ProtocolException { public NetworkConnection(InetAddress remoteIp, NetworkParameters params) throws IOException, ProtocolException {
this.params = params; this.params = params;
this.remoteIp = remoteIp;
socket = new Socket(remoteIp, params.port); socket = new Socket(remoteIp, params.port);
out = socket.getOutputStream(); out = socket.getOutputStream();
in = socket.getInputStream(); in = socket.getInputStream();
@ -101,13 +104,19 @@ public class NetworkConnection {
socket.close(); socket.close();
} }
@Override
public String toString() {
return "[" + remoteIp.getHostAddress() + "]:" + params.port + " (" + (socket.isConnected() ? "connected" :
"disconnected") + ")";
}
/** /**
* Reads a network message from the wire, blocking until the message is fully received. * Reads a network message from the wire, blocking until the message is fully received.
* *
* @return An instance of a Message subclass. * @return An instance of a Message subclass
* @throws ProtocolException if the message is badly formatted, failed checksum or there was a protocol failure. * @throws ProtocolException if the message is badly formatted, failed checksum or there was a TCP failure.
*/ */
public Message readMessage() throws ProtocolException { public Message readMessage() throws IOException, ProtocolException {
// A BitCoin protocol message has the following format. // A BitCoin protocol message has the following format.
// //
// - 4 byte magic number: 0xfabfb5da for the testnet or // - 4 byte magic number: 0xfabfb5da for the testnet or
@ -120,17 +129,14 @@ public class NetworkConnection {
// The checksum is the first 4 bytes of a SHA256 hash of the message payload. It isn't // The checksum is the first 4 bytes of a SHA256 hash of the message payload. It isn't
// present for all messages, notably, the first one on a connection. // present for all messages, notably, the first one on a connection.
byte[] header = new byte[4 + COMMAND_LEN + 4 + (usesChecksumming ? 4 : 0)]; byte[] header = new byte[4 + COMMAND_LEN + 4 + (usesChecksumming ? 4 : 0)];
try { int readCursor = 0;
int readCursor = 0; while (readCursor < header.length) {
while (readCursor < header.length) { int bytesRead = in.read(header, readCursor, header.length - readCursor);
int bytesRead = in.read(header, readCursor, header.length - readCursor); if (bytesRead == -1) {
if (bytesRead == -1) { // There's no more data to read.
throw new ProtocolException("Socket disconnected half way through a message"); throw new IOException("Socket is disconnected");
}
readCursor += bytesRead;
} }
} catch (IOException e) { readCursor += bytesRead;
throw new ProtocolException(e);
} }
int cursor = 0; int cursor = 0;
@ -171,18 +177,14 @@ public class NetworkConnection {
} }
// Now try to read the whole message. // Now try to read the whole message.
int readCursor = 0; readCursor = 0;
byte[] payloadBytes = new byte[size]; byte[] payloadBytes = new byte[size];
try { while (readCursor < payloadBytes.length - 1) {
while (readCursor < payloadBytes.length - 1) { int bytesRead = in.read(payloadBytes, readCursor, size - readCursor);
int bytesRead = in.read(payloadBytes, readCursor, size - readCursor); if (bytesRead == -1) {
if (bytesRead == -1) { throw new ProtocolException("Socket disconnected half way through a message");
throw new ProtocolException("Socket disconnected half way through a message");
}
readCursor += bytesRead;
} }
} catch (IOException e) { readCursor += bytesRead;
throw new ProtocolException(e);
} }
// Verify the checksum. // Verify the checksum.

View File

@ -35,6 +35,8 @@ public class Peer {
private final NetworkConnection conn; private final NetworkConnection conn;
private final NetworkParameters params; private final NetworkParameters params;
private Thread thread; private Thread thread;
// Whether the peer thread is supposed to be running or not. Set to false during shutdown so the peer thread
// knows to quit when the socket goes away.
private boolean running; private boolean running;
private final BlockChain blockChain; private final BlockChain blockChain;
@ -42,7 +44,7 @@ public class Peer {
private CountDownLatch chainCompletionLatch; private CountDownLatch chainCompletionLatch;
// When we want to download a block or transaction from a peer, the InventoryItem is put here whilst waiting for // When we want to download a block or transaction from a peer, the InventoryItem is put here whilst waiting for
// the response. Synchronized on itself. // the response. Synchronized on itself.
private List<GetDataFuture> pendingGetDataFutures; private List<GetDataFuture<Block>> pendingGetBlockFutures;
/** /**
* Construct a peer that handles the given network connection and reads/writes from the given block chain. Note that * Construct a peer that handles the given network connection and reads/writes from the given block chain. Note that
@ -52,7 +54,7 @@ public class Peer {
this.conn = conn; this.conn = conn;
this.params = params; this.params = params;
this.blockChain = blockChain; this.blockChain = blockChain;
this.pendingGetDataFutures = new ArrayList<GetDataFuture>(); this.pendingGetBlockFutures = new ArrayList<GetDataFuture<Block>>();
} }
/** Starts the background thread that processes messages. */ /** Starts the background thread that processes messages. */
@ -62,6 +64,10 @@ public class Peer {
Peer.this.run(); Peer.this.run();
} }
}); });
synchronized (this) {
running = true;
}
this.thread.setName("Bitcoin peer thread: " + conn.toString());
this.thread.start(); this.thread.start();
} }
@ -70,17 +76,8 @@ public class Peer {
*/ */
private void run() { private void run() {
assert Thread.currentThread() == thread; assert Thread.currentThread() == thread;
// Synchronization here is unnecessary but makes FindBugs happy. try {
synchronized (this) { while (true) {
this.running = true;
}
while (true) {
synchronized (this) {
if (!this.running) break;
}
// TODO: This isn't a good way to handle network I/O, we should probably switch to nio.
// TODO: Improve the exception handling here.
try {
Message m = conn.readMessage(); Message m = conn.readMessage();
if (m instanceof InventoryMessage) { if (m instanceof InventoryMessage) {
processInv((InventoryMessage) m); processInv((InventoryMessage) m);
@ -94,18 +91,15 @@ public class Peer {
// TODO: Handle the other messages we can receive. // TODO: Handle the other messages we can receive.
LOG("Received unhandled message: " + m.toString()); LOG("Received unhandled message: " + m.toString());
} }
} catch (ProtocolException e) { }
if (!(e.getCause() instanceof SocketException)) { } catch (Exception e) {
LOG(e.toString()); if (e instanceof IOException && !running) {
e.printStackTrace(); // This exception was expected because we are tearing down the socket as part of quitting.
} else { LOG("Shutting down peer thread");
// Time to die ... } else {
break; // We caught an unexpected exception.
} System.err.println(e.toString());
} catch (IOException e) {
LOG(e.toString());
e.printStackTrace(); e.printStackTrace();
break;
} }
} }
synchronized (this) { synchronized (this) {
@ -120,14 +114,14 @@ public class Peer {
assert Thread.currentThread() == thread; assert Thread.currentThread() == thread;
try { try {
// Was this block requested by getBlock()? // Was this block requested by getBlock()?
synchronized (pendingGetDataFutures) { synchronized (pendingGetBlockFutures) {
for (int i = 0; i < pendingGetDataFutures.size(); i++) { for (int i = 0; i < pendingGetBlockFutures.size(); i++) {
GetDataFuture f = pendingGetDataFutures.get(i); GetDataFuture<Block> f = pendingGetBlockFutures.get(i);
if (Arrays.equals(f.getItem().hash, m.getHash())) { if (Arrays.equals(f.getItem().hash, m.getHash())) {
// Yes, it was. So pass it through the future. // Yes, it was. So pass it through the future.
f.setResult(m); f.setResult(m);
// Blocks explicitly requested don't get sent to the block chain. // Blocks explicitly requested don't get sent to the block chain.
pendingGetDataFutures.remove(i); pendingGetBlockFutures.remove(i);
return; return;
} }
} }
@ -210,11 +204,11 @@ public class Peer {
InventoryMessage getdata = new InventoryMessage(params); InventoryMessage getdata = new InventoryMessage(params);
InventoryItem inventoryItem = new InventoryItem(InventoryItem.Type.Block, blockHash); InventoryItem inventoryItem = new InventoryItem(InventoryItem.Type.Block, blockHash);
getdata.items.add(inventoryItem); getdata.items.add(inventoryItem);
GetDataFuture future = new GetDataFuture(inventoryItem); GetDataFuture<Block> future = new GetDataFuture<Block>(inventoryItem);
// Add to the list of things we're waiting for. It's important this come before the network send to avoid // Add to the list of things we're waiting for. It's important this come before the network send to avoid
// race conditions. // race conditions.
synchronized (pendingGetDataFutures) { synchronized (pendingGetBlockFutures) {
pendingGetDataFutures.add(future); pendingGetBlockFutures.add(future);
} }
conn.writeMessage(NetworkConnection.MSG_GETDATA, getdata); conn.writeMessage(NetworkConnection.MSG_GETDATA, getdata);
return future; return future;