3
0
mirror of https://github.com/Qortal/altcoinj.git synced 2025-02-12 18:25:51 +00:00

Rewrite TxConfidenceTable to actually track confidence objects. Make Transaction.getConfidence() fetch the object from the table via the context. This allows us to simplify large chunks of code quite considerably. Note that confidence listeners are not yet pinning the attached confidence to the root set, so the same old bugs remain if you don't pin the tx object. That will be fixed next.

This commit is contained in:
Mike Hearn 2015-02-27 20:28:22 +01:00
parent f1d2a48a2e
commit 04bc624a91
13 changed files with 151 additions and 303 deletions

View File

@ -46,6 +46,7 @@ public class BlockChain extends AbstractBlockChain {
*/ */
public BlockChain(NetworkParameters params, Wallet wallet, BlockStore blockStore) throws BlockStoreException { public BlockChain(NetworkParameters params, Wallet wallet, BlockStore blockStore) throws BlockStoreException {
this(params, new ArrayList<BlockChainListener>(), blockStore); this(params, new ArrayList<BlockChainListener>(), blockStore);
Context.getOrCreate();
if (wallet != null) if (wallet != null)
addWallet(wallet); addWallet(wallet);
} }

View File

@ -121,6 +121,12 @@ public class Peer extends PeerSocketHandler {
// It is important to avoid a nasty edge case where we can end up with parallel chain downloads proceeding // It is important to avoid a nasty edge case where we can end up with parallel chain downloads proceeding
// simultaneously if we were to receive a newly solved block whilst parts of the chain are streaming to us. // simultaneously if we were to receive a newly solved block whilst parts of the chain are streaming to us.
private final HashSet<Sha256Hash> pendingBlockDownloads = new HashSet<Sha256Hash>(); private final HashSet<Sha256Hash> pendingBlockDownloads = new HashSet<Sha256Hash>();
// Keep references to TransactionConfidence objects for transactions that were announced by a remote peer, but
// which we haven't downloaded yet. These objects are de-duplicated by the TxConfidenceTable class.
// Once the tx is downloaded (by some peer), the Transaction object that is created will have a reference to
// the confidence object held inside it, and it's then up to the event listeners that receive the Transaction
// to keep it pinned to the root set if they care about this data.
private final HashSet<TransactionConfidence> pendingTxDownloads = new HashSet<TransactionConfidence>();
// The lowest version number we're willing to accept. Lower than this will result in an immediate disconnect. // The lowest version number we're willing to accept. Lower than this will result in an immediate disconnect.
private volatile int vMinProtocolVersion = Pong.MIN_PROTOCOL_VERSION; private volatile int vMinProtocolVersion = Pong.MIN_PROTOCOL_VERSION;
// When an API user explicitly requests a block or transaction from a peer, the InventoryItem is put here // When an API user explicitly requests a block or transaction from a peer, the InventoryItem is put here
@ -595,19 +601,23 @@ public class Peer extends PeerSocketHandler {
} }
} }
private void processTransaction(Transaction tx) throws VerificationException { private void processTransaction(final Transaction tx) throws VerificationException {
// Check a few basic syntax issues to ensure the received TX isn't nonsense. // Check a few basic syntax issues to ensure the received TX isn't nonsense.
tx.verify(); tx.verify();
final Transaction fTx;
lock.lock(); lock.lock();
try { try {
log.debug("{}: Received tx {}", getAddress(), tx.getHashAsString()); log.debug("{}: Received tx {}", getAddress(), tx.getHashAsString());
// We may get back a different transaction object.
fTx = context.getConfidenceTable().seen(tx, getAddress());
// Label the transaction as coming in from the P2P network (as opposed to being created by us, direct import, // Label the transaction as coming in from the P2P network (as opposed to being created by us, direct import,
// etc). This helps the wallet decide how to risk analyze it later. // etc). This helps the wallet decide how to risk analyze it later.
fTx.getConfidence().setSource(TransactionConfidence.Source.NETWORK); //
if (maybeHandleRequestedData(fTx)) { // Additionally, by invoking tx.getConfidence(), this tx now pins the confidence data into the heap, meaning
// we can stop holding a reference to the confidence object ourselves. It's up to event listeners on the
// Peer to stash the tx object somewhere if they want to keep receiving updates about network propagation
// and so on.
TransactionConfidence confidence = tx.getConfidence();
confidence.setSource(TransactionConfidence.Source.NETWORK);
pendingTxDownloads.remove(confidence);
if (maybeHandleRequestedData(tx)) {
return; return;
} }
if (currentFilteredBlock != null) { if (currentFilteredBlock != null) {
@ -623,7 +633,7 @@ public class Peer extends PeerSocketHandler {
// It's a broadcast transaction. Tell all wallets about this tx so they can check if it's relevant or not. // It's a broadcast transaction. Tell all wallets about this tx so they can check if it's relevant or not.
for (final Wallet wallet : wallets) { for (final Wallet wallet : wallets) {
try { try {
if (wallet.isPendingTransactionRelevant(fTx)) { if (wallet.isPendingTransactionRelevant(tx)) {
if (vDownloadTxDependencies) { if (vDownloadTxDependencies) {
// This transaction seems interesting to us, so let's download its dependencies. This has // This transaction seems interesting to us, so let's download its dependencies. This has
// several purposes: we can check that the sender isn't attacking us by engaging in protocol // several purposes: we can check that the sender isn't attacking us by engaging in protocol
@ -640,15 +650,14 @@ public class Peer extends PeerSocketHandler {
// through transactions that have confirmed, as getdata on the remote peer also checks // through transactions that have confirmed, as getdata on the remote peer also checks
// relay memory not only the mempool. Unfortunately we have no way to know that here. In // relay memory not only the mempool. Unfortunately we have no way to know that here. In
// practice it should not matter much. // practice it should not matter much.
Futures.addCallback(downloadDependencies(fTx), new FutureCallback<List<Transaction>>() { Futures.addCallback(downloadDependencies(tx), new FutureCallback<List<Transaction>>() {
@Override @Override
public void onSuccess(List<Transaction> dependencies) { public void onSuccess(List<Transaction> dependencies) {
try { try {
log.info("{}: Dependency download complete!", getAddress()); log.info("{}: Dependency download complete!", getAddress());
wallet.receivePending(fTx, dependencies); wallet.receivePending(tx, dependencies);
} catch (VerificationException e) { } catch (VerificationException e) {
log.error("{}: Wallet failed to process pending transaction {}", getAddress(), log.error("{}: Wallet failed to process pending transaction {}", getAddress(), tx.getHash());
fTx.getHashAsString());
log.error("Error was: ", e); log.error("Error was: ", e);
// Not much more we can do at this point. // Not much more we can do at this point.
} }
@ -656,13 +665,13 @@ public class Peer extends PeerSocketHandler {
@Override @Override
public void onFailure(Throwable throwable) { public void onFailure(Throwable throwable) {
log.error("Could not download dependencies of tx {}", fTx.getHashAsString()); log.error("Could not download dependencies of tx {}", tx.getHashAsString());
log.error("Error was: ", throwable); log.error("Error was: ", throwable);
// Not much more we can do at this point. // Not much more we can do at this point.
} }
}); });
} else { } else {
wallet.receivePending(fTx, null); wallet.receivePending(tx, null);
} }
} }
} catch (VerificationException e) { } catch (VerificationException e) {
@ -679,7 +688,7 @@ public class Peer extends PeerSocketHandler {
registration.executor.execute(new Runnable() { registration.executor.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
registration.listener.onTransaction(Peer.this, fTx); registration.listener.onTransaction(Peer.this, tx);
} }
}); });
} }
@ -732,23 +741,13 @@ public class Peer extends PeerSocketHandler {
// We want to recursively grab its dependencies. This is so listeners can learn important information like // We want to recursively grab its dependencies. This is so listeners can learn important information like
// whether a transaction is dependent on a timelocked transaction or has an unexpectedly deep dependency tree // whether a transaction is dependent on a timelocked transaction or has an unexpectedly deep dependency tree
// or depends on a no-fee transaction. // or depends on a no-fee transaction.
//
// Firstly find any that are already in the memory pool so if they weren't garbage collected yet, they won't // We may end up requesting transactions that we've already downloaded and thrown away here.
// be deleted. Use COW sets to make unit tests deterministic and because they are small. It's slower for
// the case of transactions with tons of inputs.
Set<Transaction> dependencies = new CopyOnWriteArraySet<Transaction>();
Set<Sha256Hash> needToRequest = new CopyOnWriteArraySet<Sha256Hash>(); Set<Sha256Hash> needToRequest = new CopyOnWriteArraySet<Sha256Hash>();
for (TransactionInput input : tx.getInputs()) { for (TransactionInput input : tx.getInputs()) {
// There may be multiple inputs that connect to the same transaction. // There may be multiple inputs that connect to the same transaction.
Sha256Hash hash = input.getOutpoint().getHash(); needToRequest.add(input.getOutpoint().getHash());
Transaction dep = context.getConfidenceTable().get(hash);
if (dep == null) {
needToRequest.add(hash);
} else {
dependencies.add(dep);
} }
}
results.addAll(dependencies);
lock.lock(); lock.lock();
try { try {
// Build the request for the missing dependencies. // Build the request for the missing dependencies.
@ -764,10 +763,6 @@ public class Peer extends PeerSocketHandler {
futures.add(req.future); futures.add(req.future);
getDataFutures.add(req); getDataFutures.add(req);
} }
// The transactions we already grabbed out of the mempool must still be considered by the code below.
for (Transaction dep : dependencies) {
futures.add(Futures.immediateFuture(dep));
}
ListenableFuture<List<Transaction>> successful = Futures.successfulAsList(futures); ListenableFuture<List<Transaction>> successful = Futures.successfulAsList(futures);
Futures.addCallback(successful, new FutureCallback<List<Transaction>>() { Futures.addCallback(successful, new FutureCallback<List<Transaction>>() {
@Override @Override
@ -1063,15 +1058,18 @@ public class Peer extends PeerSocketHandler {
// peers run at different speeds. However to conserve bandwidth on mobile devices we try to only download a // peers run at different speeds. However to conserve bandwidth on mobile devices we try to only download a
// transaction once. This means we can miss broadcasts if the peer disconnects between sending us an inv and // transaction once. This means we can miss broadcasts if the peer disconnects between sending us an inv and
// sending us the transaction: currently we'll never try to re-fetch after a timeout. // sending us the transaction: currently we'll never try to re-fetch after a timeout.
if (context.getConfidenceTable().maybeWasSeen(item.hash)) { //
// The line below can trigger confidence listeners.
TransactionConfidence conf = context.getConfidenceTable().seen(item.hash, this.getAddress());
if (conf.numBroadcastPeers() > 1) {
// Some other peer already announced this so don't download. // Some other peer already announced this so don't download.
it.remove(); it.remove();
} else { } else {
log.debug("{}: getdata on tx {}", getAddress(), item.hash); log.debug("{}: getdata on tx {}", getAddress(), item.hash);
getdata.addItem(item); getdata.addItem(item);
// Register with the garbage collector that we care about the confidence data for a while.
pendingTxDownloads.add(conf);
} }
// This can trigger transaction confidence listeners.
context.getConfidenceTable().seen(item.hash, this.getAddress());
} }
// If we are requesting filteredblocks we have to send a ping after the getdata so that we have a clear // If we are requesting filteredblocks we have to send a ping after the getdata so that we have a clear

View File

@ -522,22 +522,15 @@ public class PeerGroup implements TransactionBroadcaster {
Iterator<InventoryItem> it = items.iterator(); Iterator<InventoryItem> it = items.iterator();
while (it.hasNext()) { while (it.hasNext()) {
InventoryItem item = it.next(); InventoryItem item = it.next();
// Check the confidence pool first.
Transaction tx = chain != null ? Context.get().getConfidenceTable().get(item.hash) : null;
if (tx != null) {
transactions.add(tx);
it.remove();
} else {
// Check the wallets. // Check the wallets.
for (Wallet w : wallets) { for (Wallet w : wallets) {
tx = w.getTransaction(item.hash); Transaction tx = w.getTransaction(item.hash);
if (tx == null) continue; if (tx == null) continue;
transactions.add(tx); transactions.add(tx);
it.remove(); it.remove();
break; break;
} }
} }
}
return transactions; return transactions;
} finally { } finally {
lock.unlock(); lock.unlock();

View File

@ -1122,15 +1122,14 @@ public class Transaction extends ChildMessage implements Serializable {
/** Returns the confidence object that is owned by this transaction object. */ /** Returns the confidence object that is owned by this transaction object. */
public synchronized TransactionConfidence getConfidence() { public synchronized TransactionConfidence getConfidence() {
if (confidence == null) { if (confidence == null)
confidence = new TransactionConfidence(getHash()); confidence = Context.get().getConfidenceTable().getOrCreate(getHash()) ;
}
return confidence; return confidence;
} }
/** Check if the transaction has a known confidence */ /** Check if the transaction has a known confidence */
public synchronized boolean hasConfidence() { public synchronized boolean hasConfidence() {
return confidence != null && confidence.getConfidenceType() != TransactionConfidence.ConfidenceType.UNKNOWN; return getConfidence().getConfidenceType() != TransactionConfidence.ConfidenceType.UNKNOWN;
} }
@Override @Override

View File

@ -48,8 +48,6 @@ public class TransactionBroadcast {
@VisibleForTesting @VisibleForTesting
public static Random random = new Random(); public static Random random = new Random();
private Transaction pinnedTx;
// Tracks which nodes sent us a reject message about this broadcast, if any. Useful for debugging. // Tracks which nodes sent us a reject message about this broadcast, if any. Useful for debugging.
private Map<Peer, RejectMessage> rejects = Collections.synchronizedMap(new HashMap<Peer, RejectMessage>()); private Map<Peer, RejectMessage> rejects = Collections.synchronizedMap(new HashMap<Peer, RejectMessage>());
@ -128,13 +126,10 @@ public class TransactionBroadcast {
// be seen, 4 peers is probably too little - it doesn't taken many broken peers for tx propagation to have // be seen, 4 peers is probably too little - it doesn't taken many broken peers for tx propagation to have
// a big effect. // a big effect.
List<Peer> peers = peerGroup.getConnectedPeers(); // snapshots List<Peer> peers = peerGroup.getConnectedPeers(); // snapshots
// We intern the tx here so we are using a canonical version of the object (as it's unfortunately mutable).
// TODO: Once confidence state is moved out of Transaction we can kill off this step.
pinnedTx = Context.get().getConfidenceTable().intern(tx);
// Prepare to send the transaction by adding a listener that'll be called when confidence changes. // Prepare to send the transaction by adding a listener that'll be called when confidence changes.
// Only bother with this if we might actually hear back: // Only bother with this if we might actually hear back:
if (minConnections > 1) if (minConnections > 1)
pinnedTx.getConfidence().addEventListener(new ConfidenceChange()); tx.getConfidence().addEventListener(new ConfidenceChange());
// Satoshis code sends an inv in this case and then lets the peer request the tx data. We just // Satoshis code sends an inv in this case and then lets the peer request the tx data. We just
// blast out the TX here for a couple of reasons. Firstly it's simpler: in the case where we have // blast out the TX here for a couple of reasons. Firstly it's simpler: in the case where we have
// just a single connection we don't have to wait for getdata to be received and handled before // just a single connection we don't have to wait for getdata to be received and handled before
@ -152,7 +147,7 @@ public class TransactionBroadcast {
log.info("Sending to {} peers, will wait for {}, sending to: {}", numToBroadcastTo, numWaitingFor, Joiner.on(",").join(peers)); log.info("Sending to {} peers, will wait for {}, sending to: {}", numToBroadcastTo, numWaitingFor, Joiner.on(",").join(peers));
for (Peer peer : peers) { for (Peer peer : peers) {
try { try {
peer.sendMessage(pinnedTx); peer.sendMessage(tx);
// We don't record the peer as having seen the tx in the memory pool because we want to track only // We don't record the peer as having seen the tx in the memory pool because we want to track only
// how many peers announced to us. // how many peers announced to us.
} catch (Exception e) { } catch (Exception e) {
@ -165,7 +160,7 @@ public class TransactionBroadcast {
// any peer discovery source and the user just calls connectTo() once. // any peer discovery source and the user just calls connectTo() once.
if (minConnections == 1) { if (minConnections == 1) {
peerGroup.removeEventListener(rejectionListener); peerGroup.removeEventListener(rejectionListener);
future.set(pinnedTx); future.set(tx);
} }
} }
} }
@ -176,7 +171,7 @@ public class TransactionBroadcast {
// The number of peers that announced this tx has gone up. // The number of peers that announced this tx has gone up.
int numSeenPeers = conf.numBroadcastPeers() + rejects.size(); int numSeenPeers = conf.numBroadcastPeers() + rejects.size();
boolean mined = tx.getAppearsInHashes() != null; boolean mined = tx.getAppearsInHashes() != null;
log.info("broadcastTransaction: {}: TX {} seen by {} peers{}", reason, pinnedTx.getHashAsString(), log.info("broadcastTransaction: {}: TX {} seen by {} peers{}", reason, tx.getHashAsString(),
numSeenPeers, mined ? " and mined" : ""); numSeenPeers, mined ? " and mined" : "");
// Progress callback on the requested thread. // Progress callback on the requested thread.
@ -196,10 +191,10 @@ public class TransactionBroadcast {
// //
// We're done! It's important that the PeerGroup lock is not held (by this thread) at this // We're done! It's important that the PeerGroup lock is not held (by this thread) at this
// point to avoid triggering inversions when the Future completes. // point to avoid triggering inversions when the Future completes.
log.info("broadcastTransaction: {} complete", pinnedTx.getHashAsString()); log.info("broadcastTransaction: {} complete", tx.getHash());
peerGroup.removeEventListener(rejectionListener); peerGroup.removeEventListener(rejectionListener);
conf.removeEventListener(this); conf.removeEventListener(this);
future.set(pinnedTx); // RE-ENTRANCY POINT future.set(tx); // RE-ENTRANCY POINT
} }
} }
} }

View File

@ -17,18 +17,14 @@
package org.bitcoinj.core; package org.bitcoinj.core;
import com.google.common.collect.Sets; import com.google.common.collect.*;
import org.bitcoinj.utils.ListenerRegistration; import com.google.common.util.concurrent.*;
import org.bitcoinj.utils.Threading; import org.bitcoinj.utils.*;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import javax.annotation.Nullable; import javax.annotation.*;
import java.io.Serializable; import java.io.*;
import java.util.ListIterator; import java.util.*;
import java.util.Set; import java.util.concurrent.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import static com.google.common.base.Preconditions.*; import static com.google.common.base.Preconditions.*;
@ -261,8 +257,7 @@ public class TransactionConfidence implements Serializable {
/** /**
* Called by a {@link Peer} when a transaction is pending and announced by a peer. The more peers announce the * Called by a {@link Peer} when a transaction is pending and announced by a peer. The more peers announce the
* transaction, the more peers have validated it (assuming your internet connection is not being intercepted). * transaction, the more peers have validated it (assuming your internet connection is not being intercepted).
* If confidence is currently unknown, sets it to {@link ConfidenceType#PENDING}. Listeners will be * If confidence is currently unknown, sets it to {@link ConfidenceType#PENDING}.
* invoked in this case.
* *
* @param address IP address of the peer, used as a proxy for identity. * @param address IP address of the peer, used as a proxy for identity.
*/ */

View File

@ -16,22 +16,15 @@
package org.bitcoinj.core; package org.bitcoinj.core;
import org.bitcoinj.utils.Threading; import org.bitcoinj.utils.*;
import org.slf4j.Logger; import org.slf4j.*;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable; import javax.annotation.*;
import java.lang.ref.Reference; import java.lang.ref.*;
import java.lang.ref.ReferenceQueue; import java.util.*;
import java.lang.ref.WeakReference; import java.util.concurrent.locks.*;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
/** /**
* <p>Tracks transactions that are being announced across the network. Typically one is created for you by a * <p>Tracks transactions that are being announced across the network. Typically one is created for you by a
@ -48,38 +41,21 @@ public class TxConfidenceTable {
private static final Logger log = LoggerFactory.getLogger(TxConfidenceTable.class); private static final Logger log = LoggerFactory.getLogger(TxConfidenceTable.class);
protected ReentrantLock lock = Threading.lock("txconfidencetable"); protected ReentrantLock lock = Threading.lock("txconfidencetable");
// For each transaction we may have seen: private static class WeakConfidenceReference extends WeakReference<TransactionConfidence> {
// - only its hash in an inv packet
// - the full transaction itself, if we asked for it to be sent to us (or a peer sent it regardless), or if we
// sent it.
//
// Before we see the full transaction, we need to track how many peers advertised it, so we can estimate its
// confidence pre-chain inclusion assuming an un-tampered with network connection. After we see the full transaction
// we need to switch from tracking that data in the Entry to tracking it in the TransactionConfidence object itself.
private static class WeakTransactionReference extends WeakReference<Transaction> {
public Sha256Hash hash; public Sha256Hash hash;
public WeakTransactionReference(Transaction tx, ReferenceQueue<Transaction> queue) { public WeakConfidenceReference(TransactionConfidence confidence, ReferenceQueue<TransactionConfidence> queue) {
super(tx, queue); super(confidence, queue);
hash = tx.getHash(); hash = confidence.getTransactionHash();
} }
} }
private static class Entry { private LinkedHashMap<Sha256Hash, WeakConfidenceReference> table;
// Invariants: one of the two fields must be null, to indicate which is used.
Set<PeerAddress> addresses;
// We keep a weak reference to the transaction. This means that if no other bit of code finds the transaction
// worth keeping around it will drop out of memory and we will, at some point, forget about it, which means
// both addresses and tx.get() will be null. When this happens the WeakTransactionReference appears in the queue
// allowing us to delete the associated entry (the tx itself has already gone away).
WeakTransactionReference tx;
}
private LinkedHashMap<Sha256Hash, Entry> table;
// This ReferenceQueue gets entries added to it when they are only weakly reachable, ie, the TxConfidenceTable is the // This ReferenceQueue gets entries added to it when they are only weakly reachable, ie, the TxConfidenceTable is the
// only thing that is tracking the transaction anymore. We check it from time to time and delete table entries // only thing that is tracking the confidence data anymore. We check it from time to time and delete table entries
// corresponding to expired transactions. In this way memory usage of the system is in line with however many // corresponding to expired transactions. In this way memory usage of the system is in line with however many
// transactions you actually care to track the confidence of. We can still end up with lots of hashes being stored // transactions you actually care to track the confidence of. We can still end up with lots of hashes being stored
// if our peers flood us with invs but the MAX_SIZE param caps this. // if our peers flood us with invs but the MAX_SIZE param caps this.
private ReferenceQueue<Transaction> referenceQueue; private ReferenceQueue<TransactionConfidence> referenceQueue;
/** The max size of a table created with the no-args constructor. */ /** The max size of a table created with the no-args constructor. */
public static final int MAX_SIZE = 1000; public static final int MAX_SIZE = 1000;
@ -90,15 +66,15 @@ public class TxConfidenceTable {
* @param size Max number of transactions to track. The table will fill up to this size then stop growing. * @param size Max number of transactions to track. The table will fill up to this size then stop growing.
*/ */
public TxConfidenceTable(final int size) { public TxConfidenceTable(final int size) {
table = new LinkedHashMap<Sha256Hash, Entry>() { table = new LinkedHashMap<Sha256Hash, WeakConfidenceReference>() {
@Override @Override
protected boolean removeEldestEntry(Map.Entry<Sha256Hash, TxConfidenceTable.Entry> entry) { protected boolean removeEldestEntry(Map.Entry<Sha256Hash, WeakConfidenceReference> entry) {
// An arbitrary choice to stop the memory used by tracked transactions getting too huge in the event // An arbitrary choice to stop the memory used by tracked transactions getting too huge in the event
// of some kind of DoS attack. // of some kind of DoS attack.
return size() > size; return size() > size;
} }
}; };
referenceQueue = new ReferenceQueue<Transaction>(); referenceQueue = new ReferenceQueue<TransactionConfidence>();
} }
/** /**
@ -118,10 +94,10 @@ public class TxConfidenceTable {
private void cleanTable() { private void cleanTable() {
lock.lock(); lock.lock();
try { try {
Reference<? extends Transaction> ref; Reference<? extends TransactionConfidence> ref;
while ((ref = referenceQueue.poll()) != null) { while ((ref = referenceQueue.poll()) != null) {
// Find which transaction got deleted by the GC. // Find which transaction got deleted by the GC.
WeakTransactionReference txRef = (WeakTransactionReference) ref; WeakConfidenceReference txRef = (WeakConfidenceReference) ref;
// And remove the associated map entry so the other bits of memory can also be reclaimed. // And remove the associated map entry so the other bits of memory can also be reclaimed.
table.remove(txRef.hash); table.remove(txRef.hash);
} }
@ -137,25 +113,17 @@ public class TxConfidenceTable {
lock.lock(); lock.lock();
try { try {
cleanTable(); cleanTable();
Entry entry = table.get(txHash); WeakConfidenceReference entry = table.get(txHash);
if (entry == null) { if (entry == null) {
// No such TX known. return 0; // No such TX known.
return 0;
} else if (entry.tx == null) {
// We've seen at least one peer announce with an inv.
checkNotNull(entry.addresses);
return entry.addresses.size();
} else { } else {
final Transaction tx = entry.tx.get(); TransactionConfidence confidence = entry.get();
if (tx == null) { if (confidence == null) {
// We previously downloaded this transaction, but nothing cared about it so the garbage collector threw // Such a TX hash was seen, but nothing seemed to care so we ended up throwing away the data.
// it away. We also deleted the set that tracked which peers had seen it. Treat this case as a zero and
// just delete it from the map.
table.remove(txHash); table.remove(txHash);
return 0; return 0;
} else { } else {
checkState(entry.addresses == null); return confidence.numBroadcastPeers();
return tx.getConfidence().numBroadcastPeers();
} }
} }
} finally { } finally {
@ -164,153 +132,62 @@ public class TxConfidenceTable {
} }
/** /**
* Puts the tx into the table and returns either it, or a different Transaction object that has the same hash. * Called by peers when they see a transaction advertised in an "inv" message. It passes the data on to the relevant
* Unlike seen and the other methods, this one does not imply that a tx has been announced by a peer and does * {@link org.bitcoinj.core.TransactionConfidence} object, creating it if needed.
* not mark it as such. *
* @return the number of peers that have now announced this hash (including the caller)
*/ */
public Transaction intern(Transaction tx) { public TransactionConfidence seen(Sha256Hash hash, PeerAddress byPeer) {
TransactionConfidence confidence;
lock.lock(); lock.lock();
try { {
cleanTable(); cleanTable();
Entry entry = table.get(tx.getHash()); confidence = getOrCreate(hash);
if (entry != null) { confidence.markBroadcastBy(byPeer);
// This TX or its hash have been previously interned.
if (entry.tx != null) {
// We already interned it (but may have thrown it away).
checkState(entry.addresses == null);
// We only want one canonical object instance for a transaction no matter how many times it is
// deserialized.
Transaction transaction = entry.tx.get();
if (transaction != null) {
// We saw it before and kept it around. Hand back the canonical copy.
tx = transaction;
} }
return tx;
} else {
// We received a transaction that we have previously seen announced but not downloaded until now.
checkNotNull(entry.addresses);
entry.tx = new WeakTransactionReference(tx, referenceQueue);
Set<PeerAddress> addrs = entry.addresses;
entry.addresses = null;
TransactionConfidence confidence = tx.getConfidence();
log.debug("Adding tx [{}] {} to the confidence table",
confidence.numBroadcastPeers(), tx.getHashAsString());
for (PeerAddress a : addrs) {
markBroadcast(a, tx);
}
return tx;
}
} else {
// This often happens when we are downloading a Bloom filtered chain, or recursively downloading
// dependencies of a relevant transaction (see Peer.downloadDependencies).
log.debug("Provided with a downloaded transaction we didn't see announced yet: {}", tx.getHashAsString());
entry = new Entry();
entry.tx = new WeakTransactionReference(tx, referenceQueue);
table.put(tx.getHash(), entry);
return tx;
}
} finally {
lock.unlock(); lock.unlock();
}
}
/**
* Called by peers when they receive a "tx" message containing a valid serialized transaction.
* @param tx The TX deserialized from the wire.
* @param byPeer The Peer that received it.
* @return An object that is semantically the same TX but may be a different object instance.
*/
public Transaction seen(Transaction tx, PeerAddress byPeer) {
lock.lock();
try {
final Transaction interned = intern(tx);
markBroadcast(byPeer, interned);
return interned;
} finally {
lock.unlock();
}
}
/**
* Called by peers when they see a transaction advertised in an "inv" message. It either will increase the
* confidence of the pre-existing transaction or will just keep a record of the address for future usage.
*/
public void seen(Sha256Hash hash, PeerAddress byPeer) {
lock.lock();
try {
cleanTable();
Entry entry = table.get(hash);
if (entry != null) {
// This TX or its hash have been previously announced.
if (entry.tx != null) {
checkState(entry.addresses == null);
Transaction tx = entry.tx.get();
if (tx != null) {
markBroadcast(byPeer, tx);
log.debug("{}: Peer announced transaction we have seen before [{}] {}",
byPeer, tx.getConfidence().numBroadcastPeers(), tx.getHashAsString());
} else {
// The inv is telling us about a transaction that we previously downloaded, and threw away
// because nothing found it interesting enough to keep around. So do nothing.
}
} else {
checkNotNull(entry.addresses);
entry.addresses.add(byPeer);
log.debug("{}: Peer announced transaction we have seen announced before [{}] {}",
byPeer, entry.addresses.size(), hash);
}
} else {
// This TX has never been seen before.
entry = new Entry();
// TODO: Using hashsets here is inefficient compared to just having an array.
entry.addresses = new HashSet<PeerAddress>();
entry.addresses.add(byPeer);
table.put(hash, entry);
log.info("{}: Peer announced new transaction [1] {}", byPeer, hash);
}
} finally {
lock.unlock();
}
}
private void markBroadcast(PeerAddress byPeer, Transaction tx) {
checkState(lock.isHeldByCurrentThread());
final TransactionConfidence confidence = tx.getConfidence();
if (confidence.markBroadcastBy(byPeer))
confidence.queueListeners(TransactionConfidence.Listener.ChangeReason.SEEN_PEERS); confidence.queueListeners(TransactionConfidence.Listener.ChangeReason.SEEN_PEERS);
return confidence;
} }
/** /**
* Returns the {@link Transaction} for the given hash if we have downloaded it, or null if that hash is unknown or * Returns the {@link TransactionConfidence} for the given hash if we have downloaded it, or null if that tx hash
* we only saw advertisements for it yet or it has been downloaded but garbage collected due to nowhere else * is unknown to the system at this time.
* holding a reference to it. */
public TransactionConfidence getOrCreate(Sha256Hash hash) {
checkNotNull(hash);
lock.lock();
try {
WeakConfidenceReference reference = table.get(hash);
if (reference != null) {
TransactionConfidence confidence = reference.get();
if (confidence != null)
return confidence;
}
TransactionConfidence newConfidence = new TransactionConfidence(hash);
table.put(hash, new WeakConfidenceReference(newConfidence, referenceQueue));
return newConfidence;
} finally {
lock.unlock();
}
}
/**
* Returns the {@link TransactionConfidence} for the given hash if we have downloaded it, or null if that tx hash
* is unknown to the system at this time.
*/ */
@Nullable @Nullable
public Transaction get(Sha256Hash hash) { public TransactionConfidence get(Sha256Hash hash) {
lock.lock(); lock.lock();
try { try {
Entry entry = table.get(hash); WeakConfidenceReference ref = table.get(hash);
if (entry == null) return null; // Unknown. if (ref == null)
if (entry.tx == null) return null; // Seen but only in advertisements. return null;
if (entry.tx.get() == null) return null; // Was downloaded but garbage collected. TransactionConfidence confidence = ref.get();
Transaction tx = entry.tx.get(); if (confidence != null)
checkNotNull(tx); return confidence;
return tx; else
} finally { return null;
lock.unlock();
}
}
/**
* Returns true if the TX identified by hash has been seen before (ie, in an inv). Note that a transaction that
* was broadcast, downloaded and nothing kept a reference to it will eventually be cleared out by the garbage
* collector and wasSeen() will return false - it does not keep a permanent record of every hash ever broadcast.
*/
public boolean maybeWasSeen(Sha256Hash hash) {
lock.lock();
try {
Entry entry = table.get(hash);
return entry != null;
} finally { } finally {
lock.unlock(); lock.unlock();
} }

View File

@ -149,6 +149,7 @@ public class Wallet extends BaseTaggableObject implements Serializable, BlockCha
// A list of scripts watched by this wallet. // A list of scripts watched by this wallet.
private Set<Script> watchedScripts; private Set<Script> watchedScripts;
protected final Context context;
protected final NetworkParameters params; protected final NetworkParameters params;
@Nullable private Sha256Hash lastBlockSeenHash; @Nullable private Sha256Hash lastBlockSeenHash;
@ -246,6 +247,7 @@ public class Wallet extends BaseTaggableObject implements Serializable, BlockCha
// TODO: When this class moves to the Wallet package, along with the protobuf serializer, then hide this. // TODO: When this class moves to the Wallet package, along with the protobuf serializer, then hide this.
/** For internal use only. */ /** For internal use only. */
public Wallet(NetworkParameters params, KeyChainGroup keyChainGroup) { public Wallet(NetworkParameters params, KeyChainGroup keyChainGroup) {
this.context = Context.getOrCreate();
this.params = checkNotNull(params); this.params = checkNotNull(params);
this.keychain = checkNotNull(keyChainGroup); this.keychain = checkNotNull(keyChainGroup);
if (params == UnitTestParams.get()) if (params == UnitTestParams.get())

View File

@ -58,10 +58,10 @@ public class ChainSplitTest {
Utils.setMockClock(); // Use mock clock Utils.setMockClock(); // Use mock clock
Wallet.SendRequest.DEFAULT_FEE_PER_KB = Coin.ZERO; Wallet.SendRequest.DEFAULT_FEE_PER_KB = Coin.ZERO;
unitTestParams = UnitTestParams.get(); unitTestParams = UnitTestParams.get();
blockStore = new MemoryBlockStore(unitTestParams);
wallet = new Wallet(unitTestParams); wallet = new Wallet(unitTestParams);
ECKey key1 = wallet.freshReceiveKey(); ECKey key1 = wallet.freshReceiveKey();
ECKey key2 = wallet.freshReceiveKey(); ECKey key2 = wallet.freshReceiveKey();
blockStore = new MemoryBlockStore(unitTestParams);
chain = new BlockChain(unitTestParams, wallet, blockStore); chain = new BlockChain(unitTestParams, wallet, blockStore);
coinsTo = key1.toAddress(unitTestParams); coinsTo = key1.toAddress(unitTestParams);
coinsTo2 = key2.toAddress(unitTestParams); coinsTo2 = key2.toAddress(unitTestParams);
@ -204,7 +204,7 @@ public class ChainSplitTest {
chain.add(b3); chain.add(b3);
chain.add(b4); chain.add(b4);
// b4 causes a re-org that should make our spend go pending again. // b4 causes a re-org that should make our spend go pending again.
assertEquals(valueOf(40, 0), wallet.getBalance()); assertEquals(valueOf(40, 0), wallet.getBalance(Wallet.BalanceType.ESTIMATED));
assertEquals(ConfidenceType.PENDING, spend.getConfidence().getConfidenceType()); assertEquals(ConfidenceType.PENDING, spend.getConfidence().getConfidenceType());
} }
@ -339,9 +339,7 @@ public class ChainSplitTest {
wallet.addEventListener(new AbstractWalletEventListener() { wallet.addEventListener(new AbstractWalletEventListener() {
@Override @Override
public void onTransactionConfidenceChanged(Wallet wallet, Transaction tx) { public void onTransactionConfidenceChanged(Wallet wallet, Transaction tx) {
super.onTransactionConfidenceChanged(wallet, tx); if (tx.getConfidence().getConfidenceType() == TransactionConfidence.ConfidenceType.DEAD) {
if (tx.getConfidence().getConfidenceType() ==
TransactionConfidence.ConfidenceType.DEAD) {
eventDead[0] = tx; eventDead[0] = tx;
eventReplacement[0] = tx.getConfidence().getOverridingTransaction(); eventReplacement[0] = tx.getConfidence().getOverridingTransaction();
} }

View File

@ -376,8 +376,7 @@ public class PeerGroupTest extends TestWithPeerGroup {
// Peer 2 advertises the tx but does not receive it yet. // Peer 2 advertises the tx but does not receive it yet.
inbound(p2, inv); inbound(p2, inv);
assertTrue(outbound(p2) instanceof GetDataMessage); assertTrue(outbound(p2) instanceof GetDataMessage);
assertEquals(0, tx.getConfidence().numBroadcastPeers()); assertEquals(1, tx.getConfidence().numBroadcastPeers());
assertTrue(Context.get().getConfidenceTable().maybeWasSeen(tx.getHash()));
assertNull(event[0]); assertNull(event[0]);
// Peer 1 advertises the tx, we don't do anything as it's already been requested. // Peer 1 advertises the tx, we don't do anything as it's already been requested.
inbound(p1, inv); inbound(p1, inv);

View File

@ -292,7 +292,7 @@ public class PeerTest extends TestWithNetworkConnections {
GetDataMessage message = (GetDataMessage)outbound(writeTarget); GetDataMessage message = (GetDataMessage)outbound(writeTarget);
assertEquals(1, message.getItems().size()); assertEquals(1, message.getItems().size());
assertEquals(tx.getHash(), message.getItems().get(0).hash); assertEquals(tx.getHash(), message.getItems().get(0).hash);
assertTrue(confidenceTable.maybeWasSeen(tx.getHash())); assertNotEquals(0, tx.getConfidence().numBroadcastPeers());
// Advertising to peer2 results in no getdata message. // Advertising to peer2 results in no getdata message.
inbound(writeTarget2, inv); inbound(writeTarget2, inv);
@ -554,7 +554,7 @@ public class PeerTest extends TestWithNetworkConnections {
} }
}, Threading.SAME_THREAD); }, Threading.SAME_THREAD);
// Make the some fake transactions in the following graph: // Make some fake transactions in the following graph:
// t1 -> t2 -> [t5] // t1 -> t2 -> [t5]
// -> t3 -> t4 -> [t6] // -> t3 -> t4 -> [t6]
// -> [t7] // -> [t7]
@ -601,10 +601,6 @@ public class PeerTest extends TestWithNetworkConnections {
assertEquals(t3.getHash(), getdata.getItems().get(1).hash); assertEquals(t3.getHash(), getdata.getItems().get(1).hash);
assertEquals(someHash, getdata.getItems().get(2).hash); assertEquals(someHash, getdata.getItems().get(2).hash);
assertEquals(anotherHash, getdata.getItems().get(3).hash); assertEquals(anotherHash, getdata.getItems().get(3).hash);
// For some random reason, t4 is delivered at this point before it's needed - perhaps it was a Bloom filter
// false positive. We do this to check that the mempool is being checked for seen transactions before
// requesting them.
inbound(writeTarget, t4);
// Deliver the requested transactions. // Deliver the requested transactions.
inbound(writeTarget, t2); inbound(writeTarget, t2);
inbound(writeTarget, t3); inbound(writeTarget, t3);
@ -621,6 +617,10 @@ public class PeerTest extends TestWithNetworkConnections {
notFound.addItem(new InventoryItem(InventoryItem.Type.Transaction, t5)); notFound.addItem(new InventoryItem(InventoryItem.Type.Transaction, t5));
inbound(writeTarget, notFound); inbound(writeTarget, notFound);
assertFalse(futures.isDone()); assertFalse(futures.isDone());
// Request t4 ...
getdata = (GetDataMessage) outbound(writeTarget);
assertEquals(t4.getHash(), getdata.getItems().get(0).hash);
inbound(writeTarget, t4);
// Continue to explore the t4 branch and ask for t6, which is in the chain. // Continue to explore the t4 branch and ask for t6, which is in the chain.
getdata = (GetDataMessage) outbound(writeTarget); getdata = (GetDataMessage) outbound(writeTarget);
assertEquals(t6, getdata.getItems().get(0).hash); assertEquals(t6, getdata.getItems().get(0).hash);

View File

@ -16,64 +16,51 @@
package org.bitcoinj.core; package org.bitcoinj.core;
import org.bitcoinj.params.UnitTestParams; import org.bitcoinj.params.*;
import org.bitcoinj.testing.FakeTxBuilder; import org.bitcoinj.testing.*;
import org.bitcoinj.utils.BriefLogFormatter; import org.bitcoinj.utils.*;
import org.junit.Before; import org.junit.*;
import org.junit.Test;
import java.net.InetAddress; import java.net.*;
import static org.bitcoinj.core.Coin.COIN; import static org.bitcoinj.core.Coin.*;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.*;
import static org.junit.Assert.assertTrue;
public class MemoryPoolTest { public class TxConfidenceTableTest {
private NetworkParameters params = UnitTestParams.get(); private NetworkParameters params = UnitTestParams.get();
private Transaction tx1, tx2; private Transaction tx1, tx2;
private PeerAddress address1, address2, address3; private PeerAddress address1, address2, address3;
private TxConfidenceTable table;
@Before @Before
public void setup() throws Exception { public void setup() throws Exception {
BriefLogFormatter.init(); BriefLogFormatter.init();
tx1 = FakeTxBuilder.createFakeTx(params, COIN, new ECKey().toAddress(params)); table = Context.getOrCreate().getConfidenceTable();
tx2 = new Transaction(params, tx1.bitcoinSerialize());
Address to = new ECKey().toAddress(params);
Address change = new ECKey().toAddress(params);
tx1 = FakeTxBuilder.createFakeTxWithChangeAddress(params, COIN, to, change);
tx2 = FakeTxBuilder.createFakeTxWithChangeAddress(params, COIN, to, change);
assertEquals(tx1.getHash(), tx2.getHash());
address1 = new PeerAddress(InetAddress.getByAddress(new byte[] { 127, 0, 0, 1 })); address1 = new PeerAddress(InetAddress.getByAddress(new byte[] { 127, 0, 0, 1 }));
address2 = new PeerAddress(InetAddress.getByAddress(new byte[] { 127, 0, 0, 2 })); address2 = new PeerAddress(InetAddress.getByAddress(new byte[] { 127, 0, 0, 2 }));
address3 = new PeerAddress(InetAddress.getByAddress(new byte[] { 127, 0, 0, 3 })); address3 = new PeerAddress(InetAddress.getByAddress(new byte[] { 127, 0, 0, 3 }));
} }
@Test
public void canonicalInstance() throws Exception {
TxConfidenceTable table = new TxConfidenceTable();
// Check that if we repeatedly send it the same transaction but with different objects, we get back the same
// canonical instance with the confidences update.
assertEquals(0, table.numBroadcastPeers(tx1.getHash()));
assertEquals(tx1, table.seen(tx1, address1));
assertEquals(1, tx1.getConfidence().numBroadcastPeers());
assertEquals(1, table.numBroadcastPeers(tx1.getHash()));
assertEquals(tx1, table.seen(tx2, address2));
assertEquals(2, tx1.getConfidence().numBroadcastPeers());
assertEquals(2, table.numBroadcastPeers(tx1.getHash()));
assertEquals(tx1, table.get(tx1.getHash()));
}
@Test @Test
public void invAndDownload() throws Exception { public void invAndDownload() throws Exception {
TxConfidenceTable table = new TxConfidenceTable();
// Base case: we see a transaction announced twice and then download it. The count is in the confidence object. // Base case: we see a transaction announced twice and then download it. The count is in the confidence object.
assertEquals(0, table.numBroadcastPeers(tx1.getHash())); assertEquals(0, table.numBroadcastPeers(tx1.getHash()));
table.seen(tx1.getHash(), address1); table.seen(tx1.getHash(), address1);
assertEquals(1, table.numBroadcastPeers(tx1.getHash())); assertEquals(1, table.numBroadcastPeers(tx1.getHash()));
assertTrue(table.maybeWasSeen(tx1.getHash()));
table.seen(tx1.getHash(), address2); table.seen(tx1.getHash(), address2);
assertEquals(2, table.numBroadcastPeers(tx1.getHash())); assertEquals(2, table.numBroadcastPeers(tx1.getHash()));
Transaction t = table.seen(tx1, address1); assertEquals(2, tx2.getConfidence().numBroadcastPeers());
assertEquals(2, t.getConfidence().numBroadcastPeers());
// And now we see another inv. // And now we see another inv.
table.seen(tx1.getHash(), address3); table.seen(tx1.getHash(), address3);
assertEquals(3, t.getConfidence().numBroadcastPeers()); assertEquals(3, tx2.getConfidence().numBroadcastPeers());
assertEquals(3, table.numBroadcastPeers(tx1.getHash())); assertEquals(3, table.numBroadcastPeers(tx1.getHash()));
} }
} }

View File

@ -48,7 +48,11 @@ public class SendMoneyController {
try { try {
Coin amount = Coin.parseCoin(amountEdit.getText()); Coin amount = Coin.parseCoin(amountEdit.getText());
Address destination = new Address(Main.params, address.getText()); Address destination = new Address(Main.params, address.getText());
Wallet.SendRequest req = Wallet.SendRequest.to(destination, amount); Wallet.SendRequest req;
if (amount.equals(Main.bitcoin.wallet().getBalance()))
req = Wallet.SendRequest.emptyWallet(destination);
else
req = Wallet.SendRequest.to(destination, amount);
req.aesKey = aesKey; req.aesKey = aesKey;
sendResult = Main.bitcoin.wallet().sendCoins(req); sendResult = Main.bitcoin.wallet().sendCoins(req);
Futures.addCallback(sendResult.broadcastComplete, new FutureCallback<Transaction>() { Futures.addCallback(sendResult.broadcastComplete, new FutureCallback<Transaction>() {