diff --git a/core/src/main/java/com/google/bitcoin/core/TransactionBroadcast.java b/core/src/main/java/com/google/bitcoin/core/TransactionBroadcast.java index 9be9ab89..7e622c63 100644 --- a/core/src/main/java/com/google/bitcoin/core/TransactionBroadcast.java +++ b/core/src/main/java/com/google/bitcoin/core/TransactionBroadcast.java @@ -17,11 +17,17 @@ package com.google.bitcoin.core; import com.google.bitcoin.utils.Threading; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collections; +import java.util.List; +import java.util.Random; + /** * Represents a single transaction broadcast that we are performing. A broadcast occurs after a new transaction is created * (typically by a {@link Wallet} and needs to be sent to the network. A broadcast can succeed or fail. A success is @@ -37,6 +43,11 @@ public class TransactionBroadcast { private final PeerGroup peerGroup; private final Transaction tx; private int minConnections; + private int numWaitingFor, numToBroadcastTo; + + // Used for the peers permutation: unit tests replace this to make themselves deterministic. + @VisibleForTesting static Random random = new Random(); + private Transaction pinnedTx; public TransactionBroadcast(PeerGroup peerGroup, Transaction tx) { this.peerGroup = peerGroup; @@ -65,15 +76,17 @@ public class TransactionBroadcast { // This can be called immediately if we already have enough. Otherwise it'll be called from a peer // thread. - // Pick a peer to be the lucky recipient of our tx. This can race if the peer we pick dies immediately. - final Peer somePeer = peerGroup.getDownloadPeer(); - log.info("broadcastTransaction: Enough peers, adding {} to the memory pool and sending to {}", - tx.getHashAsString(), somePeer); - final Transaction pinnedTx = peerGroup.getMemoryPool().seen(tx, somePeer.getAddress()); + // We will send the tx simultaneously to half the connected peers and wait to hear back from at least half + // of the other half, i.e., with 4 peers connected we will send the tx to 2 randomly chosen peers, and then + // wait for it to show up on one of the other two. This will be taken as sign of network acceptance. As can + // be seen, 4 peers is probably too little - it doesn't taken many broken peers for tx propagation to have + // a big effect. + List peers = peerGroup.getConnectedPeers(); // snapshots + pinnedTx = peerGroup.getMemoryPool().seen(tx, peers.get(0).getAddress()); // Prepare to send the transaction by adding a listener that'll be called when confidence changes. // Only bother with this if we might actually hear back: if (minConnections > 1) - pinnedTx.getConfidence().addEventListener(new ConfidenceChange(pinnedTx)); + pinnedTx.getConfidence().addEventListener(new ConfidenceChange()); // Satoshis code sends an inv in this case and then lets the peer request the tx data. We just // blast out the TX here for a couple of reasons. Firstly it's simpler: in the case where we have // just a single connection we don't have to wait for getdata to be received and handled before @@ -82,9 +95,21 @@ public class TransactionBroadcast { // transaction or not. However, we are not a fully validating node and this is advertised in // our version message, as SPV nodes cannot relay it doesn't give away any additional information // to skip the inv here - we wouldn't send invs anyway. - // - // TODO: The peer we picked might be dead by now. If we can't write the message, pick again and retry. - somePeer.sendMessage(pinnedTx); + int numConnected = peers.size(); + numToBroadcastTo = Math.max(1, peers.size() / 2); + numWaitingFor = (int) Math.ceil((peers.size() - numToBroadcastTo) / 2.0); + Collections.shuffle(peers, random); + peers = peers.subList(0, numToBroadcastTo); + log.info("broadcastTransaction: We have {} peers, adding {} to the memory pool and sending to {} peers, will wait for {}: {}", + numConnected, tx.getHashAsString(), numToBroadcastTo, numWaitingFor, Joiner.on(",").join(peers)); + for (Peer peer : peers) { + try { + peer.sendMessage(pinnedTx); + peerGroup.getMemoryPool().seen(pinnedTx, peer.getAddress()); + } catch (Exception e) { + log.error("Caught exception sending to {}", peer, e); + } + } // If we've been limited to talk to only one peer, we can't wait to hear back because the // remote peer won't tell us about transactions we just announced to it for obvious reasons. // So we just have to assume we're done, at that point. This happens when we're not given @@ -93,16 +118,9 @@ public class TransactionBroadcast { future.set(pinnedTx); } } - } private class ConfidenceChange implements TransactionConfidence.Listener { - private final Transaction pinnedTx; - - public ConfidenceChange(Transaction pinnedTx) { - this.pinnedTx = pinnedTx; - } - public void onConfidenceChanged(Transaction tx, ChangeReason reason) { // The number of peers that announced this tx has gone up. final TransactionConfidence conf = tx.getConfidence(); @@ -110,24 +128,24 @@ public class TransactionBroadcast { boolean mined = tx.getAppearsInHashes() != null; log.info("broadcastTransaction: {}: TX {} seen by {} peers{}", reason, pinnedTx.getHashAsString(), numSeenPeers, mined ? " and mined" : ""); - if (!(numSeenPeers >= minConnections || mined)) - return; - // We've seen the min required number of peers announce the transaction, or it was included - // in a block. Normally we'd expect to see it fully propagate before it gets mined, but - // it can be that a block is solved very soon after broadcast, and it's also possible that - // due to version skew and changes in the relay rules our transaction is not going to - // fully propagate yet can get mined anyway. - // - // Note that we can't wait for the current number of connected peers right now because we - // could have added more peers after the broadcast took place, which means they won't - // have seen the transaction. In future when peers sync up their memory pools after they - // connect we could come back and change this. - // - // We're done! It's important that the PeerGroup lock is not held (by this thread) at this - // point to avoid triggering inversions when the Future completes. - log.info("broadcastTransaction: {} complete", pinnedTx.getHashAsString()); - tx.getConfidence().removeEventListener(this); - future.set(pinnedTx); // RE-ENTRANCY POINT + if (numSeenPeers >= numWaitingFor + numToBroadcastTo || mined) { + // We've seen the min required number of peers announce the transaction, or it was included + // in a block. Normally we'd expect to see it fully propagate before it gets mined, but + // it can be that a block is solved very soon after broadcast, and it's also possible that + // due to version skew and changes in the relay rules our transaction is not going to + // fully propagate yet can get mined anyway. + // + // Note that we can't wait for the current number of connected peers right now because we + // could have added more peers after the broadcast took place, which means they won't + // have seen the transaction. In future when peers sync up their memory pools after they + // connect we could come back and change this. + // + // We're done! It's important that the PeerGroup lock is not held (by this thread) at this + // point to avoid triggering inversions when the Future completes. + log.info("broadcastTransaction: {} complete", pinnedTx.getHashAsString()); + tx.getConfidence().removeEventListener(this); + future.set(pinnedTx); // RE-ENTRANCY POINT + } } } } diff --git a/core/src/test/java/com/google/bitcoin/core/PeerGroupTest.java b/core/src/test/java/com/google/bitcoin/core/PeerGroupTest.java index 7116dad3..916d007d 100644 --- a/core/src/test/java/com/google/bitcoin/core/PeerGroupTest.java +++ b/core/src/test/java/com/google/bitcoin/core/PeerGroupTest.java @@ -33,17 +33,16 @@ import java.util.Set; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; -import static com.google.common.base.Preconditions.checkNotNull; import static org.junit.Assert.*; + +// TX announcement and broadcast is tested in TransactionBroadcastTest. + public class PeerGroupTest extends TestWithPeerGroup { - static final NetworkParameters params = UnitTestParams.get(); - @Override @Before public void setUp() throws Exception { super.setUp(new MemoryBlockStore(UnitTestParams.get())); - peerGroup.addWallet(wallet); } @@ -254,74 +253,6 @@ public class PeerGroupTest extends TestWithPeerGroup { assertTrue(tx.getConfidence().wasBroadcastBy(peerOf(p3).getAddress())); } - @Test - public void announce() throws Exception { - // Make sure we can create spends, and that they are announced. Then do the same with offline mode. - - // Set up connections and block chain. - FakeChannel p1 = connectPeer(1, new VersionMessage(params, 2)); - FakeChannel p2 = connectPeer(2); - - assertNotNull(peerGroup.getDownloadPeer()); - - control.replay(); - - peerGroup.setMinBroadcastConnections(2); - - // Send ourselves a bit of money. - Block b1 = TestUtils.makeSolvedTestBlock(blockStore, address); - inbound(p1, b1); - assertNull(outbound(p1)); - - assertEquals(Utils.toNanoCoins(50, 0), wallet.getBalance()); - - // Check that the wallet informs us of changes in confidence as the transaction ripples across the network. - final Transaction[] transactions = new Transaction[1]; - wallet.addEventListener(new AbstractWalletEventListener() { - @Override - public void onTransactionConfidenceChanged(Wallet wallet, Transaction tx) { - transactions[0] = tx; - } - }); - - // Now create a spend, and expect the announcement on p1. - Address dest = new ECKey().toAddress(params); - Wallet.SendResult sendResult = wallet.sendCoins(peerGroup, dest, Utils.toNanoCoins(1, 0)); - assertNotNull(sendResult.tx); - Threading.waitForUserCode(); - assertFalse(sendResult.broadcastComplete.isDone()); - assertEquals(transactions[0], sendResult.tx); - assertEquals(transactions[0].getConfidence().numBroadcastPeers(), 1); - transactions[0] = null; - Transaction t1 = (Transaction) outbound(p1); - assertNotNull(t1); - // 49 BTC in change. - assertEquals(Utils.toNanoCoins(49, 0), t1.getValueSentToMe(wallet)); - // The future won't complete until it's heard back from the network on p2. - InventoryMessage inv = new InventoryMessage(params); - inv.addTransaction(t1); - inbound(p2, inv); - Threading.waitForUserCode(); - assertTrue(sendResult.broadcastComplete.isDone()); - assertEquals(transactions[0], sendResult.tx); - assertEquals(2, transactions[0].getConfidence().numBroadcastPeers()); - // Confirm it. - Block b2 = TestUtils.createFakeBlock(blockStore, t1).block; - inbound(p1, b2); - assertNull(outbound(p1)); - - // Do the same thing with an offline transaction. - peerGroup.removeWallet(wallet); - Wallet.SendRequest req = Wallet.SendRequest.to(dest, Utils.toNanoCoins(2, 0)); - req.ensureMinRequiredFee = false; - Transaction t3 = checkNotNull(wallet.sendCoinsOffline(req)); - assertNull(outbound(p1)); // Nothing sent. - // Add the wallet to the peer group (simulate initialization). Transactions should be announced. - peerGroup.addWallet(wallet); - // Transaction announced to the first peer. - assertEquals(t3.getHash(), ((Transaction) outbound(p1)).getHash()); - } - @Test public void testWalletCatchupTime() throws Exception { // Check the fast catchup time was initialized to something around the current runtime minus a week. diff --git a/core/src/test/java/com/google/bitcoin/core/TestWithPeerGroup.java b/core/src/test/java/com/google/bitcoin/core/TestWithPeerGroup.java index 9940f529..0776397d 100644 --- a/core/src/test/java/com/google/bitcoin/core/TestWithPeerGroup.java +++ b/core/src/test/java/com/google/bitcoin/core/TestWithPeerGroup.java @@ -16,6 +16,7 @@ package com.google.bitcoin.core; +import com.google.bitcoin.params.UnitTestParams; import com.google.bitcoin.store.BlockStore; import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.channel.*; @@ -28,6 +29,7 @@ import static org.junit.Assert.assertTrue; * Utility class that makes it easy to work with mock NetworkConnections in PeerGroups. */ public class TestWithPeerGroup extends TestWithNetworkConnections { + protected static final NetworkParameters params = UnitTestParams.get(); protected PeerGroup peerGroup; protected VersionMessage remoteVersionMessage; diff --git a/core/src/test/java/com/google/bitcoin/core/TransactionBroadcastTest.java b/core/src/test/java/com/google/bitcoin/core/TransactionBroadcastTest.java new file mode 100644 index 00000000..10c48672 --- /dev/null +++ b/core/src/test/java/com/google/bitcoin/core/TransactionBroadcastTest.java @@ -0,0 +1,131 @@ +/** + * Copyright 2013 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.bitcoin.core; + +import com.google.bitcoin.params.UnitTestParams; +import com.google.bitcoin.store.MemoryBlockStore; +import com.google.bitcoin.utils.TestUtils; +import com.google.bitcoin.utils.Threading; +import com.google.common.util.concurrent.ListenableFuture; +import org.junit.Before; +import org.junit.Test; + +import java.util.Random; + +import static com.google.common.base.Preconditions.checkNotNull; +import static org.junit.Assert.*; + +public class TransactionBroadcastTest extends TestWithPeerGroup { + @Override + @Before + public void setUp() throws Exception { + super.setUp(new MemoryBlockStore(UnitTestParams.get())); + peerGroup.addWallet(wallet); + // Fix the random permutation that TransactionBroadcast uses to shuffle the peers. + TransactionBroadcast.random = new Random(0); + peerGroup.setMinBroadcastConnections(2); + } + + @Test + public void fourPeers() throws Exception { + FakeChannel[] channels = { connectPeer(1), connectPeer(2), connectPeer(3), connectPeer(4) }; + Transaction tx = new Transaction(params); + TransactionBroadcast broadcast = new TransactionBroadcast(peerGroup, tx); + ListenableFuture future = broadcast.broadcast(); + assertFalse(future.isDone()); + // We expect two peers to receive a tx message, and at least one of the others must announce for the future to + // complete successfully. + Message[] messages = { + (Message) outbound(channels[0]), + (Message) outbound(channels[1]), + (Message) outbound(channels[2]), + (Message) outbound(channels[3]) + }; + // 0 and 3 are randomly selected to receive the broadcast. + assertEquals(tx, messages[0]); + assertEquals(tx, messages[3]); + assertNull(messages[1]); + assertNull(messages[2]); + Threading.waitForUserCode(); + assertFalse(future.isDone()); + inbound(channels[1], InventoryMessage.with(tx)); + Threading.waitForUserCode(); + assertTrue(future.isDone()); + } + + @Test + public void peerGroupWalletIntegration() throws Exception { + // Make sure we can create spends, and that they are announced. Then do the same with offline mode. + + // Set up connections and block chain. + FakeChannel p1 = connectPeer(1, new VersionMessage(params, 2)); + FakeChannel p2 = connectPeer(2); + + // Send ourselves a bit of money. + Block b1 = TestUtils.makeSolvedTestBlock(blockStore, address); + inbound(p1, b1); + assertNull(outbound(p1)); + + assertEquals(Utils.toNanoCoins(50, 0), wallet.getBalance()); + + // Check that the wallet informs us of changes in confidence as the transaction ripples across the network. + final Transaction[] transactions = new Transaction[1]; + wallet.addEventListener(new AbstractWalletEventListener() { + @Override + public void onTransactionConfidenceChanged(Wallet wallet, Transaction tx) { + transactions[0] = tx; + } + }); + + // Now create a spend, and expect the announcement on p1. + Address dest = new ECKey().toAddress(params); + Wallet.SendResult sendResult = wallet.sendCoins(peerGroup, dest, Utils.toNanoCoins(1, 0)); + assertNotNull(sendResult.tx); + Threading.waitForUserCode(); + assertFalse(sendResult.broadcastComplete.isDone()); + assertEquals(transactions[0], sendResult.tx); + assertEquals(transactions[0].getConfidence().numBroadcastPeers(), 1); + transactions[0] = null; + Transaction t1 = (Transaction) outbound(p1); + assertNotNull(t1); + // 49 BTC in change. + assertEquals(Utils.toNanoCoins(49, 0), t1.getValueSentToMe(wallet)); + // The future won't complete until it's heard back from the network on p2. + InventoryMessage inv = new InventoryMessage(params); + inv.addTransaction(t1); + inbound(p2, inv); + Threading.waitForUserCode(); + assertTrue(sendResult.broadcastComplete.isDone()); + assertEquals(transactions[0], sendResult.tx); + assertEquals(2, transactions[0].getConfidence().numBroadcastPeers()); + // Confirm it. + Block b2 = TestUtils.createFakeBlock(blockStore, t1).block; + inbound(p1, b2); + assertNull(outbound(p1)); + + // Do the same thing with an offline transaction. + peerGroup.removeWallet(wallet); + Wallet.SendRequest req = Wallet.SendRequest.to(dest, Utils.toNanoCoins(2, 0)); + req.ensureMinRequiredFee = false; + Transaction t3 = checkNotNull(wallet.sendCoinsOffline(req)); + assertNull(outbound(p1)); // Nothing sent. + // Add the wallet to the peer group (simulate initialization). Transactions should be announced. + peerGroup.addWallet(wallet); + // Transaction announced to the first peer. + assertEquals(t3.getHash(), ((Transaction) outbound(p1)).getHash()); + } +}