3
0
mirror of https://github.com/Qortal/altcoinj.git synced 2025-02-15 03:35:52 +00:00

Only do network writes on the handler thread in nio wrappers

This commit is contained in:
Matt Corallo 2013-11-27 18:09:59 -05:00 committed by Mike Hearn
parent abe9513dde
commit ba543a3b10
3 changed files with 51 additions and 12 deletions

View File

@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.GuardedBy;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.util.Arrays; import java.util.Arrays;
@ -96,6 +97,14 @@ class ConnectionHandler implements MessageWriteTarget {
checkState(connectedHandlers.add(this)); checkState(connectedHandlers.add(this));
} }
@GuardedBy("lock")
private void setWriteOps() {
// Make sure we are registered to get updated when writing is available again
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
// Refresh the selector to make sure it gets the new interestOps
key.selector().wakeup();
}
// Tries to write any outstanding write bytes, runs in any thread (possibly unlocked) // Tries to write any outstanding write bytes, runs in any thread (possibly unlocked)
private void tryWriteBytes() throws IOException { private void tryWriteBytes() throws IOException {
lock.lock(); lock.lock();
@ -108,10 +117,7 @@ class ConnectionHandler implements MessageWriteTarget {
if (!buff.hasRemaining()) if (!buff.hasRemaining())
bytesIterator.remove(); bytesIterator.remove();
else { else {
// Make sure we are registered to get updated when writing is available again setWriteOps();
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
// Refresh the selector to make sure it gets the new interestOps
key.selector().wakeup();
break; break;
} }
} }
@ -139,12 +145,17 @@ class ConnectionHandler implements MessageWriteTarget {
// TODO: Kill the needless message duplication when the write completes right away // TODO: Kill the needless message duplication when the write completes right away
bytesToWrite.offer(ByteBuffer.wrap(Arrays.copyOf(message, message.length))); bytesToWrite.offer(ByteBuffer.wrap(Arrays.copyOf(message, message.length)));
bytesToWriteRemaining += message.length; bytesToWriteRemaining += message.length;
tryWriteBytes(); setWriteOps();
} catch (IOException e) { } catch (IOException e) {
lock.unlock(); lock.unlock();
log.error("Error writing message to connection, closing connection", e); log.error("Error writing message to connection, closing connection", e);
closeConnection(); closeConnection();
throw e; throw e;
} catch (CancelledKeyException e) {
lock.unlock();
log.error("Error writing message to connection, closing connection", e);
closeConnection();
throw new IOException(e);
} }
lock.unlock(); lock.unlock();
} }
@ -211,7 +222,7 @@ class ConnectionHandler implements MessageWriteTarget {
} catch (Exception e) { } catch (Exception e) {
// This can happen eg if the channel closes while the thread is about to get killed // This can happen eg if the channel closes while the thread is about to get killed
// (ClosedByInterruptException), or if handler.parser.receiveBytes throws something // (ClosedByInterruptException), or if handler.parser.receiveBytes throws something
log.error("Error handling SelectionKey: " + e.getMessage()); log.error("Error handling SelectionKey: ", e);
if (handler != null) if (handler != null)
handler.closeConnection(); handler.closeConnection();
} }

View File

@ -34,6 +34,7 @@ import java.io.OutputStream;
import java.math.BigInteger; import java.math.BigInteger;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketException; import java.net.SocketException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.util.ArrayList; import java.util.ArrayList;
@ -803,8 +804,8 @@ public class PeerTest extends TestWithNetworkConnections {
peer.writeTarget.writeBytes(new byte[1]); peer.writeTarget.writeBytes(new byte[1]);
fail(); fail();
} catch (IOException e) { } catch (IOException e) {
assertTrue(e instanceof ClosedChannelException || assertTrue((e.getCause() != null && e.getCause() instanceof CancelledKeyException)
(e instanceof SocketException && e.getMessage().equals("Socket is closed"))); || (e instanceof SocketException && e.getMessage().equals("Socket is closed")));
} }
} }
@ -881,6 +882,13 @@ public class PeerTest extends TestWithNetworkConnections {
} }
}; };
connect(); // Writes out a verack+version. connect(); // Writes out a verack+version.
final SettableFuture<Void> peerDisconnected = SettableFuture.create();
writeTarget.peer.addEventListener(new AbstractPeerEventListener() {
@Override
public void onPeerDisconnected(Peer p, int peerCount) {
peerDisconnected.set(null);
}
});
final NetworkParameters params = TestNet3Params.testNet(); final NetworkParameters params = TestNet3Params.testNet();
BitcoinSerializer serializer = new BitcoinSerializer(params); BitcoinSerializer serializer = new BitcoinSerializer(params);
// Now write some bogus truncated message. // Now write some bogus truncated message.
@ -908,12 +916,13 @@ public class PeerTest extends TestWithNetworkConnections {
} catch (ExecutionException e) { } catch (ExecutionException e) {
assertTrue(e.getCause() instanceof ProtocolException); assertTrue(e.getCause() instanceof ProtocolException);
} }
peerDisconnected.get();
try { try {
peer.writeTarget.writeBytes(new byte[1]); peer.writeTarget.writeBytes(new byte[1]);
fail(); fail();
} catch (IOException e) { } catch (IOException e) {
assertTrue(e instanceof ClosedChannelException || assertTrue((e.getCause() != null && e.getCause() instanceof CancelledKeyException)
(e instanceof SocketException && e.getMessage().equals("Socket is closed"))); || (e instanceof SocketException && e.getMessage().equals("Socket is closed")));
} }
} }
} }

View File

@ -30,6 +30,7 @@ import java.net.InetSocketAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkArgument;
@ -111,6 +112,17 @@ public class TestWithNetworkConnections {
protected InboundMessageQueuer connect(Peer peer, VersionMessage versionMessage) throws Exception { protected InboundMessageQueuer connect(Peer peer, VersionMessage versionMessage) throws Exception {
checkArgument(versionMessage.hasBlockChain()); checkArgument(versionMessage.hasBlockChain());
final AtomicBoolean doneConnecting = new AtomicBoolean(false);
final Thread thisThread = Thread.currentThread();
peer.addEventListener(new AbstractPeerEventListener() {
@Override
public void onPeerDisconnected(Peer p, int peerCount) {
synchronized (doneConnecting) {
if (!doneConnecting.get())
thisThread.interrupt();
}
}
});
if (clientType == ClientType.NIO_CLIENT_MANAGER || clientType == ClientType.BLOCKING_CLIENT_MANAGER) if (clientType == ClientType.NIO_CLIENT_MANAGER || clientType == ClientType.BLOCKING_CLIENT_MANAGER)
channels.openConnection(new InetSocketAddress("127.0.0.1", 2000), peer); channels.openConnection(new InetSocketAddress("127.0.0.1", 2000), peer);
else if (clientType == ClientType.NIO_CLIENT) else if (clientType == ClientType.NIO_CLIENT)
@ -125,8 +137,15 @@ public class TestWithNetworkConnections {
// Complete handshake with the peer - send/receive version(ack)s, receive bloom filter // Complete handshake with the peer - send/receive version(ack)s, receive bloom filter
writeTarget.sendMessage(versionMessage); writeTarget.sendMessage(versionMessage);
writeTarget.sendMessage(new VersionAck()); writeTarget.sendMessage(new VersionAck());
assertTrue(writeTarget.nextMessageBlocking() instanceof VersionMessage); try {
assertTrue(writeTarget.nextMessageBlocking() instanceof VersionAck); assertTrue(writeTarget.nextMessageBlocking() instanceof VersionMessage);
assertTrue(writeTarget.nextMessageBlocking() instanceof VersionAck);
synchronized (doneConnecting) {
doneConnecting.set(true);
}
} catch (InterruptedException e) {
// We were disconnected before we got back version/verack
}
return writeTarget; return writeTarget;
} }