3
0
mirror of https://github.com/Qortal/altcoinj.git synced 2025-02-13 18:55:52 +00:00

BlockingClient: make it easier to manually start a read/write loop.

This commit is contained in:
Mike Hearn 2015-07-22 22:27:19 +02:00
parent 12f3d2dbca
commit 551d01a625

View File

@ -42,7 +42,6 @@ public class BlockingClient implements MessageWriteTarget {
private static final int BUFFER_SIZE_LOWER_BOUND = 4096; private static final int BUFFER_SIZE_LOWER_BOUND = 4096;
private static final int BUFFER_SIZE_UPPER_BOUND = 65536; private static final int BUFFER_SIZE_UPPER_BOUND = 65536;
private final ByteBuffer dbuf;
private Socket socket; private Socket socket;
private volatile boolean vCloseRequested = false; private volatile boolean vCloseRequested = false;
private SettableFuture<SocketAddress> connectFuture; private SettableFuture<SocketAddress> connectFuture;
@ -60,11 +59,11 @@ public class BlockingClient implements MessageWriteTarget {
* @param clientSet A set which this object will add itself to after initialization, and then remove itself from * @param clientSet A set which this object will add itself to after initialization, and then remove itself from
*/ */
public BlockingClient(final SocketAddress serverAddress, final StreamParser parser, public BlockingClient(final SocketAddress serverAddress, final StreamParser parser,
final int connectTimeoutMillis, final SocketFactory socketFactory, @Nullable final Set<BlockingClient> clientSet) throws IOException { final int connectTimeoutMillis, final SocketFactory socketFactory,
@Nullable final Set<BlockingClient> clientSet) throws IOException {
connectFuture = SettableFuture.create(); connectFuture = SettableFuture.create();
// Try to fit at least one message in the network buffer, but place an upper and lower limit on its size to make // Try to fit at least one message in the network buffer, but place an upper and lower limit on its size to make
// sure it doesnt get too large or have to call read too often. // sure it doesnt get too large or have to call read too often.
dbuf = ByteBuffer.allocateDirect(Math.min(Math.max(parser.getMaxMessageSize(), BUFFER_SIZE_LOWER_BOUND), BUFFER_SIZE_UPPER_BOUND));
parser.setWriteTarget(this); parser.setWriteTarget(this);
socket = socketFactory.createSocket(); socket = socketFactory.createSocket();
final Context context = Context.get(); final Context context = Context.get();
@ -79,25 +78,7 @@ public class BlockingClient implements MessageWriteTarget {
parser.connectionOpened(); parser.connectionOpened();
connectFuture.set(serverAddress); connectFuture.set(serverAddress);
InputStream stream = socket.getInputStream(); InputStream stream = socket.getInputStream();
byte[] readBuff = new byte[dbuf.capacity()]; runReadLoop(stream, parser);
while (true) {
// TODO Kill the message duplication here
checkState(dbuf.remaining() > 0 && dbuf.remaining() <= readBuff.length);
int read = stream.read(readBuff, 0, Math.max(1, Math.min(dbuf.remaining(), stream.available())));
if (read == -1)
return;
dbuf.put(readBuff, 0, read);
// "flip" the buffer - setting the limit to the current position and setting position to 0
dbuf.flip();
// Use parser.receiveBytes's return value as a double-check that it stopped reading at the right
// location
int bytesConsumed = parser.receiveBytes(dbuf);
checkState(dbuf.position() == bytesConsumed);
// Now drop the bytes which were read by compacting dbuf (resetting limit and keeping relative
// position)
dbuf.compact();
}
} catch (Exception e) { } catch (Exception e) {
if (!vCloseRequested) { if (!vCloseRequested) {
log.error("Error trying to open/read from connection: {}: {}", serverAddress, e.getMessage()); log.error("Error trying to open/read from connection: {}: {}", serverAddress, e.getMessage());
@ -120,6 +101,32 @@ public class BlockingClient implements MessageWriteTarget {
t.start(); t.start();
} }
/**
* A blocking call that never returns, except by throwing an exception. It reads bytes from the input stream
* and feeds them to the provided {@link StreamParser}, for example, a {@link Peer}.
*/
public static void runReadLoop(InputStream stream, StreamParser parser) throws Exception {
ByteBuffer dbuf = ByteBuffer.allocateDirect(Math.min(Math.max(parser.getMaxMessageSize(), BUFFER_SIZE_LOWER_BOUND), BUFFER_SIZE_UPPER_BOUND));
byte[] readBuff = new byte[dbuf.capacity()];
while (true) {
// TODO Kill the message duplication here
checkState(dbuf.remaining() > 0 && dbuf.remaining() <= readBuff.length);
int read = stream.read(readBuff, 0, Math.max(1, Math.min(dbuf.remaining(), stream.available())));
if (read == -1)
return;
dbuf.put(readBuff, 0, read);
// "flip" the buffer - setting the limit to the current position and setting position to 0
dbuf.flip();
// Use parser.receiveBytes's return value as a double-check that it stopped reading at the right
// location
int bytesConsumed = parser.receiveBytes(dbuf);
checkState(dbuf.position() == bytesConsumed);
// Now drop the bytes which were read by compacting dbuf (resetting limit and keeping relative
// position)
dbuf.compact();
}
}
/** /**
* Closes the connection to the server, triggering the {@link StreamParser#connectionClosed()} * Closes the connection to the server, triggering the {@link StreamParser#connectionClosed()}
* event on the network-handling thread where all callbacks occur. * event on the network-handling thread where all callbacks occur.