3
0
mirror of https://github.com/Qortal/altcoinj.git synced 2025-02-14 19:25:51 +00:00

Use Reentrant locks in a few more places, fix deadlocks(s) on close

This commit is contained in:
Matt Corallo 2013-07-02 16:26:15 +02:00 committed by Mike Hearn
parent 2d84b3c27b
commit c36e725d7d
3 changed files with 77 additions and 35 deletions

View File

@ -343,9 +343,9 @@ public class PaymentChannelServerState {
storedServerChannel = null; storedServerChannel = null;
StoredPaymentChannelServerStates channels = (StoredPaymentChannelServerStates) StoredPaymentChannelServerStates channels = (StoredPaymentChannelServerStates)
wallet.getExtensions().get(StoredPaymentChannelServerStates.EXTENSION_ID); wallet.getExtensions().get(StoredPaymentChannelServerStates.EXTENSION_ID);
channels.closeChannel(temp); // Calls this method again for us channels.closeChannel(temp); // May call this method again for us (if it wasn't the original caller)
checkState(state.compareTo(State.CLOSING) >= 0); if (state.compareTo(State.CLOSING) >= 0)
return closedFuture; return closedFuture;
} }
if (state.ordinal() < State.READY.ordinal()) { if (state.ordinal() < State.READY.ordinal()) {

View File

@ -18,9 +18,12 @@ package com.google.bitcoin.protocols.channels;
import java.io.*; import java.io.*;
import java.util.*; import java.util.*;
import java.util.concurrent.locks.ReentrantLock;
import com.google.bitcoin.core.*; import com.google.bitcoin.core.*;
import com.google.bitcoin.utils.Locks;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import net.jcip.annotations.GuardedBy;
import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
@ -32,12 +35,14 @@ import static com.google.common.base.Preconditions.checkNotNull;
public class StoredPaymentChannelServerStates implements WalletExtension { public class StoredPaymentChannelServerStates implements WalletExtension {
static final String EXTENSION_ID = StoredPaymentChannelServerStates.class.getName(); static final String EXTENSION_ID = StoredPaymentChannelServerStates.class.getName();
@VisibleForTesting final Map<Sha256Hash, StoredServerChannel> mapChannels = new HashMap<Sha256Hash, StoredServerChannel>(); @GuardedBy("lock") @VisibleForTesting final Map<Sha256Hash, StoredServerChannel> mapChannels = new HashMap<Sha256Hash, StoredServerChannel>();
private final Wallet wallet; private final Wallet wallet;
private final TransactionBroadcaster broadcaster; private final TransactionBroadcaster broadcaster;
private final Timer channelTimeoutHandler = new Timer(); private final Timer channelTimeoutHandler = new Timer();
private final ReentrantLock lock = Locks.lock("StoredPaymentChannelServerStates");
/** /**
* The offset between the refund transaction's lock time and the time channels will be automatically closed. * The offset between the refund transaction's lock time and the time channels will be automatically closed.
* This defines a window during which we must get the last payment transaction verified, ie it should allow time for * This defines a window during which we must get the last payment transaction verified, ie it should allow time for
@ -64,19 +69,25 @@ public class StoredPaymentChannelServerStates implements WalletExtension {
* <p>Removes the given channel from this set of {@link StoredServerChannel}s and notifies the wallet of a change to * <p>Removes the given channel from this set of {@link StoredServerChannel}s and notifies the wallet of a change to
* this wallet extension.</p> * this wallet extension.</p>
*/ */
public synchronized void closeChannel(StoredServerChannel channel) { public void closeChannel(StoredServerChannel channel) {
lock.lock();
try {
if (mapChannels.remove(channel.contract.getHash()) == null)
return;
} finally {
lock.unlock();
}
synchronized (channel) { synchronized (channel) {
if (channel.connectedHandler != null) if (channel.connectedHandler != null) // connectedHandler will be reset to null in connectionClosed
channel.connectedHandler.close(); // connectedHandler will be reset to null in connectionClosed channel.connectedHandler.close(); // Closes the actual connection, not the channel
try {//TODO add event listener to PaymentChannelServerStateManager try {//TODO add event listener to PaymentChannelServerStateManager
channel.getState(wallet, broadcaster).close(); // Closes the actual connection, not the channel channel.getState(wallet, broadcaster).close();
} catch (ValueOutOfRangeException e) { } catch (ValueOutOfRangeException e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. e.printStackTrace();
} catch (VerificationException e) { } catch (VerificationException e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. e.printStackTrace();
} }
channel.state = null; channel.state = null;
mapChannels.remove(channel.contract.getHash());
} }
wallet.addOrUpdateExtension(this); wallet.addOrUpdateExtension(this);
} }
@ -84,8 +95,13 @@ public class StoredPaymentChannelServerStates implements WalletExtension {
/** /**
* Gets the {@link StoredServerChannel} with the given channel id (ie contract transaction hash). * Gets the {@link StoredServerChannel} with the given channel id (ie contract transaction hash).
*/ */
public synchronized StoredServerChannel getChannel(Sha256Hash id) { public StoredServerChannel getChannel(Sha256Hash id) {
return mapChannels.get(id); lock.lock();
try {
return mapChannels.get(id);
} finally {
lock.unlock();
}
} }
/** /**
@ -95,16 +111,21 @@ public class StoredPaymentChannelServerStates implements WalletExtension {
* <p>Because there must be only one, canonical {@link StoredServerChannel} per channel, this method throws if the * <p>Because there must be only one, canonical {@link StoredServerChannel} per channel, this method throws if the
* channel is already present in the set of channels.</p> * channel is already present in the set of channels.</p>
*/ */
public synchronized void putChannel(final StoredServerChannel channel) { public void putChannel(final StoredServerChannel channel) {
checkArgument(mapChannels.put(channel.contract.getHash(), checkNotNull(channel)) == null); lock.lock();
channelTimeoutHandler.schedule(new TimerTask() { try {
@Override checkArgument(mapChannels.put(channel.contract.getHash(), checkNotNull(channel)) == null);
public void run() { channelTimeoutHandler.schedule(new TimerTask() {
closeChannel(channel); @Override
} public void run() {
// Add the difference between real time and Utils.now() so that test-cases can use a mock clock. closeChannel(channel);
}, new Date((channel.refundTransactionUnlockTimeSecs + CHANNEL_EXPIRE_OFFSET)*1000L }
+ (System.currentTimeMillis() - Utils.now().getTime()))); // Add the difference between real time and Utils.now() so that test-cases can use a mock clock.
}, new Date((channel.refundTransactionUnlockTimeSecs + CHANNEL_EXPIRE_OFFSET)*1000L
+ (System.currentTimeMillis() - Utils.now().getTime())));
} finally {
lock.unlock();
}
} }
@Override @Override
@ -118,7 +139,8 @@ public class StoredPaymentChannelServerStates implements WalletExtension {
} }
@Override @Override
public synchronized byte[] serializeWalletExtension() { public byte[] serializeWalletExtension() {
lock.lock();
try { try {
ByteArrayOutputStream out = new ByteArrayOutputStream(); ByteArrayOutputStream out = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(out); ObjectOutputStream oos = new ObjectOutputStream(out);
@ -128,17 +150,24 @@ public class StoredPaymentChannelServerStates implements WalletExtension {
return out.toByteArray(); return out.toByteArray();
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} finally {
lock.unlock();
} }
} }
@Override @Override
public synchronized void deserializeWalletExtension(Wallet containingWallet, byte[] data) throws Exception { public void deserializeWalletExtension(Wallet containingWallet, byte[] data) throws Exception {
checkArgument(containingWallet == wallet); lock.lock();
ByteArrayInputStream inStream = new ByteArrayInputStream(data); try {
ObjectInputStream ois = new ObjectInputStream(inStream); checkArgument(containingWallet == wallet);
while (inStream.available() > 0) { ByteArrayInputStream inStream = new ByteArrayInputStream(data);
StoredServerChannel channel = (StoredServerChannel)ois.readObject(); ObjectInputStream ois = new ObjectInputStream(inStream);
putChannel(channel); while (inStream.available() > 0) {
StoredServerChannel channel = (StoredServerChannel)ois.readObject();
putChannel(channel);
}
} finally {
lock.unlock();
} }
} }
} }

View File

@ -25,7 +25,9 @@ import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider; import java.nio.channels.spi.SelectorProvider;
import java.util.Iterator; import java.util.Iterator;
import java.util.concurrent.locks.ReentrantLock;
import com.google.bitcoin.utils.Locks;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -48,6 +50,7 @@ public class ProtobufServer {
private static final int BUFFER_SIZE_UPPER_BOUND = 65536; private static final int BUFFER_SIZE_UPPER_BOUND = 65536;
private class ConnectionHandler extends MessageWriteTarget { private class ConnectionHandler extends MessageWriteTarget {
private final ReentrantLock lock = Locks.lock("protobufServerConnectionHandler");
private final ByteBuffer dbuf; private final ByteBuffer dbuf;
private final SocketChannel channel; private final SocketChannel channel;
private final ProtobufParser parser; private final ProtobufParser parser;
@ -66,13 +69,16 @@ public class ProtobufServer {
} }
@Override @Override
synchronized void writeBytes(byte[] message) { void writeBytes(byte[] message) {
lock.lock();
try { try {
if (channel.write(ByteBuffer.wrap(message)) != message.length) if (channel.write(ByteBuffer.wrap(message)) != message.length)
throw new IOException("Couldn't write all of message to socket"); throw new IOException("Couldn't write all of message to socket");
} catch (IOException e) { } catch (IOException e) {
log.error("Error writing message to connection, closing connection", e); log.error("Error writing message to connection, closing connection", e);
closeConnection(); closeConnection();
} finally {
lock.unlock();
} }
} }
@ -86,10 +92,17 @@ public class ProtobufServer {
connectionClosed(); connectionClosed();
} }
private synchronized void connectionClosed() { private void connectionClosed() {
if (!closeCalled) boolean callClosed = false;
lock.lock();
try {
callClosed = !closeCalled;
closeCalled = true;
} finally {
lock.unlock();
}
if (callClosed)
parser.connectionClosed(); parser.connectionClosed();
closeCalled = true;
} }
} }