3
0
mirror of https://github.com/Qortal/altcoinj.git synced 2025-02-19 05:35:49 +00:00

Slightly different attempt to fix thread safety issue in PeerGroup to in #278 - make connectTo always locked. It used to be that we couldn't do this but there are no comments reminding me why not, and unit tests + wallettemplate seem happy with it being locked, so I think changes in the network code since then have probably removed this issue.

This commit is contained in:
Mike Hearn 2014-11-24 13:33:21 +01:00
parent 69de1f01ac
commit 9814a6caba

View File

@ -341,6 +341,7 @@ public class PeerGroup implements TransactionBroadcaster {
@SuppressWarnings("FieldAccessNotGuarded") // only called when inactives is accessed, and lock is held then. @SuppressWarnings("FieldAccessNotGuarded") // only called when inactives is accessed, and lock is held then.
@Override @Override
public int compare(PeerAddress a, PeerAddress b) { public int compare(PeerAddress a, PeerAddress b) {
checkState(lock.isHeldByCurrentThread());
int result = backoffMap.get(a).compareTo(backoffMap.get(b)); int result = backoffMap.get(a).compareTo(backoffMap.get(b));
// Sort by port if otherwise equals - for testing // Sort by port if otherwise equals - for testing
if (result == 0) if (result == 0)
@ -479,10 +480,10 @@ public class PeerGroup implements TransactionBroadcaster {
executor.schedule(this, delay, TimeUnit.MILLISECONDS); executor.schedule(this, delay, TimeUnit.MILLISECONDS);
return; return;
} }
connectTo(addrToTry, false, vConnectTimeoutMillis);
} finally { } finally {
lock.unlock(); lock.unlock();
} }
connectTo(addrToTry, false, vConnectTimeoutMillis);
if (countConnectedAndPendingPeers() < getMaxConnections()) { if (countConnectedAndPendingPeers() < getMaxConnections()) {
executor.execute(this); // Try next peer immediately. executor.execute(this); // Try next peer immediately.
} }
@ -1096,9 +1097,14 @@ public class PeerGroup implements TransactionBroadcaster {
*/ */
@Nullable @Nullable
public Peer connectTo(InetSocketAddress address) { public Peer connectTo(InetSocketAddress address) {
lock.lock();
try {
PeerAddress peerAddress = new PeerAddress(address); PeerAddress peerAddress = new PeerAddress(address);
backoffMap.put(peerAddress, new ExponentialBackoff(peerBackoffParams)); backoffMap.put(peerAddress, new ExponentialBackoff(peerBackoffParams));
return connectTo(peerAddress, true, vConnectTimeoutMillis); return connectTo(peerAddress, true, vConnectTimeoutMillis);
} finally {
lock.unlock();
}
} }
/** /**
@ -1106,9 +1112,14 @@ public class PeerGroup implements TransactionBroadcaster {
*/ */
@Nullable @Nullable
public Peer connectToLocalHost() { public Peer connectToLocalHost() {
lock.lock();
try {
final PeerAddress localhost = PeerAddress.localhost(params); final PeerAddress localhost = PeerAddress.localhost(params);
backoffMap.put(localhost, new ExponentialBackoff(peerBackoffParams)); backoffMap.put(localhost, new ExponentialBackoff(peerBackoffParams));
return connectTo(localhost, true, vConnectTimeoutMillis); return connectTo(localhost, true, vConnectTimeoutMillis);
} finally {
lock.unlock();
}
} }
/** /**
@ -1119,8 +1130,9 @@ public class PeerGroup implements TransactionBroadcaster {
* explicitly requested. * explicitly requested.
* @return Peer or null. * @return Peer or null.
*/ */
@Nullable @Nullable @GuardedBy("lock")
protected Peer connectTo(PeerAddress address, boolean incrementMaxConnections, int connectTimeoutMillis) { protected Peer connectTo(PeerAddress address, boolean incrementMaxConnections, int connectTimeoutMillis) {
checkState(lock.isHeldByCurrentThread());
VersionMessage ver = getVersionMessage().duplicate(); VersionMessage ver = getVersionMessage().duplicate();
ver.bestHeight = chain == null ? 0 : chain.getBestChainHeight(); ver.bestHeight = chain == null ? 0 : chain.getBestChainHeight();
ver.time = Utils.currentTimeSeconds(); ver.time = Utils.currentTimeSeconds();
@ -1140,16 +1152,10 @@ public class PeerGroup implements TransactionBroadcaster {
peer.setSocketTimeout(connectTimeoutMillis); peer.setSocketTimeout(connectTimeoutMillis);
// When the channel has connected and version negotiated successfully, handleNewPeer will end up being called on // When the channel has connected and version negotiated successfully, handleNewPeer will end up being called on
// a worker thread. // a worker thread.
if (incrementMaxConnections) { if (incrementMaxConnections) {
// We don't use setMaxConnections here as that would trigger a recursive attempt to establish a new // We don't use setMaxConnections here as that would trigger a recursive attempt to establish a new
// outbound connection. // outbound connection.
lock.lock();
try {
maxConnections++; maxConnections++;
} finally {
lock.unlock();
}
} }
return peer; return peer;
} }
@ -1461,7 +1467,8 @@ public class PeerGroup implements TransactionBroadcaster {
} }
final SettableFuture<List<Peer>> future = SettableFuture.create(); final SettableFuture<List<Peer>> future = SettableFuture.create();
addEventListener(new AbstractPeerEventListener() { addEventListener(new AbstractPeerEventListener() {
@Override public void onPeerConnected(Peer peer, int peerCount) { @Override
public void onPeerConnected(Peer peer, int peerCount) {
final List<Peer> peers = findPeersOfAtLeastVersion(protocolVersion); final List<Peer> peers = findPeersOfAtLeastVersion(protocolVersion);
if (peers.size() >= numPeers) { if (peers.size() >= numPeers) {
future.set(peers); future.set(peers);