From bffc85fa2409bf9a2aaa487a14fb15e45af37ff5 Mon Sep 17 00:00:00 2001 From: Mike Hearn Date: Mon, 11 Mar 2013 12:25:10 +0100 Subject: [PATCH] Move message processing out of the handler class and into the top-level Peer class. --- .../java/com/google/bitcoin/core/Peer.java | 154 +++++++++--------- 1 file changed, 79 insertions(+), 75 deletions(-) diff --git a/core/src/main/java/com/google/bitcoin/core/Peer.java b/core/src/main/java/com/google/bitcoin/core/Peer.java index a04360c3..621b7cd1 100644 --- a/core/src/main/java/com/google/bitcoin/core/Peer.java +++ b/core/src/main/java/com/google/bitcoin/core/Peer.java @@ -232,81 +232,7 @@ public class Peer { @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { Message m = (Message)e.getMessage(); - - // Allow event listeners to filter the message stream. Listeners are allowed to drop messages by - // returning null. - for (PeerEventListener listener : eventListeners) { - m = listener.onPreMessageReceived(Peer.this, m); - if (m == null) break; - } - if (m == null) return; - - lock.lock(); - try { - if (currentFilteredBlock != null && !(m instanceof Transaction)) { - processFilteredBlock(currentFilteredBlock); - currentFilteredBlock = null; - } - } finally { - lock.unlock(); - } - - if (m instanceof NotFoundMessage) { - // This is sent to us when we did a getdata on some transactions that aren't in the peers memory pool. - // Because NotFoundMessage is a subclass of InventoryMessage, the test for it must come before the next. - processNotFoundMessage((NotFoundMessage) m); - } else if (m instanceof InventoryMessage) { - processInv((InventoryMessage) m); - } else if (m instanceof Block) { - processBlock((Block) m); - } else if (m instanceof FilteredBlock) { - // Filtered blocks come before the data that they refer to, so stash it here and then fill it out as - // messages stream in. We'll call processFilteredBlock when a non-tx message arrives (eg, another - // FilteredBlock) or when a tx that isn't needed by that block is found. A ping message is sent after - // a getblocks, to force the non-tx message path. - lock.lock(); - try { - currentFilteredBlock = (FilteredBlock) m; - } finally { - lock.unlock(); - } - } else if (m instanceof Transaction) { - processTransaction((Transaction) m); - } else if (m instanceof GetDataMessage) { - processGetData((GetDataMessage) m); - } else if (m instanceof AddressMessage) { - // We don't care about addresses of the network right now. But in future, - // we should save them in the wallet so we don't put too much load on the seed nodes and can - // properly explore the network. - } else if (m instanceof HeadersMessage) { - processHeaders((HeadersMessage) m); - } else if (m instanceof AlertMessage) { - processAlert((AlertMessage) m); - } else if (m instanceof VersionMessage) { - peerVersionMessage.set((VersionMessage) m); - for (PeerLifecycleListener listener : lifecycleListeners) - listener.onPeerConnected(Peer.this); - if (getPeerVersionMessage().clientVersion < minProtocolVersion) { - log.warn("Connected to a peer speaking protocol version {} but need {}, closing", - getPeerVersionMessage().clientVersion, minProtocolVersion); - e.getChannel().close(); - } - } else if (m instanceof VersionAck) { - if (getPeerVersionMessage() == null) { - throw new ProtocolException("got a version ack before version"); - } - if (isAcked) { - throw new ProtocolException("got more than one version ack"); - } - isAcked = true; - } else if (m instanceof Ping) { - if (((Ping) m).hasNonce()) - sendMessage(new Pong(((Ping) m).getNonce())); - } else if (m instanceof Pong) { - processPong((Pong)m); - } else { - log.warn("Received unhandled message: {}", m); - } + processMessage(e, m); } public Peer getPeer() { @@ -314,6 +240,84 @@ public class Peer { } } + private void processMessage(MessageEvent e, Message m) throws IOException, VerificationException, ProtocolException { + checkNotLocked(lock); + // Allow event listeners to filter the message stream. Listeners are allowed to drop messages by + // returning null. + for (PeerEventListener listener : eventListeners) { + m = listener.onPreMessageReceived(this, m); + if (m == null) break; + } + if (m == null) return; + + lock.lock(); + try { + if (currentFilteredBlock != null && !(m instanceof Transaction)) { + processFilteredBlock(currentFilteredBlock); + currentFilteredBlock = null; + } + } finally { + lock.unlock(); + } + + if (m instanceof NotFoundMessage) { + // This is sent to us when we did a getdata on some transactions that aren't in the peers memory pool. + // Because NotFoundMessage is a subclass of InventoryMessage, the test for it must come before the next. + processNotFoundMessage((NotFoundMessage) m); + } else if (m instanceof InventoryMessage) { + processInv((InventoryMessage) m); + } else if (m instanceof Block) { + processBlock((Block) m); + } else if (m instanceof FilteredBlock) { + // Filtered blocks come before the data that they refer to, so stash it here and then fill it out as + // messages stream in. We'll call processFilteredBlock when a non-tx message arrives (eg, another + // FilteredBlock) or when a tx that isn't needed by that block is found. A ping message is sent after + // a getblocks, to force the non-tx message path. + lock.lock(); + try { + currentFilteredBlock = (FilteredBlock) m; + } finally { + lock.unlock(); + } + } else if (m instanceof Transaction) { + processTransaction((Transaction) m); + } else if (m instanceof GetDataMessage) { + processGetData((GetDataMessage) m); + } else if (m instanceof AddressMessage) { + // We don't care about addresses of the network right now. But in future, + // we should save them in the wallet so we don't put too much load on the seed nodes and can + // properly explore the network. + } else if (m instanceof HeadersMessage) { + processHeaders((HeadersMessage) m); + } else if (m instanceof AlertMessage) { + processAlert((AlertMessage) m); + } else if (m instanceof VersionMessage) { + peerVersionMessage.set((VersionMessage) m); + for (PeerLifecycleListener listener : lifecycleListeners) + listener.onPeerConnected(this); + if (getPeerVersionMessage().clientVersion < minProtocolVersion) { + log.warn("Connected to a peer speaking protocol version {} but need {}, closing", + getPeerVersionMessage().clientVersion, minProtocolVersion); + e.getChannel().close(); + } + } else if (m instanceof VersionAck) { + if (getPeerVersionMessage() == null) { + throw new ProtocolException("got a version ack before version"); + } + if (isAcked) { + throw new ProtocolException("got more than one version ack"); + } + isAcked = true; + } else if (m instanceof Ping) { + if (((Ping) m).hasNonce()) + sendMessage(new Pong(((Ping) m).getNonce())); + } else if (m instanceof Pong) { + processPong((Pong)m); + } else { + log.warn("Received unhandled message: {}", m); + } + } + private void processNotFoundMessage(NotFoundMessage m) { // This does not need to be locked.