Browse Source

OnlineAccountsV3:

Move online account cache code from Block into OnlineAccountsManager, simplifying Block code and removing duplicated caches from Block also.
This tidies up those remaining set-based getters in OnlineAccountsManager.
No need for currentOnlineAccountsHashes's inner Map to be sorted so addAccounts() creates new ConcurentHashMap insteaad of ConcurrentSkipListMap.

Changed GetOnlineAccountsV3Message to use a single byte for count of hashes as it can only be 1 to 256.
256 is represented by 0.

Comments tidy-up.
Change v3 broadcast interval from 10s to 15s.
reduce-reward-shares
catbref 2 years ago
parent
commit
712c4463f7
  1. 44
      src/main/java/org/qortal/block/Block.java
  2. 90
      src/main/java/org/qortal/controller/OnlineAccountsManager.java
  3. 36
      src/main/java/org/qortal/network/message/GetOnlineAccountsV3Message.java

44
src/main/java/org/qortal/block/Block.java

@ -221,11 +221,10 @@ public class Block {
return accountAmount;
}
}
/** Always use getExpandedAccounts() to access this, as it's lazy-instantiated. */
private List<ExpandedAccount> cachedExpandedAccounts = null;
/** Opportunistic cache of this block's valid online accounts. Only created by call to isValid(). */
private List<OnlineAccountData> cachedValidOnlineAccounts = null;
/** Opportunistic cache of this block's valid online reward-shares. Only created by call to isValid(). */
private List<RewardShareData> cachedOnlineRewardShares = null;
@ -1020,42 +1019,31 @@ public class Block {
long onlineTimestamp = this.blockData.getOnlineAccountsTimestamp();
byte[] onlineTimestampBytes = Longs.toByteArray(onlineTimestamp);
// If this block is much older than current online timestamp, then there's no point checking current online accounts
List<OnlineAccountData> currentOnlineAccounts = onlineTimestamp < NTP.getTime() - OnlineAccountsManager.ONLINE_TIMESTAMP_MODULUS
? null
: OnlineAccountsManager.getInstance().getOnlineAccounts(onlineTimestamp);
List<OnlineAccountData> latestBlocksOnlineAccounts = OnlineAccountsManager.getInstance().getLatestBlocksOnlineAccounts(onlineTimestamp);
// Extract online accounts' timestamp signatures from block data
List<byte[]> onlineAccountsSignatures = BlockTransformer.decodeTimestampSignatures(this.blockData.getOnlineAccountsSignatures());
// We'll build up a list of online accounts to hand over to Controller if block is added to chain
// and this will become latestBlocksOnlineAccounts (above) to reduce CPU load when we process next block...
List<OnlineAccountData> ourOnlineAccounts = new ArrayList<>();
// Convert
Set<OnlineAccountData> onlineAccounts = new HashSet<>();
for (int i = 0; i < onlineAccountsSignatures.size(); ++i) {
byte[] signature = onlineAccountsSignatures.get(i);
byte[] publicKey = onlineRewardShares.get(i).getRewardSharePublicKey();
OnlineAccountData onlineAccountData = new OnlineAccountData(onlineTimestamp, signature, publicKey);
ourOnlineAccounts.add(onlineAccountData);
onlineAccounts.add(onlineAccountData);
}
// If signature is still current then no need to perform Ed25519 verify
if (currentOnlineAccounts != null && currentOnlineAccounts.remove(onlineAccountData))
// remove() returned true, so online account still current
// and one less entry in currentOnlineAccounts to check next time
continue;
// Remove those already validated & cached by online accounts manager - no need to re-validate them
OnlineAccountsManager.getInstance().removeKnown(onlineAccounts, onlineTimestamp);
// If signature was okay in latest block then no need to perform Ed25519 verify
if (latestBlocksOnlineAccounts != null && latestBlocksOnlineAccounts.contains(onlineAccountData))
continue;
if (!Crypto.verify(publicKey, signature, onlineTimestampBytes))
// Validate the rest
for (OnlineAccountData onlineAccount : onlineAccounts)
if (!Crypto.verify(onlineAccount.getPublicKey(), onlineAccount.getSignature(), onlineTimestampBytes))
return ValidationResult.ONLINE_ACCOUNT_SIGNATURE_INCORRECT;
}
// We've validated these, so allow online accounts manager to cache
OnlineAccountsManager.getInstance().addBlocksOnlineAccounts(onlineAccounts, onlineTimestamp);
// All online accounts valid, so save our list of online accounts for potential later use
this.cachedValidOnlineAccounts = ourOnlineAccounts;
this.cachedOnlineRewardShares = onlineRewardShares;
return ValidationResult.OK;
@ -1426,9 +1414,6 @@ public class Block {
postBlockTidy();
// Give Controller our cached, valid online accounts data (if any) to help reduce CPU load for next block
OnlineAccountsManager.getInstance().pushLatestBlocksOnlineAccounts(this.cachedValidOnlineAccounts);
// Log some debugging info relating to the block weight calculation
this.logDebugInfo();
}
@ -1644,9 +1629,6 @@ public class Block {
this.blockData.setHeight(null);
postBlockTidy();
// Remove any cached, valid online accounts data from Controller
OnlineAccountsManager.getInstance().popLatestBlocksOnlineAccounts();
}
protected void orphanTransactionsFromBlock() throws DataException {

90
src/main/java/org/qortal/controller/OnlineAccountsManager.java

@ -6,6 +6,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.qortal.account.Account;
import org.qortal.account.PrivateKeyAccount;
import org.qortal.block.Block;
import org.qortal.block.BlockChain;
import org.qortal.crypto.Crypto;
import org.qortal.data.account.MintingAccountData;
@ -49,7 +50,7 @@ public class OnlineAccountsManager {
private static final long ONLINE_ACCOUNTS_QUEUE_INTERVAL = 100L; //ms
private static final long ONLINE_ACCOUNTS_TASKS_INTERVAL = 10 * 1000L; // ms
private static final long ONLINE_ACCOUNTS_LEGACY_BROADCAST_INTERVAL = 60 * 1000L; // ms
private static final long ONLINE_ACCOUNTS_BROADCAST_INTERVAL = 10 * 1000L; // ms
private static final long ONLINE_ACCOUNTS_BROADCAST_INTERVAL = 15 * 1000L; // ms
private static final long ONLINE_ACCOUNTS_V2_PEER_VERSION = 0x0300020000L; // v3.2.0
private static final long ONLINE_ACCOUNTS_V3_PEER_VERSION = 0x03000200cbL; // v3.2.203
@ -65,17 +66,14 @@ public class OnlineAccountsManager {
private final Map<Long, Set<OnlineAccountData>> currentOnlineAccounts = new ConcurrentHashMap<>();
/**
* Cache of hash-summary of 'current' online accounts, keyed by timestamp, then leading byte of public key.
* <p>
* Inner map is also sorted using {@code Byte::compareUnsigned} as a comparator.
* This is critical for proper function of GET_ONLINE_ACCOUNTS_V3 protocol.
*/
private final Map<Long, Map<Byte, byte[]>> currentOnlineAccountsHashes = new ConcurrentHashMap<>();
/**
* Cache of online accounts for latest blocks - not necessarily 'current' / now.
* Probably only accessed / modified by a single Synchronizer thread.
* <i>Probably</i> only accessed / modified by a single Synchronizer thread.
*/
private final Map<Long, Set<OnlineAccountData>> latestBlocksOnlineAccounts = new ConcurrentHashMap<>();
private final SortedMap<Long, Set<OnlineAccountData>> latestBlocksOnlineAccounts = new ConcurrentSkipListMap<>();
public static long toOnlineAccountTimestamp(long timestamp) {
return (timestamp / ONLINE_TIMESTAMP_MODULUS) * ONLINE_TIMESTAMP_MODULUS;
@ -157,7 +155,7 @@ public class OnlineAccountsManager {
if (isStopping)
return;
boolean isValid = this.validateAccount(repository, onlineAccountData);
boolean isValid = this.isValidCurrentAccount(repository, onlineAccountData);
if (isValid)
onlineAccountsToAdd.add(onlineAccountData);
@ -170,7 +168,7 @@ public class OnlineAccountsManager {
LOGGER.error("Repository issue while verifying online accounts", e);
}
LOGGER.debug("Adding {} validated online accounts from import queue", onlineAccountsToAdd.size());
LOGGER.debug("Merging {} validated online accounts from import queue", onlineAccountsToAdd.size());
addAccounts(onlineAccountsToAdd);
}
@ -182,13 +180,12 @@ public class OnlineAccountsManager {
// Start from index 1 to enforce static leading byte
for (int i = 1; i < otherArray.length; i++)
// inplaceArray[i] ^= otherArray[otherArray.length - i - 1];
inplaceArray[i] ^= otherArray[i];
return inplaceArray;
}
private boolean validateAccount(Repository repository, OnlineAccountData onlineAccountData) throws DataException {
private static boolean isValidCurrentAccount(Repository repository, OnlineAccountData onlineAccountData) throws DataException {
final Long now = NTP.getTime();
if (now == null)
return false;
@ -202,7 +199,7 @@ public class OnlineAccountsManager {
return false;
}
// Verify
// Verify signature
byte[] data = Longs.toByteArray(onlineAccountData.getTimestamp());
if (!Crypto.verify(rewardSharePublicKey, onlineAccountData.getSignature(), data)) {
LOGGER.trace(() -> String.format("Rejecting invalid online account %s", Base58.encode(rewardSharePublicKey)));
@ -241,7 +238,7 @@ public class OnlineAccountsManager {
for (var entry : hashesToRebuild.entrySet()) {
Long timestamp = entry.getKey();
LOGGER.debug(String.format("Rehashing for timestamp %d and leading bytes %s",
LOGGER.debug(() -> String.format("Rehashing for timestamp %d and leading bytes %s",
timestamp,
entry.getValue().stream().sorted(Byte::compareUnsigned).map(leadingByte -> String.format("%02x", leadingByte)).collect(Collectors.joining(", "))
)
@ -253,7 +250,7 @@ public class OnlineAccountsManager {
.filter(publicKey -> leadingByte == publicKey[0])
.reduce(null, OnlineAccountsManager::xorByteArrayInPlace);
currentOnlineAccountsHashes.computeIfAbsent(timestamp, k -> new ConcurrentSkipListMap<>(Byte::compareUnsigned)).put(leadingByte, pubkeyHash);
currentOnlineAccountsHashes.computeIfAbsent(timestamp, k -> new ConcurrentHashMap<>()).put(leadingByte, pubkeyHash);
LOGGER.trace(() -> String.format("Rebuilt hash %s for timestamp %d and leading byte %02x using %d public keys",
HashCode.fromBytes(pubkeyHash),
@ -447,44 +444,53 @@ public class OnlineAccountsManager {
return getOnlineAccounts(onlineTimestamp);
}
/**
* Returns cached, unmodifiable list of latest block's online accounts.
*/
// TODO: this needs tidying up - do we change method to only return latest timestamp's set?
// Block::areOnlineAccountsValid() - only wants online accounts with timestamp that matches latest / previous block's timestamp to avoid re-verifying sigs
public List<OnlineAccountData> getLatestBlocksOnlineAccounts(long blockOnlineTimestamp) {
Set<OnlineAccountData> onlineAccounts = this.latestBlocksOnlineAccounts.getOrDefault(blockOnlineTimestamp, Collections.emptySet());
return List.copyOf(onlineAccounts);
}
// Block processing
/**
* Caches list of latest block's online accounts. Typically called by Block.process()
* Removes previously validated entries from block's online accounts.
* <p>
* Checks both 'current' and block caches.
* <p>
* Typically called by {@link Block#areOnlineAccountsValid()}
*/
// TODO: is this simply a bulk add, like the import queue but blocking? Used by Synchronizer but could be for blocks that are quite historic?
// Block::process() - basically for adding latest block's online accounts to cache to avoid re-verifying when processing another block in the future
public void pushLatestBlocksOnlineAccounts(List<OnlineAccountData> latestBlocksOnlineAccounts) {
if (latestBlocksOnlineAccounts == null || latestBlocksOnlineAccounts.isEmpty())
return;
public void removeKnown(Set<OnlineAccountData> blocksOnlineAccounts, Long timestamp) {
Set<OnlineAccountData> onlineAccounts = this.currentOnlineAccounts.get(timestamp);
long timestamp = latestBlocksOnlineAccounts.get(0).getTimestamp();
// If not 'current' timestamp - try block cache instead
if (onlineAccounts == null)
onlineAccounts = this.latestBlocksOnlineAccounts.get(timestamp);
this.latestBlocksOnlineAccounts.computeIfAbsent(timestamp, k -> ConcurrentHashMap.newKeySet()).addAll(latestBlocksOnlineAccounts);
if (this.latestBlocksOnlineAccounts.size() > MAX_BLOCKS_CACHED_ONLINE_ACCOUNTS)
this.latestBlocksOnlineAccounts.keySet().stream()
.sorted()
.findFirst()
.ifPresent(this.latestBlocksOnlineAccounts::remove);
if (onlineAccounts != null)
blocksOnlineAccounts.removeAll(onlineAccounts);
}
/**
* Reverts list of latest block's online accounts. Typically called by Block.orphan()
* Adds block's online accounts to one of OnlineAccountManager's caches.
* <p>
* It is assumed that the online accounts have been verified.
* <p>
* Typically called by {@link Block#areOnlineAccountsValid()}
*/
// TODO: see above
// Block::orphan() - for removing latest block's online accounts from cache
public void popLatestBlocksOnlineAccounts() {
// NO-OP
public void addBlocksOnlineAccounts(Set<OnlineAccountData> blocksOnlineAccounts, Long timestamp) {
// We want to add to 'current' in preference if possible
if (this.currentOnlineAccounts.containsKey(timestamp)) {
addAccounts(blocksOnlineAccounts);
return;
}
// Add to block cache instead
this.latestBlocksOnlineAccounts.computeIfAbsent(timestamp, k -> ConcurrentHashMap.newKeySet())
.addAll(blocksOnlineAccounts);
// If block cache has grown too large then we need to trim.
if (this.latestBlocksOnlineAccounts.size() > MAX_BLOCKS_CACHED_ONLINE_ACCOUNTS) {
// However, be careful to trim the opposite end to the entry we just added!
Long firstKey = this.latestBlocksOnlineAccounts.firstKey();
if (!firstKey.equals(timestamp))
this.latestBlocksOnlineAccounts.remove(firstKey);
else
this.latestBlocksOnlineAccounts.remove(this.latestBlocksOnlineAccounts.lastKey());
}
}

36
src/main/java/org/qortal/network/message/GetOnlineAccountsV3Message.java

@ -1,6 +1,5 @@
package org.qortal.network.message;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import org.qortal.transform.Transformer;
@ -11,11 +10,15 @@ import java.util.*;
/**
* For requesting online accounts info from remote peer, given our list of online accounts.
*
* Different format to V1 and V2:
* V1 is: number of entries, then timestamp + pubkey for each entry
* V2 is: groups of: number of entries, timestamp, then pubkey for each entry
* V3 is: groups of: timestamp, number of entries (one per leading byte), then hash(pubkeys) for each entry
* <p></p>
* Different format to V1 and V2:<br>
* <ul>
* <li>V1 is: number of entries, then timestamp + pubkey for each entry</li>
* <li>V2 is: groups of: number of entries, timestamp, then pubkey for each entry</li>
* <li>V3 is: groups of: timestamp, number of entries (one per leading byte), then hash(pubkeys) for each entry</li>
* </ul>
* <p></p>
* End
*/
public class GetOnlineAccountsV3Message extends Message {
@ -32,8 +35,7 @@ public class GetOnlineAccountsV3Message extends Message {
}
// We should know exactly how many bytes to allocate now
int byteSize = hashesByTimestampThenByte.size() * (Transformer.TIMESTAMP_LENGTH + Transformer.INT_LENGTH)
+ Transformer.TIMESTAMP_LENGTH /* trailing zero entry indicates end of entries */;
int byteSize = hashesByTimestampThenByte.size() * (Transformer.TIMESTAMP_LENGTH + Transformer.BYTE_LENGTH);
byteSize += hashesByTimestampThenByte.values()
.stream()
@ -50,15 +52,13 @@ public class GetOnlineAccountsV3Message extends Message {
var innerMap = outerMapEntry.getValue();
bytes.write(Ints.toByteArray(innerMap.size()));
// Number of entries: 1 - 256, where 256 is represented by 0
bytes.write(innerMap.size() & 0xFF);
for (byte[] hashBytes : innerMap.values()) {
bytes.write(hashBytes);
}
}
// end of records
bytes.write(Longs.toByteArray(0L));
} catch (IOException e) {
throw new AssertionError("IOException shouldn't occur with ByteArrayOutputStream");
}
@ -85,13 +85,15 @@ public class GetOnlineAccountsV3Message extends Message {
Map<Long, Map<Byte, byte[]>> hashesByTimestampThenByte = new HashMap<>();
while (true) {
while (bytes.hasRemaining()) {
long timestamp = bytes.getLong();
if (timestamp == 0)
// Zero timestamp indicates end of records
break;
int hashCount = bytes.getInt();
int hashCount = bytes.get();
if (hashCount <= 0)
// 256 is represented by 0.
// Also converts negative signed value (e.g. -1) to proper positive unsigned value (255)
hashCount += 256;
Map<Byte, byte[]> hashesByByte = new HashMap<>();
for (int i = 0; i < hashCount; ++i) {

Loading…
Cancel
Save