diff --git a/src/main/java/org/qortal/controller/OnlineAccountsManager.java b/src/main/java/org/qortal/controller/OnlineAccountsManager.java index 70b04e56..58e4f64e 100644 --- a/src/main/java/org/qortal/controller/OnlineAccountsManager.java +++ b/src/main/java/org/qortal/controller/OnlineAccountsManager.java @@ -67,9 +67,14 @@ public class OnlineAccountsManager extends Thread { Deque> latestBlocksOnlineAccounts = new ArrayDeque<>(MAX_BLOCKS_CACHED_ONLINE_ACCOUNTS); public OnlineAccountsManager() { - + // TODO: make private, add these tasks to scheduled executor: + // send our online accounts every 10s + // expireOnlineAccounts every ONLINE_ACCOUNTS_CHECK_INTERVAL + // broadcastOnlineAccountsQuery every ONLINE_ACCOUNTS_BROADCAST_INTERVAL + // processOnlineAccountsImportQueue every 100ms? } + // TODO: convert to SingletonContainer a-la Network public static synchronized OnlineAccountsManager getInstance() { if (instance == null) { instance = new OnlineAccountsManager(); @@ -78,6 +83,7 @@ public class OnlineAccountsManager extends Thread { return instance; } + // TODO: see constructor for more info public void run() { // Start separate thread to prepare our online accounts @@ -113,6 +119,7 @@ public class OnlineAccountsManager extends Thread { public void shutdown() { isStopping = true; + // TODO: convert interrrupt to executor.shutdownNow(); this.interrupt(); } @@ -151,11 +158,14 @@ public class OnlineAccountsManager extends Thread { // Utilities + // TODO: split this into validateAccount() and addAccount() private void verifyAndAddAccount(Repository repository, OnlineAccountData onlineAccountData) throws DataException { final Long now = NTP.getTime(); if (now == null) return; + // TODO: don't create otherAccount, instead: + // byte[] rewardSharePublicKey = onlineAccountData.getPublicKey(); PublicKeyAccount otherAccount = new PublicKeyAccount(repository, onlineAccountData.getPublicKey()); // Check timestamp is 'recent' here @@ -166,12 +176,14 @@ public class OnlineAccountsManager extends Thread { // Verify byte[] data = Longs.toByteArray(onlineAccountData.getTimestamp()); + // TODO: use Crypto.verify() static method directly if (!otherAccount.verify(onlineAccountData.getSignature(), data)) { LOGGER.trace(() -> String.format("Rejecting invalid online account %s", otherAccount.getAddress())); return; } // Qortal: check online account is actually reward-share + // TODO: use "rewardSharePublicKey" from above TODO RewardShareData rewardShareData = repository.getAccountRepository().getRewardShare(onlineAccountData.getPublicKey()); if (rewardShareData == null) { // Reward-share doesn't even exist - probably not a good sign @@ -186,6 +198,7 @@ public class OnlineAccountsManager extends Thread { return; } + // TODO: change this.onlineAccounts to a ConcurrentMap? Keyed by timestamp? synchronized (this.onlineAccounts) { OnlineAccountData existingAccountData = this.onlineAccounts.stream().filter(account -> Arrays.equals(account.getPublicKey(), onlineAccountData.getPublicKey())).findFirst().orElse(null); @@ -193,17 +206,21 @@ public class OnlineAccountsManager extends Thread { if (existingAccountData.getTimestamp() < onlineAccountData.getTimestamp()) { this.onlineAccounts.remove(existingAccountData); + // TODO: change otherAccount.getAddress() to rewardSharePublicKey in Base58? LOGGER.trace(() -> String.format("Updated online account %s with timestamp %d (was %d)", otherAccount.getAddress(), onlineAccountData.getTimestamp(), existingAccountData.getTimestamp())); } else { + // TODO: change otherAccount.getAddress() to rewardSharePublicKey in Base58? LOGGER.trace(() -> String.format("Not updating existing online account %s", otherAccount.getAddress())); return; } } else { + // TODO: change otherAccount.getAddress() to rewardSharePublicKey in Base58? LOGGER.trace(() -> String.format("Added online account %s with timestamp %d", otherAccount.getAddress(), onlineAccountData.getTimestamp())); } this.onlineAccounts.add(onlineAccountData); + // TODO: if we actually added a new account, then we need to rebuild our hashes-by-timestamp-then-byte for rewardSharePublicKey's leading byte also } } @@ -220,6 +237,7 @@ public class OnlineAccountsManager extends Thread { final long onlineAccountsTimestamp = toOnlineAccountTimestamp(now); byte[] timestampBytes = Longs.toByteArray(onlineAccountsTimestamp); + // TODO: use new addAccount() method synchronized (this.onlineAccounts) { this.onlineAccounts.clear(); diff --git a/src/main/java/org/qortal/network/message/GetOnlineAccountsV3Message.java b/src/main/java/org/qortal/network/message/GetOnlineAccountsV3Message.java new file mode 100644 index 00000000..02fed2a9 --- /dev/null +++ b/src/main/java/org/qortal/network/message/GetOnlineAccountsV3Message.java @@ -0,0 +1,110 @@ +package org.qortal.network.message; + +import com.google.common.primitives.Ints; +import com.google.common.primitives.Longs; +import org.qortal.transform.Transformer; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.*; + +/** + * For requesting online accounts info from remote peer, given our list of online accounts. + * + * Different format to V1 and V2: + * V1 is: number of entries, then timestamp + pubkey for each entry + * V2 is: groups of: number of entries, timestamp, then pubkey for each entry + * V3 is: groups of: timestamp, number of entries (one per leading byte), then hash(pubkeys) for each entry + */ +public class GetOnlineAccountsV3Message extends Message { + + private static final Map> EMPTY_ONLINE_ACCOUNTS = Collections.emptyMap(); + private Map> hashesByTimestampThenByte; + + public GetOnlineAccountsV3Message(Map> hashesByTimestampThenByte) { + super(MessageType.GET_ONLINE_ACCOUNTS_V3); + + // If we don't have ANY online accounts then it's an easier construction... + if (hashesByTimestampThenByte.isEmpty()) { + this.dataBytes = EMPTY_DATA_BYTES; + return; + } + + // We should know exactly how many bytes to allocate now + int byteSize = hashesByTimestampThenByte.size() * (Transformer.TIMESTAMP_LENGTH + Transformer.INT_LENGTH) + + Transformer.TIMESTAMP_LENGTH /* trailing zero entry indicates end of entries */; + + byteSize += hashesByTimestampThenByte.values() + .stream() + .mapToInt(map -> map.size() * Transformer.PUBLIC_KEY_LENGTH) + .sum(); + + ByteArrayOutputStream bytes = new ByteArrayOutputStream(byteSize); + + // Warning: no double-checking/fetching! We must be ConcurrentMap compatible. + // So no contains() then get() or multiple get()s on the same key/map. + try { + for (var outerMapEntry : hashesByTimestampThenByte.entrySet()) { + bytes.write(Longs.toByteArray(outerMapEntry.getKey())); + + var innerMap = outerMapEntry.getValue(); + + bytes.write(Ints.toByteArray(innerMap.size())); + + for (byte[] hashBytes : innerMap.values()) { + bytes.write(hashBytes); + } + } + + // end of records + bytes.write(Longs.toByteArray(0L)); + } catch (IOException e) { + throw new AssertionError("IOException shouldn't occur with ByteArrayOutputStream"); + } + + this.dataBytes = bytes.toByteArray(); + this.checksumBytes = Message.generateChecksum(this.dataBytes); + } + + private GetOnlineAccountsV3Message(int id, Map> hashesByTimestampThenByte) { + super(id, MessageType.GET_ONLINE_ACCOUNTS_V3); + + this.hashesByTimestampThenByte = hashesByTimestampThenByte; + } + + public Map> getHashesByTimestampThenByte() { + return this.hashesByTimestampThenByte; + } + + public static Message fromByteBuffer(int id, ByteBuffer bytes) { + // 'empty' case + if (!bytes.hasRemaining()) { + return new GetOnlineAccountsV3Message(id, EMPTY_ONLINE_ACCOUNTS); + } + + Map> hashesByTimestampThenByte = new HashMap<>(); + + while (true) { + long timestamp = bytes.getLong(); + if (timestamp == 0) + // Zero timestamp indicates end of records + break; + + int hashCount = bytes.getInt(); + Map hashesByByte = new HashMap<>(); + + for (int i = 0; i < hashCount; ++i) { + byte[] publicKeyHash = new byte[Transformer.PUBLIC_KEY_LENGTH]; + bytes.get(publicKeyHash); + + hashesByByte.put(publicKeyHash[0], publicKeyHash); + } + + hashesByTimestampThenByte.put(timestamp, hashesByByte); + } + + return new GetOnlineAccountsV3Message(id, hashesByTimestampThenByte); + } + +} diff --git a/src/main/java/org/qortal/network/message/Message.java b/src/main/java/org/qortal/network/message/Message.java index f752b5b9..d8467d90 100644 --- a/src/main/java/org/qortal/network/message/Message.java +++ b/src/main/java/org/qortal/network/message/Message.java @@ -46,6 +46,7 @@ public abstract class Message { private static final int MAX_DATA_SIZE = 10 * 1024 * 1024; // 10MB protected static final byte[] EMPTY_DATA_BYTES = new byte[0]; + private static final ByteBuffer EMPTY_READ_ONLY_BYTE_BUFFER = ByteBuffer.wrap(EMPTY_DATA_BYTES).asReadOnlyBuffer(); protected int id; protected final MessageType type; @@ -126,7 +127,7 @@ public abstract class Message { if (dataSize > 0 && dataSize + CHECKSUM_LENGTH > readOnlyBuffer.remaining()) return null; - ByteBuffer dataSlice = null; + ByteBuffer dataSlice = EMPTY_READ_ONLY_BYTE_BUFFER; if (dataSize > 0) { byte[] expectedChecksum = new byte[CHECKSUM_LENGTH]; readOnlyBuffer.get(expectedChecksum); diff --git a/src/main/java/org/qortal/network/message/MessageType.java b/src/main/java/org/qortal/network/message/MessageType.java index c2ae7676..de711dc3 100644 --- a/src/main/java/org/qortal/network/message/MessageType.java +++ b/src/main/java/org/qortal/network/message/MessageType.java @@ -46,6 +46,8 @@ public enum MessageType { GET_ONLINE_ACCOUNTS(81, GetOnlineAccountsMessage::fromByteBuffer), ONLINE_ACCOUNTS_V2(82, OnlineAccountsV2Message::fromByteBuffer), GET_ONLINE_ACCOUNTS_V2(83, GetOnlineAccountsV2Message::fromByteBuffer), + // ONLINE_ACCOUNTS_V3(84, OnlineAccountsV3Message::fromByteBuffer), + GET_ONLINE_ACCOUNTS_V3(85, GetOnlineAccountsV3Message::fromByteBuffer), ARBITRARY_DATA(90, ArbitraryDataMessage::fromByteBuffer), GET_ARBITRARY_DATA(91, GetArbitraryDataMessage::fromByteBuffer), diff --git a/src/test/java/org/qortal/test/network/OnlineAccountsV3Tests.java b/src/test/java/org/qortal/test/network/OnlineAccountsV3Tests.java new file mode 100644 index 00000000..3394213b --- /dev/null +++ b/src/test/java/org/qortal/test/network/OnlineAccountsV3Tests.java @@ -0,0 +1,225 @@ +package org.qortal.test.network; + +import com.google.common.primitives.Ints; +import com.google.common.primitives.Longs; +import org.bouncycastle.jce.provider.BouncyCastleProvider; +import org.bouncycastle.jsse.provider.BouncyCastleJsseProvider; +import org.junit.Ignore; +import org.junit.Test; +import org.qortal.data.network.OnlineAccountData; +import org.qortal.network.message.*; +import org.qortal.transform.Transformer; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.security.Security; +import java.util.*; + +import static org.junit.Assert.*; + +public class OnlineAccountsV3Tests { + + private static final Random RANDOM = new Random(); + static { + // This must go before any calls to LogManager/Logger + System.setProperty("java.util.logging.manager", "org.apache.logging.log4j.jul.LogManager"); + + Security.insertProviderAt(new BouncyCastleProvider(), 0); + Security.insertProviderAt(new BouncyCastleJsseProvider(), 1); + } + + @Ignore("For informational use") + @Test + public void compareV2ToV3() throws MessageException { + List onlineAccounts = generateOnlineAccounts(false); + + // How many of each timestamp and leading byte (of public key) + Map> hashesByTimestampThenByte = convertToHashMaps(onlineAccounts); + + byte[] v3DataBytes = new GetOnlineAccountsV3Message(hashesByTimestampThenByte).toBytes(); + int v3ByteSize = v3DataBytes.length; + + byte[] v2DataBytes = new GetOnlineAccountsV2Message(onlineAccounts).toBytes(); + int v2ByteSize = v2DataBytes.length; + + int numTimestamps = hashesByTimestampThenByte.size(); + System.out.printf("For %d accounts split across %d timestamp%s: V2 size %d vs V3 size %d%n", + onlineAccounts.size(), + numTimestamps, + numTimestamps != 1 ? "s" : "", + v2ByteSize, + v3ByteSize + ); + + for (var outerMapEntry : hashesByTimestampThenByte.entrySet()) { + long timestamp = outerMapEntry.getKey(); + + var innerMap = outerMapEntry.getValue(); + + System.out.printf("For timestamp %d: %d / 256 slots used.%n", + timestamp, + innerMap.size() + ); + } + } + + private Map> convertToHashMaps(List onlineAccounts) { + // How many of each timestamp and leading byte (of public key) + Map> hashesByTimestampThenByte = new HashMap<>(); + + for (OnlineAccountData onlineAccountData : onlineAccounts) { + Long timestamp = onlineAccountData.getTimestamp(); + Byte leadingByte = onlineAccountData.getPublicKey()[0]; + + hashesByTimestampThenByte + .computeIfAbsent(timestamp, k -> new HashMap<>()) + .compute(leadingByte, (k, v) -> xorByteArrayInPlace(v, onlineAccountData.getPublicKey())); + } + + return hashesByTimestampThenByte; + } + + // TODO: This needs to be moved - probably to be OnlineAccountsManager + private static byte[] xorByteArrayInPlace(byte[] inplaceArray, byte[] otherArray) { + if (inplaceArray == null) + return Arrays.copyOf(otherArray, otherArray.length); + + // Start from index 1 to enforce static leading byte + for (int i = 1; i < otherArray.length; i++) + inplaceArray[i] ^= otherArray[otherArray.length - i - 1]; + + return inplaceArray; + } + + @Test + public void testOnGetOnlineAccountsV3() { + List ourOnlineAccounts = generateOnlineAccounts(false); + List peersOnlineAccounts = generateOnlineAccounts(false); + + Map> ourConvertedHashes = convertToHashMaps(ourOnlineAccounts); + Map> peersConvertedHashes = convertToHashMaps(peersOnlineAccounts); + + List mockReply = new ArrayList<>(); + + // Warning: no double-checking/fetching - we must be ConcurrentMap compatible! + // So no contains()-then-get() or multiple get()s on the same key/map. + for (var ourOuterMapEntry : ourConvertedHashes.entrySet()) { + Long timestamp = ourOuterMapEntry.getKey(); + + var ourInnerMap = ourOuterMapEntry.getValue(); + var peersInnerMap = peersConvertedHashes.get(timestamp); + + if (peersInnerMap == null) { + // Peer doesn't have this timestamp, so if it's valid (i.e. not too old) then we'd have to send all of ours + for (Byte leadingByte : ourInnerMap.keySet()) + mockReply.add(timestamp + ":" + leadingByte); + } else { + // We have entries for this timestamp so compare against peer's entries + for (var ourInnerMapEntry : ourInnerMap.entrySet()) { + Byte leadingByte = ourInnerMapEntry.getKey(); + byte[] peersHash = peersInnerMap.get(leadingByte); + + if (!Arrays.equals(ourInnerMapEntry.getValue(), peersHash)) { + // We don't match peer, or peer doesn't have - send all online accounts for this timestamp and leading byte + mockReply.add(timestamp + ":" + leadingByte); + } + } + } + } + + int numOurTimestamps = ourConvertedHashes.size(); + System.out.printf("We have %d accounts split across %d timestamp%s%n", + ourOnlineAccounts.size(), + numOurTimestamps, + numOurTimestamps != 1 ? "s" : "" + ); + + int numPeerTimestamps = peersConvertedHashes.size(); + System.out.printf("Peer sent %d accounts split across %d timestamp%s%n", + peersOnlineAccounts.size(), + numPeerTimestamps, + numPeerTimestamps != 1 ? "s" : "" + ); + + System.out.printf("We need to send: %d%n%s%n", mockReply.size(), String.join(", ", mockReply)); + } + + @Test + public void testSerialization() throws MessageException { + List onlineAccountsOut = generateOnlineAccounts(true); + Map> hashesByTimestampThenByteOut = convertToHashMaps(onlineAccountsOut); + + validateSerialization(hashesByTimestampThenByteOut); + } + + @Test + public void testEmptySerialization() throws MessageException { + Map> hashesByTimestampThenByteOut = Collections.emptyMap(); + validateSerialization(hashesByTimestampThenByteOut); + + hashesByTimestampThenByteOut = new HashMap<>(); + validateSerialization(hashesByTimestampThenByteOut); + } + + private void validateSerialization(Map> hashesByTimestampThenByteOut) throws MessageException { + Message messageOut = new GetOnlineAccountsV3Message(hashesByTimestampThenByteOut); + byte[] messageBytes = messageOut.toBytes(); + + ByteBuffer byteBuffer = ByteBuffer.wrap(messageBytes).asReadOnlyBuffer(); + + GetOnlineAccountsV3Message messageIn = (GetOnlineAccountsV3Message) Message.fromByteBuffer(byteBuffer); + + Map> hashesByTimestampThenByteIn = messageIn.getHashesByTimestampThenByte(); + + Set timestampsIn = hashesByTimestampThenByteIn.keySet(); + Set timestampsOut = hashesByTimestampThenByteOut.keySet(); + assertEquals("timestamp count mismatch", timestampsOut.size(), timestampsIn.size()); + assertTrue("timestamps mismatch", timestampsIn.containsAll(timestampsOut)); + + for (Long timestamp : timestampsIn) { + Map hashesByByteIn = hashesByTimestampThenByteIn.get(timestamp); + Map hashesByByteOut = hashesByTimestampThenByteOut.get(timestamp); + assertNotNull("timestamp entry missing", hashesByByteOut); + + Set leadingBytesIn = hashesByByteIn.keySet(); + Set leadingBytesOut = hashesByByteOut.keySet(); + assertEquals("leading byte entry count mismatch", leadingBytesOut.size(), leadingBytesIn.size()); + assertTrue("leading byte entry mismatch", leadingBytesIn.containsAll(leadingBytesOut)); + + for (Byte leadingByte : leadingBytesOut) { + byte[] bytesIn = hashesByByteIn.get(leadingByte); + byte[] bytesOut = hashesByByteOut.get(leadingByte); + + assertTrue("pubkey hash mismatch", Arrays.equals(bytesOut, bytesIn)); + } + } + } + + private List generateOnlineAccounts(boolean withSignatures) { + List onlineAccounts = new ArrayList<>(); + + int numTimestamps = RANDOM.nextInt(2) + 1; // 1 or 2 + + for (int t = 0; t < numTimestamps; ++t) { + long timestamp = 1 << 31 + (t + 1) << 12; + int numAccounts = RANDOM.nextInt(3000); + + for (int a = 0; a < numAccounts; ++a) { + byte[] sig = null; + if (withSignatures) { + sig = new byte[Transformer.SIGNATURE_LENGTH]; + RANDOM.nextBytes(sig); + } + + byte[] pubkey = new byte[Transformer.PUBLIC_KEY_LENGTH]; + RANDOM.nextBytes(pubkey); + + onlineAccounts.add(new OnlineAccountData(timestamp, sig, pubkey)); + } + } + + return onlineAccounts; + } + +}