3
0
mirror of https://github.com/Qortal/altcoinj.git synced 2025-02-12 18:25:51 +00:00

Payment channels: make ProtobufParser use finer grained, cycle detecting locking.

This commit is contained in:
Mike Hearn 2013-11-01 13:00:26 +01:00
parent c36989cfce
commit 06ac0105f3
3 changed files with 86 additions and 71 deletions

View File

@ -134,7 +134,7 @@ public class PaymentChannelClientConnection {
* @throws IllegalStateException If the channel has been closed or is not yet open * @throws IllegalStateException If the channel has been closed or is not yet open
* (see {@link PaymentChannelClientConnection#getChannelOpenFuture()} for the second) * (see {@link PaymentChannelClientConnection#getChannelOpenFuture()} for the second)
*/ */
public synchronized void incrementPayment(BigInteger size) throws ValueOutOfRangeException, IllegalStateException { public void incrementPayment(BigInteger size) throws ValueOutOfRangeException, IllegalStateException {
channelClient.incrementPayment(size); channelClient.incrementPayment(size);
} }
@ -145,7 +145,7 @@ public class PaymentChannelClientConnection {
* <p>Note that if you call any methods which update state directly the server will not be notified and channel * <p>Note that if you call any methods which update state directly the server will not be notified and channel
* initialization logic in the connection may fail unexpectedly.</p> * initialization logic in the connection may fail unexpectedly.</p>
*/ */
public synchronized PaymentChannelClientState state() { public PaymentChannelClientState state() {
return channelClient.state(); return channelClient.state();
} }
@ -153,7 +153,7 @@ public class PaymentChannelClientConnection {
* Closes the connection, notifying the server it should close the channel by broadcasting the most recent payment * Closes the connection, notifying the server it should close the channel by broadcasting the most recent payment
* transaction. * transaction.
*/ */
public synchronized void close() { public void close() {
// Shutdown is a little complicated. // Shutdown is a little complicated.
// //
// This call will cause the CLOSE message to be written to the wire, and then the destroyConnection() method that // This call will cause the CLOSE message to be written to the wire, and then the destroyConnection() method that

View File

@ -17,14 +17,18 @@
package com.google.bitcoin.protocols.niowrapper; package com.google.bitcoin.protocols.niowrapper;
import com.google.bitcoin.core.Utils; import com.google.bitcoin.core.Utils;
import com.google.bitcoin.utils.Threading;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import com.google.protobuf.MessageLite; import com.google.protobuf.MessageLite;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.GuardedBy;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.ByteOrder; import java.nio.ByteOrder;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Preconditions.checkState;
@ -66,10 +70,11 @@ public class ProtobufParser<MessageType extends MessageLite> extends AbstractTim
// a smaller network buffer per client and only allocate more memory when we need it to deserialize large messages. // a smaller network buffer per client and only allocate more memory when we need it to deserialize large messages.
// Though this is not in of itself a DoS protection, it allows for handling more legitimate clients per server and // Though this is not in of itself a DoS protection, it allows for handling more legitimate clients per server and
// attacking clients can be made to timeout/get blocked if they are sending crap to fill buffers. // attacking clients can be made to timeout/get blocked if they are sending crap to fill buffers.
private int messageBytesOffset = 0; @GuardedBy("lock") private int messageBytesOffset = 0;
private byte[] messageBytes; @GuardedBy("lock") private byte[] messageBytes;
private final ReentrantLock lock = Threading.lock("ProtobufParser");
private MessageWriteTarget writeTarget; private final AtomicReference<MessageWriteTarget> writeTarget = new AtomicReference<MessageWriteTarget>();
/** /**
* Creates a new protobuf handler. * Creates a new protobuf handler.
@ -91,9 +96,9 @@ public class ProtobufParser<MessageType extends MessageLite> extends AbstractTim
} }
@Override @Override
public synchronized void setWriteTarget(MessageWriteTarget writeTarget) { public void setWriteTarget(MessageWriteTarget writeTarget) {
checkState(this.writeTarget == null); // Only allow it to be set once.
this.writeTarget = checkNotNull(writeTarget); checkState(this.writeTarget.getAndSet(checkNotNull(writeTarget)) == null);
} }
@Override @Override
@ -104,8 +109,8 @@ public class ProtobufParser<MessageType extends MessageLite> extends AbstractTim
/** /**
* Closes this connection, eventually triggering a {@link ProtobufParser.Listener#connectionClosed()} event. * Closes this connection, eventually triggering a {@link ProtobufParser.Listener#connectionClosed()} event.
*/ */
public synchronized void closeConnection() { public void closeConnection() {
this.writeTarget.closeConnection(); this.writeTarget.get().closeConnection();
} }
@Override @Override
@ -123,65 +128,70 @@ public class ProtobufParser<MessageType extends MessageLite> extends AbstractTim
} }
@Override @Override
public synchronized int receiveBytes(ByteBuffer buff) throws Exception { public int receiveBytes(ByteBuffer buff) throws Exception {
if (messageBytes != null) { lock.lock();
// Just keep filling up the currently being worked on message try {
int bytesToGet = Math.min(messageBytes.length - messageBytesOffset, buff.remaining()); if (messageBytes != null) {
buff.get(messageBytes, messageBytesOffset, bytesToGet); // Just keep filling up the currently being worked on message
messageBytesOffset += bytesToGet; int bytesToGet = Math.min(messageBytes.length - messageBytesOffset, buff.remaining());
if (messageBytesOffset == messageBytes.length) { buff.get(messageBytes, messageBytesOffset, bytesToGet);
// Filled up our buffer, decode the message messageBytesOffset += bytesToGet;
deserializeMessage(ByteBuffer.wrap(messageBytes)); if (messageBytesOffset == messageBytes.length) {
messageBytes = null; // Filled up our buffer, decode the message
if (buff.hasRemaining()) deserializeMessage(ByteBuffer.wrap(messageBytes));
return bytesToGet + receiveBytes(buff); messageBytes = null;
if (buff.hasRemaining())
return bytesToGet + receiveBytes(buff);
}
return bytesToGet;
} }
return bytesToGet;
// If we cant read the length prefix yet, give up
if (buff.remaining() < 4)
return 0;
// Read one integer in big endian
buff.order(ByteOrder.BIG_ENDIAN);
final int len = buff.getInt();
// If length is larger than the maximum message size (or is negative/overflows) throw an exception and close the
// connection
if (len > maxMessageSize || len + 4 < 4)
throw new IllegalStateException("Message too large or length underflowed");
// If the buffer's capacity is less than the next messages length + 4 (length prefix), we must use messageBytes
// as a temporary buffer to store the message
if (buff.capacity() < len + 4) {
messageBytes = new byte[len];
// Now copy all remaining bytes into the new buffer, set messageBytesOffset and tell the caller how many
// bytes we consumed
int bytesToRead = buff.remaining();
buff.get(messageBytes, 0, bytesToRead);
messageBytesOffset = bytesToRead;
return bytesToRead + 4;
}
if (buff.remaining() < len) {
// Wait until the whole message is available in the buffer
buff.position(buff.position() - 4); // Make sure the buffer's position is right at the end
return 0;
}
// Temporarily limit the buffer to the size of the message so that the protobuf decode doesn't get messed up
int limit = buff.limit();
buff.limit(buff.position() + len);
deserializeMessage(buff);
checkState(buff.remaining() == 0);
buff.limit(limit); // Reset the limit in case we have to recurse
// If there are still bytes remaining, see if we can pull out another message since we won't get called again
if (buff.hasRemaining())
return len + 4 + receiveBytes(buff);
else
return len + 4;
} finally {
lock.unlock();
} }
// If we cant read the length prefix yet, give up
if (buff.remaining() < 4)
return 0;
// Read one integer in big endian
buff.order(ByteOrder.BIG_ENDIAN);
final int len = buff.getInt();
// If length is larger than the maximum message size (or is negative/overflows) throw an exception and close the
// connection
if (len > maxMessageSize || len + 4 < 4)
throw new IllegalStateException("Message too large or length underflowed");
// If the buffer's capacity is less than the next messages length + 4 (length prefix), we must use messageBytes
// as a temporary buffer to store the message
if (buff.capacity() < len + 4) {
messageBytes = new byte[len];
// Now copy all remaining bytes into the new buffer, set messageBytesOffset and tell the caller how many
// bytes we consumed
int bytesToRead = buff.remaining();
buff.get(messageBytes, 0, bytesToRead);
messageBytesOffset = bytesToRead;
return bytesToRead + 4;
}
if (buff.remaining() < len) {
// Wait until the whole message is available in the buffer
buff.position(buff.position() - 4); // Make sure the buffer's position is right at the end
return 0;
}
// Temporarily limit the buffer to the size of the message so that the protobuf decode doesn't get messed up
int limit = buff.limit();
buff.limit(buff.position() + len);
deserializeMessage(buff);
checkState(buff.remaining() == 0);
buff.limit(limit); // Reset the limit in case we have to recurse
// If there are still bytes remaining, see if we can pull out another message since we won't get called again
if (buff.hasRemaining())
return len + 4 + receiveBytes(buff);
else
return len + 4;
} }
@Override @Override
@ -202,14 +212,15 @@ public class ProtobufParser<MessageType extends MessageLite> extends AbstractTim
* *
* @throws IllegalStateException If the encoded message is larger than the maximum message size. * @throws IllegalStateException If the encoded message is larger than the maximum message size.
*/ */
public synchronized void write(MessageType msg) throws IllegalStateException { public void write(MessageType msg) throws IllegalStateException {
byte[] messageBytes = msg.toByteArray(); byte[] messageBytes = msg.toByteArray();
checkState(messageBytes.length <= maxMessageSize); checkState(messageBytes.length <= maxMessageSize);
byte[] messageLength = new byte[4]; byte[] messageLength = new byte[4];
Utils.uint32ToByteArrayBE(messageBytes.length, messageLength, 0); Utils.uint32ToByteArrayBE(messageBytes.length, messageLength, 0);
try { try {
writeTarget.writeBytes(messageLength); MessageWriteTarget target = writeTarget.get();
writeTarget.writeBytes(messageBytes); target.writeBytes(messageLength);
target.writeBytes(messageBytes);
} catch (IOException e) { } catch (IOException e) {
closeConnection(); closeConnection();
} }

View File

@ -1829,6 +1829,7 @@ public class WalletTest extends TestWithWallet {
wallet.addEventListener(new AbstractWalletEventListener() { wallet.addEventListener(new AbstractWalletEventListener() {
@Override @Override
public void onCoinsReceived(Wallet wallet, Transaction tx, BigInteger prevBalance, BigInteger newBalance) { public void onCoinsReceived(Wallet wallet, Transaction tx, BigInteger prevBalance, BigInteger newBalance) {
log.info("onCoinsReceived 1");
throw new RuntimeException("barf"); throw new RuntimeException("barf");
} }
}); });
@ -1836,12 +1837,15 @@ public class WalletTest extends TestWithWallet {
wallet.addEventListener(new AbstractWalletEventListener() { wallet.addEventListener(new AbstractWalletEventListener() {
@Override @Override
public void onCoinsReceived(Wallet wallet, Transaction tx, BigInteger prevBalance, BigInteger newBalance) { public void onCoinsReceived(Wallet wallet, Transaction tx, BigInteger prevBalance, BigInteger newBalance) {
log.info("onCoinsReceived 2");
flag.incrementAndGet(); flag.incrementAndGet();
} }
}); });
sendMoneyToWallet(Utils.toNanoCoins(1, 0), AbstractBlockChain.NewBlockType.BEST_CHAIN); sendMoneyToWallet(Utils.toNanoCoins(1, 0), AbstractBlockChain.NewBlockType.BEST_CHAIN);
log.info("Wait for user thread");
Threading.waitForUserCode(); Threading.waitForUserCode();
log.info("... and test flag.");
assertEquals(1, flag.get()); assertEquals(1, flag.get());
} }