3
0
mirror of https://github.com/Qortal/altcoinj.git synced 2025-02-12 18:25:51 +00:00

Peer: Allow peer listeners to handle exceptions thrown in the peer thread.

This commit is contained in:
Mike Hearn 2013-03-30 19:28:58 +00:00
parent aac6a7daf7
commit d92f944564
4 changed files with 107 additions and 58 deletions

View File

@ -45,4 +45,8 @@ public class AbstractPeerEventListener implements PeerEventListener {
public List<Message> getData(Peer peer, GetDataMessage m) {
return null;
}
@Override
public void onException(Throwable throwable) {
}
}

View File

@ -243,68 +243,80 @@ public class Peer {
}
private void processMessage(MessageEvent e, Message m) throws IOException, VerificationException, ProtocolException {
// Allow event listeners to filter the message stream. Listeners are allowed to drop messages by
// returning null.
for (PeerEventListener listener : eventListeners) {
m = listener.onPreMessageReceived(this, m);
if (m == null) break;
}
if (m == null) return;
try {
// Allow event listeners to filter the message stream. Listeners are allowed to drop messages by
// returning null.
for (PeerEventListener listener : eventListeners) {
m = listener.onPreMessageReceived(this, m);
if (m == null) break;
}
if (m == null) return;
// If we are in the middle of receiving transactions as part of a filtered block push from the remote node,
// and we receive something that's not a transaction, then we're done.
if (currentFilteredBlock != null && !(m instanceof Transaction)) {
endFilteredBlock(currentFilteredBlock);
currentFilteredBlock = null;
}
// If we are in the middle of receiving transactions as part of a filtered block push from the remote node,
// and we receive something that's not a transaction, then we're done.
if (currentFilteredBlock != null && !(m instanceof Transaction)) {
endFilteredBlock(currentFilteredBlock);
currentFilteredBlock = null;
}
if (m instanceof NotFoundMessage) {
// This is sent to us when we did a getdata on some transactions that aren't in the peers memory pool.
// Because NotFoundMessage is a subclass of InventoryMessage, the test for it must come before the next.
processNotFoundMessage((NotFoundMessage) m);
} else if (m instanceof InventoryMessage) {
processInv((InventoryMessage) m);
} else if (m instanceof Block) {
processBlock((Block) m);
} else if (m instanceof FilteredBlock) {
startFilteredBlock((FilteredBlock) m);
} else if (m instanceof Transaction) {
processTransaction((Transaction) m);
} else if (m instanceof GetDataMessage) {
processGetData((GetDataMessage) m);
} else if (m instanceof AddressMessage) {
// We don't care about addresses of the network right now. But in future,
// we should save them in the wallet so we don't put too much load on the seed nodes and can
// properly explore the network.
} else if (m instanceof HeadersMessage) {
processHeaders((HeadersMessage) m);
} else if (m instanceof AlertMessage) {
processAlert((AlertMessage) m);
} else if (m instanceof VersionMessage) {
vPeerVersionMessage = (VersionMessage) m;
for (PeerLifecycleListener listener : lifecycleListeners)
listener.onPeerConnected(this);
final int version = vMinProtocolVersion;
if (vPeerVersionMessage.clientVersion < version) {
log.warn("Connected to a peer speaking protocol version {} but need {}, closing",
vPeerVersionMessage.clientVersion, version);
e.getChannel().close();
if (m instanceof NotFoundMessage) {
// This is sent to us when we did a getdata on some transactions that aren't in the peers memory pool.
// Because NotFoundMessage is a subclass of InventoryMessage, the test for it must come before the next.
processNotFoundMessage((NotFoundMessage) m);
} else if (m instanceof InventoryMessage) {
processInv((InventoryMessage) m);
} else if (m instanceof Block) {
processBlock((Block) m);
} else if (m instanceof FilteredBlock) {
startFilteredBlock((FilteredBlock) m);
} else if (m instanceof Transaction) {
processTransaction((Transaction) m);
} else if (m instanceof GetDataMessage) {
processGetData((GetDataMessage) m);
} else if (m instanceof AddressMessage) {
// We don't care about addresses of the network right now. But in future,
// we should save them in the wallet so we don't put too much load on the seed nodes and can
// properly explore the network.
} else if (m instanceof HeadersMessage) {
processHeaders((HeadersMessage) m);
} else if (m instanceof AlertMessage) {
processAlert((AlertMessage) m);
} else if (m instanceof VersionMessage) {
vPeerVersionMessage = (VersionMessage) m;
for (PeerLifecycleListener listener : lifecycleListeners)
listener.onPeerConnected(this);
final int version = vMinProtocolVersion;
if (vPeerVersionMessage.clientVersion < version) {
log.warn("Connected to a peer speaking protocol version {} but need {}, closing",
vPeerVersionMessage.clientVersion, version);
e.getChannel().close();
}
} else if (m instanceof VersionAck) {
if (vPeerVersionMessage == null) {
throw new ProtocolException("got a version ack before version");
}
if (isAcked) {
throw new ProtocolException("got more than one version ack");
}
isAcked = true;
} else if (m instanceof Ping) {
if (((Ping) m).hasNonce())
sendMessage(new Pong(((Ping) m).getNonce()));
} else if (m instanceof Pong) {
processPong((Pong)m);
} else {
log.warn("Received unhandled message: {}", m);
}
} else if (m instanceof VersionAck) {
if (vPeerVersionMessage == null) {
throw new ProtocolException("got a version ack before version");
} catch (Throwable throwable) {
log.warn("Caught exception in peer thread: {}", throwable.getMessage());
throwable.printStackTrace();
for (PeerEventListener listener : eventListeners) {
try {
listener.onException(throwable);
} catch (Exception e1) {
e1.printStackTrace();
}
}
if (isAcked) {
throw new ProtocolException("got more than one version ack");
}
isAcked = true;
} else if (m instanceof Ping) {
if (((Ping) m).hasNonce())
sendMessage(new Pong(((Ping) m).getNonce()));
} else if (m instanceof Pong) {
processPong((Pong)m);
} else {
log.warn("Received unhandled message: {}", m);
}
}

View File

@ -78,4 +78,11 @@ public interface PeerEventListener {
* items as possible which appear in the {@link GetDataMessage}, or null if you're not interested in responding.
*/
public List<Message> getData(Peer peer, GetDataMessage m);
/**
* Called if there is an exception thrown in a Netty worker thread whilst processing an inbound message. You
* can use this to report crashes of the peer threads back to your apps website, for instance. After this callback
* runs the peer will be disconnected. Any exceptions thrown by this method will be logged and ignored.
*/
public void onException(Throwable throwable);
}

View File

@ -17,6 +17,7 @@
package com.google.bitcoin.core;
import com.google.bitcoin.core.Peer.PeerHandler;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import org.easymock.Capture;
import org.easymock.CaptureType;
@ -766,6 +767,31 @@ public class PeerTest extends TestWithNetworkConnections {
peer.setMinProtocolVersion(500);
}
@Test
public void exceptionListener() throws Exception {
wallet.addEventListener(new AbstractWalletEventListener() {
@Override
public void onCoinsReceived(Wallet wallet, Transaction tx, BigInteger prevBalance, BigInteger newBalance) {
throw new NullPointerException("boo!");
}
});
final Throwable[] throwables = new Throwable[1];
peer.addEventListener(new AbstractPeerEventListener() {
@Override
public void onException(Throwable throwable) {
throwables[0] = throwable;
}
});
control.replay();
connect();
Transaction t1 = new Transaction(unitTestParams);
t1.addInput(new TransactionInput(unitTestParams, t1, new byte[]{}));
t1.addOutput(Utils.toNanoCoins(1, 0), wallet.getChangeAddress());
inbound(peer, t1);
inbound(peer, new NotFoundMessage(unitTestParams, Lists.newArrayList(new InventoryItem(InventoryItem.Type.Transaction, t1.getInput(0).getHash()))));
assertTrue(throwables[0] instanceof NullPointerException);
}
// TODO: Use generics here to avoid unnecessary casting.
private Message outbound() {
List<DownstreamMessageEvent> messages = event.getValues();