3
0
mirror of https://github.com/Qortal/altcoinj.git synced 2025-02-12 18:25:51 +00:00

Move message processing out of the handler class and into the top-level Peer class.

This commit is contained in:
Mike Hearn 2013-03-11 12:25:10 +01:00
parent b2ab3e1c43
commit bffc85fa24

View File

@ -232,81 +232,7 @@ public class Peer {
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
Message m = (Message)e.getMessage();
// Allow event listeners to filter the message stream. Listeners are allowed to drop messages by
// returning null.
for (PeerEventListener listener : eventListeners) {
m = listener.onPreMessageReceived(Peer.this, m);
if (m == null) break;
}
if (m == null) return;
lock.lock();
try {
if (currentFilteredBlock != null && !(m instanceof Transaction)) {
processFilteredBlock(currentFilteredBlock);
currentFilteredBlock = null;
}
} finally {
lock.unlock();
}
if (m instanceof NotFoundMessage) {
// This is sent to us when we did a getdata on some transactions that aren't in the peers memory pool.
// Because NotFoundMessage is a subclass of InventoryMessage, the test for it must come before the next.
processNotFoundMessage((NotFoundMessage) m);
} else if (m instanceof InventoryMessage) {
processInv((InventoryMessage) m);
} else if (m instanceof Block) {
processBlock((Block) m);
} else if (m instanceof FilteredBlock) {
// Filtered blocks come before the data that they refer to, so stash it here and then fill it out as
// messages stream in. We'll call processFilteredBlock when a non-tx message arrives (eg, another
// FilteredBlock) or when a tx that isn't needed by that block is found. A ping message is sent after
// a getblocks, to force the non-tx message path.
lock.lock();
try {
currentFilteredBlock = (FilteredBlock) m;
} finally {
lock.unlock();
}
} else if (m instanceof Transaction) {
processTransaction((Transaction) m);
} else if (m instanceof GetDataMessage) {
processGetData((GetDataMessage) m);
} else if (m instanceof AddressMessage) {
// We don't care about addresses of the network right now. But in future,
// we should save them in the wallet so we don't put too much load on the seed nodes and can
// properly explore the network.
} else if (m instanceof HeadersMessage) {
processHeaders((HeadersMessage) m);
} else if (m instanceof AlertMessage) {
processAlert((AlertMessage) m);
} else if (m instanceof VersionMessage) {
peerVersionMessage.set((VersionMessage) m);
for (PeerLifecycleListener listener : lifecycleListeners)
listener.onPeerConnected(Peer.this);
if (getPeerVersionMessage().clientVersion < minProtocolVersion) {
log.warn("Connected to a peer speaking protocol version {} but need {}, closing",
getPeerVersionMessage().clientVersion, minProtocolVersion);
e.getChannel().close();
}
} else if (m instanceof VersionAck) {
if (getPeerVersionMessage() == null) {
throw new ProtocolException("got a version ack before version");
}
if (isAcked) {
throw new ProtocolException("got more than one version ack");
}
isAcked = true;
} else if (m instanceof Ping) {
if (((Ping) m).hasNonce())
sendMessage(new Pong(((Ping) m).getNonce()));
} else if (m instanceof Pong) {
processPong((Pong)m);
} else {
log.warn("Received unhandled message: {}", m);
}
processMessage(e, m);
}
public Peer getPeer() {
@ -314,6 +240,84 @@ public class Peer {
}
}
private void processMessage(MessageEvent e, Message m) throws IOException, VerificationException, ProtocolException {
checkNotLocked(lock);
// Allow event listeners to filter the message stream. Listeners are allowed to drop messages by
// returning null.
for (PeerEventListener listener : eventListeners) {
m = listener.onPreMessageReceived(this, m);
if (m == null) break;
}
if (m == null) return;
lock.lock();
try {
if (currentFilteredBlock != null && !(m instanceof Transaction)) {
processFilteredBlock(currentFilteredBlock);
currentFilteredBlock = null;
}
} finally {
lock.unlock();
}
if (m instanceof NotFoundMessage) {
// This is sent to us when we did a getdata on some transactions that aren't in the peers memory pool.
// Because NotFoundMessage is a subclass of InventoryMessage, the test for it must come before the next.
processNotFoundMessage((NotFoundMessage) m);
} else if (m instanceof InventoryMessage) {
processInv((InventoryMessage) m);
} else if (m instanceof Block) {
processBlock((Block) m);
} else if (m instanceof FilteredBlock) {
// Filtered blocks come before the data that they refer to, so stash it here and then fill it out as
// messages stream in. We'll call processFilteredBlock when a non-tx message arrives (eg, another
// FilteredBlock) or when a tx that isn't needed by that block is found. A ping message is sent after
// a getblocks, to force the non-tx message path.
lock.lock();
try {
currentFilteredBlock = (FilteredBlock) m;
} finally {
lock.unlock();
}
} else if (m instanceof Transaction) {
processTransaction((Transaction) m);
} else if (m instanceof GetDataMessage) {
processGetData((GetDataMessage) m);
} else if (m instanceof AddressMessage) {
// We don't care about addresses of the network right now. But in future,
// we should save them in the wallet so we don't put too much load on the seed nodes and can
// properly explore the network.
} else if (m instanceof HeadersMessage) {
processHeaders((HeadersMessage) m);
} else if (m instanceof AlertMessage) {
processAlert((AlertMessage) m);
} else if (m instanceof VersionMessage) {
peerVersionMessage.set((VersionMessage) m);
for (PeerLifecycleListener listener : lifecycleListeners)
listener.onPeerConnected(this);
if (getPeerVersionMessage().clientVersion < minProtocolVersion) {
log.warn("Connected to a peer speaking protocol version {} but need {}, closing",
getPeerVersionMessage().clientVersion, minProtocolVersion);
e.getChannel().close();
}
} else if (m instanceof VersionAck) {
if (getPeerVersionMessage() == null) {
throw new ProtocolException("got a version ack before version");
}
if (isAcked) {
throw new ProtocolException("got more than one version ack");
}
isAcked = true;
} else if (m instanceof Ping) {
if (((Ping) m).hasNonce())
sendMessage(new Pong(((Ping) m).getNonce()));
} else if (m instanceof Pong) {
processPong((Pong)m);
} else {
log.warn("Received unhandled message: {}", m);
}
}
private void processNotFoundMessage(NotFoundMessage m) {
// This does not need to be locked.