3
0
mirror of https://github.com/Qortal/altcoinj.git synced 2025-02-14 11:15:51 +00:00

Abstract out SelectionKey processing and logic from NioServer.

This commit is contained in:
Matt Corallo 2013-07-10 23:27:01 +02:00 committed by Mike Hearn
parent 9980903572
commit 8448296f5f
2 changed files with 131 additions and 94 deletions

View File

@ -0,0 +1,129 @@
/*
* Copyright 2013 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.bitcoin.protocols.niowrapper;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.concurrent.locks.ReentrantLock;
import com.google.bitcoin.utils.Threading;
import org.slf4j.LoggerFactory;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
/**
* A simple connection handler which handles all the business logic of a connection
*/
class ConnectionHandler extends MessageWriteTarget {
private static final org.slf4j.Logger log = LoggerFactory.getLogger(ConnectionHandler.class);
private static final int BUFFER_SIZE_LOWER_BOUND = 4096;
private static final int BUFFER_SIZE_UPPER_BOUND = 65536;
private final ReentrantLock lock = Threading.lock("nioConnectionHandler");
private final ByteBuffer dbuf;
private final SocketChannel channel;
final StreamParser parser;
private boolean closeCalled = false;
ConnectionHandler(StreamParserFactory parserFactory, SocketChannel channel) throws IOException {
this.channel = checkNotNull(channel);
StreamParser newParser = parserFactory.getNewParser(channel.socket().getInetAddress(), channel.socket().getPort());
if (newParser == null) {
closeConnection();
throw new IOException("Parser factory.getNewParser returned null");
}
this.parser = newParser;
dbuf = ByteBuffer.allocateDirect(Math.min(Math.max(parser.getMaxMessageSize(), BUFFER_SIZE_LOWER_BOUND), BUFFER_SIZE_UPPER_BOUND));
newParser.setWriteTarget(this);
}
@Override
void writeBytes(byte[] message) {
lock.lock();
try {
if (channel.write(ByteBuffer.wrap(message)) != message.length)
throw new IOException("Couldn't write all of message to socket");
} catch (IOException e) {
log.error("Error writing message to connection, closing connection", e);
closeConnection();
} finally {
lock.unlock();
}
}
@Override
void closeConnection() {
try {
channel.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
connectionClosed();
}
private void connectionClosed() {
boolean callClosed = false;
lock.lock();
try {
callClosed = !closeCalled;
closeCalled = true;
} finally {
lock.unlock();
}
if (callClosed)
parser.connectionClosed();
}
// Handle a SelectionKey which was selected
static void handleKey(SelectionKey key) throws IOException {
ConnectionHandler handler = ((ConnectionHandler)key.attachment());
try {
if (!key.isValid() && handler != null)
handler.closeConnection(); // Key has been cancelled, make sure the socket gets closed
else if (handler != null && key.isReadable()) {
// Do a socket read and invoke the parser's receiveBytes message
int read = handler.channel.read(handler.dbuf);
if (read == 0)
return; // Should probably never happen, but just in case it actually can just return 0
else if (read == -1) { // Socket was closed
key.cancel();
handler.closeConnection();
return;
}
// "flip" the buffer - setting the limit to the current position and setting position to 0
handler.dbuf.flip();
// Use parser.receiveBytes's return value as a check that it stopped reading at the right location
int bytesConsumed = handler.parser.receiveBytes(handler.dbuf);
checkState(handler.dbuf.position() == bytesConsumed);
// Now drop the bytes which were read by compacting dbuf (resetting limit and keeping relative
// position)
handler.dbuf.compact();
}
} catch (Exception e) {
// This can happen eg if the channel closes while the tread is about to get killed
// (ClosedByInterruptException), or if parser.parser.receiveBytes throws something
log.error("Error handling SelectionKey", e);
if (handler != null)
handler.closeConnection();
}
}
}

View File

@ -18,16 +18,13 @@ package com.google.bitcoin.protocols.niowrapper;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.Iterator;
import java.util.concurrent.locks.ReentrantLock;
import com.google.bitcoin.utils.Threading;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.LoggerFactory;
@ -46,106 +43,17 @@ public class NioServer {
@VisibleForTesting final Thread handlerThread;
private final ServerSocketChannel sc;
private static final int BUFFER_SIZE_LOWER_BOUND = 4096;
private static final int BUFFER_SIZE_UPPER_BOUND = 65536;
private class ConnectionHandler extends MessageWriteTarget {
private final ReentrantLock lock = Threading.lock("nioServerConnectionHandler");
private final ByteBuffer dbuf;
private final SocketChannel channel;
private final StreamParser parser;
private boolean closeCalled = false;
ConnectionHandler(SocketChannel channel) throws IOException {
this.channel = checkNotNull(channel);
StreamParser newParser = parserFactory.getNewParser(channel.socket().getInetAddress(), channel.socket().getPort());
if (newParser == null) {
closeConnection();
throw new IOException("Parser factory.getNewParser returned null");
}
this.parser = newParser;
dbuf = ByteBuffer.allocateDirect(Math.min(Math.max(newParser.getMaxMessageSize(), BUFFER_SIZE_LOWER_BOUND), BUFFER_SIZE_UPPER_BOUND));
newParser.setWriteTarget(this);
}
@Override
void writeBytes(byte[] message) {
lock.lock();
try {
if (channel.write(ByteBuffer.wrap(message)) != message.length)
throw new IOException("Couldn't write all of message to socket");
} catch (IOException e) {
log.error("Error writing message to connection, closing connection", e);
closeConnection();
} finally {
lock.unlock();
}
}
@Override
void closeConnection() {
try {
channel.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
connectionClosed();
}
private void connectionClosed() {
boolean callClosed = false;
lock.lock();
try {
callClosed = !closeCalled;
closeCalled = true;
} finally {
lock.unlock();
}
if (callClosed)
parser.connectionClosed();
}
}
// Handle a SelectionKey which was selected
private void handleKey(Selector selector, SelectionKey key) throws IOException {
if (key.isValid() && key.isAcceptable()) {
// Accept a new connection, give it a parser as an attachment
SocketChannel newChannel = sc.accept();
newChannel.configureBlocking(false);
ConnectionHandler handler = new ConnectionHandler(newChannel);
ConnectionHandler handler = new ConnectionHandler(parserFactory, newChannel);
newChannel.register(selector, SelectionKey.OP_READ).attach(handler);
handler.parser.connectionOpened();
} else { // Got a closing channel or a channel to a client connection
ConnectionHandler handler = ((ConnectionHandler)key.attachment());
try {
if (!key.isValid() && handler != null)
handler.closeConnection(); // Key has been cancelled, make sure the socket gets closed
else if (handler != null && key.isReadable()) {
// Do a socket read and invoke the parser's receiveBytes message
int read = handler.channel.read(handler.dbuf);
if (read == 0)
return; // Should probably never happen, but just in case it actually can just return 0
else if (read == -1) { // Socket was closed
key.cancel();
handler.closeConnection();
return;
}
// "flip" the buffer - setting the limit to the current position and setting position to 0
handler.dbuf.flip();
// Use parser.receiveBytes's return value as a check that it stopped reading at the right location
int bytesConsumed = handler.parser.receiveBytes(handler.dbuf);
checkState(handler.dbuf.position() == bytesConsumed);
// Now drop the bytes which were read by compacting dbuf (resetting limit and keeping relative
// position)
handler.dbuf.compact();
}
} catch (Exception e) {
// This can happen eg if the channel closes while the tread is about to get killed
// (ClosedByInterruptException), or if parser.parser.receiveBytes throws something
log.error("Error handling SelectionKey", e);
if (handler != null)
handler.closeConnection();
}
ConnectionHandler.handleKey(key);
}
}