3
0
mirror of https://github.com/Qortal/altcoinj.git synced 2025-02-13 10:45:51 +00:00

Name change: ProtobufParser → ProtobufConnection, StreamParser → StreamConnection

This commit is contained in:
ollekullberg 2015-08-26 11:07:22 +02:00
parent 74af6a2c8a
commit ebc888d282
18 changed files with 200 additions and 195 deletions

View File

@ -18,6 +18,7 @@ package org.bitcoinj.core;
import com.google.common.base.*;
import com.google.common.base.Objects;
import org.bitcoinj.net.StreamConnection;
import org.bitcoinj.core.listeners.PeerConnectionEventListener;
import org.bitcoinj.core.listeners.PeerDataEventListener;
import org.bitcoinj.store.BlockStore;
@ -162,9 +163,9 @@ public class Peer extends PeerSocketHandler {
*
* <p>Note that this does <b>NOT</b> make a connection to the given remoteAddress, it only creates a handler for a
* connection. If you want to create a one-off connection, create a Peer and pass it to
* {@link org.bitcoinj.net.NioClientManager#openConnection(java.net.SocketAddress, org.bitcoinj.net.StreamParser)}
* {@link org.bitcoinj.net.NioClientManager#openConnection(java.net.SocketAddress, StreamConnection)}
* or
* {@link org.bitcoinj.net.NioClient#NioClient(java.net.SocketAddress, org.bitcoinj.net.StreamParser, int)}.</p>
* {@link org.bitcoinj.net.NioClient#NioClient(java.net.SocketAddress, StreamConnection, int)}.</p>
*
* <p>The remoteAddress provided should match the remote address of the peer which is being connected to, and is
* used to keep track of which peers relayed transactions and offer more descriptive logging.</p>
@ -180,9 +181,9 @@ public class Peer extends PeerSocketHandler {
*
* <p>Note that this does <b>NOT</b> make a connection to the given remoteAddress, it only creates a handler for a
* connection. If you want to create a one-off connection, create a Peer and pass it to
* {@link org.bitcoinj.net.NioClientManager#openConnection(java.net.SocketAddress, org.bitcoinj.net.StreamParser)}
* {@link org.bitcoinj.net.NioClientManager#openConnection(java.net.SocketAddress, StreamConnection)}
* or
* {@link org.bitcoinj.net.NioClient#NioClient(java.net.SocketAddress, org.bitcoinj.net.StreamParser, int)}.</p>
* {@link org.bitcoinj.net.NioClient#NioClient(java.net.SocketAddress, StreamConnection, int)}.</p>
*
* <p>The remoteAddress provided should match the remote address of the peer which is being connected to, and is
* used to keep track of which peers relayed transactions and offer more descriptive logging.</p>
@ -199,9 +200,9 @@ public class Peer extends PeerSocketHandler {
*
* <p>Note that this does <b>NOT</b> make a connection to the given remoteAddress, it only creates a handler for a
* connection. If you want to create a one-off connection, create a Peer and pass it to
* {@link org.bitcoinj.net.NioClientManager#openConnection(java.net.SocketAddress, org.bitcoinj.net.StreamParser)}
* {@link org.bitcoinj.net.NioClientManager#openConnection(java.net.SocketAddress, StreamConnection)}
* or
* {@link org.bitcoinj.net.NioClient#NioClient(java.net.SocketAddress, org.bitcoinj.net.StreamParser, int)}.</p>
* {@link org.bitcoinj.net.NioClient#NioClient(java.net.SocketAddress, StreamConnection, int)}.</p>
*
* <p>The remoteAddress provided should match the remote address of the peer which is being connected to, and is
* used to keep track of which peers relayed transactions and offer more descriptive logging.</p>
@ -232,9 +233,9 @@ public class Peer extends PeerSocketHandler {
*
* <p>Note that this does <b>NOT</b> make a connection to the given remoteAddress, it only creates a handler for a
* connection. If you want to create a one-off connection, create a Peer and pass it to
* {@link org.bitcoinj.net.NioClientManager#openConnection(java.net.SocketAddress, org.bitcoinj.net.StreamParser)}
* {@link org.bitcoinj.net.NioClientManager#openConnection(java.net.SocketAddress, StreamConnection)}
* or
* {@link org.bitcoinj.net.NioClient#NioClient(java.net.SocketAddress, org.bitcoinj.net.StreamParser, int)}.</p>
* {@link org.bitcoinj.net.NioClient#NioClient(java.net.SocketAddress, StreamConnection, int)}.</p>
*
* <p>The remoteAddress provided should match the remote address of the peer which is being connected to, and is
* used to keep track of which peers relayed transactions and offer more descriptive logging.</p>

View File

@ -18,7 +18,7 @@ package org.bitcoinj.core;
import org.bitcoinj.net.AbstractTimeoutHandler;
import org.bitcoinj.net.MessageWriteTarget;
import org.bitcoinj.net.StreamParser;
import org.bitcoinj.net.StreamConnection;
import org.bitcoinj.utils.Threading;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
@ -39,7 +39,7 @@ import static com.google.common.base.Preconditions.*;
* Handles high-level message (de)serialization for peers, acting as the bridge between the
* {@link org.bitcoinj.net} classes and {@link Peer}.
*/
public abstract class PeerSocketHandler extends AbstractTimeoutHandler implements StreamParser {
public abstract class PeerSocketHandler extends AbstractTimeoutHandler implements StreamConnection {
private static final Logger log = LoggerFactory.getLogger(PeerSocketHandler.class);
private final MessageSerializer serializer;

View File

@ -30,7 +30,7 @@ import java.util.*;
import static com.google.common.base.Preconditions.*;
/**
* <p>Creates a simple connection to a server using a {@link StreamParser} to process data.</p>
* <p>Creates a simple connection to a server using a {@link StreamConnection} to process data.</p>
*
* <p>Generally, using {@link NioClient} and {@link NioClientManager} should be preferred over {@link BlockingClient}
* and {@link BlockingClientManager}, unless you wish to connect over a proxy or use some other network settings that
@ -47,10 +47,10 @@ public class BlockingClient implements MessageWriteTarget {
private SettableFuture<SocketAddress> connectFuture;
/**
* <p>Creates a new client to the given server address using the given {@link StreamParser} to decode the data.
* The given parser <b>MUST</b> be unique to this object. This does not block while waiting for the connection to
* open, but will call either the {@link StreamParser#connectionOpened()} or
* {@link StreamParser#connectionClosed()} callback on the created network event processing thread.</p>
* <p>Creates a new client to the given server address using the given {@link StreamConnection} to decode the data.
* The given connection <b>MUST</b> be unique to this object. This does not block while waiting for the connection to
* open, but will call either the {@link StreamConnection#connectionOpened()} or
* {@link StreamConnection#connectionClosed()} callback on the created network event processing thread.</p>
*
* @param connectTimeoutMillis The connect timeout set on the connection (in milliseconds). 0 is interpreted as no
* timeout.
@ -58,13 +58,13 @@ public class BlockingClient implements MessageWriteTarget {
* how this client connects to the internet. If not sure, use SocketFactory.getDefault()
* @param clientSet A set which this object will add itself to after initialization, and then remove itself from
*/
public BlockingClient(final SocketAddress serverAddress, final StreamParser parser,
public BlockingClient(final SocketAddress serverAddress, final StreamConnection connection,
final int connectTimeoutMillis, final SocketFactory socketFactory,
@Nullable final Set<BlockingClient> clientSet) throws IOException {
connectFuture = SettableFuture.create();
// Try to fit at least one message in the network buffer, but place an upper and lower limit on its size to make
// sure it doesnt get too large or have to call read too often.
parser.setWriteTarget(this);
connection.setWriteTarget(this);
socket = socketFactory.createSocket();
final Context context = Context.get();
Thread t = new Thread() {
@ -75,10 +75,10 @@ public class BlockingClient implements MessageWriteTarget {
clientSet.add(BlockingClient.this);
try {
socket.connect(serverAddress, connectTimeoutMillis);
parser.connectionOpened();
connection.connectionOpened();
connectFuture.set(serverAddress);
InputStream stream = socket.getInputStream();
runReadLoop(stream, parser);
runReadLoop(stream, connection);
} catch (Exception e) {
if (!vCloseRequested) {
log.error("Error trying to open/read from connection: {}: {}", serverAddress, e.getMessage());
@ -92,7 +92,7 @@ public class BlockingClient implements MessageWriteTarget {
}
if (clientSet != null)
clientSet.remove(BlockingClient.this);
parser.connectionClosed();
connection.connectionClosed();
}
}
};
@ -103,10 +103,10 @@ public class BlockingClient implements MessageWriteTarget {
/**
* A blocking call that never returns, except by throwing an exception. It reads bytes from the input stream
* and feeds them to the provided {@link StreamParser}, for example, a {@link Peer}.
* and feeds them to the provided {@link StreamConnection}, for example, a {@link Peer}.
*/
public static void runReadLoop(InputStream stream, StreamParser parser) throws Exception {
ByteBuffer dbuf = ByteBuffer.allocateDirect(Math.min(Math.max(parser.getMaxMessageSize(), BUFFER_SIZE_LOWER_BOUND), BUFFER_SIZE_UPPER_BOUND));
public static void runReadLoop(InputStream stream, StreamConnection connection) throws Exception {
ByteBuffer dbuf = ByteBuffer.allocateDirect(Math.min(Math.max(connection.getMaxMessageSize(), BUFFER_SIZE_LOWER_BOUND), BUFFER_SIZE_UPPER_BOUND));
byte[] readBuff = new byte[dbuf.capacity()];
while (true) {
// TODO Kill the message duplication here
@ -117,9 +117,9 @@ public class BlockingClient implements MessageWriteTarget {
dbuf.put(readBuff, 0, read);
// "flip" the buffer - setting the limit to the current position and setting position to 0
dbuf.flip();
// Use parser.receiveBytes's return value as a double-check that it stopped reading at the right
// Use connection.receiveBytes's return value as a double-check that it stopped reading at the right
// location
int bytesConsumed = parser.receiveBytes(dbuf);
int bytesConsumed = connection.receiveBytes(dbuf);
checkState(dbuf.position() == bytesConsumed);
// Now drop the bytes which were read by compacting dbuf (resetting limit and keeping relative
// position)
@ -128,7 +128,7 @@ public class BlockingClient implements MessageWriteTarget {
}
/**
* Closes the connection to the server, triggering the {@link StreamParser#connectionClosed()}
* Closes the connection to the server, triggering the {@link StreamConnection#connectionClosed()}
* event on the network-handling thread where all callbacks occur.
*/
@Override

View File

@ -55,11 +55,11 @@ public class BlockingClientManager extends AbstractIdleService implements Client
}
@Override
public ListenableFuture<SocketAddress> openConnection(SocketAddress serverAddress, StreamParser parser) {
public ListenableFuture<SocketAddress> openConnection(SocketAddress serverAddress, StreamConnection connection) {
try {
if (!isRunning())
throw new IllegalStateException();
return new BlockingClient(serverAddress, parser, connectTimeoutMillis, socketFactory, clients).getConnectFuture();
return new BlockingClient(serverAddress, connection, connectTimeoutMillis, socketFactory, clients).getConnectFuture();
} catch (IOException e) {
throw new RuntimeException(e); // This should only happen if we are, eg, out of system resources
}

View File

@ -30,10 +30,10 @@ import java.net.SocketAddress;
*/
public interface ClientConnectionManager extends Service {
/**
* Creates a new connection to the given address, with the given parser used to handle incoming data. Any errors
* Creates a new connection to the given address, with the given connection used to handle incoming data. Any errors
* that occur during connection will be returned in the given future, including errors that can occur immediately.
*/
ListenableFuture<SocketAddress> openConnection(SocketAddress serverAddress, StreamParser parser);
ListenableFuture<SocketAddress> openConnection(SocketAddress serverAddress, StreamConnection connection);
/** Gets the number of connected peers */
int getConnectedClientCount();

View File

@ -57,7 +57,7 @@ class ConnectionHandler implements MessageWriteTarget {
@GuardedBy("lock") private final ByteBuffer readBuff;
@GuardedBy("lock") private final SocketChannel channel;
@GuardedBy("lock") private final SelectionKey key;
@GuardedBy("lock") StreamParser parser;
@GuardedBy("lock") StreamConnection connection;
@GuardedBy("lock") private boolean closeCalled = false;
@GuardedBy("lock") private long bytesToWriteRemaining = 0;
@ -65,30 +65,30 @@ class ConnectionHandler implements MessageWriteTarget {
private Set<ConnectionHandler> connectedHandlers;
public ConnectionHandler(StreamParserFactory parserFactory, SelectionKey key) throws IOException {
this(parserFactory.getNewParser(((SocketChannel) key.channel()).socket().getInetAddress(), ((SocketChannel) key.channel()).socket().getPort()), key);
if (parser == null)
throw new IOException("Parser factory.getNewParser returned null");
public ConnectionHandler(StreamConnectionFactory connectionFactory, SelectionKey key) throws IOException {
this(connectionFactory.getNewConnection(((SocketChannel) key.channel()).socket().getInetAddress(), ((SocketChannel) key.channel()).socket().getPort()), key);
if (connection == null)
throw new IOException("Parser factory.getNewConnection returned null");
}
private ConnectionHandler(@Nullable StreamParser parser, SelectionKey key) {
private ConnectionHandler(@Nullable StreamConnection connection, SelectionKey key) {
this.key = key;
this.channel = checkNotNull(((SocketChannel)key.channel()));
if (parser == null) {
if (connection == null) {
readBuff = null;
return;
}
this.parser = parser;
readBuff = ByteBuffer.allocateDirect(Math.min(Math.max(parser.getMaxMessageSize(), BUFFER_SIZE_LOWER_BOUND), BUFFER_SIZE_UPPER_BOUND));
parser.setWriteTarget(this); // May callback into us (eg closeConnection() now)
this.connection = connection;
readBuff = ByteBuffer.allocateDirect(Math.min(Math.max(connection.getMaxMessageSize(), BUFFER_SIZE_LOWER_BOUND), BUFFER_SIZE_UPPER_BOUND));
connection.setWriteTarget(this); // May callback into us (eg closeConnection() now)
connectedHandlers = null;
}
public ConnectionHandler(StreamParser parser, SelectionKey key, Set<ConnectionHandler> connectedHandlers) {
this(checkNotNull(parser), key);
public ConnectionHandler(StreamConnection connection, SelectionKey key, Set<ConnectionHandler> connectedHandlers) {
this(checkNotNull(connection), key);
// closeConnection() may have already happened because we invoked the other c'tor above, which called
// parser.setWriteTarget which might have re-entered already. In this case we shouldn't add ourselves
// connection.setWriteTarget which might have re-entered already. In this case we shouldn't add ourselves
// to the connectedHandlers set.
lock.lock();
try {
@ -191,7 +191,7 @@ class ConnectionHandler implements MessageWriteTarget {
}
if (callClosed) {
checkState(connectedHandlers == null || connectedHandlers.remove(this));
parser.connectionClosed();
connection.connectionClosed();
}
}
@ -208,7 +208,7 @@ class ConnectionHandler implements MessageWriteTarget {
return;
}
if (key.isReadable()) {
// Do a socket read and invoke the parser's receiveBytes message
// Do a socket read and invoke the connection's receiveBytes message
int read = handler.channel.read(handler.readBuff);
if (read == 0)
return; // Was probably waiting on a write
@ -219,8 +219,8 @@ class ConnectionHandler implements MessageWriteTarget {
}
// "flip" the buffer - setting the limit to the current position and setting position to 0
handler.readBuff.flip();
// Use parser.receiveBytes's return value as a check that it stopped reading at the right location
int bytesConsumed = checkNotNull(handler.parser).receiveBytes(handler.readBuff);
// Use connection.receiveBytes's return value as a check that it stopped reading at the right location
int bytesConsumed = checkNotNull(handler.connection).receiveBytes(handler.readBuff);
checkState(handler.readBuff.position() == bytesConsumed);
// Now drop the bytes which were read by compacting readBuff (resetting limit and keeping relative
// position)
@ -230,7 +230,7 @@ class ConnectionHandler implements MessageWriteTarget {
handler.tryWriteBytes();
} catch (Exception e) {
// This can happen eg if the channel closes while the thread is about to get killed
// (ClosedByInterruptException), or if handler.parser.receiveBytes throws something
// (ClosedByInterruptException), or if handler.connection.receiveBytes throws something
Throwable t = Throwables.getRootCause(e);
log.warn("Error handling SelectionKey: {}", t.getMessage() != null ? t.getMessage() : t.getClass().getName());
handler.closeConnection();

View File

@ -27,7 +27,7 @@ public interface MessageWriteTarget {
*/
void writeBytes(byte[] message) throws IOException;
/**
* Closes the connection to the server, triggering the {@link StreamParser#connectionClosed()}
* Closes the connection to the server, triggering the {@link StreamConnection#connectionClosed()}
* event on the network-handling thread where all callbacks occur.
*/
void closeConnection();

View File

@ -26,7 +26,7 @@ import java.net.*;
import java.nio.*;
/**
* Creates a simple connection to a server using a {@link StreamParser} to process data.
* Creates a simple connection to a server using a {@link StreamConnection} to process data.
*/
public class NioClient implements MessageWriteTarget {
private static final Logger log = LoggerFactory.getLogger(NioClient.class);
@ -34,13 +34,13 @@ public class NioClient implements MessageWriteTarget {
private final Handler handler;
private final NioClientManager manager = new NioClientManager();
class Handler extends AbstractTimeoutHandler implements StreamParser {
private final StreamParser upstreamParser;
class Handler extends AbstractTimeoutHandler implements StreamConnection {
private final StreamConnection upstreamConnection;
private MessageWriteTarget writeTarget;
private boolean closeOnOpen = false;
private boolean closeCalled = false;
Handler(StreamParser upstreamParser, int connectTimeoutMillis) {
this.upstreamParser = upstreamParser;
Handler(StreamConnection upstreamConnection, int connectTimeoutMillis) {
this.upstreamConnection = upstreamConnection;
setSocketTimeout(connectTimeoutMillis);
setTimeoutEnabled(true);
}
@ -56,19 +56,19 @@ public class NioClient implements MessageWriteTarget {
manager.stopAsync();
if (!closeCalled) {
closeCalled = true;
upstreamParser.connectionClosed();
upstreamConnection.connectionClosed();
}
}
@Override
public synchronized void connectionOpened() {
if (!closeOnOpen)
upstreamParser.connectionOpened();
upstreamConnection.connectionOpened();
}
@Override
public int receiveBytes(ByteBuffer buff) throws Exception {
return upstreamParser.receiveBytes(buff);
return upstreamConnection.receiveBytes(buff);
}
@Override
@ -78,26 +78,26 @@ public class NioClient implements MessageWriteTarget {
else {
setTimeoutEnabled(false);
this.writeTarget = writeTarget;
upstreamParser.setWriteTarget(writeTarget);
upstreamConnection.setWriteTarget(writeTarget);
}
}
@Override
public int getMaxMessageSize() {
return upstreamParser.getMaxMessageSize();
return upstreamConnection.getMaxMessageSize();
}
}
/**
* <p>Creates a new client to the given server address using the given {@link StreamParser} to decode the data.
* The given parser <b>MUST</b> be unique to this object. This does not block while waiting for the connection to
* open, but will call either the {@link StreamParser#connectionOpened()} or
* {@link StreamParser#connectionClosed()} callback on the created network event processing thread.</p>
* <p>Creates a new client to the given server address using the given {@link StreamConnection} to decode the data.
* The given connection <b>MUST</b> be unique to this object. This does not block while waiting for the connection to
* open, but will call either the {@link StreamConnection#connectionOpened()} or
* {@link StreamConnection#connectionClosed()} callback on the created network event processing thread.</p>
*
* @param connectTimeoutMillis The connect timeout set on the connection (in milliseconds). 0 is interpreted as no
* timeout.
*/
public NioClient(final SocketAddress serverAddress, final StreamParser parser,
public NioClient(final SocketAddress serverAddress, final StreamConnection parser,
final int connectTimeoutMillis) throws IOException {
manager.startAsync();
manager.awaitRunning();

View File

@ -40,11 +40,11 @@ public class NioClientManager extends AbstractExecutionThreadService implements
class PendingConnect {
SocketChannel sc;
StreamParser parser;
StreamConnection connection;
SocketAddress address;
SettableFuture<SocketAddress> future = SettableFuture.create();
PendingConnect(SocketChannel sc, StreamParser parser, SocketAddress address) { this.sc = sc; this.parser = parser; this.address = address; }
PendingConnect(SocketChannel sc, StreamConnection connection, SocketAddress address) { this.sc = sc; this.connection = connection; this.address = address; }
}
final Queue<PendingConnect> newConnectionChannels = new LinkedBlockingQueue<PendingConnect>();
@ -57,14 +57,14 @@ public class NioClientManager extends AbstractExecutionThreadService implements
if (key.isValid() && key.isConnectable()) { // ie a client connection which has finished the initial connect process
// Create a ConnectionHandler and hook everything together
PendingConnect data = (PendingConnect) key.attachment();
StreamParser parser = data.parser;
StreamConnection connection = data.connection;
SocketChannel sc = (SocketChannel) key.channel();
ConnectionHandler handler = new ConnectionHandler(parser, key, connectedHandlers);
ConnectionHandler handler = new ConnectionHandler(connection, key, connectedHandlers);
try {
if (sc.finishConnect()) {
log.info("Successfully connected to {}", sc.socket().getRemoteSocketAddress());
key.interestOps((key.interestOps() | SelectionKey.OP_READ) & ~SelectionKey.OP_CONNECT).attach(handler);
parser.connectionOpened();
connection.connectionOpened();
data.future.set(data.address);
} else {
log.warn("Failed to connect to {}", sc.socket().getRemoteSocketAddress());
@ -145,15 +145,15 @@ public class NioClientManager extends AbstractExecutionThreadService implements
}
@Override
public ListenableFuture<SocketAddress> openConnection(SocketAddress serverAddress, StreamParser parser) {
public ListenableFuture<SocketAddress> openConnection(SocketAddress serverAddress, StreamConnection connection) {
if (!isRunning())
throw new IllegalStateException();
// Create a new connection, give it a parser as an attachment
// Create a new connection, give it a connection as an attachment
try {
SocketChannel sc = SocketChannel.open();
sc.configureBlocking(false);
sc.connect(serverAddress);
PendingConnect data = new PendingConnect(sc, parser, serverAddress);
PendingConnect data = new PendingConnect(sc, connection, serverAddress);
newConnectionChannels.offer(data);
selector.wakeup();
return data.future;

View File

@ -28,13 +28,13 @@ import com.google.common.util.concurrent.AbstractExecutionThreadService;
import org.slf4j.LoggerFactory;
/**
* Creates a simple server listener which listens for incoming client connections and uses a {@link StreamParser} to
* Creates a simple server listener which listens for incoming client connections and uses a {@link StreamConnection} to
* process data.
*/
public class NioServer extends AbstractExecutionThreadService {
private static final org.slf4j.Logger log = LoggerFactory.getLogger(NioServer.class);
private final StreamParserFactory parserFactory;
private final StreamConnectionFactory connectionFactory;
private final ServerSocketChannel sc;
@VisibleForTesting final Selector selector;
@ -42,14 +42,14 @@ public class NioServer extends AbstractExecutionThreadService {
// Handle a SelectionKey which was selected
private void handleKey(Selector selector, SelectionKey key) throws IOException {
if (key.isValid() && key.isAcceptable()) {
// Accept a new connection, give it a parser as an attachment
// Accept a new connection, give it a stream connection as an attachment
SocketChannel newChannel = sc.accept();
newChannel.configureBlocking(false);
SelectionKey newKey = newChannel.register(selector, SelectionKey.OP_READ);
try {
ConnectionHandler handler = new ConnectionHandler(parserFactory, newKey);
ConnectionHandler handler = new ConnectionHandler(connectionFactory, newKey);
newKey.attach(handler);
handler.parser.connectionOpened();
handler.connection.connectionOpened();
} catch (IOException e) {
// This can happen if ConnectionHandler's call to get a new handler returned null
log.error("Error handling new connection", Throwables.getRootCause(e).getMessage());
@ -62,12 +62,12 @@ public class NioServer extends AbstractExecutionThreadService {
/**
* Creates a new server which is capable of listening for incoming connections and processing client provided data
* using {@link StreamParser}s created by the given {@link StreamParserFactory}
* using {@link StreamConnection}s created by the given {@link StreamConnectionFactory}
*
* @throws IOException If there is an issue opening the server socket or binding fails for some reason
*/
public NioServer(final StreamParserFactory parserFactory, InetSocketAddress bindAddress) throws IOException {
this.parserFactory = parserFactory;
public NioServer(final StreamConnectionFactory connectionFactory, InetSocketAddress bindAddress) throws IOException {
this.connectionFactory = connectionFactory;
sc = ServerSocketChannel.open();
sc.configureBlocking(false);

View File

@ -40,22 +40,24 @@ import static com.google.common.base.Preconditions.checkState;
*
* <p>Messages are encoded with a 4-byte signed integer (big endian) prefix to indicate their length followed by the
* serialized protobuf</p>
*
* <p>(Used to be called ProtobufParser)</p>
*/
public class ProtobufParser<MessageType extends MessageLite> extends AbstractTimeoutHandler implements StreamParser {
private static final Logger log = LoggerFactory.getLogger(ProtobufParser.class);
public class ProtobufConnection<MessageType extends MessageLite> extends AbstractTimeoutHandler implements StreamConnection {
private static final Logger log = LoggerFactory.getLogger(ProtobufConnection.class);
/**
* An interface which can be implemented to handle callbacks as new messages are generated and socket events occur.
* @param <MessageType> The protobuf type which is used on this socket.
* This <b>MUST</b> match the MessageType used in the parent {@link ProtobufParser}
* This <b>MUST</b> match the MessageType used in the parent {@link ProtobufConnection}
*/
public interface Listener<MessageType extends MessageLite> {
/** Called when a new protobuf is received from the remote side. */
void messageReceived(ProtobufParser<MessageType> handler, MessageType msg);
void messageReceived(ProtobufConnection<MessageType> handler, MessageType msg);
/** Called when the connection is opened and available for writing data to. */
void connectionOpen(ProtobufParser<MessageType> handler);
void connectionOpen(ProtobufConnection<MessageType> handler);
/** Called when the connection is closed and no more data should be provided. */
void connectionClosed(ProtobufParser<MessageType> handler);
void connectionClosed(ProtobufConnection<MessageType> handler);
}
// The callback listener
@ -73,7 +75,7 @@ public class ProtobufParser<MessageType extends MessageLite> extends AbstractTim
// attacking clients can be made to timeout/get blocked if they are sending crap to fill buffers.
@GuardedBy("lock") private int messageBytesOffset = 0;
@GuardedBy("lock") private byte[] messageBytes;
private final ReentrantLock lock = Threading.lock("ProtobufParser");
private final ReentrantLock lock = Threading.lock("ProtobufConnection");
@VisibleForTesting final AtomicReference<MessageWriteTarget> writeTarget = new AtomicReference<MessageWriteTarget>();
@ -88,7 +90,7 @@ public class ProtobufParser<MessageType extends MessageLite> extends AbstractTim
* @param timeoutMillis The timeout between messages before the connection is automatically closed. Only enabled
* after the connection is established.
*/
public ProtobufParser(Listener<MessageType> handler, MessageType prototype, int maxMessageSize, int timeoutMillis) {
public ProtobufConnection(Listener<MessageType> handler, MessageType prototype, int maxMessageSize, int timeoutMillis) {
this.handler = handler;
this.prototype = prototype;
this.maxMessageSize = Math.min(maxMessageSize, Integer.MAX_VALUE - 4);
@ -108,7 +110,7 @@ public class ProtobufParser<MessageType extends MessageLite> extends AbstractTim
}
/**
* Closes this connection, eventually triggering a {@link ProtobufParser.Listener#connectionClosed()} event.
* Closes this connection, eventually triggering a {@link ProtobufConnection.Listener#connectionClosed()} event.
*/
public void closeConnection() {
this.writeTarget.get().closeConnection();

View File

@ -21,8 +21,10 @@ import java.nio.ByteBuffer;
/**
* A generic handler which is used in {@link NioServer}, {@link NioClient} and {@link BlockingClient} to handle incoming
* data streams.
*
* Used to be callet StreamParser.
*/
public interface StreamParser {
public interface StreamConnection {
/** Called when the connection socket is closed */
void connectionClosed();
@ -31,7 +33,7 @@ public interface StreamParser {
/**
* <p>Called when new bytes are available from the remote end. This should only ever be called by the single
* writeTarget associated with any given StreamParser, multiple callers will likely confuse implementations.</p>
* writeTarget associated with any given StreamConnection, multiple callers will likely confuse implementations.</p>
*
* Implementers/callers must follow the following conventions exactly:
* <ul>
@ -51,7 +53,7 @@ public interface StreamParser {
int receiveBytes(ByteBuffer buff) throws Exception;
/**
* Called when this parser is attached to an upstream write target (ie a low-level connection handler). This
* Called when this connection is attached to an upstream write target (ie a low-level connection handler). This
* writeTarget should be stored and used to close the connection or write data to the socket.
*/
void setWriteTarget(MessageWriteTarget writeTarget);

View File

@ -20,14 +20,14 @@ import java.net.InetAddress;
import javax.annotation.Nullable;
/**
* A factory which generates new {@link StreamParser}s when a new connection is opened.
* A factory which generates new {@link StreamConnection}s when a new connection is opened.
*/
public interface StreamParserFactory {
public interface StreamConnectionFactory {
/**
* Returns a new handler or null to have the connection close.
* @param inetAddress The client's (IP) address
* @param port The remote port on the client side
*/
@Nullable
StreamParser getNewParser(InetAddress inetAddress, int port);
StreamConnection getNewConnection(InetAddress inetAddress, int port);
}

View File

@ -23,7 +23,7 @@ import org.bitcoinj.core.Sha256Hash;
import org.bitcoinj.core.Utils;
import org.bitcoinj.core.Wallet;
import org.bitcoinj.net.NioClient;
import org.bitcoinj.net.ProtobufParser;
import org.bitcoinj.net.ProtobufConnection;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
@ -43,7 +43,7 @@ public class PaymentChannelClientConnection {
private final SettableFuture<PaymentChannelClientConnection> channelOpenFuture = SettableFuture.create();
private final PaymentChannelClient channelClient;
private final ProtobufParser<Protos.TwoWayChannelMessage> wireParser;
private final ProtobufConnection<Protos.TwoWayChannelMessage> wireParser;
/**
* Attempts to open a new connection to and open a payment channel with the given host and port, blocking until the
@ -128,9 +128,9 @@ public class PaymentChannelClientConnection {
});
// And glue back in the opposite direction - network to the channelClient.
wireParser = new ProtobufParser<Protos.TwoWayChannelMessage>(new ProtobufParser.Listener<Protos.TwoWayChannelMessage>() {
wireParser = new ProtobufConnection<Protos.TwoWayChannelMessage>(new ProtobufConnection.Listener<Protos.TwoWayChannelMessage>() {
@Override
public void messageReceived(ProtobufParser<Protos.TwoWayChannelMessage> handler, Protos.TwoWayChannelMessage msg) {
public void messageReceived(ProtobufConnection<Protos.TwoWayChannelMessage> handler, Protos.TwoWayChannelMessage msg) {
try {
channelClient.receiveMessage(msg);
} catch (InsufficientMoneyException e) {
@ -140,12 +140,12 @@ public class PaymentChannelClientConnection {
}
@Override
public void connectionOpen(ProtobufParser<Protos.TwoWayChannelMessage> handler) {
public void connectionOpen(ProtobufConnection<Protos.TwoWayChannelMessage> handler) {
channelClient.connectionOpen();
}
@Override
public void connectionClosed(ProtobufParser<Protos.TwoWayChannelMessage> handler) {
public void connectionClosed(ProtobufConnection<Protos.TwoWayChannelMessage> handler) {
channelClient.connectionClosed();
channelOpenFuture.setException(new PaymentChannelCloseException("The TCP socket died",
PaymentChannelCloseException.CloseReason.CONNECTION_CLOSED));
@ -238,7 +238,7 @@ public class PaymentChannelClientConnection {
// we defined above will be called, which in turn will call wireParser.closeConnection(), which in turn will invoke
// NioClient.closeConnection(), which will then close the socket triggering interruption of the network
// thread it had created. That causes the background thread to die, which on its way out calls
// ProtobufParser.connectionClosed which invokes the connectionClosed method we defined above which in turn
// ProtobufConnection.connectionClosed which invokes the connectionClosed method we defined above which in turn
// then configures the open-future correctly and closes the state object. Phew!
try {
channelClient.settle();

View File

@ -22,8 +22,8 @@ import org.bitcoinj.core.Sha256Hash;
import org.bitcoinj.core.TransactionBroadcaster;
import org.bitcoinj.core.Wallet;
import org.bitcoinj.net.NioServer;
import org.bitcoinj.net.ProtobufParser;
import org.bitcoinj.net.StreamParserFactory;
import org.bitcoinj.net.ProtobufConnection;
import org.bitcoinj.net.StreamConnectionFactory;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.ByteString;
@ -90,14 +90,14 @@ public class PaymentChannelServerListener {
}
});
protobufHandlerListener = new ProtobufParser.Listener<Protos.TwoWayChannelMessage>() {
protobufHandlerListener = new ProtobufConnection.Listener<Protos.TwoWayChannelMessage>() {
@Override
public synchronized void messageReceived(ProtobufParser<Protos.TwoWayChannelMessage> handler, Protos.TwoWayChannelMessage msg) {
public synchronized void messageReceived(ProtobufConnection<Protos.TwoWayChannelMessage> handler, Protos.TwoWayChannelMessage msg) {
paymentChannelManager.receiveMessage(msg);
}
@Override
public synchronized void connectionClosed(ProtobufParser<Protos.TwoWayChannelMessage> handler) {
public synchronized void connectionClosed(ProtobufConnection<Protos.TwoWayChannelMessage> handler) {
paymentChannelManager.connectionClosed();
if (closeReason != null)
eventHandler.channelClosed(closeReason);
@ -107,7 +107,7 @@ public class PaymentChannelServerListener {
}
@Override
public synchronized void connectionOpen(ProtobufParser<Protos.TwoWayChannelMessage> handler) {
public synchronized void connectionOpen(ProtobufConnection<Protos.TwoWayChannelMessage> handler) {
ServerConnectionEventHandler eventHandler = eventHandlerFactory.onNewConnection(address);
if (eventHandler == null)
handler.closeConnection();
@ -118,7 +118,7 @@ public class PaymentChannelServerListener {
}
};
socketProtobufHandler = new ProtobufParser<Protos.TwoWayChannelMessage>
socketProtobufHandler = new ProtobufConnection<Protos.TwoWayChannelMessage>
(protobufHandlerListener, Protos.TwoWayChannelMessage.getDefaultInstance(), Short.MAX_VALUE, timeoutSeconds*1000);
}
@ -131,10 +131,10 @@ public class PaymentChannelServerListener {
private final PaymentChannelServer paymentChannelManager;
// The connection handler which puts/gets protobufs from the TCP socket
private final ProtobufParser<Protos.TwoWayChannelMessage> socketProtobufHandler;
private final ProtobufConnection<Protos.TwoWayChannelMessage> socketProtobufHandler;
// The listener which connects to socketProtobufHandler
private final ProtobufParser.Listener<Protos.TwoWayChannelMessage> protobufHandlerListener;
private final ProtobufConnection.Listener<Protos.TwoWayChannelMessage> protobufHandlerListener;
}
/**
@ -142,9 +142,9 @@ public class PaymentChannelServerListener {
* @throws Exception If binding to the given port fails (eg SocketException: Permission denied for privileged ports)
*/
public void bindAndStart(int port) throws Exception {
server = new NioServer(new StreamParserFactory() {
server = new NioServer(new StreamConnectionFactory() {
@Override
public ProtobufParser<Protos.TwoWayChannelMessage> getNewParser(InetAddress inetAddress, int port) {
public ProtobufConnection<Protos.TwoWayChannelMessage> getNewConnection(InetAddress inetAddress, int port) {
return new ServerHandler(new InetSocketAddress(inetAddress, port), timeoutSeconds).socketProtobufHandler;
}
}, new InetSocketAddress(port));

View File

@ -18,7 +18,7 @@ package org.bitcoinj.protocols.channels;
import org.bitcoinj.core.Coin;
import org.bitcoinj.core.Sha256Hash;
import org.bitcoinj.net.ProtobufParser;
import org.bitcoinj.net.ProtobufConnection;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.ByteString;
@ -31,10 +31,10 @@ import javax.annotation.Nullable;
* {@link PaymentChannelServerListener}
*/
public abstract class ServerConnectionEventHandler {
private ProtobufParser<Protos.TwoWayChannelMessage> connectionChannel;
private ProtobufConnection<Protos.TwoWayChannelMessage> connectionChannel;
// Called by ServerListener before channelOpen to set connectionChannel when it is ready to received application messages
// Also called with null to clear connectionChannel after channelClosed()
synchronized void setConnectionChannel(@Nullable ProtobufParser<Protos.TwoWayChannelMessage> connectionChannel) { this.connectionChannel = connectionChannel; }
synchronized void setConnectionChannel(@Nullable ProtobufConnection<Protos.TwoWayChannelMessage> connectionChannel) { this.connectionChannel = connectionChannel; }
/**
* <p>Closes the channel with the client (will generate a

View File

@ -111,10 +111,10 @@ public class TestWithNetworkConnections {
}
protected void startPeerServer(int i) throws IOException {
peerServers[i] = new NioServer(new StreamParserFactory() {
peerServers[i] = new NioServer(new StreamConnectionFactory() {
@Nullable
@Override
public StreamParser getNewParser(InetAddress inetAddress, int port) {
public StreamConnection getNewConnection(InetAddress inetAddress, int port) {
return new InboundMessageQueuer(params) {
@Override
public void connectionClosed() {

View File

@ -64,7 +64,7 @@ public class NetworkAbstractionTests {
channels = null;
}
private MessageWriteTarget openConnection(SocketAddress addr, ProtobufParser<Protos.TwoWayChannelMessage> parser) throws Exception {
private MessageWriteTarget openConnection(SocketAddress addr, ProtobufConnection<TwoWayChannelMessage> parser) throws Exception {
if (clientType == 0 || clientType == 1) {
channels.openConnection(addr, parser);
if (parser.writeTarget.get() == null)
@ -97,28 +97,28 @@ public class NetworkAbstractionTests {
final SettableFuture<Void> client2ConnectionOpened = SettableFuture.create();
final SettableFuture<Void> serverConnectionClosed = SettableFuture.create();
final SettableFuture<Void> client2Disconnected = SettableFuture.create();
NioServer server = new NioServer(new StreamParserFactory() {
NioServer server = new NioServer(new StreamConnectionFactory() {
boolean finishedFirst = false;
@Override
public ProtobufParser<TwoWayChannelMessage> getNewParser(InetAddress inetAddress, int port) {
public ProtobufConnection<TwoWayChannelMessage> getNewConnection(InetAddress inetAddress, int port) {
if (!finishedFirst) {
finishedFirst = true;
return null;
}
return new ProtobufParser<Protos.TwoWayChannelMessage>(new ProtobufParser.Listener<Protos.TwoWayChannelMessage>() {
return new ProtobufConnection<TwoWayChannelMessage>(new ProtobufConnection.Listener<Protos.TwoWayChannelMessage>() {
@Override
public void messageReceived(ProtobufParser<Protos.TwoWayChannelMessage> handler, Protos.TwoWayChannelMessage msg) {
public void messageReceived(ProtobufConnection<TwoWayChannelMessage> handler, Protos.TwoWayChannelMessage msg) {
handler.write(msg);
}
@Override
public void connectionOpen(ProtobufParser<Protos.TwoWayChannelMessage> handler) {
public void connectionOpen(ProtobufConnection<TwoWayChannelMessage> handler) {
serverConnectionOpen.set(null);
}
@Override
public void connectionClosed(ProtobufParser<Protos.TwoWayChannelMessage> handler) {
public void connectionClosed(ProtobufConnection<TwoWayChannelMessage> handler) {
serverConnectionClosed.set(null);
}
}, Protos.TwoWayChannelMessage.getDefaultInstance(), 1000, 0);
@ -127,20 +127,20 @@ public class NetworkAbstractionTests {
server.startAsync();
server.awaitRunning();
ProtobufParser<Protos.TwoWayChannelMessage> clientHandler = new ProtobufParser<Protos.TwoWayChannelMessage>(
new ProtobufParser.Listener<Protos.TwoWayChannelMessage>() {
ProtobufConnection<TwoWayChannelMessage> clientHandler = new ProtobufConnection<TwoWayChannelMessage>(
new ProtobufConnection.Listener<Protos.TwoWayChannelMessage>() {
@Override
public synchronized void messageReceived(ProtobufParser<Protos.TwoWayChannelMessage> handler, Protos.TwoWayChannelMessage msg) {
public synchronized void messageReceived(ProtobufConnection<TwoWayChannelMessage> handler, Protos.TwoWayChannelMessage msg) {
fail.set(true);
}
@Override
public void connectionOpen(ProtobufParser<Protos.TwoWayChannelMessage> handler) {
public void connectionOpen(ProtobufConnection<TwoWayChannelMessage> handler) {
client1ConnectionOpened.set(null);
}
@Override
public void connectionClosed(ProtobufParser<Protos.TwoWayChannelMessage> handler) {
public void connectionClosed(ProtobufConnection<TwoWayChannelMessage> handler) {
client1Disconnected.set(null);
}
}, Protos.TwoWayChannelMessage.getDefaultInstance(), 1000, 0);
@ -149,22 +149,22 @@ public class NetworkAbstractionTests {
client1ConnectionOpened.get();
client1Disconnected.get();
clientHandler = new ProtobufParser<Protos.TwoWayChannelMessage>(
new ProtobufParser.Listener<Protos.TwoWayChannelMessage>() {
clientHandler = new ProtobufConnection<TwoWayChannelMessage>(
new ProtobufConnection.Listener<Protos.TwoWayChannelMessage>() {
@Override
public synchronized void messageReceived(ProtobufParser<Protos.TwoWayChannelMessage> handler, Protos.TwoWayChannelMessage msg) {
public synchronized void messageReceived(ProtobufConnection<TwoWayChannelMessage> handler, Protos.TwoWayChannelMessage msg) {
if (client2MessageReceived.isDone())
fail.set(true);
client2MessageReceived.set(msg);
}
@Override
public void connectionOpen(ProtobufParser<Protos.TwoWayChannelMessage> handler) {
public void connectionOpen(ProtobufConnection<TwoWayChannelMessage> handler) {
client2ConnectionOpened.set(null);
}
@Override
public void connectionClosed(ProtobufParser<Protos.TwoWayChannelMessage> handler) {
public void connectionClosed(ProtobufConnection<TwoWayChannelMessage> handler) {
client2Disconnected.set(null);
}
}, Protos.TwoWayChannelMessage.getDefaultInstance(), 1000, 0);
@ -195,23 +195,23 @@ public class NetworkAbstractionTests {
final SettableFuture<Void> clientConnectionClosed = SettableFuture.create();
final SettableFuture<Protos.TwoWayChannelMessage> clientMessage1Received = SettableFuture.create();
final SettableFuture<Protos.TwoWayChannelMessage> clientMessage2Received = SettableFuture.create();
NioServer server = new NioServer(new StreamParserFactory() {
NioServer server = new NioServer(new StreamConnectionFactory() {
@Override
public ProtobufParser<TwoWayChannelMessage> getNewParser(InetAddress inetAddress, int port) {
return new ProtobufParser<Protos.TwoWayChannelMessage>(new ProtobufParser.Listener<Protos.TwoWayChannelMessage>() {
public ProtobufConnection<TwoWayChannelMessage> getNewConnection(InetAddress inetAddress, int port) {
return new ProtobufConnection<TwoWayChannelMessage>(new ProtobufConnection.Listener<Protos.TwoWayChannelMessage>() {
@Override
public void messageReceived(ProtobufParser<Protos.TwoWayChannelMessage> handler, Protos.TwoWayChannelMessage msg) {
public void messageReceived(ProtobufConnection<TwoWayChannelMessage> handler, Protos.TwoWayChannelMessage msg) {
handler.write(msg);
handler.write(msg);
}
@Override
public void connectionOpen(ProtobufParser<Protos.TwoWayChannelMessage> handler) {
public void connectionOpen(ProtobufConnection<TwoWayChannelMessage> handler) {
serverConnectionOpen.set(null);
}
@Override
public void connectionClosed(ProtobufParser<Protos.TwoWayChannelMessage> handler) {
public void connectionClosed(ProtobufConnection<TwoWayChannelMessage> handler) {
serverConnectionClosed.set(null);
}
}, Protos.TwoWayChannelMessage.getDefaultInstance(), 1000, 0);
@ -220,10 +220,10 @@ public class NetworkAbstractionTests {
server.startAsync();
server.awaitRunning();
ProtobufParser<Protos.TwoWayChannelMessage> clientHandler = new ProtobufParser<Protos.TwoWayChannelMessage>(
new ProtobufParser.Listener<Protos.TwoWayChannelMessage>() {
ProtobufConnection<TwoWayChannelMessage> clientHandler = new ProtobufConnection<TwoWayChannelMessage>(
new ProtobufConnection.Listener<Protos.TwoWayChannelMessage>() {
@Override
public synchronized void messageReceived(ProtobufParser<Protos.TwoWayChannelMessage> handler, Protos.TwoWayChannelMessage msg) {
public synchronized void messageReceived(ProtobufConnection<TwoWayChannelMessage> handler, Protos.TwoWayChannelMessage msg) {
if (clientMessage1Received.isDone())
clientMessage2Received.set(msg);
else
@ -231,12 +231,12 @@ public class NetworkAbstractionTests {
}
@Override
public void connectionOpen(ProtobufParser<Protos.TwoWayChannelMessage> handler) {
public void connectionOpen(ProtobufConnection<TwoWayChannelMessage> handler) {
clientConnectionOpen.set(null);
}
@Override
public void connectionClosed(ProtobufParser<Protos.TwoWayChannelMessage> handler) {
public void connectionClosed(ProtobufConnection<TwoWayChannelMessage> handler) {
clientConnectionClosed.set(null);
}
}, Protos.TwoWayChannelMessage.getDefaultInstance(), 1000, 0);
@ -273,17 +273,17 @@ public class NetworkAbstractionTests {
final SettableFuture<Void> clientConnection2Open = SettableFuture.create();
final SettableFuture<Void> serverConnection2Closed = SettableFuture.create();
final SettableFuture<Void> clientConnection2Closed = SettableFuture.create();
NioServer server = new NioServer(new StreamParserFactory() {
NioServer server = new NioServer(new StreamConnectionFactory() {
@Override
public ProtobufParser<Protos.TwoWayChannelMessage> getNewParser(InetAddress inetAddress, int port) {
return new ProtobufParser<Protos.TwoWayChannelMessage>(new ProtobufParser.Listener<Protos.TwoWayChannelMessage>() {
public ProtobufConnection<TwoWayChannelMessage> getNewConnection(InetAddress inetAddress, int port) {
return new ProtobufConnection<TwoWayChannelMessage>(new ProtobufConnection.Listener<Protos.TwoWayChannelMessage>() {
@Override
public void messageReceived(ProtobufParser<Protos.TwoWayChannelMessage> handler, Protos.TwoWayChannelMessage msg) {
public void messageReceived(ProtobufConnection<TwoWayChannelMessage> handler, Protos.TwoWayChannelMessage msg) {
fail.set(true);
}
@Override
public synchronized void connectionOpen(ProtobufParser<Protos.TwoWayChannelMessage> handler) {
public synchronized void connectionOpen(ProtobufConnection<TwoWayChannelMessage> handler) {
if (serverConnection1Open.isDone()) {
handler.setSocketTimeout(0);
serverConnection2Open.set(null);
@ -292,7 +292,7 @@ public class NetworkAbstractionTests {
}
@Override
public synchronized void connectionClosed(ProtobufParser<Protos.TwoWayChannelMessage> handler) {
public synchronized void connectionClosed(ProtobufConnection<TwoWayChannelMessage> handler) {
if (serverConnection1Closed.isDone()) {
serverConnection2Closed.set(null);
} else
@ -304,20 +304,20 @@ public class NetworkAbstractionTests {
server.startAsync();
server.awaitRunning();
openConnection(new InetSocketAddress("localhost", 4243), new ProtobufParser<Protos.TwoWayChannelMessage>(
new ProtobufParser.Listener<Protos.TwoWayChannelMessage>() {
openConnection(new InetSocketAddress("localhost", 4243), new ProtobufConnection<TwoWayChannelMessage>(
new ProtobufConnection.Listener<Protos.TwoWayChannelMessage>() {
@Override
public void messageReceived(ProtobufParser<Protos.TwoWayChannelMessage> handler, Protos.TwoWayChannelMessage msg) {
public void messageReceived(ProtobufConnection<TwoWayChannelMessage> handler, Protos.TwoWayChannelMessage msg) {
fail.set(true);
}
@Override
public void connectionOpen(ProtobufParser<Protos.TwoWayChannelMessage> handler) {
public void connectionOpen(ProtobufConnection<TwoWayChannelMessage> handler) {
clientConnection1Open.set(null);
}
@Override
public void connectionClosed(ProtobufParser<Protos.TwoWayChannelMessage> handler) {
public void connectionClosed(ProtobufConnection<TwoWayChannelMessage> handler) {
clientConnection1Closed.set(null);
}
}, Protos.TwoWayChannelMessage.getDefaultInstance(), 1000, 0));
@ -329,20 +329,20 @@ public class NetworkAbstractionTests {
serverConnection1Closed.get();
long closeDelayFinish = System.currentTimeMillis();
ProtobufParser<Protos.TwoWayChannelMessage> client2Handler = new ProtobufParser<Protos.TwoWayChannelMessage>(
new ProtobufParser.Listener<Protos.TwoWayChannelMessage>() {
ProtobufConnection<TwoWayChannelMessage> client2Handler = new ProtobufConnection<TwoWayChannelMessage>(
new ProtobufConnection.Listener<Protos.TwoWayChannelMessage>() {
@Override
public void messageReceived(ProtobufParser<Protos.TwoWayChannelMessage> handler, Protos.TwoWayChannelMessage msg) {
public void messageReceived(ProtobufConnection<TwoWayChannelMessage> handler, Protos.TwoWayChannelMessage msg) {
fail.set(true);
}
@Override
public void connectionOpen(ProtobufParser<Protos.TwoWayChannelMessage> handler) {
public void connectionOpen(ProtobufConnection<TwoWayChannelMessage> handler) {
clientConnection2Open.set(null);
}
@Override
public void connectionClosed(ProtobufParser<Protos.TwoWayChannelMessage> handler) {
public void connectionClosed(ProtobufConnection<TwoWayChannelMessage> handler) {
clientConnection2Closed.set(null);
}
}, Protos.TwoWayChannelMessage.getDefaultInstance(), 1000, 0);
@ -363,7 +363,7 @@ public class NetworkAbstractionTests {
@Test
public void largeDataTest() throws Exception {
/** Test various large-data handling, essentially testing {@link ProtobufParser#receiveBytes(java.nio.ByteBuffer)} */
/** Test various large-data handling, essentially testing {@link ProtobufConnection#receiveBytes(java.nio.ByteBuffer)} */
final SettableFuture<Void> serverConnectionOpen = SettableFuture.create();
final SettableFuture<Void> clientConnectionOpen = SettableFuture.create();
final SettableFuture<Void> serverConnectionClosed = SettableFuture.create();
@ -372,22 +372,22 @@ public class NetworkAbstractionTests {
final SettableFuture<Protos.TwoWayChannelMessage> clientMessage2Received = SettableFuture.create();
final SettableFuture<Protos.TwoWayChannelMessage> clientMessage3Received = SettableFuture.create();
final SettableFuture<Protos.TwoWayChannelMessage> clientMessage4Received = SettableFuture.create();
NioServer server = new NioServer(new StreamParserFactory() {
NioServer server = new NioServer(new StreamConnectionFactory() {
@Override
public ProtobufParser<Protos.TwoWayChannelMessage> getNewParser(InetAddress inetAddress, int port) {
return new ProtobufParser<Protos.TwoWayChannelMessage>(new ProtobufParser.Listener<Protos.TwoWayChannelMessage>() {
public ProtobufConnection<TwoWayChannelMessage> getNewConnection(InetAddress inetAddress, int port) {
return new ProtobufConnection<TwoWayChannelMessage>(new ProtobufConnection.Listener<Protos.TwoWayChannelMessage>() {
@Override
public void messageReceived(ProtobufParser<Protos.TwoWayChannelMessage> handler, Protos.TwoWayChannelMessage msg) {
public void messageReceived(ProtobufConnection<TwoWayChannelMessage> handler, Protos.TwoWayChannelMessage msg) {
handler.write(msg);
}
@Override
public void connectionOpen(ProtobufParser<Protos.TwoWayChannelMessage> handler) {
public void connectionOpen(ProtobufConnection<TwoWayChannelMessage> handler) {
serverConnectionOpen.set(null);
}
@Override
public void connectionClosed(ProtobufParser<Protos.TwoWayChannelMessage> handler) {
public void connectionClosed(ProtobufConnection<TwoWayChannelMessage> handler) {
serverConnectionClosed.set(null);
}
}, Protos.TwoWayChannelMessage.getDefaultInstance(), 0x10000, 0);
@ -396,10 +396,10 @@ public class NetworkAbstractionTests {
server.startAsync();
server.awaitRunning();
ProtobufParser<Protos.TwoWayChannelMessage> clientHandler = new ProtobufParser<Protos.TwoWayChannelMessage>(
new ProtobufParser.Listener<Protos.TwoWayChannelMessage>() {
ProtobufConnection<TwoWayChannelMessage> clientHandler = new ProtobufConnection<TwoWayChannelMessage>(
new ProtobufConnection.Listener<Protos.TwoWayChannelMessage>() {
@Override
public synchronized void messageReceived(ProtobufParser<Protos.TwoWayChannelMessage> handler, Protos.TwoWayChannelMessage msg) {
public synchronized void messageReceived(ProtobufConnection<TwoWayChannelMessage> handler, Protos.TwoWayChannelMessage msg) {
if (clientMessage1Received.isDone()) {
if (clientMessage2Received.isDone()) {
if (clientMessage3Received.isDone()) {
@ -415,12 +415,12 @@ public class NetworkAbstractionTests {
}
@Override
public void connectionOpen(ProtobufParser<Protos.TwoWayChannelMessage> handler) {
public void connectionOpen(ProtobufConnection<TwoWayChannelMessage> handler) {
clientConnectionOpen.set(null);
}
@Override
public void connectionClosed(ProtobufParser<Protos.TwoWayChannelMessage> handler) {
public void connectionClosed(ProtobufConnection<TwoWayChannelMessage> handler) {
clientConnectionClosed.set(null);
}
}, Protos.TwoWayChannelMessage.getDefaultInstance(), 0x10000, 0);
@ -523,17 +523,17 @@ public class NetworkAbstractionTests {
final SettableFuture<Protos.TwoWayChannelMessage> client1MessageReceived = SettableFuture.create();
final SettableFuture<Protos.TwoWayChannelMessage> client2MessageReceived = SettableFuture.create();
final SettableFuture<Protos.TwoWayChannelMessage> client3MessageReceived = SettableFuture.create();
NioServer server = new NioServer(new StreamParserFactory() {
NioServer server = new NioServer(new StreamConnectionFactory() {
@Override
public ProtobufParser<Protos.TwoWayChannelMessage> getNewParser(InetAddress inetAddress, int port) {
return new ProtobufParser<Protos.TwoWayChannelMessage>(new ProtobufParser.Listener<Protos.TwoWayChannelMessage>() {
public ProtobufConnection<TwoWayChannelMessage> getNewConnection(InetAddress inetAddress, int port) {
return new ProtobufConnection<TwoWayChannelMessage>(new ProtobufConnection.Listener<Protos.TwoWayChannelMessage>() {
@Override
public void messageReceived(ProtobufParser<Protos.TwoWayChannelMessage> handler, Protos.TwoWayChannelMessage msg) {
public void messageReceived(ProtobufConnection<TwoWayChannelMessage> handler, Protos.TwoWayChannelMessage msg) {
handler.write(msg);
}
@Override
public synchronized void connectionOpen(ProtobufParser<Protos.TwoWayChannelMessage> handler) {
public synchronized void connectionOpen(ProtobufConnection<TwoWayChannelMessage> handler) {
if (serverConnection1Open.isDone()) {
if (serverConnection2Open.isDone())
serverConnection3Open.set(null);
@ -544,7 +544,7 @@ public class NetworkAbstractionTests {
}
@Override
public synchronized void connectionClosed(ProtobufParser<Protos.TwoWayChannelMessage> handler) {
public synchronized void connectionClosed(ProtobufConnection<TwoWayChannelMessage> handler) {
if (serverConnectionClosed1.isDone()) {
if (serverConnectionClosed2.isDone()) {
checkState(!serverConnectionClosed3.isDone());
@ -560,20 +560,20 @@ public class NetworkAbstractionTests {
server.startAsync();
server.awaitRunning();
ProtobufParser<Protos.TwoWayChannelMessage> client1Handler = new ProtobufParser<Protos.TwoWayChannelMessage>(
new ProtobufParser.Listener<Protos.TwoWayChannelMessage>() {
ProtobufConnection<TwoWayChannelMessage> client1Handler = new ProtobufConnection<TwoWayChannelMessage>(
new ProtobufConnection.Listener<Protos.TwoWayChannelMessage>() {
@Override
public void messageReceived(ProtobufParser<Protos.TwoWayChannelMessage> handler, Protos.TwoWayChannelMessage msg) {
public void messageReceived(ProtobufConnection<TwoWayChannelMessage> handler, Protos.TwoWayChannelMessage msg) {
client1MessageReceived.set(msg);
}
@Override
public void connectionOpen(ProtobufParser<Protos.TwoWayChannelMessage> handler) {
public void connectionOpen(ProtobufConnection<TwoWayChannelMessage> handler) {
client1ConnectionOpen.set(null);
}
@Override
public void connectionClosed(ProtobufParser<Protos.TwoWayChannelMessage> handler) {
public void connectionClosed(ProtobufConnection<TwoWayChannelMessage> handler) {
client1ConnectionClosed.set(null);
}
}, Protos.TwoWayChannelMessage.getDefaultInstance(), 1000, 0);
@ -582,20 +582,20 @@ public class NetworkAbstractionTests {
client1ConnectionOpen.get();
serverConnection1Open.get();
ProtobufParser<Protos.TwoWayChannelMessage> client2Handler = new ProtobufParser<Protos.TwoWayChannelMessage>(
new ProtobufParser.Listener<Protos.TwoWayChannelMessage>() {
ProtobufConnection<TwoWayChannelMessage> client2Handler = new ProtobufConnection<TwoWayChannelMessage>(
new ProtobufConnection.Listener<Protos.TwoWayChannelMessage>() {
@Override
public void messageReceived(ProtobufParser<Protos.TwoWayChannelMessage> handler, Protos.TwoWayChannelMessage msg) {
public void messageReceived(ProtobufConnection<TwoWayChannelMessage> handler, Protos.TwoWayChannelMessage msg) {
client2MessageReceived.set(msg);
}
@Override
public void connectionOpen(ProtobufParser<Protos.TwoWayChannelMessage> handler) {
public void connectionOpen(ProtobufConnection<TwoWayChannelMessage> handler) {
client2ConnectionOpen.set(null);
}
@Override
public void connectionClosed(ProtobufParser<Protos.TwoWayChannelMessage> handler) {
public void connectionClosed(ProtobufConnection<TwoWayChannelMessage> handler) {
client2ConnectionClosed.set(null);
}
}, Protos.TwoWayChannelMessage.getDefaultInstance(), 1000, 0);
@ -604,20 +604,20 @@ public class NetworkAbstractionTests {
client2ConnectionOpen.get();
serverConnection2Open.get();
ProtobufParser<Protos.TwoWayChannelMessage> client3Handler = new ProtobufParser<Protos.TwoWayChannelMessage>(
new ProtobufParser.Listener<Protos.TwoWayChannelMessage>() {
ProtobufConnection<TwoWayChannelMessage> client3Handler = new ProtobufConnection<TwoWayChannelMessage>(
new ProtobufConnection.Listener<Protos.TwoWayChannelMessage>() {
@Override
public void messageReceived(ProtobufParser<Protos.TwoWayChannelMessage> handler, Protos.TwoWayChannelMessage msg) {
public void messageReceived(ProtobufConnection<TwoWayChannelMessage> handler, Protos.TwoWayChannelMessage msg) {
client3MessageReceived.set(msg);
}
@Override
public void connectionOpen(ProtobufParser<Protos.TwoWayChannelMessage> handler) {
public void connectionOpen(ProtobufConnection<TwoWayChannelMessage> handler) {
client3ConnectionOpen.set(null);
}
@Override
public synchronized void connectionClosed(ProtobufParser<Protos.TwoWayChannelMessage> handler) {
public synchronized void connectionClosed(ProtobufConnection<TwoWayChannelMessage> handler) {
checkState(!client3ConnectionClosed.isDone());
client3ConnectionClosed.set(null);
}