diff --git a/src/main/java/org/qortal/block/Block.java b/src/main/java/org/qortal/block/Block.java index 5fe005d6..a0cba9bb 100644 --- a/src/main/java/org/qortal/block/Block.java +++ b/src/main/java/org/qortal/block/Block.java @@ -1023,8 +1023,8 @@ public class Block { // If this block is much older than current online timestamp, then there's no point checking current online accounts List currentOnlineAccounts = onlineTimestamp < NTP.getTime() - OnlineAccountsManager.ONLINE_TIMESTAMP_MODULUS ? null - : OnlineAccountsManager.getInstance().getOnlineAccounts(); - List latestBlocksOnlineAccounts = OnlineAccountsManager.getInstance().getLatestBlocksOnlineAccounts(); + : OnlineAccountsManager.getInstance().getOnlineAccounts(onlineTimestamp); + List latestBlocksOnlineAccounts = OnlineAccountsManager.getInstance().getLatestBlocksOnlineAccounts(onlineTimestamp); // Extract online accounts' timestamp signatures from block data List onlineAccountsSignatures = BlockTransformer.decodeTimestampSignatures(this.blockData.getOnlineAccountsSignatures()); diff --git a/src/main/java/org/qortal/controller/Controller.java b/src/main/java/org/qortal/controller/Controller.java index 0a011db5..d6be1b07 100644 --- a/src/main/java/org/qortal/controller/Controller.java +++ b/src/main/java/org/qortal/controller/Controller.java @@ -1229,6 +1229,10 @@ public class Controller extends Thread { OnlineAccountsManager.getInstance().onNetworkOnlineAccountsV2Message(peer, message); break; + case GET_ONLINE_ACCOUNTS_V3: + OnlineAccountsManager.getInstance().onNetworkGetOnlineAccountsV3Message(peer, message); + break; + case GET_ARBITRARY_DATA: // Not currently supported break; diff --git a/src/main/java/org/qortal/controller/OnlineAccountsManager.java b/src/main/java/org/qortal/controller/OnlineAccountsManager.java index 58e4f64e..4e8b3c77 100644 --- a/src/main/java/org/qortal/controller/OnlineAccountsManager.java +++ b/src/main/java/org/qortal/controller/OnlineAccountsManager.java @@ -1,12 +1,13 @@ package org.qortal.controller; +import com.google.common.hash.HashCode; import com.google.common.primitives.Longs; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.qortal.account.Account; import org.qortal.account.PrivateKeyAccount; -import org.qortal.account.PublicKeyAccount; import org.qortal.block.BlockChain; +import org.qortal.crypto.Crypto; import org.qortal.data.account.MintingAccountData; import org.qortal.data.account.RewardShareData; import org.qortal.data.network.OnlineAccountData; @@ -18,212 +19,101 @@ import org.qortal.repository.Repository; import org.qortal.repository.RepositoryManager; import org.qortal.utils.Base58; import org.qortal.utils.NTP; +import org.qortal.utils.NamedThreadFactory; import java.util.*; +import java.util.concurrent.*; import java.util.stream.Collectors; -public class OnlineAccountsManager extends Thread { - - private class OurOnlineAccountsThread extends Thread { - - public void run() { - try { - while (!isStopping) { - Thread.sleep(10000L); - - // Refresh our online accounts signatures? - sendOurOnlineAccountsInfo(); - - } - } catch (InterruptedException e) { - // Fall through to exit thread - } - } - } - +public class OnlineAccountsManager { private static final Logger LOGGER = LogManager.getLogger(OnlineAccountsManager.class); - private static OnlineAccountsManager instance; + // 'Current' as in 'now' + + /** + * How long online accounts signatures last before they expire. + */ + public static final long ONLINE_TIMESTAMP_MODULUS = 5 * 60 * 1000L; + + /** + * How many 'current' timestamp-sets of online accounts we cache. + */ + private static final int MAX_CACHED_TIMESTAMP_SETS = 2; + + /** + * How many timestamp-sets of online accounts we cache for 'latest blocks'. + */ + private static final int MAX_BLOCKS_CACHED_ONLINE_ACCOUNTS = 3; + + private static final long ONLINE_ACCOUNTS_QUEUE_INTERVAL = 100L; //ms + private static final long ONLINE_ACCOUNTS_TASKS_INTERVAL = 10 * 1000L; // ms + private static final long ONLINE_ACCOUNTS_LEGACY_BROADCAST_INTERVAL = 60 * 1000L; // ms + private static final long ONLINE_ACCOUNTS_BROADCAST_INTERVAL = 10 * 1000L; // ms + + private static final long ONLINE_ACCOUNTS_V2_PEER_VERSION = 0x0300020000L; // v3.2.0 + private static final long ONLINE_ACCOUNTS_V3_PEER_VERSION = 0x03000200cbL; // v3.2.203 + + private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(4, new NamedThreadFactory("OnlineAccounts")); private volatile boolean isStopping = false; - // To do with online accounts list - private static final long ONLINE_ACCOUNTS_TASKS_INTERVAL = 10 * 1000L; // ms - private static final long ONLINE_ACCOUNTS_BROADCAST_INTERVAL = 1 * 60 * 1000L; // ms - public static final long ONLINE_TIMESTAMP_MODULUS = 5 * 60 * 1000L; - private static final long LAST_SEEN_EXPIRY_PERIOD = (ONLINE_TIMESTAMP_MODULUS * 2) + (1 * 60 * 1000L); - /** How many (latest) blocks' worth of online accounts we cache */ - private static final int MAX_BLOCKS_CACHED_ONLINE_ACCOUNTS = 2; - private static final long ONLINE_ACCOUNTS_V2_PEER_VERSION = 0x0300020000L; + private final Set onlineAccountsImportQueue = ConcurrentHashMap.newKeySet(); - private long onlineAccountsTasksTimestamp = Controller.startTime + ONLINE_ACCOUNTS_TASKS_INTERVAL; // ms + /** + * Cache of 'current' online accounts, keyed by timestamp + */ + private final Map> currentOnlineAccounts = new ConcurrentHashMap<>(); + /** + * Cache of hash-summary of 'current' online accounts, keyed by timestamp, then leading byte of public key. + *

+ * Inner map is also sorted using {@code Byte::compareUnsigned} as a comparator. + * This is critical for proper function of GET_ONLINE_ACCOUNTS_V3 protocol. + */ + private final Map> currentOnlineAccountsHashes = new ConcurrentHashMap<>(); - private final List onlineAccountsImportQueue = Collections.synchronizedList(new ArrayList<>()); + /** + * Cache of online accounts for latest blocks - not necessarily 'current' / now. + * Probably only accessed / modified by a single Synchronizer thread. + */ + private final Map> latestBlocksOnlineAccounts = new ConcurrentHashMap<>(); - - /** Cache of current 'online accounts' */ - List onlineAccounts = new ArrayList<>(); - /** Cache of latest blocks' online accounts */ - Deque> latestBlocksOnlineAccounts = new ArrayDeque<>(MAX_BLOCKS_CACHED_ONLINE_ACCOUNTS); - - public OnlineAccountsManager() { - // TODO: make private, add these tasks to scheduled executor: - // send our online accounts every 10s - // expireOnlineAccounts every ONLINE_ACCOUNTS_CHECK_INTERVAL - // broadcastOnlineAccountsQuery every ONLINE_ACCOUNTS_BROADCAST_INTERVAL - // processOnlineAccountsImportQueue every 100ms? + public static long toOnlineAccountTimestamp(long timestamp) { + return (timestamp / ONLINE_TIMESTAMP_MODULUS) * ONLINE_TIMESTAMP_MODULUS; } - // TODO: convert to SingletonContainer a-la Network - public static synchronized OnlineAccountsManager getInstance() { - if (instance == null) { - instance = new OnlineAccountsManager(); - } - - return instance; + private OnlineAccountsManager() { } - // TODO: see constructor for more info - public void run() { + private static class SingletonContainer { + private static final OnlineAccountsManager INSTANCE = new OnlineAccountsManager(); + } - // Start separate thread to prepare our online accounts - // This could be converted to a thread pool later if more concurrency is needed - OurOnlineAccountsThread ourOnlineAccountsThread = new OurOnlineAccountsThread(); - ourOnlineAccountsThread.start(); + public static OnlineAccountsManager getInstance() { + return SingletonContainer.INSTANCE; + } - try { - while (!Controller.isStopping()) { - Thread.sleep(100L); + public void start() { + // Expire old online accounts signatures + executor.scheduleAtFixedRate(this::expireOldOnlineAccounts, ONLINE_ACCOUNTS_TASKS_INTERVAL, ONLINE_ACCOUNTS_TASKS_INTERVAL, TimeUnit.MILLISECONDS); - final Long now = NTP.getTime(); - if (now == null) { - continue; - } + // Send our online accounts + executor.scheduleAtFixedRate(this::sendOurOnlineAccountsInfo, ONLINE_ACCOUNTS_BROADCAST_INTERVAL, ONLINE_ACCOUNTS_BROADCAST_INTERVAL, TimeUnit.MILLISECONDS); - // Perform tasks to do with managing online accounts list - if (now >= onlineAccountsTasksTimestamp) { - onlineAccountsTasksTimestamp = now + ONLINE_ACCOUNTS_TASKS_INTERVAL; - performOnlineAccountsTasks(); - } + // Request online accounts from peers (legacy) + executor.scheduleAtFixedRate(this::requestLegacyRemoteOnlineAccounts, ONLINE_ACCOUNTS_LEGACY_BROADCAST_INTERVAL, ONLINE_ACCOUNTS_LEGACY_BROADCAST_INTERVAL, TimeUnit.MILLISECONDS); + // Request online accounts from peers (V3+) + executor.scheduleAtFixedRate(this::requestRemoteOnlineAccounts, ONLINE_ACCOUNTS_BROADCAST_INTERVAL, ONLINE_ACCOUNTS_BROADCAST_INTERVAL, TimeUnit.MILLISECONDS); - // Process queued online account verifications - this.processOnlineAccountsImportQueue(); - - } - } catch (InterruptedException e) { - // Fall through to exit thread - } - - ourOnlineAccountsThread.interrupt(); + // Process import queue + executor.scheduleWithFixedDelay(this::processOnlineAccountsImportQueue, ONLINE_ACCOUNTS_QUEUE_INTERVAL, ONLINE_ACCOUNTS_QUEUE_INTERVAL, TimeUnit.MILLISECONDS); } public void shutdown() { isStopping = true; - // TODO: convert interrrupt to executor.shutdownNow(); - this.interrupt(); - } - - - // Online accounts import queue - - private void processOnlineAccountsImportQueue() { - if (this.onlineAccountsImportQueue.isEmpty()) { - // Nothing to do - return; - } - - LOGGER.debug("Processing online accounts import queue (size: {})", this.onlineAccountsImportQueue.size()); - - try (final Repository repository = RepositoryManager.getRepository()) { - - List onlineAccountDataCopy = new ArrayList<>(this.onlineAccountsImportQueue); - for (OnlineAccountData onlineAccountData : onlineAccountDataCopy) { - if (isStopping) { - return; - } - - this.verifyAndAddAccount(repository, onlineAccountData); - - // Remove from queue - onlineAccountsImportQueue.remove(onlineAccountData); - } - - LOGGER.debug("Finished processing online accounts import queue"); - - } catch (DataException e) { - LOGGER.error(String.format("Repository issue while verifying online accounts"), e); - } - } - - - // Utilities - - // TODO: split this into validateAccount() and addAccount() - private void verifyAndAddAccount(Repository repository, OnlineAccountData onlineAccountData) throws DataException { - final Long now = NTP.getTime(); - if (now == null) - return; - - // TODO: don't create otherAccount, instead: - // byte[] rewardSharePublicKey = onlineAccountData.getPublicKey(); - PublicKeyAccount otherAccount = new PublicKeyAccount(repository, onlineAccountData.getPublicKey()); - - // Check timestamp is 'recent' here - if (Math.abs(onlineAccountData.getTimestamp() - now) > ONLINE_TIMESTAMP_MODULUS * 2) { - LOGGER.trace(() -> String.format("Rejecting online account %s with out of range timestamp %d", otherAccount.getAddress(), onlineAccountData.getTimestamp())); - return; - } - - // Verify - byte[] data = Longs.toByteArray(onlineAccountData.getTimestamp()); - // TODO: use Crypto.verify() static method directly - if (!otherAccount.verify(onlineAccountData.getSignature(), data)) { - LOGGER.trace(() -> String.format("Rejecting invalid online account %s", otherAccount.getAddress())); - return; - } - - // Qortal: check online account is actually reward-share - // TODO: use "rewardSharePublicKey" from above TODO - RewardShareData rewardShareData = repository.getAccountRepository().getRewardShare(onlineAccountData.getPublicKey()); - if (rewardShareData == null) { - // Reward-share doesn't even exist - probably not a good sign - LOGGER.trace(() -> String.format("Rejecting unknown online reward-share public key %s", Base58.encode(onlineAccountData.getPublicKey()))); - return; - } - - Account mintingAccount = new Account(repository, rewardShareData.getMinter()); - if (!mintingAccount.canMint()) { - // Minting-account component of reward-share can no longer mint - disregard - LOGGER.trace(() -> String.format("Rejecting online reward-share with non-minting account %s", mintingAccount.getAddress())); - return; - } - - // TODO: change this.onlineAccounts to a ConcurrentMap? Keyed by timestamp? - synchronized (this.onlineAccounts) { - OnlineAccountData existingAccountData = this.onlineAccounts.stream().filter(account -> Arrays.equals(account.getPublicKey(), onlineAccountData.getPublicKey())).findFirst().orElse(null); - - if (existingAccountData != null) { - if (existingAccountData.getTimestamp() < onlineAccountData.getTimestamp()) { - this.onlineAccounts.remove(existingAccountData); - - // TODO: change otherAccount.getAddress() to rewardSharePublicKey in Base58? - LOGGER.trace(() -> String.format("Updated online account %s with timestamp %d (was %d)", otherAccount.getAddress(), onlineAccountData.getTimestamp(), existingAccountData.getTimestamp())); - } else { - // TODO: change otherAccount.getAddress() to rewardSharePublicKey in Base58? - LOGGER.trace(() -> String.format("Not updating existing online account %s", otherAccount.getAddress())); - - return; - } - } else { - // TODO: change otherAccount.getAddress() to rewardSharePublicKey in Base58? - LOGGER.trace(() -> String.format("Added online account %s with timestamp %d", otherAccount.getAddress(), onlineAccountData.getTimestamp())); - } - - this.onlineAccounts.add(onlineAccountData); - // TODO: if we actually added a new account, then we need to rebuild our hashes-by-timestamp-then-byte for rewardSharePublicKey's leading byte also - } + executor.shutdownNow(); } + // Testing support public void ensureTestingAccountsOnline(PrivateKeyAccount... onlineAccounts) { if (!BlockChain.getInstance().isTestChain()) { LOGGER.warn("Ignoring attempt to ensure test account is online for non-test chain!"); @@ -237,61 +127,222 @@ public class OnlineAccountsManager extends Thread { final long onlineAccountsTimestamp = toOnlineAccountTimestamp(now); byte[] timestampBytes = Longs.toByteArray(onlineAccountsTimestamp); - // TODO: use new addAccount() method - synchronized (this.onlineAccounts) { - this.onlineAccounts.clear(); + Set replacementAccounts = new HashSet<>(); + for (PrivateKeyAccount onlineAccount : onlineAccounts) { + // Check mintingAccount is actually reward-share? - for (PrivateKeyAccount onlineAccount : onlineAccounts) { - // Check mintingAccount is actually reward-share? + byte[] signature = onlineAccount.sign(timestampBytes); + byte[] publicKey = onlineAccount.getPublicKey(); - byte[] signature = onlineAccount.sign(timestampBytes); - byte[] publicKey = onlineAccount.getPublicKey(); + OnlineAccountData ourOnlineAccountData = new OnlineAccountData(onlineAccountsTimestamp, signature, publicKey); + replacementAccounts.add(ourOnlineAccountData); + } - OnlineAccountData ourOnlineAccountData = new OnlineAccountData(onlineAccountsTimestamp, signature, publicKey); - this.onlineAccounts.add(ourOnlineAccountData); + this.currentOnlineAccounts.clear(); + addAccounts(replacementAccounts); + } + + // Online accounts import queue + + private void processOnlineAccountsImportQueue() { + if (this.onlineAccountsImportQueue.isEmpty()) + // Nothing to do + return; + + LOGGER.debug("Processing online accounts import queue (size: {})", this.onlineAccountsImportQueue.size()); + + Set onlineAccountsToAdd = new HashSet<>(); + try (final Repository repository = RepositoryManager.getRepository()) { + for (OnlineAccountData onlineAccountData : this.onlineAccountsImportQueue) { + if (isStopping) + return; + + boolean isValid = this.validateAccount(repository, onlineAccountData); + if (isValid) + onlineAccountsToAdd.add(onlineAccountData); + + // Remove from queue + onlineAccountsImportQueue.remove(onlineAccountData); + } + + LOGGER.debug("Finished validating online accounts import queue"); + } catch (DataException e) { + LOGGER.error("Repository issue while verifying online accounts", e); + } + + LOGGER.debug("Adding {} validated online accounts from import queue", onlineAccountsToAdd.size()); + addAccounts(onlineAccountsToAdd); + } + + // Utilities + + public static byte[] xorByteArrayInPlace(byte[] inplaceArray, byte[] otherArray) { + if (inplaceArray == null) + return Arrays.copyOf(otherArray, otherArray.length); + + // Start from index 1 to enforce static leading byte + for (int i = 1; i < otherArray.length; i++) + // inplaceArray[i] ^= otherArray[otherArray.length - i - 1]; + inplaceArray[i] ^= otherArray[i]; + + return inplaceArray; + } + + private boolean validateAccount(Repository repository, OnlineAccountData onlineAccountData) throws DataException { + final Long now = NTP.getTime(); + if (now == null) + return false; + + byte[] rewardSharePublicKey = onlineAccountData.getPublicKey(); + long onlineAccountTimestamp = onlineAccountData.getTimestamp(); + + // Check timestamp is 'recent' here + if (Math.abs(onlineAccountTimestamp - now) > ONLINE_TIMESTAMP_MODULUS * 2) { + LOGGER.trace(() -> String.format("Rejecting online account %s with out of range timestamp %d", Base58.encode(rewardSharePublicKey), onlineAccountTimestamp)); + return false; + } + + // Verify + byte[] data = Longs.toByteArray(onlineAccountData.getTimestamp()); + if (!Crypto.verify(rewardSharePublicKey, onlineAccountData.getSignature(), data)) { + LOGGER.trace(() -> String.format("Rejecting invalid online account %s", Base58.encode(rewardSharePublicKey))); + return false; + } + + // Qortal: check online account is actually reward-share + RewardShareData rewardShareData = repository.getAccountRepository().getRewardShare(rewardSharePublicKey); + if (rewardShareData == null) { + // Reward-share doesn't even exist - probably not a good sign + LOGGER.trace(() -> String.format("Rejecting unknown online reward-share public key %s", Base58.encode(rewardSharePublicKey))); + return false; + } + + Account mintingAccount = new Account(repository, rewardShareData.getMinter()); + if (!mintingAccount.canMint()) { + // Minting-account component of reward-share can no longer mint - disregard + LOGGER.trace(() -> String.format("Rejecting online reward-share with non-minting account %s", mintingAccount.getAddress())); + return false; + } + + return true; + } + + private void addAccounts(Set onlineAccountsToAdd) { + // For keeping track of which hashes to rebuild + Map> hashesToRebuild = new HashMap<>(); + + for (OnlineAccountData onlineAccountData : onlineAccountsToAdd) { + boolean isNewEntry = this.addAccount(onlineAccountData); + + if (isNewEntry) + hashesToRebuild.computeIfAbsent(onlineAccountData.getTimestamp(), k -> new HashSet<>()).add(onlineAccountData.getPublicKey()[0]); + } + + for (var entry : hashesToRebuild.entrySet()) { + Long timestamp = entry.getKey(); + + LOGGER.debug(String.format("Rehashing for timestamp %d and leading bytes %s", + timestamp, + entry.getValue().stream().sorted(Byte::compareUnsigned).map(leadingByte -> String.format("%02x", leadingByte)).collect(Collectors.joining(", ")) + ) + ); + + for (Byte leadingByte : entry.getValue()) { + byte[] pubkeyHash = currentOnlineAccounts.get(timestamp).stream() + .map(OnlineAccountData::getPublicKey) + .filter(publicKey -> leadingByte == publicKey[0]) + .reduce(null, OnlineAccountsManager::xorByteArrayInPlace); + + currentOnlineAccountsHashes.computeIfAbsent(timestamp, k -> new ConcurrentSkipListMap<>(Byte::compareUnsigned)).put(leadingByte, pubkeyHash); + + LOGGER.trace(() -> String.format("Rebuilt hash %s for timestamp %d and leading byte %02x using %d public keys", + HashCode.fromBytes(pubkeyHash), + timestamp, + leadingByte, + currentOnlineAccounts.get(timestamp).stream() + .map(OnlineAccountData::getPublicKey) + .filter(publicKey -> leadingByte == publicKey[0]) + .count() + )); } } } - private void performOnlineAccountsTasks() { + private boolean addAccount(OnlineAccountData onlineAccountData) { + byte[] rewardSharePublicKey = onlineAccountData.getPublicKey(); + long onlineAccountTimestamp = onlineAccountData.getTimestamp(); + + Set onlineAccounts = this.currentOnlineAccounts.computeIfAbsent(onlineAccountTimestamp, k -> ConcurrentHashMap.newKeySet()); + boolean isNewEntry = onlineAccounts.add(onlineAccountData); + + if (isNewEntry) + LOGGER.trace(() -> String.format("Added online account %s with timestamp %d", Base58.encode(rewardSharePublicKey), onlineAccountTimestamp)); + else + LOGGER.trace(() -> String.format("Not updating existing online account %s with timestamp %d", Base58.encode(rewardSharePublicKey), onlineAccountTimestamp)); + + return isNewEntry; + } + + /** + * Expire old entries. + */ + private void expireOldOnlineAccounts() { final Long now = NTP.getTime(); if (now == null) return; - // Expire old entries - final long cutoffThreshold = now - LAST_SEEN_EXPIRY_PERIOD; - synchronized (this.onlineAccounts) { - Iterator iterator = this.onlineAccounts.iterator(); - while (iterator.hasNext()) { - OnlineAccountData onlineAccountData = iterator.next(); - - if (onlineAccountData.getTimestamp() < cutoffThreshold) { - iterator.remove(); - - LOGGER.trace(() -> { - PublicKeyAccount otherAccount = new PublicKeyAccount(null, onlineAccountData.getPublicKey()); - return String.format("Removed expired online account %s with timestamp %d", otherAccount.getAddress(), onlineAccountData.getTimestamp()); - }); - } - } - } - - // Request data from other peers? - if ((this.onlineAccountsTasksTimestamp % ONLINE_ACCOUNTS_BROADCAST_INTERVAL) < ONLINE_ACCOUNTS_TASKS_INTERVAL) { - List safeOnlineAccounts; - synchronized (this.onlineAccounts) { - safeOnlineAccounts = new ArrayList<>(this.onlineAccounts); - } - - Message messageV1 = new GetOnlineAccountsMessage(safeOnlineAccounts); - Message messageV2 = new GetOnlineAccountsV2Message(safeOnlineAccounts); - - Network.getInstance().broadcast(peer -> - peer.getPeersVersion() >= ONLINE_ACCOUNTS_V2_PEER_VERSION ? messageV2 : messageV1 - ); - } + final long cutoffThreshold = now - MAX_CACHED_TIMESTAMP_SETS * ONLINE_TIMESTAMP_MODULUS; + this.currentOnlineAccounts.keySet().removeIf(timestamp -> timestamp < cutoffThreshold); + this.currentOnlineAccountsHashes.keySet().removeIf(timestamp -> timestamp < cutoffThreshold); } + /** + * Request data from other peers. (Pre-V3) + */ + private void requestLegacyRemoteOnlineAccounts() { + final Long now = NTP.getTime(); + if (now == null) + return; + + // Don't bother if we're not up to date + if (!Controller.getInstance().isUpToDate()) + return; + + List mergedOnlineAccounts = Set.copyOf(this.currentOnlineAccounts.values()).stream().flatMap(Set::stream).collect(Collectors.toList()); + + Message messageV2 = new GetOnlineAccountsV2Message(mergedOnlineAccounts); + + Network.getInstance().broadcast(peer -> + peer.getPeersVersion() < ONLINE_ACCOUNTS_V3_PEER_VERSION + ? messageV2 + : null + ); + } + + /** + * Request data from other peers. V3+ + */ + private void requestRemoteOnlineAccounts() { + final Long now = NTP.getTime(); + if (now == null) + return; + + // Don't bother if we're not up to date + if (!Controller.getInstance().isUpToDate()) + return; + + Message messageV3 = new GetOnlineAccountsV3Message(currentOnlineAccountsHashes); + + Network.getInstance().broadcast(peer -> + peer.getPeersVersion() >= ONLINE_ACCOUNTS_V3_PEER_VERSION + ? messageV3 + : null + ); + } + + /** + * Send online accounts that are minting on this node. + */ private void sendOurOnlineAccountsInfo() { final Long now = NTP.getTime(); if (now == null) { @@ -302,13 +353,12 @@ public class OnlineAccountsManager extends Thread { try (final Repository repository = RepositoryManager.getRepository()) { mintingAccounts = repository.getAccountRepository().getMintingAccounts(); - // We have no accounts, but don't reset timestamp + // We have no accounts to send if (mintingAccounts.isEmpty()) return; - // Only reward-share accounts allowed + // Only active reward-shares allowed Iterator iterator = mintingAccounts.iterator(); - int i = 0; while (iterator.hasNext()) { MintingAccountData mintingAccountData = iterator.next(); @@ -325,11 +375,6 @@ public class OnlineAccountsManager extends Thread { iterator.remove(); continue; } - - if (++i > 1+1) { - iterator.remove(); - continue; - } } } catch (DataException e) { LOGGER.warn(String.format("Repository issue trying to fetch minting accounts: %s", e.getMessage())); @@ -343,7 +388,6 @@ public class OnlineAccountsManager extends Thread { byte[] timestampBytes = Longs.toByteArray(onlineAccountsTimestamp); List ourOnlineAccounts = new ArrayList<>(); - MINTING_ACCOUNTS: for (MintingAccountData mintingAccountData : mintingAccounts) { PrivateKeyAccount mintingAccount = new PrivateKeyAccount(null, mintingAccountData.getPrivateKey()); @@ -352,28 +396,13 @@ public class OnlineAccountsManager extends Thread { // Our account is online OnlineAccountData ourOnlineAccountData = new OnlineAccountData(onlineAccountsTimestamp, signature, publicKey); - synchronized (this.onlineAccounts) { - Iterator iterator = this.onlineAccounts.iterator(); - while (iterator.hasNext()) { - OnlineAccountData existingOnlineAccountData = iterator.next(); - if (Arrays.equals(existingOnlineAccountData.getPublicKey(), ourOnlineAccountData.getPublicKey())) { - // If our online account is already present, with same timestamp, then move on to next mintingAccount - if (existingOnlineAccountData.getTimestamp() == onlineAccountsTimestamp) - continue MINTING_ACCOUNTS; - - // If our online account is already present, but with older timestamp, then remove it - iterator.remove(); - break; - } - } - - this.onlineAccounts.add(ourOnlineAccountData); + boolean isNewEntry = addAccount(ourOnlineAccountData); + if (isNewEntry) { + LOGGER.trace(() -> String.format("Added our online account %s with timestamp %d", Base58.encode(mintingAccount.getPublicKey()), onlineAccountsTimestamp)); + ourOnlineAccounts.add(ourOnlineAccountData); + hasInfoChanged = true; } - - LOGGER.trace(() -> String.format("Added our online account %s with timestamp %d", mintingAccount.getAddress(), onlineAccountsTimestamp)); - ourOnlineAccounts.add(ourOnlineAccountData); - hasInfoChanged = true; } if (!hasInfoChanged) @@ -381,52 +410,81 @@ public class OnlineAccountsManager extends Thread { Message messageV1 = new OnlineAccountsMessage(ourOnlineAccounts); Message messageV2 = new OnlineAccountsV2Message(ourOnlineAccounts); + Message messageV3 = new OnlineAccountsV2Message(ourOnlineAccounts); // TODO: V3 message Network.getInstance().broadcast(peer -> - peer.getPeersVersion() >= ONLINE_ACCOUNTS_V2_PEER_VERSION ? messageV2 : messageV1 + peer.getPeersVersion() >= ONLINE_ACCOUNTS_V3_PEER_VERSION + ? messageV3 + : peer.getPeersVersion() >= ONLINE_ACCOUNTS_V2_PEER_VERSION + ? messageV2 + : messageV1 ); - LOGGER.trace(() -> String.format("Broadcasted %d online account%s with timestamp %d", ourOnlineAccounts.size(), (ourOnlineAccounts.size() != 1 ? "s" : ""), onlineAccountsTimestamp)); + LOGGER.debug("Broadcasted {} online account{} with timestamp {}", ourOnlineAccounts.size(), (ourOnlineAccounts.size() != 1 ? "s" : ""), onlineAccountsTimestamp); } - public static long toOnlineAccountTimestamp(long timestamp) { - return (timestamp / ONLINE_TIMESTAMP_MODULUS) * ONLINE_TIMESTAMP_MODULUS; + /** + * Returns list of online accounts matching given timestamp. + */ + // Block::mint() - only wants online accounts with timestamp that matches block's timestamp so they can be added to new block + // Block::areOnlineAccountsValid() - only wants online accounts with timestamp that matches block's timestamp to avoid re-verifying sigs + public List getOnlineAccounts(long onlineTimestamp) { + return new ArrayList<>(Set.copyOf(this.currentOnlineAccounts.getOrDefault(onlineTimestamp, Collections.emptySet()))); } - /** Returns list of online accounts with timestamp recent enough to be considered currently online. */ + /** + * Returns list of online accounts with timestamp recent enough to be considered currently online. + */ + // API: calls this to return list of online accounts - probably expects ALL timestamps - but going to get 'current' from now on + // BlockMinter: only calls this to check whether returned list is empty or not, to determine whether minting is even possible or not public List getOnlineAccounts() { - final long onlineTimestamp = toOnlineAccountTimestamp(NTP.getTime()); + final Long now = NTP.getTime(); + if (now == null) + return Collections.emptyList(); - synchronized (this.onlineAccounts) { - return this.onlineAccounts.stream().filter(account -> account.getTimestamp() == onlineTimestamp).collect(Collectors.toList()); - } + final long onlineTimestamp = toOnlineAccountTimestamp(now); + + return getOnlineAccounts(onlineTimestamp); } + /** + * Returns cached, unmodifiable list of latest block's online accounts. + */ + // TODO: this needs tidying up - do we change method to only return latest timestamp's set? + // Block::areOnlineAccountsValid() - only wants online accounts with timestamp that matches latest / previous block's timestamp to avoid re-verifying sigs + public List getLatestBlocksOnlineAccounts(long blockOnlineTimestamp) { + Set onlineAccounts = this.latestBlocksOnlineAccounts.getOrDefault(blockOnlineTimestamp, Collections.emptySet()); - /** Returns cached, unmodifiable list of latest block's online accounts. */ - public List getLatestBlocksOnlineAccounts() { - synchronized (this.latestBlocksOnlineAccounts) { - return this.latestBlocksOnlineAccounts.peekFirst(); - } + return List.copyOf(onlineAccounts); } - /** Caches list of latest block's online accounts. Typically called by Block.process() */ + /** + * Caches list of latest block's online accounts. Typically called by Block.process() + */ + // TODO: is this simply a bulk add, like the import queue but blocking? Used by Synchronizer but could be for blocks that are quite historic? + // Block::process() - basically for adding latest block's online accounts to cache to avoid re-verifying when processing another block in the future public void pushLatestBlocksOnlineAccounts(List latestBlocksOnlineAccounts) { - synchronized (this.latestBlocksOnlineAccounts) { - if (this.latestBlocksOnlineAccounts.size() == MAX_BLOCKS_CACHED_ONLINE_ACCOUNTS) - this.latestBlocksOnlineAccounts.pollLast(); + if (latestBlocksOnlineAccounts == null || latestBlocksOnlineAccounts.isEmpty()) + return; - this.latestBlocksOnlineAccounts.addFirst(latestBlocksOnlineAccounts == null - ? Collections.emptyList() - : Collections.unmodifiableList(latestBlocksOnlineAccounts)); - } + long timestamp = latestBlocksOnlineAccounts.get(0).getTimestamp(); + + this.latestBlocksOnlineAccounts.computeIfAbsent(timestamp, k -> ConcurrentHashMap.newKeySet()).addAll(latestBlocksOnlineAccounts); + + if (this.latestBlocksOnlineAccounts.size() > MAX_BLOCKS_CACHED_ONLINE_ACCOUNTS) + this.latestBlocksOnlineAccounts.keySet().stream() + .sorted() + .findFirst() + .ifPresent(this.latestBlocksOnlineAccounts::remove); } - /** Reverts list of latest block's online accounts. Typically called by Block.orphan() */ + /** + * Reverts list of latest block's online accounts. Typically called by Block.orphan() + */ + // TODO: see above + // Block::orphan() - for removing latest block's online accounts from cache public void popLatestBlocksOnlineAccounts() { - synchronized (this.latestBlocksOnlineAccounts) { - this.latestBlocksOnlineAccounts.pollFirst(); - } + // NO-OP } @@ -438,45 +496,48 @@ public class OnlineAccountsManager extends Thread { List excludeAccounts = getOnlineAccountsMessage.getOnlineAccounts(); // Send online accounts info, excluding entries with matching timestamp & public key from excludeAccounts - List accountsToSend; - synchronized (this.onlineAccounts) { - accountsToSend = new ArrayList<>(this.onlineAccounts); - } + List accountsToSend = Set.copyOf(this.currentOnlineAccounts.values()).stream().flatMap(Set::stream).collect(Collectors.toList()); + int prefilterSize = accountsToSend.size(); Iterator iterator = accountsToSend.iterator(); - - SEND_ITERATOR: while (iterator.hasNext()) { OnlineAccountData onlineAccountData = iterator.next(); - for (int i = 0; i < excludeAccounts.size(); ++i) { - OnlineAccountData excludeAccountData = excludeAccounts.get(i); - + for (OnlineAccountData excludeAccountData : excludeAccounts) { if (onlineAccountData.getTimestamp() == excludeAccountData.getTimestamp() && Arrays.equals(onlineAccountData.getPublicKey(), excludeAccountData.getPublicKey())) { iterator.remove(); - continue SEND_ITERATOR; + break; } } } + if (accountsToSend.isEmpty()) + return; + Message onlineAccountsMessage = new OnlineAccountsMessage(accountsToSend); peer.sendMessage(onlineAccountsMessage); - LOGGER.trace(() -> String.format("Sent %d of our %d online accounts to %s", accountsToSend.size(), this.onlineAccounts.size(), peer)); + LOGGER.debug("Sent {} of our {} online accounts to {}", accountsToSend.size(), prefilterSize, peer); } public void onNetworkOnlineAccountsMessage(Peer peer, Message message) { OnlineAccountsMessage onlineAccountsMessage = (OnlineAccountsMessage) message; List peersOnlineAccounts = onlineAccountsMessage.getOnlineAccounts(); - LOGGER.trace(() -> String.format("Received %d online accounts from %s", peersOnlineAccounts.size(), peer)); + LOGGER.debug("Received {} online accounts from {}", peersOnlineAccounts.size(), peer); - try (final Repository repository = RepositoryManager.getRepository()) { - for (OnlineAccountData onlineAccountData : peersOnlineAccounts) - this.verifyAndAddAccount(repository, onlineAccountData); - } catch (DataException e) { - LOGGER.error(String.format("Repository issue while verifying online accounts from peer %s", peer), e); + int importCount = 0; + + // Add any online accounts to the queue that aren't already present + for (OnlineAccountData onlineAccountData : peersOnlineAccounts) { + boolean isNewEntry = onlineAccountsImportQueue.add(onlineAccountData); + + if (isNewEntry) + importCount++; } + + if (importCount > 0) + LOGGER.debug("Added {} online accounts to queue", importCount); } public void onNetworkGetOnlineAccountsV2Message(Peer peer, Message message) { @@ -485,58 +546,106 @@ public class OnlineAccountsManager extends Thread { List excludeAccounts = getOnlineAccountsMessage.getOnlineAccounts(); // Send online accounts info, excluding entries with matching timestamp & public key from excludeAccounts - List accountsToSend; - synchronized (this.onlineAccounts) { - accountsToSend = new ArrayList<>(this.onlineAccounts); - } + List accountsToSend = Set.copyOf(this.currentOnlineAccounts.values()).stream().flatMap(Set::stream).collect(Collectors.toList()); + int prefilterSize = accountsToSend.size(); Iterator iterator = accountsToSend.iterator(); - - SEND_ITERATOR: while (iterator.hasNext()) { OnlineAccountData onlineAccountData = iterator.next(); - for (int i = 0; i < excludeAccounts.size(); ++i) { - OnlineAccountData excludeAccountData = excludeAccounts.get(i); - + for (OnlineAccountData excludeAccountData : excludeAccounts) { if (onlineAccountData.getTimestamp() == excludeAccountData.getTimestamp() && Arrays.equals(onlineAccountData.getPublicKey(), excludeAccountData.getPublicKey())) { iterator.remove(); - continue SEND_ITERATOR; + break; } } } + if (accountsToSend.isEmpty()) + return; + Message onlineAccountsMessage = new OnlineAccountsV2Message(accountsToSend); peer.sendMessage(onlineAccountsMessage); - LOGGER.trace(() -> String.format("Sent %d of our %d online accounts to %s", accountsToSend.size(), this.onlineAccounts.size(), peer)); + LOGGER.debug("Sent {} of our {} online accounts to {}", accountsToSend.size(), prefilterSize, peer); } public void onNetworkOnlineAccountsV2Message(Peer peer, Message message) { OnlineAccountsV2Message onlineAccountsMessage = (OnlineAccountsV2Message) message; List peersOnlineAccounts = onlineAccountsMessage.getOnlineAccounts(); - LOGGER.debug(String.format("Received %d online accounts from %s", peersOnlineAccounts.size(), peer)); + LOGGER.debug("Received {} online accounts from {}", peersOnlineAccounts.size(), peer); int importCount = 0; // Add any online accounts to the queue that aren't already present for (OnlineAccountData onlineAccountData : peersOnlineAccounts) { + boolean isNewEntry = onlineAccountsImportQueue.add(onlineAccountData); - // Do we already know about this online account data? - if (onlineAccounts.contains(onlineAccountData)) { - continue; - } - - // Is it already in the import queue? - if (onlineAccountsImportQueue.contains(onlineAccountData)) { - continue; - } - - onlineAccountsImportQueue.add(onlineAccountData); - importCount++; + if (isNewEntry) + importCount++; } - LOGGER.debug(String.format("Added %d online accounts to queue", importCount)); + if (importCount > 0) + LOGGER.debug("Added {} online accounts to queue", importCount); + } + + public void onNetworkGetOnlineAccountsV3Message(Peer peer, Message message) { + GetOnlineAccountsV3Message getOnlineAccountsMessage = (GetOnlineAccountsV3Message) message; + + Map> peersHashes = getOnlineAccountsMessage.getHashesByTimestampThenByte(); + List outgoingOnlineAccounts = new ArrayList<>(); + + // Warning: no double-checking/fetching - we must be ConcurrentMap compatible! + // So no contains()-then-get() or multiple get()s on the same key/map. + // We also use getOrDefault() with emptySet() on currentOnlineAccounts in case corresponding timestamp entry isn't there. + for (var ourOuterMapEntry : currentOnlineAccountsHashes.entrySet()) { + Long timestamp = ourOuterMapEntry.getKey(); + + var ourInnerMap = ourOuterMapEntry.getValue(); + var peersInnerMap = peersHashes.get(timestamp); + + if (peersInnerMap == null) { + // Peer doesn't have this timestamp, so if it's valid (i.e. not too old) then we'd have to send all of ours + Set timestampsOnlineAccounts = this.currentOnlineAccounts.getOrDefault(timestamp, Collections.emptySet()); + outgoingOnlineAccounts.addAll(timestampsOnlineAccounts); + + LOGGER.debug(() -> String.format("Going to send all %d online accounts for timestamp %d", timestampsOnlineAccounts.size(), timestamp)); + } else { + // Quick cache of which leading bytes to send so we only have to filter once + Set outgoingLeadingBytes = new HashSet<>(); + + // We have entries for this timestamp so compare against peer's entries + for (var ourInnerMapEntry : ourInnerMap.entrySet()) { + Byte leadingByte = ourInnerMapEntry.getKey(); + byte[] peersHash = peersInnerMap.get(leadingByte); + + if (!Arrays.equals(ourInnerMapEntry.getValue(), peersHash)) { + // For this leading byte: hashes don't match or peer doesn't have entry + // Send all online accounts for this timestamp and leading byte + outgoingLeadingBytes.add(leadingByte); + } + } + + int beforeAddSize = outgoingOnlineAccounts.size(); + + this.currentOnlineAccounts.getOrDefault(timestamp, Collections.emptySet()).stream() + .filter(account -> outgoingLeadingBytes.contains(account.getPublicKey()[0])) + .forEach(outgoingOnlineAccounts::add); + + if (outgoingLeadingBytes.size() > beforeAddSize) + LOGGER.debug(String.format("Going to send %d online accounts for timestamp %d and leading bytes %s", + outgoingOnlineAccounts.size() - beforeAddSize, + timestamp, + outgoingLeadingBytes.stream().sorted(Byte::compareUnsigned).map(leadingByte -> String.format("%02x", leadingByte)).collect(Collectors.joining(", ")) + ) + ); + } + } + + Message onlineAccountsMessage = new OnlineAccountsV2Message(outgoingOnlineAccounts); // TODO: V3 message + peer.sendMessage(onlineAccountsMessage); + + LOGGER.debug("Sent {} online accounts to {}", outgoingOnlineAccounts.size(), peer); } } diff --git a/src/main/java/org/qortal/data/network/OnlineAccountData.java b/src/main/java/org/qortal/data/network/OnlineAccountData.java index 15792307..99c181ba 100644 --- a/src/main/java/org/qortal/data/network/OnlineAccountData.java +++ b/src/main/java/org/qortal/data/network/OnlineAccountData.java @@ -5,6 +5,7 @@ import java.util.Arrays; import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlTransient; import org.qortal.account.PublicKeyAccount; @@ -16,6 +17,9 @@ public class OnlineAccountData { protected byte[] signature; protected byte[] publicKey; + @XmlTransient + private int hash; + // Constructors // necessary for JAXB serialization @@ -74,8 +78,13 @@ public class OnlineAccountData { @Override public int hashCode() { - // Pretty lazy implementation - return (int) this.timestamp; + int h = this.hash; + if (h == 0) { + this.hash = h = Long.hashCode(this.timestamp) + ^ Arrays.hashCode(this.publicKey) + ^ Arrays.hashCode(this.signature); + } + return h; } } diff --git a/src/test/java/org/qortal/test/network/OnlineAccountsV3Tests.java b/src/test/java/org/qortal/test/network/OnlineAccountsV3Tests.java index 3394213b..6136c1e1 100644 --- a/src/test/java/org/qortal/test/network/OnlineAccountsV3Tests.java +++ b/src/test/java/org/qortal/test/network/OnlineAccountsV3Tests.java @@ -1,17 +1,14 @@ package org.qortal.test.network; -import com.google.common.primitives.Ints; -import com.google.common.primitives.Longs; import org.bouncycastle.jce.provider.BouncyCastleProvider; import org.bouncycastle.jsse.provider.BouncyCastleJsseProvider; import org.junit.Ignore; import org.junit.Test; +import org.qortal.controller.OnlineAccountsManager; import org.qortal.data.network.OnlineAccountData; import org.qortal.network.message.*; import org.qortal.transform.Transformer; -import java.io.ByteArrayOutputStream; -import java.io.IOException; import java.nio.ByteBuffer; import java.security.Security; import java.util.*; @@ -74,24 +71,12 @@ public class OnlineAccountsV3Tests { hashesByTimestampThenByte .computeIfAbsent(timestamp, k -> new HashMap<>()) - .compute(leadingByte, (k, v) -> xorByteArrayInPlace(v, onlineAccountData.getPublicKey())); + .compute(leadingByte, (k, v) -> OnlineAccountsManager.xorByteArrayInPlace(v, onlineAccountData.getPublicKey())); } return hashesByTimestampThenByte; } - // TODO: This needs to be moved - probably to be OnlineAccountsManager - private static byte[] xorByteArrayInPlace(byte[] inplaceArray, byte[] otherArray) { - if (inplaceArray == null) - return Arrays.copyOf(otherArray, otherArray.length); - - // Start from index 1 to enforce static leading byte - for (int i = 1; i < otherArray.length; i++) - inplaceArray[i] ^= otherArray[otherArray.length - i - 1]; - - return inplaceArray; - } - @Test public void testOnGetOnlineAccountsV3() { List ourOnlineAccounts = generateOnlineAccounts(false);