diff --git a/core/src/main/java/org/bitcoinj/core/Peer.java b/core/src/main/java/org/bitcoinj/core/Peer.java index 559805df..1a6baafb 100644 --- a/core/src/main/java/org/bitcoinj/core/Peer.java +++ b/core/src/main/java/org/bitcoinj/core/Peer.java @@ -18,6 +18,7 @@ package org.bitcoinj.core; import com.google.common.base.*; import com.google.common.base.Objects; +import org.bitcoinj.net.StreamConnection; import org.bitcoinj.core.listeners.PeerConnectionEventListener; import org.bitcoinj.core.listeners.PeerDataEventListener; import org.bitcoinj.store.BlockStore; @@ -162,9 +163,9 @@ public class Peer extends PeerSocketHandler { * *

Note that this does NOT make a connection to the given remoteAddress, it only creates a handler for a * connection. If you want to create a one-off connection, create a Peer and pass it to - * {@link org.bitcoinj.net.NioClientManager#openConnection(java.net.SocketAddress, org.bitcoinj.net.StreamParser)} + * {@link org.bitcoinj.net.NioClientManager#openConnection(java.net.SocketAddress, StreamConnection)} * or - * {@link org.bitcoinj.net.NioClient#NioClient(java.net.SocketAddress, org.bitcoinj.net.StreamParser, int)}.

+ * {@link org.bitcoinj.net.NioClient#NioClient(java.net.SocketAddress, StreamConnection, int)}.

* *

The remoteAddress provided should match the remote address of the peer which is being connected to, and is * used to keep track of which peers relayed transactions and offer more descriptive logging.

@@ -180,9 +181,9 @@ public class Peer extends PeerSocketHandler { * *

Note that this does NOT make a connection to the given remoteAddress, it only creates a handler for a * connection. If you want to create a one-off connection, create a Peer and pass it to - * {@link org.bitcoinj.net.NioClientManager#openConnection(java.net.SocketAddress, org.bitcoinj.net.StreamParser)} + * {@link org.bitcoinj.net.NioClientManager#openConnection(java.net.SocketAddress, StreamConnection)} * or - * {@link org.bitcoinj.net.NioClient#NioClient(java.net.SocketAddress, org.bitcoinj.net.StreamParser, int)}.

+ * {@link org.bitcoinj.net.NioClient#NioClient(java.net.SocketAddress, StreamConnection, int)}.

* *

The remoteAddress provided should match the remote address of the peer which is being connected to, and is * used to keep track of which peers relayed transactions and offer more descriptive logging.

@@ -199,9 +200,9 @@ public class Peer extends PeerSocketHandler { * *

Note that this does NOT make a connection to the given remoteAddress, it only creates a handler for a * connection. If you want to create a one-off connection, create a Peer and pass it to - * {@link org.bitcoinj.net.NioClientManager#openConnection(java.net.SocketAddress, org.bitcoinj.net.StreamParser)} + * {@link org.bitcoinj.net.NioClientManager#openConnection(java.net.SocketAddress, StreamConnection)} * or - * {@link org.bitcoinj.net.NioClient#NioClient(java.net.SocketAddress, org.bitcoinj.net.StreamParser, int)}.

+ * {@link org.bitcoinj.net.NioClient#NioClient(java.net.SocketAddress, StreamConnection, int)}.

* *

The remoteAddress provided should match the remote address of the peer which is being connected to, and is * used to keep track of which peers relayed transactions and offer more descriptive logging.

@@ -232,9 +233,9 @@ public class Peer extends PeerSocketHandler { * *

Note that this does NOT make a connection to the given remoteAddress, it only creates a handler for a * connection. If you want to create a one-off connection, create a Peer and pass it to - * {@link org.bitcoinj.net.NioClientManager#openConnection(java.net.SocketAddress, org.bitcoinj.net.StreamParser)} + * {@link org.bitcoinj.net.NioClientManager#openConnection(java.net.SocketAddress, StreamConnection)} * or - * {@link org.bitcoinj.net.NioClient#NioClient(java.net.SocketAddress, org.bitcoinj.net.StreamParser, int)}.

+ * {@link org.bitcoinj.net.NioClient#NioClient(java.net.SocketAddress, StreamConnection, int)}.

* *

The remoteAddress provided should match the remote address of the peer which is being connected to, and is * used to keep track of which peers relayed transactions and offer more descriptive logging.

diff --git a/core/src/main/java/org/bitcoinj/core/PeerSocketHandler.java b/core/src/main/java/org/bitcoinj/core/PeerSocketHandler.java index 8907d6a0..0420113f 100644 --- a/core/src/main/java/org/bitcoinj/core/PeerSocketHandler.java +++ b/core/src/main/java/org/bitcoinj/core/PeerSocketHandler.java @@ -18,7 +18,7 @@ package org.bitcoinj.core; import org.bitcoinj.net.AbstractTimeoutHandler; import org.bitcoinj.net.MessageWriteTarget; -import org.bitcoinj.net.StreamParser; +import org.bitcoinj.net.StreamConnection; import org.bitcoinj.utils.Threading; import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; @@ -39,7 +39,7 @@ import static com.google.common.base.Preconditions.*; * Handles high-level message (de)serialization for peers, acting as the bridge between the * {@link org.bitcoinj.net} classes and {@link Peer}. */ -public abstract class PeerSocketHandler extends AbstractTimeoutHandler implements StreamParser { +public abstract class PeerSocketHandler extends AbstractTimeoutHandler implements StreamConnection { private static final Logger log = LoggerFactory.getLogger(PeerSocketHandler.class); private final MessageSerializer serializer; diff --git a/core/src/main/java/org/bitcoinj/net/BlockingClient.java b/core/src/main/java/org/bitcoinj/net/BlockingClient.java index 2aa4d717..a6762b26 100644 --- a/core/src/main/java/org/bitcoinj/net/BlockingClient.java +++ b/core/src/main/java/org/bitcoinj/net/BlockingClient.java @@ -30,7 +30,7 @@ import java.util.*; import static com.google.common.base.Preconditions.*; /** - *

Creates a simple connection to a server using a {@link StreamParser} to process data.

+ *

Creates a simple connection to a server using a {@link StreamConnection} to process data.

* *

Generally, using {@link NioClient} and {@link NioClientManager} should be preferred over {@link BlockingClient} * and {@link BlockingClientManager}, unless you wish to connect over a proxy or use some other network settings that @@ -47,10 +47,10 @@ public class BlockingClient implements MessageWriteTarget { private SettableFuture connectFuture; /** - *

Creates a new client to the given server address using the given {@link StreamParser} to decode the data. - * The given parser MUST be unique to this object. This does not block while waiting for the connection to - * open, but will call either the {@link StreamParser#connectionOpened()} or - * {@link StreamParser#connectionClosed()} callback on the created network event processing thread.

+ *

Creates a new client to the given server address using the given {@link StreamConnection} to decode the data. + * The given connection MUST be unique to this object. This does not block while waiting for the connection to + * open, but will call either the {@link StreamConnection#connectionOpened()} or + * {@link StreamConnection#connectionClosed()} callback on the created network event processing thread.

* * @param connectTimeoutMillis The connect timeout set on the connection (in milliseconds). 0 is interpreted as no * timeout. @@ -58,13 +58,13 @@ public class BlockingClient implements MessageWriteTarget { * how this client connects to the internet. If not sure, use SocketFactory.getDefault() * @param clientSet A set which this object will add itself to after initialization, and then remove itself from */ - public BlockingClient(final SocketAddress serverAddress, final StreamParser parser, + public BlockingClient(final SocketAddress serverAddress, final StreamConnection connection, final int connectTimeoutMillis, final SocketFactory socketFactory, @Nullable final Set clientSet) throws IOException { connectFuture = SettableFuture.create(); // Try to fit at least one message in the network buffer, but place an upper and lower limit on its size to make // sure it doesnt get too large or have to call read too often. - parser.setWriteTarget(this); + connection.setWriteTarget(this); socket = socketFactory.createSocket(); final Context context = Context.get(); Thread t = new Thread() { @@ -75,10 +75,10 @@ public class BlockingClient implements MessageWriteTarget { clientSet.add(BlockingClient.this); try { socket.connect(serverAddress, connectTimeoutMillis); - parser.connectionOpened(); + connection.connectionOpened(); connectFuture.set(serverAddress); InputStream stream = socket.getInputStream(); - runReadLoop(stream, parser); + runReadLoop(stream, connection); } catch (Exception e) { if (!vCloseRequested) { log.error("Error trying to open/read from connection: {}: {}", serverAddress, e.getMessage()); @@ -92,7 +92,7 @@ public class BlockingClient implements MessageWriteTarget { } if (clientSet != null) clientSet.remove(BlockingClient.this); - parser.connectionClosed(); + connection.connectionClosed(); } } }; @@ -103,10 +103,10 @@ public class BlockingClient implements MessageWriteTarget { /** * A blocking call that never returns, except by throwing an exception. It reads bytes from the input stream - * and feeds them to the provided {@link StreamParser}, for example, a {@link Peer}. + * and feeds them to the provided {@link StreamConnection}, for example, a {@link Peer}. */ - public static void runReadLoop(InputStream stream, StreamParser parser) throws Exception { - ByteBuffer dbuf = ByteBuffer.allocateDirect(Math.min(Math.max(parser.getMaxMessageSize(), BUFFER_SIZE_LOWER_BOUND), BUFFER_SIZE_UPPER_BOUND)); + public static void runReadLoop(InputStream stream, StreamConnection connection) throws Exception { + ByteBuffer dbuf = ByteBuffer.allocateDirect(Math.min(Math.max(connection.getMaxMessageSize(), BUFFER_SIZE_LOWER_BOUND), BUFFER_SIZE_UPPER_BOUND)); byte[] readBuff = new byte[dbuf.capacity()]; while (true) { // TODO Kill the message duplication here @@ -117,9 +117,9 @@ public class BlockingClient implements MessageWriteTarget { dbuf.put(readBuff, 0, read); // "flip" the buffer - setting the limit to the current position and setting position to 0 dbuf.flip(); - // Use parser.receiveBytes's return value as a double-check that it stopped reading at the right + // Use connection.receiveBytes's return value as a double-check that it stopped reading at the right // location - int bytesConsumed = parser.receiveBytes(dbuf); + int bytesConsumed = connection.receiveBytes(dbuf); checkState(dbuf.position() == bytesConsumed); // Now drop the bytes which were read by compacting dbuf (resetting limit and keeping relative // position) @@ -128,7 +128,7 @@ public class BlockingClient implements MessageWriteTarget { } /** - * Closes the connection to the server, triggering the {@link StreamParser#connectionClosed()} + * Closes the connection to the server, triggering the {@link StreamConnection#connectionClosed()} * event on the network-handling thread where all callbacks occur. */ @Override diff --git a/core/src/main/java/org/bitcoinj/net/BlockingClientManager.java b/core/src/main/java/org/bitcoinj/net/BlockingClientManager.java index 57c38f22..afce2136 100644 --- a/core/src/main/java/org/bitcoinj/net/BlockingClientManager.java +++ b/core/src/main/java/org/bitcoinj/net/BlockingClientManager.java @@ -55,11 +55,11 @@ public class BlockingClientManager extends AbstractIdleService implements Client } @Override - public ListenableFuture openConnection(SocketAddress serverAddress, StreamParser parser) { + public ListenableFuture openConnection(SocketAddress serverAddress, StreamConnection connection) { try { if (!isRunning()) throw new IllegalStateException(); - return new BlockingClient(serverAddress, parser, connectTimeoutMillis, socketFactory, clients).getConnectFuture(); + return new BlockingClient(serverAddress, connection, connectTimeoutMillis, socketFactory, clients).getConnectFuture(); } catch (IOException e) { throw new RuntimeException(e); // This should only happen if we are, eg, out of system resources } diff --git a/core/src/main/java/org/bitcoinj/net/ClientConnectionManager.java b/core/src/main/java/org/bitcoinj/net/ClientConnectionManager.java index 8027023d..23d49343 100644 --- a/core/src/main/java/org/bitcoinj/net/ClientConnectionManager.java +++ b/core/src/main/java/org/bitcoinj/net/ClientConnectionManager.java @@ -30,10 +30,10 @@ import java.net.SocketAddress; */ public interface ClientConnectionManager extends Service { /** - * Creates a new connection to the given address, with the given parser used to handle incoming data. Any errors + * Creates a new connection to the given address, with the given connection used to handle incoming data. Any errors * that occur during connection will be returned in the given future, including errors that can occur immediately. */ - ListenableFuture openConnection(SocketAddress serverAddress, StreamParser parser); + ListenableFuture openConnection(SocketAddress serverAddress, StreamConnection connection); /** Gets the number of connected peers */ int getConnectedClientCount(); diff --git a/core/src/main/java/org/bitcoinj/net/ConnectionHandler.java b/core/src/main/java/org/bitcoinj/net/ConnectionHandler.java index b2263640..8f2bfe6c 100644 --- a/core/src/main/java/org/bitcoinj/net/ConnectionHandler.java +++ b/core/src/main/java/org/bitcoinj/net/ConnectionHandler.java @@ -57,7 +57,7 @@ class ConnectionHandler implements MessageWriteTarget { @GuardedBy("lock") private final ByteBuffer readBuff; @GuardedBy("lock") private final SocketChannel channel; @GuardedBy("lock") private final SelectionKey key; - @GuardedBy("lock") StreamParser parser; + @GuardedBy("lock") StreamConnection connection; @GuardedBy("lock") private boolean closeCalled = false; @GuardedBy("lock") private long bytesToWriteRemaining = 0; @@ -65,30 +65,30 @@ class ConnectionHandler implements MessageWriteTarget { private Set connectedHandlers; - public ConnectionHandler(StreamParserFactory parserFactory, SelectionKey key) throws IOException { - this(parserFactory.getNewParser(((SocketChannel) key.channel()).socket().getInetAddress(), ((SocketChannel) key.channel()).socket().getPort()), key); - if (parser == null) - throw new IOException("Parser factory.getNewParser returned null"); + public ConnectionHandler(StreamConnectionFactory connectionFactory, SelectionKey key) throws IOException { + this(connectionFactory.getNewConnection(((SocketChannel) key.channel()).socket().getInetAddress(), ((SocketChannel) key.channel()).socket().getPort()), key); + if (connection == null) + throw new IOException("Parser factory.getNewConnection returned null"); } - private ConnectionHandler(@Nullable StreamParser parser, SelectionKey key) { + private ConnectionHandler(@Nullable StreamConnection connection, SelectionKey key) { this.key = key; this.channel = checkNotNull(((SocketChannel)key.channel())); - if (parser == null) { + if (connection == null) { readBuff = null; return; } - this.parser = parser; - readBuff = ByteBuffer.allocateDirect(Math.min(Math.max(parser.getMaxMessageSize(), BUFFER_SIZE_LOWER_BOUND), BUFFER_SIZE_UPPER_BOUND)); - parser.setWriteTarget(this); // May callback into us (eg closeConnection() now) + this.connection = connection; + readBuff = ByteBuffer.allocateDirect(Math.min(Math.max(connection.getMaxMessageSize(), BUFFER_SIZE_LOWER_BOUND), BUFFER_SIZE_UPPER_BOUND)); + connection.setWriteTarget(this); // May callback into us (eg closeConnection() now) connectedHandlers = null; } - public ConnectionHandler(StreamParser parser, SelectionKey key, Set connectedHandlers) { - this(checkNotNull(parser), key); + public ConnectionHandler(StreamConnection connection, SelectionKey key, Set connectedHandlers) { + this(checkNotNull(connection), key); // closeConnection() may have already happened because we invoked the other c'tor above, which called - // parser.setWriteTarget which might have re-entered already. In this case we shouldn't add ourselves + // connection.setWriteTarget which might have re-entered already. In this case we shouldn't add ourselves // to the connectedHandlers set. lock.lock(); try { @@ -191,7 +191,7 @@ class ConnectionHandler implements MessageWriteTarget { } if (callClosed) { checkState(connectedHandlers == null || connectedHandlers.remove(this)); - parser.connectionClosed(); + connection.connectionClosed(); } } @@ -208,7 +208,7 @@ class ConnectionHandler implements MessageWriteTarget { return; } if (key.isReadable()) { - // Do a socket read and invoke the parser's receiveBytes message + // Do a socket read and invoke the connection's receiveBytes message int read = handler.channel.read(handler.readBuff); if (read == 0) return; // Was probably waiting on a write @@ -219,8 +219,8 @@ class ConnectionHandler implements MessageWriteTarget { } // "flip" the buffer - setting the limit to the current position and setting position to 0 handler.readBuff.flip(); - // Use parser.receiveBytes's return value as a check that it stopped reading at the right location - int bytesConsumed = checkNotNull(handler.parser).receiveBytes(handler.readBuff); + // Use connection.receiveBytes's return value as a check that it stopped reading at the right location + int bytesConsumed = checkNotNull(handler.connection).receiveBytes(handler.readBuff); checkState(handler.readBuff.position() == bytesConsumed); // Now drop the bytes which were read by compacting readBuff (resetting limit and keeping relative // position) @@ -230,7 +230,7 @@ class ConnectionHandler implements MessageWriteTarget { handler.tryWriteBytes(); } catch (Exception e) { // This can happen eg if the channel closes while the thread is about to get killed - // (ClosedByInterruptException), or if handler.parser.receiveBytes throws something + // (ClosedByInterruptException), or if handler.connection.receiveBytes throws something Throwable t = Throwables.getRootCause(e); log.warn("Error handling SelectionKey: {}", t.getMessage() != null ? t.getMessage() : t.getClass().getName()); handler.closeConnection(); diff --git a/core/src/main/java/org/bitcoinj/net/MessageWriteTarget.java b/core/src/main/java/org/bitcoinj/net/MessageWriteTarget.java index 18166b47..7679b730 100644 --- a/core/src/main/java/org/bitcoinj/net/MessageWriteTarget.java +++ b/core/src/main/java/org/bitcoinj/net/MessageWriteTarget.java @@ -27,7 +27,7 @@ public interface MessageWriteTarget { */ void writeBytes(byte[] message) throws IOException; /** - * Closes the connection to the server, triggering the {@link StreamParser#connectionClosed()} + * Closes the connection to the server, triggering the {@link StreamConnection#connectionClosed()} * event on the network-handling thread where all callbacks occur. */ void closeConnection(); diff --git a/core/src/main/java/org/bitcoinj/net/NioClient.java b/core/src/main/java/org/bitcoinj/net/NioClient.java index 8525d7b5..df8f9688 100644 --- a/core/src/main/java/org/bitcoinj/net/NioClient.java +++ b/core/src/main/java/org/bitcoinj/net/NioClient.java @@ -26,7 +26,7 @@ import java.net.*; import java.nio.*; /** - * Creates a simple connection to a server using a {@link StreamParser} to process data. + * Creates a simple connection to a server using a {@link StreamConnection} to process data. */ public class NioClient implements MessageWriteTarget { private static final Logger log = LoggerFactory.getLogger(NioClient.class); @@ -34,13 +34,13 @@ public class NioClient implements MessageWriteTarget { private final Handler handler; private final NioClientManager manager = new NioClientManager(); - class Handler extends AbstractTimeoutHandler implements StreamParser { - private final StreamParser upstreamParser; + class Handler extends AbstractTimeoutHandler implements StreamConnection { + private final StreamConnection upstreamConnection; private MessageWriteTarget writeTarget; private boolean closeOnOpen = false; private boolean closeCalled = false; - Handler(StreamParser upstreamParser, int connectTimeoutMillis) { - this.upstreamParser = upstreamParser; + Handler(StreamConnection upstreamConnection, int connectTimeoutMillis) { + this.upstreamConnection = upstreamConnection; setSocketTimeout(connectTimeoutMillis); setTimeoutEnabled(true); } @@ -56,19 +56,19 @@ public class NioClient implements MessageWriteTarget { manager.stopAsync(); if (!closeCalled) { closeCalled = true; - upstreamParser.connectionClosed(); + upstreamConnection.connectionClosed(); } } @Override public synchronized void connectionOpened() { if (!closeOnOpen) - upstreamParser.connectionOpened(); + upstreamConnection.connectionOpened(); } @Override public int receiveBytes(ByteBuffer buff) throws Exception { - return upstreamParser.receiveBytes(buff); + return upstreamConnection.receiveBytes(buff); } @Override @@ -78,26 +78,26 @@ public class NioClient implements MessageWriteTarget { else { setTimeoutEnabled(false); this.writeTarget = writeTarget; - upstreamParser.setWriteTarget(writeTarget); + upstreamConnection.setWriteTarget(writeTarget); } } @Override public int getMaxMessageSize() { - return upstreamParser.getMaxMessageSize(); + return upstreamConnection.getMaxMessageSize(); } } /** - *

Creates a new client to the given server address using the given {@link StreamParser} to decode the data. - * The given parser MUST be unique to this object. This does not block while waiting for the connection to - * open, but will call either the {@link StreamParser#connectionOpened()} or - * {@link StreamParser#connectionClosed()} callback on the created network event processing thread.

+ *

Creates a new client to the given server address using the given {@link StreamConnection} to decode the data. + * The given connection MUST be unique to this object. This does not block while waiting for the connection to + * open, but will call either the {@link StreamConnection#connectionOpened()} or + * {@link StreamConnection#connectionClosed()} callback on the created network event processing thread.

* * @param connectTimeoutMillis The connect timeout set on the connection (in milliseconds). 0 is interpreted as no * timeout. */ - public NioClient(final SocketAddress serverAddress, final StreamParser parser, + public NioClient(final SocketAddress serverAddress, final StreamConnection parser, final int connectTimeoutMillis) throws IOException { manager.startAsync(); manager.awaitRunning(); diff --git a/core/src/main/java/org/bitcoinj/net/NioClientManager.java b/core/src/main/java/org/bitcoinj/net/NioClientManager.java index 36c21219..bd9547f5 100644 --- a/core/src/main/java/org/bitcoinj/net/NioClientManager.java +++ b/core/src/main/java/org/bitcoinj/net/NioClientManager.java @@ -40,11 +40,11 @@ public class NioClientManager extends AbstractExecutionThreadService implements class PendingConnect { SocketChannel sc; - StreamParser parser; + StreamConnection connection; SocketAddress address; SettableFuture future = SettableFuture.create(); - PendingConnect(SocketChannel sc, StreamParser parser, SocketAddress address) { this.sc = sc; this.parser = parser; this.address = address; } + PendingConnect(SocketChannel sc, StreamConnection connection, SocketAddress address) { this.sc = sc; this.connection = connection; this.address = address; } } final Queue newConnectionChannels = new LinkedBlockingQueue(); @@ -57,14 +57,14 @@ public class NioClientManager extends AbstractExecutionThreadService implements if (key.isValid() && key.isConnectable()) { // ie a client connection which has finished the initial connect process // Create a ConnectionHandler and hook everything together PendingConnect data = (PendingConnect) key.attachment(); - StreamParser parser = data.parser; + StreamConnection connection = data.connection; SocketChannel sc = (SocketChannel) key.channel(); - ConnectionHandler handler = new ConnectionHandler(parser, key, connectedHandlers); + ConnectionHandler handler = new ConnectionHandler(connection, key, connectedHandlers); try { if (sc.finishConnect()) { log.info("Successfully connected to {}", sc.socket().getRemoteSocketAddress()); key.interestOps((key.interestOps() | SelectionKey.OP_READ) & ~SelectionKey.OP_CONNECT).attach(handler); - parser.connectionOpened(); + connection.connectionOpened(); data.future.set(data.address); } else { log.warn("Failed to connect to {}", sc.socket().getRemoteSocketAddress()); @@ -145,15 +145,15 @@ public class NioClientManager extends AbstractExecutionThreadService implements } @Override - public ListenableFuture openConnection(SocketAddress serverAddress, StreamParser parser) { + public ListenableFuture openConnection(SocketAddress serverAddress, StreamConnection connection) { if (!isRunning()) throw new IllegalStateException(); - // Create a new connection, give it a parser as an attachment + // Create a new connection, give it a connection as an attachment try { SocketChannel sc = SocketChannel.open(); sc.configureBlocking(false); sc.connect(serverAddress); - PendingConnect data = new PendingConnect(sc, parser, serverAddress); + PendingConnect data = new PendingConnect(sc, connection, serverAddress); newConnectionChannels.offer(data); selector.wakeup(); return data.future; diff --git a/core/src/main/java/org/bitcoinj/net/NioServer.java b/core/src/main/java/org/bitcoinj/net/NioServer.java index 800f80f0..cfe02518 100644 --- a/core/src/main/java/org/bitcoinj/net/NioServer.java +++ b/core/src/main/java/org/bitcoinj/net/NioServer.java @@ -28,13 +28,13 @@ import com.google.common.util.concurrent.AbstractExecutionThreadService; import org.slf4j.LoggerFactory; /** - * Creates a simple server listener which listens for incoming client connections and uses a {@link StreamParser} to + * Creates a simple server listener which listens for incoming client connections and uses a {@link StreamConnection} to * process data. */ public class NioServer extends AbstractExecutionThreadService { private static final org.slf4j.Logger log = LoggerFactory.getLogger(NioServer.class); - private final StreamParserFactory parserFactory; + private final StreamConnectionFactory connectionFactory; private final ServerSocketChannel sc; @VisibleForTesting final Selector selector; @@ -42,14 +42,14 @@ public class NioServer extends AbstractExecutionThreadService { // Handle a SelectionKey which was selected private void handleKey(Selector selector, SelectionKey key) throws IOException { if (key.isValid() && key.isAcceptable()) { - // Accept a new connection, give it a parser as an attachment + // Accept a new connection, give it a stream connection as an attachment SocketChannel newChannel = sc.accept(); newChannel.configureBlocking(false); SelectionKey newKey = newChannel.register(selector, SelectionKey.OP_READ); try { - ConnectionHandler handler = new ConnectionHandler(parserFactory, newKey); + ConnectionHandler handler = new ConnectionHandler(connectionFactory, newKey); newKey.attach(handler); - handler.parser.connectionOpened(); + handler.connection.connectionOpened(); } catch (IOException e) { // This can happen if ConnectionHandler's call to get a new handler returned null log.error("Error handling new connection", Throwables.getRootCause(e).getMessage()); @@ -62,12 +62,12 @@ public class NioServer extends AbstractExecutionThreadService { /** * Creates a new server which is capable of listening for incoming connections and processing client provided data - * using {@link StreamParser}s created by the given {@link StreamParserFactory} + * using {@link StreamConnection}s created by the given {@link StreamConnectionFactory} * * @throws IOException If there is an issue opening the server socket or binding fails for some reason */ - public NioServer(final StreamParserFactory parserFactory, InetSocketAddress bindAddress) throws IOException { - this.parserFactory = parserFactory; + public NioServer(final StreamConnectionFactory connectionFactory, InetSocketAddress bindAddress) throws IOException { + this.connectionFactory = connectionFactory; sc = ServerSocketChannel.open(); sc.configureBlocking(false); diff --git a/core/src/main/java/org/bitcoinj/net/ProtobufParser.java b/core/src/main/java/org/bitcoinj/net/ProtobufConnection.java similarity index 92% rename from core/src/main/java/org/bitcoinj/net/ProtobufParser.java rename to core/src/main/java/org/bitcoinj/net/ProtobufConnection.java index 64a441a3..dec49870 100644 --- a/core/src/main/java/org/bitcoinj/net/ProtobufParser.java +++ b/core/src/main/java/org/bitcoinj/net/ProtobufConnection.java @@ -40,22 +40,24 @@ import static com.google.common.base.Preconditions.checkState; * *

Messages are encoded with a 4-byte signed integer (big endian) prefix to indicate their length followed by the * serialized protobuf

+ * + *

(Used to be called ProtobufParser)

*/ -public class ProtobufParser extends AbstractTimeoutHandler implements StreamParser { - private static final Logger log = LoggerFactory.getLogger(ProtobufParser.class); +public class ProtobufConnection extends AbstractTimeoutHandler implements StreamConnection { + private static final Logger log = LoggerFactory.getLogger(ProtobufConnection.class); /** * An interface which can be implemented to handle callbacks as new messages are generated and socket events occur. * @param The protobuf type which is used on this socket. - * This MUST match the MessageType used in the parent {@link ProtobufParser} + * This MUST match the MessageType used in the parent {@link ProtobufConnection} */ public interface Listener { /** Called when a new protobuf is received from the remote side. */ - void messageReceived(ProtobufParser handler, MessageType msg); + void messageReceived(ProtobufConnection handler, MessageType msg); /** Called when the connection is opened and available for writing data to. */ - void connectionOpen(ProtobufParser handler); + void connectionOpen(ProtobufConnection handler); /** Called when the connection is closed and no more data should be provided. */ - void connectionClosed(ProtobufParser handler); + void connectionClosed(ProtobufConnection handler); } // The callback listener @@ -73,7 +75,7 @@ public class ProtobufParser extends AbstractTim // attacking clients can be made to timeout/get blocked if they are sending crap to fill buffers. @GuardedBy("lock") private int messageBytesOffset = 0; @GuardedBy("lock") private byte[] messageBytes; - private final ReentrantLock lock = Threading.lock("ProtobufParser"); + private final ReentrantLock lock = Threading.lock("ProtobufConnection"); @VisibleForTesting final AtomicReference writeTarget = new AtomicReference(); @@ -88,7 +90,7 @@ public class ProtobufParser extends AbstractTim * @param timeoutMillis The timeout between messages before the connection is automatically closed. Only enabled * after the connection is established. */ - public ProtobufParser(Listener handler, MessageType prototype, int maxMessageSize, int timeoutMillis) { + public ProtobufConnection(Listener handler, MessageType prototype, int maxMessageSize, int timeoutMillis) { this.handler = handler; this.prototype = prototype; this.maxMessageSize = Math.min(maxMessageSize, Integer.MAX_VALUE - 4); @@ -108,7 +110,7 @@ public class ProtobufParser extends AbstractTim } /** - * Closes this connection, eventually triggering a {@link ProtobufParser.Listener#connectionClosed()} event. + * Closes this connection, eventually triggering a {@link ProtobufConnection.Listener#connectionClosed()} event. */ public void closeConnection() { this.writeTarget.get().closeConnection(); diff --git a/core/src/main/java/org/bitcoinj/net/StreamParser.java b/core/src/main/java/org/bitcoinj/net/StreamConnection.java similarity index 88% rename from core/src/main/java/org/bitcoinj/net/StreamParser.java rename to core/src/main/java/org/bitcoinj/net/StreamConnection.java index 51f6e435..40b4caf1 100644 --- a/core/src/main/java/org/bitcoinj/net/StreamParser.java +++ b/core/src/main/java/org/bitcoinj/net/StreamConnection.java @@ -21,8 +21,10 @@ import java.nio.ByteBuffer; /** * A generic handler which is used in {@link NioServer}, {@link NioClient} and {@link BlockingClient} to handle incoming * data streams. + * + * Used to be callet StreamParser. */ -public interface StreamParser { +public interface StreamConnection { /** Called when the connection socket is closed */ void connectionClosed(); @@ -31,7 +33,7 @@ public interface StreamParser { /** *

Called when new bytes are available from the remote end. This should only ever be called by the single - * writeTarget associated with any given StreamParser, multiple callers will likely confuse implementations.

+ * writeTarget associated with any given StreamConnection, multiple callers will likely confuse implementations.

* * Implementers/callers must follow the following conventions exactly: *
    @@ -51,7 +53,7 @@ public interface StreamParser { int receiveBytes(ByteBuffer buff) throws Exception; /** - * Called when this parser is attached to an upstream write target (ie a low-level connection handler). This + * Called when this connection is attached to an upstream write target (ie a low-level connection handler). This * writeTarget should be stored and used to close the connection or write data to the socket. */ void setWriteTarget(MessageWriteTarget writeTarget); diff --git a/core/src/main/java/org/bitcoinj/net/StreamParserFactory.java b/core/src/main/java/org/bitcoinj/net/StreamConnectionFactory.java similarity index 81% rename from core/src/main/java/org/bitcoinj/net/StreamParserFactory.java rename to core/src/main/java/org/bitcoinj/net/StreamConnectionFactory.java index 50f1625f..2a64bdca 100644 --- a/core/src/main/java/org/bitcoinj/net/StreamParserFactory.java +++ b/core/src/main/java/org/bitcoinj/net/StreamConnectionFactory.java @@ -20,14 +20,14 @@ import java.net.InetAddress; import javax.annotation.Nullable; /** - * A factory which generates new {@link StreamParser}s when a new connection is opened. + * A factory which generates new {@link StreamConnection}s when a new connection is opened. */ -public interface StreamParserFactory { +public interface StreamConnectionFactory { /** * Returns a new handler or null to have the connection close. * @param inetAddress The client's (IP) address * @param port The remote port on the client side */ @Nullable - StreamParser getNewParser(InetAddress inetAddress, int port); + StreamConnection getNewConnection(InetAddress inetAddress, int port); } diff --git a/core/src/main/java/org/bitcoinj/protocols/channels/PaymentChannelClientConnection.java b/core/src/main/java/org/bitcoinj/protocols/channels/PaymentChannelClientConnection.java index fff14d3c..fb598e39 100644 --- a/core/src/main/java/org/bitcoinj/protocols/channels/PaymentChannelClientConnection.java +++ b/core/src/main/java/org/bitcoinj/protocols/channels/PaymentChannelClientConnection.java @@ -23,7 +23,7 @@ import org.bitcoinj.core.Sha256Hash; import org.bitcoinj.core.Utils; import org.bitcoinj.core.Wallet; import org.bitcoinj.net.NioClient; -import org.bitcoinj.net.ProtobufParser; +import org.bitcoinj.net.ProtobufConnection; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; @@ -43,7 +43,7 @@ public class PaymentChannelClientConnection { private final SettableFuture channelOpenFuture = SettableFuture.create(); private final PaymentChannelClient channelClient; - private final ProtobufParser wireParser; + private final ProtobufConnection wireParser; /** * Attempts to open a new connection to and open a payment channel with the given host and port, blocking until the @@ -128,9 +128,9 @@ public class PaymentChannelClientConnection { }); // And glue back in the opposite direction - network to the channelClient. - wireParser = new ProtobufParser(new ProtobufParser.Listener() { + wireParser = new ProtobufConnection(new ProtobufConnection.Listener() { @Override - public void messageReceived(ProtobufParser handler, Protos.TwoWayChannelMessage msg) { + public void messageReceived(ProtobufConnection handler, Protos.TwoWayChannelMessage msg) { try { channelClient.receiveMessage(msg); } catch (InsufficientMoneyException e) { @@ -140,12 +140,12 @@ public class PaymentChannelClientConnection { } @Override - public void connectionOpen(ProtobufParser handler) { + public void connectionOpen(ProtobufConnection handler) { channelClient.connectionOpen(); } @Override - public void connectionClosed(ProtobufParser handler) { + public void connectionClosed(ProtobufConnection handler) { channelClient.connectionClosed(); channelOpenFuture.setException(new PaymentChannelCloseException("The TCP socket died", PaymentChannelCloseException.CloseReason.CONNECTION_CLOSED)); @@ -238,7 +238,7 @@ public class PaymentChannelClientConnection { // we defined above will be called, which in turn will call wireParser.closeConnection(), which in turn will invoke // NioClient.closeConnection(), which will then close the socket triggering interruption of the network // thread it had created. That causes the background thread to die, which on its way out calls - // ProtobufParser.connectionClosed which invokes the connectionClosed method we defined above which in turn + // ProtobufConnection.connectionClosed which invokes the connectionClosed method we defined above which in turn // then configures the open-future correctly and closes the state object. Phew! try { channelClient.settle(); diff --git a/core/src/main/java/org/bitcoinj/protocols/channels/PaymentChannelServerListener.java b/core/src/main/java/org/bitcoinj/protocols/channels/PaymentChannelServerListener.java index a33ca0c1..903417f9 100644 --- a/core/src/main/java/org/bitcoinj/protocols/channels/PaymentChannelServerListener.java +++ b/core/src/main/java/org/bitcoinj/protocols/channels/PaymentChannelServerListener.java @@ -22,8 +22,8 @@ import org.bitcoinj.core.Sha256Hash; import org.bitcoinj.core.TransactionBroadcaster; import org.bitcoinj.core.Wallet; import org.bitcoinj.net.NioServer; -import org.bitcoinj.net.ProtobufParser; -import org.bitcoinj.net.StreamParserFactory; +import org.bitcoinj.net.ProtobufConnection; +import org.bitcoinj.net.StreamConnectionFactory; import com.google.common.util.concurrent.ListenableFuture; import com.google.protobuf.ByteString; @@ -90,14 +90,14 @@ public class PaymentChannelServerListener { } }); - protobufHandlerListener = new ProtobufParser.Listener() { + protobufHandlerListener = new ProtobufConnection.Listener() { @Override - public synchronized void messageReceived(ProtobufParser handler, Protos.TwoWayChannelMessage msg) { + public synchronized void messageReceived(ProtobufConnection handler, Protos.TwoWayChannelMessage msg) { paymentChannelManager.receiveMessage(msg); } @Override - public synchronized void connectionClosed(ProtobufParser handler) { + public synchronized void connectionClosed(ProtobufConnection handler) { paymentChannelManager.connectionClosed(); if (closeReason != null) eventHandler.channelClosed(closeReason); @@ -107,7 +107,7 @@ public class PaymentChannelServerListener { } @Override - public synchronized void connectionOpen(ProtobufParser handler) { + public synchronized void connectionOpen(ProtobufConnection handler) { ServerConnectionEventHandler eventHandler = eventHandlerFactory.onNewConnection(address); if (eventHandler == null) handler.closeConnection(); @@ -118,7 +118,7 @@ public class PaymentChannelServerListener { } }; - socketProtobufHandler = new ProtobufParser + socketProtobufHandler = new ProtobufConnection (protobufHandlerListener, Protos.TwoWayChannelMessage.getDefaultInstance(), Short.MAX_VALUE, timeoutSeconds*1000); } @@ -131,10 +131,10 @@ public class PaymentChannelServerListener { private final PaymentChannelServer paymentChannelManager; // The connection handler which puts/gets protobufs from the TCP socket - private final ProtobufParser socketProtobufHandler; + private final ProtobufConnection socketProtobufHandler; // The listener which connects to socketProtobufHandler - private final ProtobufParser.Listener protobufHandlerListener; + private final ProtobufConnection.Listener protobufHandlerListener; } /** @@ -142,9 +142,9 @@ public class PaymentChannelServerListener { * @throws Exception If binding to the given port fails (eg SocketException: Permission denied for privileged ports) */ public void bindAndStart(int port) throws Exception { - server = new NioServer(new StreamParserFactory() { + server = new NioServer(new StreamConnectionFactory() { @Override - public ProtobufParser getNewParser(InetAddress inetAddress, int port) { + public ProtobufConnection getNewConnection(InetAddress inetAddress, int port) { return new ServerHandler(new InetSocketAddress(inetAddress, port), timeoutSeconds).socketProtobufHandler; } }, new InetSocketAddress(port)); diff --git a/core/src/main/java/org/bitcoinj/protocols/channels/ServerConnectionEventHandler.java b/core/src/main/java/org/bitcoinj/protocols/channels/ServerConnectionEventHandler.java index 978c00b3..5751c2c6 100644 --- a/core/src/main/java/org/bitcoinj/protocols/channels/ServerConnectionEventHandler.java +++ b/core/src/main/java/org/bitcoinj/protocols/channels/ServerConnectionEventHandler.java @@ -18,7 +18,7 @@ package org.bitcoinj.protocols.channels; import org.bitcoinj.core.Coin; import org.bitcoinj.core.Sha256Hash; -import org.bitcoinj.net.ProtobufParser; +import org.bitcoinj.net.ProtobufConnection; import com.google.common.util.concurrent.ListenableFuture; import com.google.protobuf.ByteString; @@ -31,10 +31,10 @@ import javax.annotation.Nullable; * {@link PaymentChannelServerListener} */ public abstract class ServerConnectionEventHandler { - private ProtobufParser connectionChannel; + private ProtobufConnection connectionChannel; // Called by ServerListener before channelOpen to set connectionChannel when it is ready to received application messages // Also called with null to clear connectionChannel after channelClosed() - synchronized void setConnectionChannel(@Nullable ProtobufParser connectionChannel) { this.connectionChannel = connectionChannel; } + synchronized void setConnectionChannel(@Nullable ProtobufConnection connectionChannel) { this.connectionChannel = connectionChannel; } /** *

    Closes the channel with the client (will generate a diff --git a/core/src/main/java/org/bitcoinj/testing/TestWithNetworkConnections.java b/core/src/main/java/org/bitcoinj/testing/TestWithNetworkConnections.java index 73a19e9d..73635ac0 100644 --- a/core/src/main/java/org/bitcoinj/testing/TestWithNetworkConnections.java +++ b/core/src/main/java/org/bitcoinj/testing/TestWithNetworkConnections.java @@ -111,10 +111,10 @@ public class TestWithNetworkConnections { } protected void startPeerServer(int i) throws IOException { - peerServers[i] = new NioServer(new StreamParserFactory() { + peerServers[i] = new NioServer(new StreamConnectionFactory() { @Nullable @Override - public StreamParser getNewParser(InetAddress inetAddress, int port) { + public StreamConnection getNewConnection(InetAddress inetAddress, int port) { return new InboundMessageQueuer(params) { @Override public void connectionClosed() { diff --git a/core/src/test/java/org/bitcoinj/net/NetworkAbstractionTests.java b/core/src/test/java/org/bitcoinj/net/NetworkAbstractionTests.java index 34e2dfd6..e0ea3809 100644 --- a/core/src/test/java/org/bitcoinj/net/NetworkAbstractionTests.java +++ b/core/src/test/java/org/bitcoinj/net/NetworkAbstractionTests.java @@ -64,7 +64,7 @@ public class NetworkAbstractionTests { channels = null; } - private MessageWriteTarget openConnection(SocketAddress addr, ProtobufParser parser) throws Exception { + private MessageWriteTarget openConnection(SocketAddress addr, ProtobufConnection parser) throws Exception { if (clientType == 0 || clientType == 1) { channels.openConnection(addr, parser); if (parser.writeTarget.get() == null) @@ -97,28 +97,28 @@ public class NetworkAbstractionTests { final SettableFuture client2ConnectionOpened = SettableFuture.create(); final SettableFuture serverConnectionClosed = SettableFuture.create(); final SettableFuture client2Disconnected = SettableFuture.create(); - NioServer server = new NioServer(new StreamParserFactory() { + NioServer server = new NioServer(new StreamConnectionFactory() { boolean finishedFirst = false; @Override - public ProtobufParser getNewParser(InetAddress inetAddress, int port) { + public ProtobufConnection getNewConnection(InetAddress inetAddress, int port) { if (!finishedFirst) { finishedFirst = true; return null; } - return new ProtobufParser(new ProtobufParser.Listener() { + return new ProtobufConnection(new ProtobufConnection.Listener() { @Override - public void messageReceived(ProtobufParser handler, Protos.TwoWayChannelMessage msg) { + public void messageReceived(ProtobufConnection handler, Protos.TwoWayChannelMessage msg) { handler.write(msg); } @Override - public void connectionOpen(ProtobufParser handler) { + public void connectionOpen(ProtobufConnection handler) { serverConnectionOpen.set(null); } @Override - public void connectionClosed(ProtobufParser handler) { + public void connectionClosed(ProtobufConnection handler) { serverConnectionClosed.set(null); } }, Protos.TwoWayChannelMessage.getDefaultInstance(), 1000, 0); @@ -127,20 +127,20 @@ public class NetworkAbstractionTests { server.startAsync(); server.awaitRunning(); - ProtobufParser clientHandler = new ProtobufParser( - new ProtobufParser.Listener() { + ProtobufConnection clientHandler = new ProtobufConnection( + new ProtobufConnection.Listener() { @Override - public synchronized void messageReceived(ProtobufParser handler, Protos.TwoWayChannelMessage msg) { + public synchronized void messageReceived(ProtobufConnection handler, Protos.TwoWayChannelMessage msg) { fail.set(true); } @Override - public void connectionOpen(ProtobufParser handler) { + public void connectionOpen(ProtobufConnection handler) { client1ConnectionOpened.set(null); } @Override - public void connectionClosed(ProtobufParser handler) { + public void connectionClosed(ProtobufConnection handler) { client1Disconnected.set(null); } }, Protos.TwoWayChannelMessage.getDefaultInstance(), 1000, 0); @@ -149,22 +149,22 @@ public class NetworkAbstractionTests { client1ConnectionOpened.get(); client1Disconnected.get(); - clientHandler = new ProtobufParser( - new ProtobufParser.Listener() { + clientHandler = new ProtobufConnection( + new ProtobufConnection.Listener() { @Override - public synchronized void messageReceived(ProtobufParser handler, Protos.TwoWayChannelMessage msg) { + public synchronized void messageReceived(ProtobufConnection handler, Protos.TwoWayChannelMessage msg) { if (client2MessageReceived.isDone()) fail.set(true); client2MessageReceived.set(msg); } @Override - public void connectionOpen(ProtobufParser handler) { + public void connectionOpen(ProtobufConnection handler) { client2ConnectionOpened.set(null); } @Override - public void connectionClosed(ProtobufParser handler) { + public void connectionClosed(ProtobufConnection handler) { client2Disconnected.set(null); } }, Protos.TwoWayChannelMessage.getDefaultInstance(), 1000, 0); @@ -195,23 +195,23 @@ public class NetworkAbstractionTests { final SettableFuture clientConnectionClosed = SettableFuture.create(); final SettableFuture clientMessage1Received = SettableFuture.create(); final SettableFuture clientMessage2Received = SettableFuture.create(); - NioServer server = new NioServer(new StreamParserFactory() { + NioServer server = new NioServer(new StreamConnectionFactory() { @Override - public ProtobufParser getNewParser(InetAddress inetAddress, int port) { - return new ProtobufParser(new ProtobufParser.Listener() { + public ProtobufConnection getNewConnection(InetAddress inetAddress, int port) { + return new ProtobufConnection(new ProtobufConnection.Listener() { @Override - public void messageReceived(ProtobufParser handler, Protos.TwoWayChannelMessage msg) { + public void messageReceived(ProtobufConnection handler, Protos.TwoWayChannelMessage msg) { handler.write(msg); handler.write(msg); } @Override - public void connectionOpen(ProtobufParser handler) { + public void connectionOpen(ProtobufConnection handler) { serverConnectionOpen.set(null); } @Override - public void connectionClosed(ProtobufParser handler) { + public void connectionClosed(ProtobufConnection handler) { serverConnectionClosed.set(null); } }, Protos.TwoWayChannelMessage.getDefaultInstance(), 1000, 0); @@ -220,10 +220,10 @@ public class NetworkAbstractionTests { server.startAsync(); server.awaitRunning(); - ProtobufParser clientHandler = new ProtobufParser( - new ProtobufParser.Listener() { + ProtobufConnection clientHandler = new ProtobufConnection( + new ProtobufConnection.Listener() { @Override - public synchronized void messageReceived(ProtobufParser handler, Protos.TwoWayChannelMessage msg) { + public synchronized void messageReceived(ProtobufConnection handler, Protos.TwoWayChannelMessage msg) { if (clientMessage1Received.isDone()) clientMessage2Received.set(msg); else @@ -231,12 +231,12 @@ public class NetworkAbstractionTests { } @Override - public void connectionOpen(ProtobufParser handler) { + public void connectionOpen(ProtobufConnection handler) { clientConnectionOpen.set(null); } @Override - public void connectionClosed(ProtobufParser handler) { + public void connectionClosed(ProtobufConnection handler) { clientConnectionClosed.set(null); } }, Protos.TwoWayChannelMessage.getDefaultInstance(), 1000, 0); @@ -273,17 +273,17 @@ public class NetworkAbstractionTests { final SettableFuture clientConnection2Open = SettableFuture.create(); final SettableFuture serverConnection2Closed = SettableFuture.create(); final SettableFuture clientConnection2Closed = SettableFuture.create(); - NioServer server = new NioServer(new StreamParserFactory() { + NioServer server = new NioServer(new StreamConnectionFactory() { @Override - public ProtobufParser getNewParser(InetAddress inetAddress, int port) { - return new ProtobufParser(new ProtobufParser.Listener() { + public ProtobufConnection getNewConnection(InetAddress inetAddress, int port) { + return new ProtobufConnection(new ProtobufConnection.Listener() { @Override - public void messageReceived(ProtobufParser handler, Protos.TwoWayChannelMessage msg) { + public void messageReceived(ProtobufConnection handler, Protos.TwoWayChannelMessage msg) { fail.set(true); } @Override - public synchronized void connectionOpen(ProtobufParser handler) { + public synchronized void connectionOpen(ProtobufConnection handler) { if (serverConnection1Open.isDone()) { handler.setSocketTimeout(0); serverConnection2Open.set(null); @@ -292,7 +292,7 @@ public class NetworkAbstractionTests { } @Override - public synchronized void connectionClosed(ProtobufParser handler) { + public synchronized void connectionClosed(ProtobufConnection handler) { if (serverConnection1Closed.isDone()) { serverConnection2Closed.set(null); } else @@ -304,20 +304,20 @@ public class NetworkAbstractionTests { server.startAsync(); server.awaitRunning(); - openConnection(new InetSocketAddress("localhost", 4243), new ProtobufParser( - new ProtobufParser.Listener() { + openConnection(new InetSocketAddress("localhost", 4243), new ProtobufConnection( + new ProtobufConnection.Listener() { @Override - public void messageReceived(ProtobufParser handler, Protos.TwoWayChannelMessage msg) { + public void messageReceived(ProtobufConnection handler, Protos.TwoWayChannelMessage msg) { fail.set(true); } @Override - public void connectionOpen(ProtobufParser handler) { + public void connectionOpen(ProtobufConnection handler) { clientConnection1Open.set(null); } @Override - public void connectionClosed(ProtobufParser handler) { + public void connectionClosed(ProtobufConnection handler) { clientConnection1Closed.set(null); } }, Protos.TwoWayChannelMessage.getDefaultInstance(), 1000, 0)); @@ -329,20 +329,20 @@ public class NetworkAbstractionTests { serverConnection1Closed.get(); long closeDelayFinish = System.currentTimeMillis(); - ProtobufParser client2Handler = new ProtobufParser( - new ProtobufParser.Listener() { + ProtobufConnection client2Handler = new ProtobufConnection( + new ProtobufConnection.Listener() { @Override - public void messageReceived(ProtobufParser handler, Protos.TwoWayChannelMessage msg) { + public void messageReceived(ProtobufConnection handler, Protos.TwoWayChannelMessage msg) { fail.set(true); } @Override - public void connectionOpen(ProtobufParser handler) { + public void connectionOpen(ProtobufConnection handler) { clientConnection2Open.set(null); } @Override - public void connectionClosed(ProtobufParser handler) { + public void connectionClosed(ProtobufConnection handler) { clientConnection2Closed.set(null); } }, Protos.TwoWayChannelMessage.getDefaultInstance(), 1000, 0); @@ -363,7 +363,7 @@ public class NetworkAbstractionTests { @Test public void largeDataTest() throws Exception { - /** Test various large-data handling, essentially testing {@link ProtobufParser#receiveBytes(java.nio.ByteBuffer)} */ + /** Test various large-data handling, essentially testing {@link ProtobufConnection#receiveBytes(java.nio.ByteBuffer)} */ final SettableFuture serverConnectionOpen = SettableFuture.create(); final SettableFuture clientConnectionOpen = SettableFuture.create(); final SettableFuture serverConnectionClosed = SettableFuture.create(); @@ -372,22 +372,22 @@ public class NetworkAbstractionTests { final SettableFuture clientMessage2Received = SettableFuture.create(); final SettableFuture clientMessage3Received = SettableFuture.create(); final SettableFuture clientMessage4Received = SettableFuture.create(); - NioServer server = new NioServer(new StreamParserFactory() { + NioServer server = new NioServer(new StreamConnectionFactory() { @Override - public ProtobufParser getNewParser(InetAddress inetAddress, int port) { - return new ProtobufParser(new ProtobufParser.Listener() { + public ProtobufConnection getNewConnection(InetAddress inetAddress, int port) { + return new ProtobufConnection(new ProtobufConnection.Listener() { @Override - public void messageReceived(ProtobufParser handler, Protos.TwoWayChannelMessage msg) { + public void messageReceived(ProtobufConnection handler, Protos.TwoWayChannelMessage msg) { handler.write(msg); } @Override - public void connectionOpen(ProtobufParser handler) { + public void connectionOpen(ProtobufConnection handler) { serverConnectionOpen.set(null); } @Override - public void connectionClosed(ProtobufParser handler) { + public void connectionClosed(ProtobufConnection handler) { serverConnectionClosed.set(null); } }, Protos.TwoWayChannelMessage.getDefaultInstance(), 0x10000, 0); @@ -396,10 +396,10 @@ public class NetworkAbstractionTests { server.startAsync(); server.awaitRunning(); - ProtobufParser clientHandler = new ProtobufParser( - new ProtobufParser.Listener() { + ProtobufConnection clientHandler = new ProtobufConnection( + new ProtobufConnection.Listener() { @Override - public synchronized void messageReceived(ProtobufParser handler, Protos.TwoWayChannelMessage msg) { + public synchronized void messageReceived(ProtobufConnection handler, Protos.TwoWayChannelMessage msg) { if (clientMessage1Received.isDone()) { if (clientMessage2Received.isDone()) { if (clientMessage3Received.isDone()) { @@ -415,12 +415,12 @@ public class NetworkAbstractionTests { } @Override - public void connectionOpen(ProtobufParser handler) { + public void connectionOpen(ProtobufConnection handler) { clientConnectionOpen.set(null); } @Override - public void connectionClosed(ProtobufParser handler) { + public void connectionClosed(ProtobufConnection handler) { clientConnectionClosed.set(null); } }, Protos.TwoWayChannelMessage.getDefaultInstance(), 0x10000, 0); @@ -523,17 +523,17 @@ public class NetworkAbstractionTests { final SettableFuture client1MessageReceived = SettableFuture.create(); final SettableFuture client2MessageReceived = SettableFuture.create(); final SettableFuture client3MessageReceived = SettableFuture.create(); - NioServer server = new NioServer(new StreamParserFactory() { + NioServer server = new NioServer(new StreamConnectionFactory() { @Override - public ProtobufParser getNewParser(InetAddress inetAddress, int port) { - return new ProtobufParser(new ProtobufParser.Listener() { + public ProtobufConnection getNewConnection(InetAddress inetAddress, int port) { + return new ProtobufConnection(new ProtobufConnection.Listener() { @Override - public void messageReceived(ProtobufParser handler, Protos.TwoWayChannelMessage msg) { + public void messageReceived(ProtobufConnection handler, Protos.TwoWayChannelMessage msg) { handler.write(msg); } @Override - public synchronized void connectionOpen(ProtobufParser handler) { + public synchronized void connectionOpen(ProtobufConnection handler) { if (serverConnection1Open.isDone()) { if (serverConnection2Open.isDone()) serverConnection3Open.set(null); @@ -544,7 +544,7 @@ public class NetworkAbstractionTests { } @Override - public synchronized void connectionClosed(ProtobufParser handler) { + public synchronized void connectionClosed(ProtobufConnection handler) { if (serverConnectionClosed1.isDone()) { if (serverConnectionClosed2.isDone()) { checkState(!serverConnectionClosed3.isDone()); @@ -560,20 +560,20 @@ public class NetworkAbstractionTests { server.startAsync(); server.awaitRunning(); - ProtobufParser client1Handler = new ProtobufParser( - new ProtobufParser.Listener() { + ProtobufConnection client1Handler = new ProtobufConnection( + new ProtobufConnection.Listener() { @Override - public void messageReceived(ProtobufParser handler, Protos.TwoWayChannelMessage msg) { + public void messageReceived(ProtobufConnection handler, Protos.TwoWayChannelMessage msg) { client1MessageReceived.set(msg); } @Override - public void connectionOpen(ProtobufParser handler) { + public void connectionOpen(ProtobufConnection handler) { client1ConnectionOpen.set(null); } @Override - public void connectionClosed(ProtobufParser handler) { + public void connectionClosed(ProtobufConnection handler) { client1ConnectionClosed.set(null); } }, Protos.TwoWayChannelMessage.getDefaultInstance(), 1000, 0); @@ -582,20 +582,20 @@ public class NetworkAbstractionTests { client1ConnectionOpen.get(); serverConnection1Open.get(); - ProtobufParser client2Handler = new ProtobufParser( - new ProtobufParser.Listener() { + ProtobufConnection client2Handler = new ProtobufConnection( + new ProtobufConnection.Listener() { @Override - public void messageReceived(ProtobufParser handler, Protos.TwoWayChannelMessage msg) { + public void messageReceived(ProtobufConnection handler, Protos.TwoWayChannelMessage msg) { client2MessageReceived.set(msg); } @Override - public void connectionOpen(ProtobufParser handler) { + public void connectionOpen(ProtobufConnection handler) { client2ConnectionOpen.set(null); } @Override - public void connectionClosed(ProtobufParser handler) { + public void connectionClosed(ProtobufConnection handler) { client2ConnectionClosed.set(null); } }, Protos.TwoWayChannelMessage.getDefaultInstance(), 1000, 0); @@ -604,20 +604,20 @@ public class NetworkAbstractionTests { client2ConnectionOpen.get(); serverConnection2Open.get(); - ProtobufParser client3Handler = new ProtobufParser( - new ProtobufParser.Listener() { + ProtobufConnection client3Handler = new ProtobufConnection( + new ProtobufConnection.Listener() { @Override - public void messageReceived(ProtobufParser handler, Protos.TwoWayChannelMessage msg) { + public void messageReceived(ProtobufConnection handler, Protos.TwoWayChannelMessage msg) { client3MessageReceived.set(msg); } @Override - public void connectionOpen(ProtobufParser handler) { + public void connectionOpen(ProtobufConnection handler) { client3ConnectionOpen.set(null); } @Override - public synchronized void connectionClosed(ProtobufParser handler) { + public synchronized void connectionClosed(ProtobufConnection handler) { checkState(!client3ConnectionClosed.isDone()); client3ConnectionClosed.set(null); }