From d92f944564b93674e958a28f91458abd62ca47cf Mon Sep 17 00:00:00 2001 From: Mike Hearn Date: Sat, 30 Mar 2013 19:28:58 +0000 Subject: [PATCH] Peer: Allow peer listeners to handle exceptions thrown in the peer thread. --- .../core/AbstractPeerEventListener.java | 4 + .../java/com/google/bitcoin/core/Peer.java | 128 ++++++++++-------- .../bitcoin/core/PeerEventListener.java | 7 + .../com/google/bitcoin/core/PeerTest.java | 26 ++++ 4 files changed, 107 insertions(+), 58 deletions(-) diff --git a/core/src/main/java/com/google/bitcoin/core/AbstractPeerEventListener.java b/core/src/main/java/com/google/bitcoin/core/AbstractPeerEventListener.java index ac084f27..7bf9659e 100644 --- a/core/src/main/java/com/google/bitcoin/core/AbstractPeerEventListener.java +++ b/core/src/main/java/com/google/bitcoin/core/AbstractPeerEventListener.java @@ -45,4 +45,8 @@ public class AbstractPeerEventListener implements PeerEventListener { public List getData(Peer peer, GetDataMessage m) { return null; } + + @Override + public void onException(Throwable throwable) { + } } diff --git a/core/src/main/java/com/google/bitcoin/core/Peer.java b/core/src/main/java/com/google/bitcoin/core/Peer.java index 6cfef0fa..9f882441 100644 --- a/core/src/main/java/com/google/bitcoin/core/Peer.java +++ b/core/src/main/java/com/google/bitcoin/core/Peer.java @@ -243,68 +243,80 @@ public class Peer { } private void processMessage(MessageEvent e, Message m) throws IOException, VerificationException, ProtocolException { - // Allow event listeners to filter the message stream. Listeners are allowed to drop messages by - // returning null. - for (PeerEventListener listener : eventListeners) { - m = listener.onPreMessageReceived(this, m); - if (m == null) break; - } - if (m == null) return; + try { + // Allow event listeners to filter the message stream. Listeners are allowed to drop messages by + // returning null. + for (PeerEventListener listener : eventListeners) { + m = listener.onPreMessageReceived(this, m); + if (m == null) break; + } + if (m == null) return; - // If we are in the middle of receiving transactions as part of a filtered block push from the remote node, - // and we receive something that's not a transaction, then we're done. - if (currentFilteredBlock != null && !(m instanceof Transaction)) { - endFilteredBlock(currentFilteredBlock); - currentFilteredBlock = null; - } + // If we are in the middle of receiving transactions as part of a filtered block push from the remote node, + // and we receive something that's not a transaction, then we're done. + if (currentFilteredBlock != null && !(m instanceof Transaction)) { + endFilteredBlock(currentFilteredBlock); + currentFilteredBlock = null; + } - if (m instanceof NotFoundMessage) { - // This is sent to us when we did a getdata on some transactions that aren't in the peers memory pool. - // Because NotFoundMessage is a subclass of InventoryMessage, the test for it must come before the next. - processNotFoundMessage((NotFoundMessage) m); - } else if (m instanceof InventoryMessage) { - processInv((InventoryMessage) m); - } else if (m instanceof Block) { - processBlock((Block) m); - } else if (m instanceof FilteredBlock) { - startFilteredBlock((FilteredBlock) m); - } else if (m instanceof Transaction) { - processTransaction((Transaction) m); - } else if (m instanceof GetDataMessage) { - processGetData((GetDataMessage) m); - } else if (m instanceof AddressMessage) { - // We don't care about addresses of the network right now. But in future, - // we should save them in the wallet so we don't put too much load on the seed nodes and can - // properly explore the network. - } else if (m instanceof HeadersMessage) { - processHeaders((HeadersMessage) m); - } else if (m instanceof AlertMessage) { - processAlert((AlertMessage) m); - } else if (m instanceof VersionMessage) { - vPeerVersionMessage = (VersionMessage) m; - for (PeerLifecycleListener listener : lifecycleListeners) - listener.onPeerConnected(this); - final int version = vMinProtocolVersion; - if (vPeerVersionMessage.clientVersion < version) { - log.warn("Connected to a peer speaking protocol version {} but need {}, closing", - vPeerVersionMessage.clientVersion, version); - e.getChannel().close(); + if (m instanceof NotFoundMessage) { + // This is sent to us when we did a getdata on some transactions that aren't in the peers memory pool. + // Because NotFoundMessage is a subclass of InventoryMessage, the test for it must come before the next. + processNotFoundMessage((NotFoundMessage) m); + } else if (m instanceof InventoryMessage) { + processInv((InventoryMessage) m); + } else if (m instanceof Block) { + processBlock((Block) m); + } else if (m instanceof FilteredBlock) { + startFilteredBlock((FilteredBlock) m); + } else if (m instanceof Transaction) { + processTransaction((Transaction) m); + } else if (m instanceof GetDataMessage) { + processGetData((GetDataMessage) m); + } else if (m instanceof AddressMessage) { + // We don't care about addresses of the network right now. But in future, + // we should save them in the wallet so we don't put too much load on the seed nodes and can + // properly explore the network. + } else if (m instanceof HeadersMessage) { + processHeaders((HeadersMessage) m); + } else if (m instanceof AlertMessage) { + processAlert((AlertMessage) m); + } else if (m instanceof VersionMessage) { + vPeerVersionMessage = (VersionMessage) m; + for (PeerLifecycleListener listener : lifecycleListeners) + listener.onPeerConnected(this); + final int version = vMinProtocolVersion; + if (vPeerVersionMessage.clientVersion < version) { + log.warn("Connected to a peer speaking protocol version {} but need {}, closing", + vPeerVersionMessage.clientVersion, version); + e.getChannel().close(); + } + } else if (m instanceof VersionAck) { + if (vPeerVersionMessage == null) { + throw new ProtocolException("got a version ack before version"); + } + if (isAcked) { + throw new ProtocolException("got more than one version ack"); + } + isAcked = true; + } else if (m instanceof Ping) { + if (((Ping) m).hasNonce()) + sendMessage(new Pong(((Ping) m).getNonce())); + } else if (m instanceof Pong) { + processPong((Pong)m); + } else { + log.warn("Received unhandled message: {}", m); } - } else if (m instanceof VersionAck) { - if (vPeerVersionMessage == null) { - throw new ProtocolException("got a version ack before version"); + } catch (Throwable throwable) { + log.warn("Caught exception in peer thread: {}", throwable.getMessage()); + throwable.printStackTrace(); + for (PeerEventListener listener : eventListeners) { + try { + listener.onException(throwable); + } catch (Exception e1) { + e1.printStackTrace(); + } } - if (isAcked) { - throw new ProtocolException("got more than one version ack"); - } - isAcked = true; - } else if (m instanceof Ping) { - if (((Ping) m).hasNonce()) - sendMessage(new Pong(((Ping) m).getNonce())); - } else if (m instanceof Pong) { - processPong((Pong)m); - } else { - log.warn("Received unhandled message: {}", m); } } diff --git a/core/src/main/java/com/google/bitcoin/core/PeerEventListener.java b/core/src/main/java/com/google/bitcoin/core/PeerEventListener.java index dc854e15..8895b08d 100644 --- a/core/src/main/java/com/google/bitcoin/core/PeerEventListener.java +++ b/core/src/main/java/com/google/bitcoin/core/PeerEventListener.java @@ -78,4 +78,11 @@ public interface PeerEventListener { * items as possible which appear in the {@link GetDataMessage}, or null if you're not interested in responding. */ public List getData(Peer peer, GetDataMessage m); + + /** + * Called if there is an exception thrown in a Netty worker thread whilst processing an inbound message. You + * can use this to report crashes of the peer threads back to your apps website, for instance. After this callback + * runs the peer will be disconnected. Any exceptions thrown by this method will be logged and ignored. + */ + public void onException(Throwable throwable); } diff --git a/core/src/test/java/com/google/bitcoin/core/PeerTest.java b/core/src/test/java/com/google/bitcoin/core/PeerTest.java index 70e2a96e..6935369b 100644 --- a/core/src/test/java/com/google/bitcoin/core/PeerTest.java +++ b/core/src/test/java/com/google/bitcoin/core/PeerTest.java @@ -17,6 +17,7 @@ package com.google.bitcoin.core; import com.google.bitcoin.core.Peer.PeerHandler; +import com.google.common.collect.Lists; import com.google.common.util.concurrent.ListenableFuture; import org.easymock.Capture; import org.easymock.CaptureType; @@ -766,6 +767,31 @@ public class PeerTest extends TestWithNetworkConnections { peer.setMinProtocolVersion(500); } + @Test + public void exceptionListener() throws Exception { + wallet.addEventListener(new AbstractWalletEventListener() { + @Override + public void onCoinsReceived(Wallet wallet, Transaction tx, BigInteger prevBalance, BigInteger newBalance) { + throw new NullPointerException("boo!"); + } + }); + final Throwable[] throwables = new Throwable[1]; + peer.addEventListener(new AbstractPeerEventListener() { + @Override + public void onException(Throwable throwable) { + throwables[0] = throwable; + } + }); + control.replay(); + connect(); + Transaction t1 = new Transaction(unitTestParams); + t1.addInput(new TransactionInput(unitTestParams, t1, new byte[]{})); + t1.addOutput(Utils.toNanoCoins(1, 0), wallet.getChangeAddress()); + inbound(peer, t1); + inbound(peer, new NotFoundMessage(unitTestParams, Lists.newArrayList(new InventoryItem(InventoryItem.Type.Transaction, t1.getInput(0).getHash())))); + assertTrue(throwables[0] instanceof NullPointerException); + } + // TODO: Use generics here to avoid unnecessary casting. private Message outbound() { List messages = event.getValues();