3
0
mirror of https://github.com/Qortal/altcoinj.git synced 2025-02-12 10:15:52 +00:00

PeerGroup cleanup

This commit is contained in:
Miron Cuperman (devrandom) 2011-07-14 20:13:11 +00:00
parent 8e84d71308
commit d7d52cadd2
10 changed files with 244 additions and 84 deletions

View File

@ -0,0 +1,96 @@
/**
* Copyright 2011 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.bitcoin.core;
import com.google.bitcoin.core.AbstractPeerEventListener;
import com.google.bitcoin.core.Block;
import com.google.bitcoin.core.Peer;
import java.util.concurrent.Semaphore;
/**
* Listen to chain download events and print useful informational messages.
*
* <p>progress, startDownload, doneDownload maybe be overridden to change the way the user
* is notified.
*
* <p>Methods are called with the event listener object locked so your
* implementation does not have to be thread safe.
*
* @author miron@google.com (Miron Cuperman a.k.a. devrandom)
*
*/
public class DownloadListener extends AbstractPeerEventListener {
private int originalBlocksLeft = -1;
private int lastPercent = -1;
Semaphore done = new Semaphore(0);
@Override
public void onBlocksDownloaded(Peer peer, Block block, int blocksLeft) {
if (blocksLeft == 0) {
doneDownload();
done.release();
}
if (blocksLeft <= 0)
return;
if (originalBlocksLeft < 0) {
startDownload(blocksLeft);
originalBlocksLeft = blocksLeft;
}
double pct = 100.0 - (100.0 * (blocksLeft / (double) originalBlocksLeft));
if ((int)pct != lastPercent) {
progress(pct);
lastPercent = (int)pct;
}
}
/**
* Called when download progress is made.
*
* @param pct the percentage of chain downloaded, estimated
*/
protected void progress(double pct) {
System.out.println(String.format("Chain download %d%% done", (int) pct));
}
/**
* Called when download is initiated.
*
* @param blocks the number of blocks to download, estimated
*/
protected void startDownload(int blocks) {
System.out.println("Downloading block chain of size " + blocks + ". " +
(lastPercent > 1000 ? "This may take a while." : ""));
}
/**
* Called when we are done downloading the block chain.
*/
protected void doneDownload() {
System.out.println("Done downloading block chain");
}
/**
* Wait for the chain to be downloaded.
*/
public void await() throws InterruptedException {
done.acquire();
}
}

View File

@ -53,7 +53,8 @@ public class NetworkConnection {
* Connect to the given IP address using the port specified as part of the network parameters. Once construction * Connect to the given IP address using the port specified as part of the network parameters. Once construction
* is complete a functioning network channel is set up and running. * is complete a functioning network channel is set up and running.
* *
* @param peerAddress address to connect to. IPv6 is not currently supported by BitCoin. * @param peerAddress address to connect to. IPv6 is not currently supported by BitCoin. If
* port is not positive the default port from params is used.
* @param params Defines which network to connect to and details of the protocol. * @param params Defines which network to connect to and details of the protocol.
* @param bestHeight How many blocks are in our best chain * @param bestHeight How many blocks are in our best chain
* @param connectTimeout Timeout in milliseconds when initially connecting to peer * @param connectTimeout Timeout in milliseconds when initially connecting to peer
@ -106,6 +107,11 @@ public class NetworkConnection {
// Handshake is done! // Handshake is done!
} }
public NetworkConnection(InetAddress inetAddress, NetworkParameters params, int bestHeight, int connectTimeout)
throws IOException, ProtocolException {
this(new PeerAddress(inetAddress), params, bestHeight, connectTimeout);
}
/** /**
* Sends a "ping" message to the remote node. The protocol doesn't presently use this feature much. * Sends a "ping" message to the remote node. The protocol doesn't presently use this feature much.
* @throws IOException * @throws IOException

View File

@ -57,6 +57,8 @@ public class Peer {
/** /**
* Construct a peer that handles the given network connection and reads/writes from the given block chain. Note that * Construct a peer that handles the given network connection and reads/writes from the given block chain. Note that
* communication won't occur until you call connect(). * communication won't occur until you call connect().
*
* @param bestHeight our current best chain height, to facilitate downloading
*/ */
public Peer(NetworkParameters params, PeerAddress address, int bestHeight, BlockChain blockChain) { public Peer(NetworkParameters params, PeerAddress address, int bestHeight, BlockChain blockChain) {
this.params = params; this.params = params;
@ -101,6 +103,7 @@ public class Peer {
* <p>connect() must be called first * <p>connect() must be called first
*/ */
public void run() { public void run() {
// This should be called in the network loop thread for this peer
if (conn == null) if (conn == null)
throw new RuntimeException("please call connect() first"); throw new RuntimeException("please call connect() first");
@ -143,6 +146,7 @@ public class Peer {
} }
private void processBlock(Block m) throws IOException { private void processBlock(Block m) throws IOException {
// This should called in the network loop thread for this peer
try { try {
// Was this block requested by getBlock()? // Was this block requested by getBlock()?
synchronized (pendingGetBlockFutures) { synchronized (pendingGetBlockFutures) {
@ -162,7 +166,9 @@ public class Peer {
if (blockChain.add(m)) { if (blockChain.add(m)) {
// The block was successfully linked into the chain. Notify the user of our progress. // The block was successfully linked into the chain. Notify the user of our progress.
for (PeerEventListener listener : eventListeners) { for (PeerEventListener listener : eventListeners) {
listener.onBlocksDownloaded(this, getPeerBlocksToGet()); synchronized (listener) {
listener.onBlocksDownloaded(this, m, getPeerBlocksToGet());
}
} }
} else { } else {
// This block is unconnected - we don't know how to get from it back to the genesis block yet. That // This block is unconnected - we don't know how to get from it back to the genesis block yet. That
@ -176,14 +182,16 @@ public class Peer {
} }
} catch (VerificationException e) { } catch (VerificationException e) {
// We don't want verification failures to kill the thread. // We don't want verification failures to kill the thread.
log.warn("block verification failed", e); log.warn("Block verification failed", e);
} catch (ScriptException e) { } catch (ScriptException e) {
// We don't want script failures to kill the thread. // We don't want script failures to kill the thread.
log.warn("script exception", e); log.warn("Script exception", e);
} }
} }
private void processInv(InventoryMessage inv) throws IOException { private void processInv(InventoryMessage inv) throws IOException {
// This should be called in the network loop thread for this peer
// The peer told us about some blocks or transactions they have. For now we only care about blocks. // The peer told us about some blocks or transactions they have. For now we only care about blocks.
// Note that as we don't actually want to store the entire block chain or even the headers of the block // Note that as we don't actually want to store the entire block chain or even the headers of the block
// chain, we may end up requesting blocks we already requested before. This shouldn't (in theory) happen // chain, we may end up requesting blocks we already requested before. This shouldn't (in theory) happen
@ -284,6 +292,7 @@ public class Peer {
/** Called by the Peer when the result has arrived. Completes the task. */ /** Called by the Peer when the result has arrived. Completes the task. */
void setResult(T result) { void setResult(T result) {
// This should be called in the network loop thread for this peer
this.result = result; this.result = result;
// Now release the thread that is waiting. We don't need to synchronize here as the latch establishes // Now release the thread that is waiting. We don't need to synchronize here as the latch establishes
// a memory barrier. // a memory barrier.
@ -348,7 +357,9 @@ public class Peer {
*/ */
public void startBlockChainDownload() throws IOException { public void startBlockChainDownload() throws IOException {
for (PeerEventListener listener : eventListeners) { for (PeerEventListener listener : eventListeners) {
listener.onBlocksDownloaded(this, getPeerBlocksToGet()); synchronized (listener) {
listener.onBlocksDownloaded(this, null, getPeerBlocksToGet());
}
} }
if (getPeerBlocksToGet() > 0) { if (getPeerBlocksToGet() > 0) {

View File

@ -20,6 +20,7 @@ import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.math.BigInteger; import java.math.BigInteger;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.Date; import java.util.Date;
@ -38,10 +39,16 @@ public class PeerAddress extends Message {
BigInteger services; BigInteger services;
long time; long time;
/**
* Construct a peer address from a serialized payload.
*/
public PeerAddress(NetworkParameters params, byte[] payload, int offset, int protocolVersion) throws ProtocolException { public PeerAddress(NetworkParameters params, byte[] payload, int offset, int protocolVersion) throws ProtocolException {
super(params, payload, offset, protocolVersion); super(params, payload, offset, protocolVersion);
} }
/**
* Construct a peer address from a memorized or hardcoded address.
*/
public PeerAddress(InetAddress addr, int port, int protocolVersion) { public PeerAddress(InetAddress addr, int port, int protocolVersion) {
this.addr = addr; this.addr = addr;
this.port = port; this.port = port;
@ -57,6 +64,10 @@ public class PeerAddress extends Message {
this(addr, 0); this(addr, 0);
} }
public PeerAddress(InetSocketAddress addr) {
this(addr.getAddress(), addr.getPort());
}
@Override @Override
public void bitcoinSerializeToStream(OutputStream stream) throws IOException { public void bitcoinSerializeToStream(OutputStream stream) throws IOException {
if (protocolVersion >= 31402) { if (protocolVersion >= 31402) {

View File

@ -19,13 +19,23 @@ package com.google.bitcoin.core;
/** /**
* Implementing a PeerEventListener allows you to learn when significant Peer communication * Implementing a PeerEventListener allows you to learn when significant Peer communication
* has occurred. * has occurred.
*
* <p>Methods are called with the event listener object locked so your
* implementation does not have to be thread safe.
*
* @author miron@google.com (Miron Cuperman a.k.a devrandom)
*
*/ */
public interface PeerEventListener { public interface PeerEventListener {
/** /**
* This is called on a Peer thread when a block is received. * This is called on a Peer thread when a block is received. It is also called when a download
* is started with the initial number of blocks to be downloaded.
* *
* @param peer The peer receiving the block * <p>The block may have transactions or may be a header only once getheaders is implemented
* @param blocksLeft The number of blocks left to download *
* @param peer the peer receiving the block
* @param block the downloaded block, or null if this is the initial callback
* @param blocksLeft the number of blocks left to download
*/ */
public void onBlocksDownloaded(Peer peer, int blocksLeft); public void onBlocksDownloaded(Peer peer, Block block, int blocksLeft);
} }

View File

@ -17,6 +17,8 @@
package com.google.bitcoin.core; package com.google.bitcoin.core;
import com.google.bitcoin.discovery.PeerDiscovery;
import com.google.bitcoin.discovery.PeerDiscoveryException;
import com.google.bitcoin.store.BlockStore; import com.google.bitcoin.store.BlockStore;
import com.google.bitcoin.store.BlockStoreException; import com.google.bitcoin.store.BlockStoreException;
@ -24,6 +26,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
@ -36,6 +39,18 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
/** /**
* Maintain a number of connections to peers.
*
* <p>PeerGroup tries to maintain a constant number of connections to a set of distinct peers.
* Each peer runs a network listener in its own thread. When a connection is lost, a new peer
* will be tried after a delay as long as the number of connections less than the maximum.
*
* <p>Connections are made to addresses from a provided list. When that list is exhausted,
* we start again from the head of the list.
*
* <p>The PeerGroup can broadcast a transaction to the currently connected set of peers. It can
* also handle download of the blockchain from peers, restarting the process when peers die.
*
* @author miron@google.com (Miron Cuperman a.k.a devrandom) * @author miron@google.com (Miron Cuperman a.k.a devrandom)
* *
*/ */
@ -48,21 +63,33 @@ public class PeerGroup {
// Maximum number of connections this peerGroup will make // Maximum number of connections this peerGroup will make
private int maxConnections; private int maxConnections;
// Addresses to try to connect to, excluding active peers
private BlockingQueue<PeerAddress> inactives; private BlockingQueue<PeerAddress> inactives;
// Connection initiation thread
private Thread thread; private Thread thread;
// True if the connection initiation thread should be running
private boolean running; private boolean running;
// A pool of threads for peers, of size maxConnection
private ThreadPoolExecutor executor; private ThreadPoolExecutor executor;
// Currently active peers
private Set<Peer> peers;
// The peer we are currently downloading the chain from
private Peer downloadPeer;
// Callback for events related to chain download
private PeerEventListener downloadListener;
private NetworkParameters params; private NetworkParameters params;
private BlockStore blockStore; private BlockStore blockStore;
private BlockChain chain; private BlockChain chain;
private Set<Peer> peers;
private Peer downloadPeer;
private PeerEventListener downloadListener;
/** /**
* Create a PeerGroup
*
* @param maxConnections the maximum number of peer connections that this group will try to make.
* Depending on the environment, this is normally between 1 and 10.
*/ */
public PeerGroup(int maxConnections, BlockStore blockStore, NetworkParameters params, BlockChain chain) { public PeerGroup(int maxConnections, BlockStore blockStore, NetworkParameters params,
BlockChain chain) {
this.maxConnections = maxConnections; this.maxConnections = maxConnections;
this.blockStore = blockStore; this.blockStore = blockStore;
this.params = params; this.params = params;
@ -79,9 +106,26 @@ public class PeerGroup {
/** Add an address to the list of potential peers to connect to */ /** Add an address to the list of potential peers to connect to */
public void addAddress(PeerAddress peerAddress) { public void addAddress(PeerAddress peerAddress) {
// TODO(miron) consider deduplication
inactives.add(peerAddress); inactives.add(peerAddress);
} }
/** Add addresses from a discovery source to the list of potential peers to connect to */
public void addPeerDiscovery(PeerDiscovery peerDiscovery) {
// TODO(miron) consider remembering the discovery source and retrying occasionally
InetSocketAddress[] addresses;
try {
addresses = peerDiscovery.getPeers();
} catch (PeerDiscoveryException e) {
log.error("Failed to discover peer addresses from discovery source", e);
return;
}
for (int i = 0; i < addresses.length; i++) {
inactives.add(new PeerAddress(addresses[i]));
}
}
/** Starts the background thread that makes connections. */ /** Starts the background thread that makes connections. */
public void start() { public void start() {
this.thread = new Thread(new PeerExecutionRunnable(), "Peer group thread"); this.thread = new Thread(new PeerExecutionRunnable(), "Peer group thread");
@ -91,6 +135,9 @@ public class PeerGroup {
/** /**
* Stop this PeerGroup * Stop this PeerGroup
*
* <p>The peer group will be asynchronously shut down. After it is shut down
* all peers will be disconnected and no threads will be running.
*/ */
public synchronized void stop() { public synchronized void stop() {
if (running) { if (running) {
@ -120,6 +167,10 @@ public class PeerGroup {
/** /**
* Repeatedly get the next peer address from the inactive queue * Repeatedly get the next peer address from the inactive queue
* and try to connect. * and try to connect.
*
* <p>We can be terminated with Thread.interrupt. When an interrupt is received,
* we will ask the executor to shutdown and ask each peer to disconnect. At that point
* no threads or network connections will be active.
*/ */
@Override @Override
public void run() { public void run() {
@ -165,7 +216,8 @@ public class PeerGroup {
peer.run(); peer.run();
} }
finally { finally {
// In all cases, put the address back on the queue // In all cases, put the address back on the queue.
// We will retry this peer after all other peers have been tried.
inactives.add(address); inactives.add(address);
peers.remove(peer); peers.remove(peer);
handlePeerDeath(peer); handlePeerDeath(peer);
@ -174,11 +226,13 @@ public class PeerGroup {
}; };
executor.execute(command); executor.execute(command);
break; break;
} } catch (RejectedExecutionException e) {
catch (RejectedExecutionException e) {
// Reached maxConnections, try again after a delay // Reached maxConnections, try again after a delay
} catch (BlockStoreException e) { } catch (BlockStoreException e) {
log.error("block store corrupt?", e); // Fatal error
log.error("Block store corrupt?", e);
running = false;
break;
} }
// If we got here, we should retry this address because an error unrelated // If we got here, we should retry this address because an error unrelated
@ -193,6 +247,8 @@ public class PeerGroup {
* *
* <p>If no peers are currently connected, the download will be started * <p>If no peers are currently connected, the download will be started
* once a peer starts. If the peer dies, the download will resume with another peer. * once a peer starts. If the peer dies, the download will resume with another peer.
*
* @param listener a listener for chain download events, may not be null
*/ */
public synchronized void startBlockChainDownload(PeerEventListener listener) { public synchronized void startBlockChainDownload(PeerEventListener listener) {
this.downloadListener = listener; this.downloadListener = listener;
@ -213,7 +269,7 @@ public class PeerGroup {
} }
} }
private void startBlockChainDownloadFromPeer(Peer peer) { private synchronized void startBlockChainDownloadFromPeer(Peer peer) {
peer.addEventListener(downloadListener); peer.addEventListener(downloadListener);
try { try {
peer.startBlockChainDownload(); peer.startBlockChainDownload();
@ -231,9 +287,7 @@ public class PeerGroup {
final String namePrefix; final String namePrefix;
PeerGroupThreadFactory() { PeerGroupThreadFactory() {
SecurityManager s = System.getSecurityManager(); group = Thread.currentThread().getThreadGroup();
group = (s != null)? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "PeerGroup-" + namePrefix = "PeerGroup-" +
poolNumber.getAndIncrement() + poolNumber.getAndIncrement() +
"-thread-"; "-thread-";

View File

@ -422,19 +422,45 @@ public class Wallet implements Serializable {
} }
/** /**
* Sends coins to the given address, via the given {@link PeerGroup}. Change is returned to the first key in the wallet. * Sends coins to the given address, via the given {@link PeerGroup}.
* Change is returned to the first key in the wallet.
*
* @param to Which address to send coins to. * @param to Which address to send coins to.
* @param nanocoins How many nanocoins to send. You can use Utils.toNanoCoins() to calculate this. * @param nanocoins How many nanocoins to send. You can use Utils.toNanoCoins() to calculate this.
* @return The {@link Transaction} that was created or null if there was insufficient balance to send the coins. * @return The {@link Transaction} that was created or null if there was insufficient balance to send the coins.
* @throws IOException if there was a problem broadcasting the transaction * @throws IOException if there was a problem broadcasting the transaction
*/ */
public synchronized Transaction sendCoins(PeerGroup peerGroup, Address to, BigInteger nanocoins) throws IOException { public synchronized Transaction sendCoins(PeerGroup peerGroup, Address to, BigInteger nanocoins)
throws IOException {
Transaction tx = createSend(to, nanocoins); Transaction tx = createSend(to, nanocoins);
if (tx == null) // Not enough money! :-( if (tx == null) // Not enough money! :-(
return null; return null;
if (peerGroup.broadcastTransaction(tx)) { if (!peerGroup.broadcastTransaction(tx)) {
confirmSend(tx); throw new IOException("Failed to broadcast tx to all connected peers");
} }
// TODO - retry logic
confirmSend(tx);
return tx;
}
/**
* Sends coins to the given address, via the given {@link Peer}.
* Change is returned to the first key in the wallet.
*
* @param to Which address to send coins to.
* @param nanocoins How many nanocoins to send. You can use Utils.toNanoCoins() to calculate this.
* @return The {@link Transaction} that was created or null if there was insufficient balance to send the coins.
* @throws IOException if there was a problem broadcasting the transaction
*/
public synchronized Transaction sendCoins(Peer peer, Address to, BigInteger nanocoins)
throws IOException {
Transaction tx = createSend(to, nanocoins);
if (tx == null) // Not enough money! :-(
return null;
peer.broadcastTransaction(tx);
confirmSend(tx);
return tx; return tx;
} }

View File

@ -1,55 +0,0 @@
/**
* Copyright 2011 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.bitcoin.examples;
import com.google.bitcoin.core.Peer;
import com.google.bitcoin.core.PeerEventListener;
import java.util.concurrent.Semaphore;
class DownloadListener implements PeerEventListener {
private int originalBlocksLeft = -1;
private int lastPercent = -1;
Semaphore done = new Semaphore(0);
@Override
public void onBlocksDownloaded(Peer peer, int blocksLeft) {
if (blocksLeft == 0) {
System.out.println("Done downloading block chain");
done.release();
}
if (blocksLeft <= 0)
return;
if (originalBlocksLeft < 0) {
System.out.println("Downloading block chain of size " + blocksLeft + ". " +
(lastPercent > 1000 ? "This may take a while." : ""));
originalBlocksLeft = blocksLeft;
}
double pct = 100.0 - (100.0 * (blocksLeft / (double) originalBlocksLeft));
if ((int)pct != lastPercent) {
System.out.println(String.format("Chain download %d%% done", (int) pct));
lastPercent = (int)pct;
}
}
public void await() throws InterruptedException {
done.acquire();
}
}

View File

@ -18,6 +18,7 @@ package com.google.bitcoin.examples;
import com.google.bitcoin.core.Address; import com.google.bitcoin.core.Address;
import com.google.bitcoin.core.BlockChain; import com.google.bitcoin.core.BlockChain;
import com.google.bitcoin.core.DownloadListener;
import com.google.bitcoin.core.ECKey; import com.google.bitcoin.core.ECKey;
import com.google.bitcoin.core.NetworkParameters; import com.google.bitcoin.core.NetworkParameters;
import com.google.bitcoin.core.PeerAddress; import com.google.bitcoin.core.PeerAddress;

View File

@ -17,7 +17,7 @@
package com.google.bitcoin.examples; package com.google.bitcoin.examples;
import com.google.bitcoin.core.BlockChain; import com.google.bitcoin.core.BlockChain;
import com.google.bitcoin.core.NetworkConnection; import com.google.bitcoin.core.DownloadListener;
import com.google.bitcoin.core.NetworkParameters; import com.google.bitcoin.core.NetworkParameters;
import com.google.bitcoin.core.PeerAddress; import com.google.bitcoin.core.PeerAddress;
import com.google.bitcoin.core.PeerGroup; import com.google.bitcoin.core.PeerGroup;