blocks minted adjustments removal is a new feature trigger

primary names are now used throughout the chat repository

numerous message handlers have been optimized, many message handlers are now getting added to a list and scheduled for processing and when they get processed, the database gets queried significantly less, because the message requests and responses are getting batched together for database access rather than querying the database one by one, the thread limits for these message types have been significantly increased, because each individual thread coming in does very little, all it does is add the message to a list to be scheduled at a later time
This commit is contained in:
kennycud 2025-06-06 19:01:09 -07:00
parent 7ccd06e5c3
commit 8e0e455d41
36 changed files with 1598 additions and 649 deletions

View File

@ -2093,12 +2093,12 @@ public String finalizeUpload(
}
} catch (IOException | ApiException | DataException e) {
LOGGER.warn(String.format("Unable to load %s %s: %s", service, name, e.getMessage()), e);
LOGGER.warn(String.format("Unable to load %s %s: %s", service, name, e.getMessage()));
if (!response.isCommitted()) {
throw ApiExceptionFactory.INSTANCE.createCustomException(request, ApiError.FILE_NOT_FOUND, e.getMessage());
}
} catch (NumberFormatException e) {
LOGGER.warn(String.format("Invalid range for %s %s: %s", service, name, e.getMessage()), e);
LOGGER.warn(String.format("Invalid range for %s %s: %s", service, name, e.getMessage()));
if (!response.isCommitted()) {
throw ApiExceptionFactory.INSTANCE.createCustomException(request, ApiError.INVALID_DATA, e.getMessage());
}

View File

@ -12,7 +12,9 @@ import org.bouncycastle.util.Strings;
import org.json.simple.JSONObject;
import org.qortal.api.model.CrossChainTradeLedgerEntry;
import org.qortal.api.model.crosschain.BitcoinyTBDRequest;
import org.qortal.asset.Asset;
import org.qortal.crosschain.*;
import org.qortal.data.account.AccountBalanceData;
import org.qortal.data.at.ATData;
import org.qortal.data.at.ATStateData;
import org.qortal.data.crosschain.*;
@ -30,6 +32,7 @@ import java.io.Writer;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -234,6 +237,9 @@ public class CrossChainUtils {
return bitcoiny.getBlockchainProvider().removeServer(server);
}
public static ChainableServer getCurrentServer( Bitcoiny bitcoiny ) {
return bitcoiny.getBlockchainProvider().getCurrentServer();
}
/**
* Set Current Server
*
@ -773,4 +779,46 @@ public class CrossChainUtils {
entries.add(ledgerEntry);
}
}
public static List<CrossChainTradeData> populateTradeDataList(Repository repository, ACCT acct, List<ATData> atDataList) throws DataException {
if(atDataList.isEmpty()) return new ArrayList<>(0);
List<ATStateData> latestATStates
= repository.getATRepository()
.getLatestATStates(
atDataList.stream()
.map(ATData::getATAddress)
.collect(Collectors.toList())
);
Map<String, ATStateData> atStateDataByAtAddress
= latestATStates.stream().collect(Collectors.toMap(ATStateData::getATAddress, Function.identity()));
Map<String, ATData> atDataByAtAddress
= atDataList.stream().collect(Collectors.toMap(ATData::getATAddress, Function.identity()));
Map<String, Long> balanceByAtAddress
= repository
.getAccountRepository()
.getBalances(new ArrayList<>(atDataByAtAddress.keySet()), Asset.QORT)
.stream().collect(Collectors.toMap(AccountBalanceData::getAddress, AccountBalanceData::getBalance));
List<CrossChainTradeData> crossChainTradeDataList = new ArrayList<>(latestATStates.size());
for( ATStateData atStateData : latestATStates ) {
ATData atData = atDataByAtAddress.get(atStateData.getATAddress());
crossChainTradeDataList.add(
acct.populateTradeData(
repository,
atData.getCreatorPublicKey(),
atData.getCreation(),
atStateData,
OptionalLong.of(balanceByAtAddress.get(atStateData.getATAddress()))
)
);
}
return crossChainTradeDataList;
}
}

View File

@ -1723,7 +1723,15 @@ public class Block {
accountData.setBlocksMinted(accountData.getBlocksMinted() + 1);
LOGGER.trace(() -> String.format("Block minter %s up to %d minted block%s", accountData.getAddress(), accountData.getBlocksMinted(), (accountData.getBlocksMinted() != 1 ? "s" : "")));
final int effectiveBlocksMinted = accountData.getBlocksMinted() + accountData.getBlocksMintedAdjustment() + accountData.getBlocksMintedPenalty();
int blocksMintedAdjustment
=
(this.blockData.getHeight() > BlockChain.getInstance().getMintedBlocksAdjustmentRemovalHeight())
?
0
:
accountData.getBlocksMintedAdjustment();
final int effectiveBlocksMinted = accountData.getBlocksMinted() + blocksMintedAdjustment + accountData.getBlocksMintedPenalty();
for (int newLevel = maximumLevel; newLevel >= 0; --newLevel)
if (effectiveBlocksMinted >= cumulativeBlocksByLevel.get(newLevel)) {
@ -2131,7 +2139,15 @@ public class Block {
accountData.setBlocksMinted(accountData.getBlocksMinted() - 1);
LOGGER.trace(() -> String.format("Block minter %s down to %d minted block%s", accountData.getAddress(), accountData.getBlocksMinted(), (accountData.getBlocksMinted() != 1 ? "s" : "")));
final int effectiveBlocksMinted = accountData.getBlocksMinted() + accountData.getBlocksMintedAdjustment() + accountData.getBlocksMintedPenalty();
int blocksMintedAdjustment
=
(this.blockData.getHeight() > BlockChain.getInstance().getMintedBlocksAdjustmentRemovalHeight())
?
0
:
accountData.getBlocksMintedAdjustment();
final int effectiveBlocksMinted = accountData.getBlocksMinted() + blocksMintedAdjustment + accountData.getBlocksMintedPenalty();
for (int newLevel = maximumLevel; newLevel >= 0; --newLevel)
if (effectiveBlocksMinted >= cumulativeBlocksByLevel.get(newLevel)) {

View File

@ -93,7 +93,8 @@ public class BlockChain {
nullGroupMembershipHeight,
ignoreLevelForRewardShareHeight,
adminQueryFixHeight,
multipleNamesPerAccountHeight
multipleNamesPerAccountHeight,
mintedBlocksAdjustmentRemovalHeight
}
// Custom transaction fees
@ -695,6 +696,10 @@ public class BlockChain {
return this.featureTriggers.get(FeatureTrigger.multipleNamesPerAccountHeight.name()).intValue();
}
public int getMintedBlocksAdjustmentRemovalHeight() {
return this.featureTriggers.get(FeatureTrigger.mintedBlocksAdjustmentRemovalHeight.name()).intValue();
}
// More complex getters for aspects that change by height or timestamp
public long getRewardAtHeight(int ourHeight) {

View File

@ -2,6 +2,7 @@ package org.qortal.controller;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.qortal.controller.arbitrary.PeerMessage;
import org.qortal.data.block.BlockData;
import org.qortal.data.transaction.TransactionData;
import org.qortal.network.Network;
@ -20,7 +21,11 @@ import org.qortal.utils.Base58;
import org.qortal.utils.NTP;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.stream.Collectors;
public class TransactionImporter extends Thread {
@ -50,6 +55,10 @@ public class TransactionImporter extends Thread {
/** Cached list of unconfirmed transactions, used when counting per creator. This is replaced regularly */
public static List<TransactionData> unconfirmedTransactionsCache = null;
public TransactionImporter() {
signatureMessageScheduler.scheduleAtFixedRate(this::processNetworkTransactionSignaturesMessage, 60, 1, TimeUnit.SECONDS);
getTransactionMessageScheduler.scheduleAtFixedRate(this::processNetworkGetTransactionMessages, 60, 1, TimeUnit.SECONDS);
}
public static synchronized TransactionImporter getInstance() {
if (instance == null) {
@ -371,36 +380,104 @@ public class TransactionImporter extends Thread {
}
}
// List to collect messages
private final List<PeerMessage> getTransactionMessageList = new ArrayList<>();
// Lock to synchronize access to the list
private final Object getTransactionMessageLock = new Object();
// Scheduled executor service to process messages every second
private final ScheduledExecutorService getTransactionMessageScheduler = Executors.newScheduledThreadPool(1);
public void onNetworkGetTransactionMessage(Peer peer, Message message) {
GetTransactionMessage getTransactionMessage = (GetTransactionMessage) message;
byte[] signature = getTransactionMessage.getSignature();
try (final Repository repository = RepositoryManager.getRepository()) {
synchronized (getTransactionMessageLock) {
getTransactionMessageList.add(new PeerMessage(peer, message));
}
}
private void processNetworkGetTransactionMessages() {
try {
List<PeerMessage> messagesToProcess;
synchronized (getTransactionMessageLock) {
messagesToProcess = new ArrayList<>(getTransactionMessageList);
getTransactionMessageList.clear();
}
if( messagesToProcess.isEmpty() ) return;
Map<String, PeerMessage> peerMessageBySignature58 = new HashMap<>(messagesToProcess.size());
for( PeerMessage peerMessage : messagesToProcess ) {
GetTransactionMessage getTransactionMessage = (GetTransactionMessage) peerMessage.getMessage();
byte[] signature = getTransactionMessage.getSignature();
peerMessageBySignature58.put(Base58.encode(signature), peerMessage);
}
// Firstly check the sig-valid transactions that are currently queued for import
TransactionData transactionData = this.getCachedSigValidTransactions().stream()
.filter(t -> Arrays.equals(signature, t.getSignature()))
.findFirst().orElse(null);
Map<String, TransactionData> transactionsCachedBySignature58
= this.getCachedSigValidTransactions().stream()
.collect(Collectors.toMap(t -> Base58.encode(t.getSignature()), Function.identity()));
if (transactionData == null) {
Map<Boolean, List<Map.Entry<String, PeerMessage>>> transactionsCachedBySignature58Partition
= peerMessageBySignature58.entrySet().stream()
.collect(Collectors.partitioningBy(entry -> transactionsCachedBySignature58.containsKey(entry.getKey())));
List<byte[]> signaturesNeeded
= transactionsCachedBySignature58Partition.get(false).stream()
.map(Map.Entry::getValue)
.map(PeerMessage::getMessage)
.map(message -> (GetTransactionMessage) message)
.map(GetTransactionMessage::getSignature)
.collect(Collectors.toList());
// transaction found in the import queue
Map<String, TransactionData> transactionsToSendBySignature58 = new HashMap<>(messagesToProcess.size());
for( Map.Entry<String, PeerMessage> entry : transactionsCachedBySignature58Partition.get(true)) {
transactionsToSendBySignature58.put(entry.getKey(), transactionsCachedBySignature58.get(entry.getKey()));
}
if( !signaturesNeeded.isEmpty() ) {
// Not found in import queue, so try the database
transactionData = repository.getTransactionRepository().fromSignature(signature);
try (final Repository repository = RepositoryManager.getRepository()) {
transactionsToSendBySignature58.putAll(
repository.getTransactionRepository().fromSignatures(signaturesNeeded).stream()
.collect(Collectors.toMap(transactionData -> Base58.encode(transactionData.getSignature()), Function.identity()))
);
} catch (DataException e) {
LOGGER.error(e.getMessage(), e);
}
}
if (transactionData == null) {
// Still not found - so we don't have this transaction
LOGGER.debug(() -> String.format("Ignoring GET_TRANSACTION request from peer %s for unknown transaction %s", peer, Base58.encode(signature)));
// Send no response at all???
return;
}
for( final Map.Entry<String, TransactionData> entry : transactionsToSendBySignature58.entrySet() ) {
Message transactionMessage = new TransactionMessage(transactionData);
PeerMessage peerMessage = peerMessageBySignature58.get(entry.getKey());
final Message message = peerMessage.getMessage();
final Peer peer = peerMessage.getPeer();
Runnable sendTransactionMessageRunner = () -> sendTransactionMessage(entry.getKey(), entry.getValue(), message, peer);
Thread sendTransactionMessageThread = new Thread(sendTransactionMessageRunner);
sendTransactionMessageThread.start();
}
} catch (Exception e) {
LOGGER.error(e.getMessage(),e);
}
}
private static void sendTransactionMessage(String signature58, TransactionData data, Message message, Peer peer) {
try {
Message transactionMessage = new TransactionMessage(data);
transactionMessage.setId(message.getId());
if (!peer.sendMessage(transactionMessage))
peer.disconnect("failed to send transaction");
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while sending transaction %s to peer %s", Base58.encode(signature), peer), e);
} catch (TransformationException e) {
LOGGER.error(String.format("Serialization issue while sending transaction %s to peer %s", Base58.encode(signature), peer), e);
}
catch (TransformationException e) {
LOGGER.error(String.format("Serialization issue while sending transaction %s to peer %s", signature58, peer), e);
}
catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
}
@ -421,44 +498,86 @@ public class TransactionImporter extends Thread {
}
}
// List to collect messages
private final List<PeerMessage> signatureMessageList = new ArrayList<>();
// Lock to synchronize access to the list
private final Object signatureMessageLock = new Object();
// Scheduled executor service to process messages every second
private final ScheduledExecutorService signatureMessageScheduler = Executors.newScheduledThreadPool(1);
public void onNetworkTransactionSignaturesMessage(Peer peer, Message message) {
TransactionSignaturesMessage transactionSignaturesMessage = (TransactionSignaturesMessage) message;
List<byte[]> signatures = transactionSignaturesMessage.getSignatures();
synchronized (signatureMessageLock) {
signatureMessageList.add(new PeerMessage(peer, message));
}
}
try (final Repository repository = RepositoryManager.getRepository()) {
for (byte[] signature : signatures) {
String signature58 = Base58.encode(signature);
if (invalidUnconfirmedTransactions.containsKey(signature58)) {
// Previously invalid transaction - don't keep requesting it
// It will be periodically removed from invalidUnconfirmedTransactions to allow for rechecks
continue;
}
public void processNetworkTransactionSignaturesMessage() {
// Ignore if this transaction is in the queue
if (incomingTransactionQueueContains(signature)) {
LOGGER.trace(() -> String.format("Ignoring existing queued transaction %s from peer %s", Base58.encode(signature), peer));
continue;
}
try {
List<PeerMessage> messagesToProcess;
synchronized (signatureMessageLock) {
messagesToProcess = new ArrayList<>(signatureMessageList);
signatureMessageList.clear();
}
// Do we have it already? (Before requesting transaction data itself)
if (repository.getTransactionRepository().exists(signature)) {
LOGGER.trace(() -> String.format("Ignoring existing transaction %s from peer %s", Base58.encode(signature), peer));
continue;
}
Map<String, byte[]> signatureBySignature58 = new HashMap<>(messagesToProcess.size() * 10);
Map<String, Peer> peerBySignature58 = new HashMap<>( messagesToProcess.size() * 10 );
// Check isInterrupted() here and exit fast
if (Thread.currentThread().isInterrupted())
return;
for( PeerMessage peerMessage : messagesToProcess ) {
// Fetch actual transaction data from peer
Message getTransactionMessage = new GetTransactionMessage(signature);
if (!peer.sendMessage(getTransactionMessage)) {
peer.disconnect("failed to request transaction");
return;
TransactionSignaturesMessage transactionSignaturesMessage = (TransactionSignaturesMessage) peerMessage.getMessage();
List<byte[]> signatures = transactionSignaturesMessage.getSignatures();
for (byte[] signature : signatures) {
String signature58 = Base58.encode(signature);
if (invalidUnconfirmedTransactions.containsKey(signature58)) {
// Previously invalid transaction - don't keep requesting it
// It will be periodically removed from invalidUnconfirmedTransactions to allow for rechecks
continue;
}
// Ignore if this transaction is in the queue
if (incomingTransactionQueueContains(signature)) {
LOGGER.trace(() -> String.format("Ignoring existing queued transaction %s from peer %s", Base58.encode(signature), peerMessage.getPeer()));
continue;
}
signatureBySignature58.put(signature58, signature);
peerBySignature58.put(signature58, peerMessage.getPeer());
}
}
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while processing unconfirmed transactions from peer %s", peer), e);
if( !signatureBySignature58.isEmpty() ) {
try (final Repository repository = RepositoryManager.getRepository()) {
// remove signatures in db already
repository.getTransactionRepository()
.fromSignatures(new ArrayList<>(signatureBySignature58.values())).stream()
.map(TransactionData::getSignature)
.map(signature -> Base58.encode(signature))
.forEach(signature58 -> signatureBySignature58.remove(signature58));
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while processing unconfirmed transactions from peer"), e);
}
}
// Check isInterrupted() here and exit fast
if (Thread.currentThread().isInterrupted())
return;
for (Map.Entry<String, byte[]> entry : signatureBySignature58.entrySet()) {
Peer peer = peerBySignature58.get(entry.getKey());
// Fetch actual transaction data from peer
Message getTransactionMessage = new GetTransactionMessage(entry.getValue());
if (peer != null && !peer.sendMessage(getTransactionMessage)) {
peer.disconnect("failed to request transaction");
}
}
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
}

View File

@ -25,6 +25,10 @@ import org.qortal.utils.NTP;
import org.qortal.utils.Triple;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.qortal.controller.arbitrary.ArbitraryDataFileManager.MAX_FILE_HASH_RESPONSES;
@ -73,6 +77,8 @@ public class ArbitraryDataFileListManager {
private ArbitraryDataFileListManager() {
getArbitraryDataFileListMessageScheduler.scheduleAtFixedRate(this::processNetworkGetArbitraryDataFileListMessage, 60, 1, TimeUnit.SECONDS);
arbitraryDataFileListMessageScheduler.scheduleAtFixedRate(this::processNetworkArbitraryDataFileListMessage, 60, 1, TimeUnit.SECONDS);
}
public static ArbitraryDataFileListManager getInstance() {
@ -413,70 +419,116 @@ public class ArbitraryDataFileListManager {
// Network handlers
// List to collect messages
private final List<PeerMessage> arbitraryDataFileListMessageList = new ArrayList<>();
// Lock to synchronize access to the list
private final Object arbitraryDataFileListMessageLock = new Object();
// Scheduled executor service to process messages every second
private final ScheduledExecutorService arbitraryDataFileListMessageScheduler = Executors.newScheduledThreadPool(1);
public void onNetworkArbitraryDataFileListMessage(Peer peer, Message message) {
// Don't process if QDN is disabled
if (!Settings.getInstance().isQdnEnabled()) {
return;
}
ArbitraryDataFileListMessage arbitraryDataFileListMessage = (ArbitraryDataFileListMessage) message;
LOGGER.debug("Received hash list from peer {} with {} hashes", peer, arbitraryDataFileListMessage.getHashes().size());
if (LOGGER.isDebugEnabled() && arbitraryDataFileListMessage.getRequestTime() != null) {
long totalRequestTime = NTP.getTime() - arbitraryDataFileListMessage.getRequestTime();
LOGGER.debug("totalRequestTime: {}, requestHops: {}, peerAddress: {}, isRelayPossible: {}",
totalRequestTime, arbitraryDataFileListMessage.getRequestHops(),
arbitraryDataFileListMessage.getPeerAddress(), arbitraryDataFileListMessage.isRelayPossible());
synchronized (arbitraryDataFileListMessageLock) {
arbitraryDataFileListMessageList.add(new PeerMessage(peer, message));
}
}
// Do we have a pending request for this data?
Triple<String, Peer, Long> request = arbitraryDataFileListRequests.get(message.getId());
if (request == null || request.getA() == null) {
return;
}
boolean isRelayRequest = (request.getB() != null);
private void processNetworkArbitraryDataFileListMessage() {
// Does this message's signature match what we're expecting?
byte[] signature = arbitraryDataFileListMessage.getSignature();
String signature58 = Base58.encode(signature);
if (!request.getA().equals(signature58)) {
return;
}
try {
List<PeerMessage> messagesToProcess;
synchronized (arbitraryDataFileListMessageLock) {
messagesToProcess = new ArrayList<>(arbitraryDataFileListMessageList);
arbitraryDataFileListMessageList.clear();
}
List<byte[]> hashes = arbitraryDataFileListMessage.getHashes();
if (hashes == null || hashes.isEmpty()) {
return;
}
if (messagesToProcess.isEmpty()) return;
ArbitraryTransactionData arbitraryTransactionData = null;
Map<String, PeerMessage> peerMessageBySignature58 = new HashMap<>(messagesToProcess.size());
Map<String, byte[]> signatureBySignature58 = new HashMap<>(messagesToProcess.size());
Map<String, Boolean> isRelayRequestBySignature58 = new HashMap<>(messagesToProcess.size());
Map<String, List<byte[]>> hashesBySignature58 = new HashMap<>(messagesToProcess.size());
Map<String, Triple<String, Peer, Long>> requestBySignature58 = new HashMap<>(messagesToProcess.size());
// Check transaction exists and hashes are correct
try (final Repository repository = RepositoryManager.getRepository()) {
TransactionData transactionData = repository.getTransactionRepository().fromSignature(signature);
if (!(transactionData instanceof ArbitraryTransactionData))
for (PeerMessage peerMessage : messagesToProcess) {
Peer peer = peerMessage.getPeer();
Message message = peerMessage.getMessage();
ArbitraryDataFileListMessage arbitraryDataFileListMessage = (ArbitraryDataFileListMessage) message;
LOGGER.debug("Received hash list from peer {} with {} hashes", peer, arbitraryDataFileListMessage.getHashes().size());
if (LOGGER.isDebugEnabled() && arbitraryDataFileListMessage.getRequestTime() != null) {
long totalRequestTime = NTP.getTime() - arbitraryDataFileListMessage.getRequestTime();
LOGGER.debug("totalRequestTime: {}, requestHops: {}, peerAddress: {}, isRelayPossible: {}",
totalRequestTime, arbitraryDataFileListMessage.getRequestHops(),
arbitraryDataFileListMessage.getPeerAddress(), arbitraryDataFileListMessage.isRelayPossible());
}
// Do we have a pending request for this data?
Triple<String, Peer, Long> request = arbitraryDataFileListRequests.get(message.getId());
if (request == null || request.getA() == null) {
continue;
}
boolean isRelayRequest = (request.getB() != null);
// Does this message's signature match what we're expecting?
byte[] signature = arbitraryDataFileListMessage.getSignature();
String signature58 = Base58.encode(signature);
if (!request.getA().equals(signature58)) {
continue;
}
List<byte[]> hashes = arbitraryDataFileListMessage.getHashes();
if (hashes == null || hashes.isEmpty()) {
continue;
}
peerMessageBySignature58.put(signature58, peerMessage);
signatureBySignature58.put(signature58, signature);
isRelayRequestBySignature58.put(signature58, isRelayRequest);
hashesBySignature58.put(signature58, hashes);
requestBySignature58.put(signature58, request);
}
if (signatureBySignature58.isEmpty()) return;
List<ArbitraryTransactionData> arbitraryTransactionDataList;
// Check transaction exists and hashes are correct
try (final Repository repository = RepositoryManager.getRepository()) {
arbitraryTransactionDataList
= repository.getTransactionRepository()
.fromSignatures(new ArrayList<>(signatureBySignature58.values())).stream()
.filter(data -> data instanceof ArbitraryTransactionData)
.map(data -> (ArbitraryTransactionData) data)
.collect(Collectors.toList());
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while finding arbitrary transaction data list"), e);
return;
}
arbitraryTransactionData = (ArbitraryTransactionData) transactionData;
for (ArbitraryTransactionData arbitraryTransactionData : arbitraryTransactionDataList) {
// // Load data file(s)
// ArbitraryDataFile arbitraryDataFile = ArbitraryDataFile.fromTransactionData(arbitraryTransactionData);
//
// // Check all hashes exist
// for (byte[] hash : hashes) {
// //LOGGER.debug("Received hash {}", Base58.encode(hash));
// if (!arbitraryDataFile.containsChunk(hash)) {
// // Check the hash against the complete file
// if (!Arrays.equals(arbitraryDataFile.getHash(), hash)) {
// LOGGER.info("Received non-matching chunk hash {} for signature {}. This could happen if we haven't obtained the metadata file yet.", Base58.encode(hash), signature58);
// return;
// }
// }
// }
byte[] signature = arbitraryTransactionData.getSignature();
String signature58 = Base58.encode(signature);
if (!isRelayRequest || !Settings.getInstance().isRelayModeEnabled()) {
Long now = NTP.getTime();
List<byte[]> hashes = hashesBySignature58.get(signature58);
PeerMessage peerMessage = peerMessageBySignature58.get(signature58);
Peer peer = peerMessage.getPeer();
Message message = peerMessage.getMessage();
ArbitraryDataFileListMessage arbitraryDataFileListMessage = (ArbitraryDataFileListMessage) message;
Boolean isRelayRequest = isRelayRequestBySignature58.get(signature58);
if (!isRelayRequest || !Settings.getInstance().isRelayModeEnabled()) {
Long now = NTP.getTime();
if (ArbitraryDataFileManager.getInstance().arbitraryDataFileHashResponses.size() < MAX_FILE_HASH_RESPONSES) {
// Keep track of the hashes this peer reports to have access to
for (byte[] hash : hashes) {
String hash58 = Base58.encode(hash);
@ -487,233 +539,303 @@ public class ArbitraryDataFileListManager {
ArbitraryFileListResponseInfo responseInfo = new ArbitraryFileListResponseInfo(hash58, signature58,
peer, now, arbitraryDataFileListMessage.getRequestTime(), requestHops);
ArbitraryDataFileManager.getInstance().arbitraryDataFileHashResponses.add(responseInfo);
ArbitraryDataFileManager.getInstance().addResponse(responseInfo);
}
// Keep track of the source peer, for direct connections
if (arbitraryDataFileListMessage.getPeerAddress() != null) {
ArbitraryDataFileManager.getInstance().addDirectConnectionInfoIfUnique(
new ArbitraryDirectConnectionInfo(signature, arbitraryDataFileListMessage.getPeerAddress(), hashes, now));
}
}
// Keep track of the source peer, for direct connections
if (arbitraryDataFileListMessage.getPeerAddress() != null) {
ArbitraryDataFileManager.getInstance().addDirectConnectionInfoIfUnique(
new ArbitraryDirectConnectionInfo(signature, arbitraryDataFileListMessage.getPeerAddress(), hashes, now));
}
}
// Forwarding
if (isRelayRequest && Settings.getInstance().isRelayModeEnabled()) {
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while finding arbitrary transaction data list for peer %s", peer), e);
}
boolean isBlocked = (arbitraryTransactionData == null || ListUtils.isNameBlocked(arbitraryTransactionData.getName()));
if (!isBlocked) {
Triple<String, Peer, Long> request = requestBySignature58.get(signature58);
Peer requestingPeer = request.getB();
if (requestingPeer != null) {
Long requestTime = arbitraryDataFileListMessage.getRequestTime();
Integer requestHops = arbitraryDataFileListMessage.getRequestHops();
// Forwarding
if (isRelayRequest && Settings.getInstance().isRelayModeEnabled()) {
boolean isBlocked = (arbitraryTransactionData == null || ListUtils.isNameBlocked(arbitraryTransactionData.getName()));
if (!isBlocked) {
Peer requestingPeer = request.getB();
if (requestingPeer != null) {
Long requestTime = arbitraryDataFileListMessage.getRequestTime();
Integer requestHops = arbitraryDataFileListMessage.getRequestHops();
// Add each hash to our local mapping so we know who to ask later
Long now = NTP.getTime();
for (byte[] hash : hashes) {
String hash58 = Base58.encode(hash);
ArbitraryRelayInfo relayInfo = new ArbitraryRelayInfo(hash58, signature58, peer, now, requestTime, requestHops);
ArbitraryDataFileManager.getInstance().addToRelayMap(relayInfo);
}
// Add each hash to our local mapping so we know who to ask later
Long now = NTP.getTime();
for (byte[] hash : hashes) {
String hash58 = Base58.encode(hash);
ArbitraryRelayInfo relayInfo = new ArbitraryRelayInfo(hash58, signature58, peer, now, requestTime, requestHops);
ArbitraryDataFileManager.getInstance().addToRelayMap(relayInfo);
}
// Bump requestHops if it exists
if (requestHops != null) {
requestHops++;
}
// Bump requestHops if it exists
if (requestHops != null) {
requestHops++;
}
ArbitraryDataFileListMessage forwardArbitraryDataFileListMessage;
ArbitraryDataFileListMessage forwardArbitraryDataFileListMessage;
// Remove optional parameters if the requesting peer doesn't support it yet
// A message with less statistical data is better than no message at all
if (!requestingPeer.isAtLeastVersion(MIN_PEER_VERSION_FOR_FILE_LIST_STATS)) {
forwardArbitraryDataFileListMessage = new ArbitraryDataFileListMessage(signature, hashes);
} else {
forwardArbitraryDataFileListMessage = new ArbitraryDataFileListMessage(signature, hashes, requestTime, requestHops,
arbitraryDataFileListMessage.getPeerAddress(), arbitraryDataFileListMessage.isRelayPossible());
}
forwardArbitraryDataFileListMessage.setId(message.getId());
// Remove optional parameters if the requesting peer doesn't support it yet
// A message with less statistical data is better than no message at all
if (!requestingPeer.isAtLeastVersion(MIN_PEER_VERSION_FOR_FILE_LIST_STATS)) {
forwardArbitraryDataFileListMessage = new ArbitraryDataFileListMessage(signature, hashes);
} else {
forwardArbitraryDataFileListMessage = new ArbitraryDataFileListMessage(signature, hashes, requestTime, requestHops,
arbitraryDataFileListMessage.getPeerAddress(), arbitraryDataFileListMessage.isRelayPossible());
}
forwardArbitraryDataFileListMessage.setId(message.getId());
// Forward to requesting peer
LOGGER.debug("Forwarding file list with {} hashes to requesting peer: {}", hashes.size(), requestingPeer);
if (!requestingPeer.sendMessage(forwardArbitraryDataFileListMessage)) {
requestingPeer.disconnect("failed to forward arbitrary data file list");
// Forward to requesting peer
LOGGER.debug("Forwarding file list with {} hashes to requesting peer: {}", hashes.size(), requestingPeer);
if (!requestingPeer.sendMessage(forwardArbitraryDataFileListMessage)) {
requestingPeer.disconnect("failed to forward arbitrary data file list");
}
}
}
}
}
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
}
// List to collect messages
private final List<PeerMessage> getArbitraryDataFileListMessageList = new ArrayList<>();
// Lock to synchronize access to the list
private final Object getArbitraryDataFileListMessageLock = new Object();
// Scheduled executor service to process messages every second
private final ScheduledExecutorService getArbitraryDataFileListMessageScheduler = Executors.newScheduledThreadPool(1);
public void onNetworkGetArbitraryDataFileListMessage(Peer peer, Message message) {
// Don't respond if QDN is disabled
if (!Settings.getInstance().isQdnEnabled()) {
return;
}
Controller.getInstance().stats.getArbitraryDataFileListMessageStats.requests.incrementAndGet();
GetArbitraryDataFileListMessage getArbitraryDataFileListMessage = (GetArbitraryDataFileListMessage) message;
byte[] signature = getArbitraryDataFileListMessage.getSignature();
String signature58 = Base58.encode(signature);
Long now = NTP.getTime();
Triple<String, Peer, Long> newEntry = new Triple<>(signature58, peer, now);
// If we've seen this request recently, then ignore
if (arbitraryDataFileListRequests.putIfAbsent(message.getId(), newEntry) != null) {
LOGGER.trace("Ignoring hash list request from peer {} for signature {}", peer, signature58);
return;
synchronized (getArbitraryDataFileListMessageLock) {
getArbitraryDataFileListMessageList.add(new PeerMessage(peer, message));
}
}
List<byte[]> requestedHashes = getArbitraryDataFileListMessage.getHashes();
int hashCount = requestedHashes != null ? requestedHashes.size() : 0;
String requestingPeer = getArbitraryDataFileListMessage.getRequestingPeer();
private void processNetworkGetArbitraryDataFileListMessage() {
if (requestingPeer != null) {
LOGGER.debug("Received hash list request with {} hashes from peer {} (requesting peer {}) for signature {}", hashCount, peer, requestingPeer, signature58);
}
else {
LOGGER.debug("Received hash list request with {} hashes from peer {} for signature {}", hashCount, peer, signature58);
}
try {
List<PeerMessage> messagesToProcess;
synchronized (getArbitraryDataFileListMessageLock) {
messagesToProcess = new ArrayList<>(getArbitraryDataFileListMessageList);
getArbitraryDataFileListMessageList.clear();
}
List<byte[]> hashes = new ArrayList<>();
ArbitraryTransactionData transactionData = null;
boolean allChunksExist = false;
boolean hasMetadata = false;
if (messagesToProcess.isEmpty()) return;
try (final Repository repository = RepositoryManager.getRepository()) {
Map<String, byte[]> signatureBySignature58 = new HashMap<>(messagesToProcess.size());
Map<String, List<byte[]>> requestedHashesBySignature58 = new HashMap<>(messagesToProcess.size());
Map<String, String> requestingPeerBySignature58 = new HashMap<>(messagesToProcess.size());
Map<String, Long> nowBySignature58 = new HashMap<>((messagesToProcess.size()));
Map<String, PeerMessage> peerMessageBySignature58 = new HashMap<>(messagesToProcess.size());
// Firstly we need to lookup this file on chain to get a list of its hashes
transactionData = (ArbitraryTransactionData)repository.getTransactionRepository().fromSignature(signature);
if (transactionData instanceof ArbitraryTransactionData) {
for (PeerMessage messagePeer : messagesToProcess) {
Controller.getInstance().stats.getArbitraryDataFileListMessageStats.requests.incrementAndGet();
Message message = messagePeer.message;
Peer peer = messagePeer.peer;
GetArbitraryDataFileListMessage getArbitraryDataFileListMessage = (GetArbitraryDataFileListMessage) message;
byte[] signature = getArbitraryDataFileListMessage.getSignature();
String signature58 = Base58.encode(signature);
Long now = NTP.getTime();
Triple<String, Peer, Long> newEntry = new Triple<>(signature58, peer, now);
// If we've seen this request recently, then ignore
if (arbitraryDataFileListRequests.putIfAbsent(message.getId(), newEntry) != null) {
LOGGER.trace("Ignoring hash list request from peer {} for signature {}", peer, signature58);
continue;
}
List<byte[]> requestedHashes = getArbitraryDataFileListMessage.getHashes();
int hashCount = requestedHashes != null ? requestedHashes.size() : 0;
String requestingPeer = getArbitraryDataFileListMessage.getRequestingPeer();
if (requestingPeer != null) {
LOGGER.debug("Received hash list request with {} hashes from peer {} (requesting peer {}) for signature {}", hashCount, peer, requestingPeer, signature58);
} else {
LOGGER.debug("Received hash list request with {} hashes from peer {} for signature {}", hashCount, peer, signature58);
}
signatureBySignature58.put(signature58, signature);
requestedHashesBySignature58.put(signature58, requestedHashes);
requestingPeerBySignature58.put(signature58, requestingPeer);
nowBySignature58.put(signature58, now);
peerMessageBySignature58.put(signature58, messagePeer);
}
if (signatureBySignature58.isEmpty()) {
return;
}
List<byte[]> hashes = new ArrayList<>();
boolean allChunksExist = false;
boolean hasMetadata = false;
List<ArbitraryTransactionData> transactionDataList;
try (final Repository repository = RepositoryManager.getRepository()) {
// Firstly we need to lookup this file on chain to get a list of its hashes
transactionDataList
= repository.getTransactionRepository()
.fromSignatures(new ArrayList<>(signatureBySignature58.values())).stream()
.filter(data -> data instanceof ArbitraryTransactionData)
.map(data -> (ArbitraryTransactionData) data)
.collect(Collectors.toList());
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while fetching arbitrary file list for peer"), e);
return;
}
for (ArbitraryTransactionData transactionData : transactionDataList) {
byte[] signature = transactionData.getSignature();
String signature58 = Base58.encode(signature);
List<byte[]> requestedHashes = requestedHashesBySignature58.get(signature58);
// Check if we're even allowed to serve data for this transaction
if (ArbitraryDataStorageManager.getInstance().canStoreData(transactionData)) {
// Load file(s) and add any that exist to the list of hashes
ArbitraryDataFile arbitraryDataFile = ArbitraryDataFile.fromTransactionData(transactionData);
try {
// Load file(s) and add any that exist to the list of hashes
ArbitraryDataFile arbitraryDataFile = ArbitraryDataFile.fromTransactionData(transactionData);
// If the peer didn't supply a hash list, we need to return all hashes for this transaction
if (requestedHashes == null || requestedHashes.isEmpty()) {
requestedHashes = new ArrayList<>();
// If the peer didn't supply a hash list, we need to return all hashes for this transaction
if (requestedHashes == null || requestedHashes.isEmpty()) {
requestedHashes = new ArrayList<>();
// Add the metadata file
if (arbitraryDataFile.getMetadataHash() != null) {
requestedHashes.add(arbitraryDataFile.getMetadataHash());
hasMetadata = true;
// Add the metadata file
if (arbitraryDataFile.getMetadataHash() != null) {
requestedHashes.add(arbitraryDataFile.getMetadataHash());
hasMetadata = true;
}
// Add the chunk hashes
if (!arbitraryDataFile.getChunkHashes().isEmpty()) {
requestedHashes.addAll(arbitraryDataFile.getChunkHashes());
}
// Add complete file if there are no hashes
else {
requestedHashes.add(arbitraryDataFile.getHash());
}
}
// Add the chunk hashes
if (!arbitraryDataFile.getChunkHashes().isEmpty()) {
requestedHashes.addAll(arbitraryDataFile.getChunkHashes());
}
// Add complete file if there are no hashes
else {
requestedHashes.add(arbitraryDataFile.getHash());
// Assume all chunks exists, unless one can't be found below
allChunksExist = true;
for (byte[] requestedHash : requestedHashes) {
ArbitraryDataFileChunk chunk = ArbitraryDataFileChunk.fromHash(requestedHash, signature);
if (chunk.exists()) {
hashes.add(chunk.getHash());
//LOGGER.trace("Added hash {}", chunk.getHash58());
} else {
LOGGER.trace("Couldn't add hash {} because it doesn't exist", chunk.getHash58());
allChunksExist = false;
}
}
} catch (DataException e) {
LOGGER.error(e.getMessage(), e);
}
}
// If the only file we have is the metadata then we shouldn't respond. Most nodes will already have that,
// or can use the separate metadata protocol to fetch it. This should greatly reduce network spam.
if (hasMetadata && hashes.size() == 1) {
hashes.clear();
}
PeerMessage peerMessage = peerMessageBySignature58.get(signature58);
Peer peer = peerMessage.getPeer();
Message message = peerMessage.getMessage();
Long now = nowBySignature58.get(signature58);
// We should only respond if we have at least one hash
String requestingPeer = requestingPeerBySignature58.get(signature58);
if (!hashes.isEmpty()) {
// Firstly we should keep track of the requesting peer, to allow for potential direct connections later
ArbitraryDataFileManager.getInstance().addRecentDataRequest(requestingPeer);
// We have all the chunks, so update requests map to reflect that we've sent it
// There is no need to keep track of the request, as we can serve all the chunks
if (allChunksExist) {
Triple<String, Peer, Long> newEntry = new Triple<>(null, null, now);
arbitraryDataFileListRequests.put(message.getId(), newEntry);
}
// Assume all chunks exists, unless one can't be found below
allChunksExist = true;
String ourAddress = Network.getInstance().getOurExternalIpAddressAndPort();
ArbitraryDataFileListMessage arbitraryDataFileListMessage;
// Remove optional parameters if the requesting peer doesn't support it yet
// A message with less statistical data is better than no message at all
if (!peer.isAtLeastVersion(MIN_PEER_VERSION_FOR_FILE_LIST_STATS)) {
arbitraryDataFileListMessage = new ArbitraryDataFileListMessage(signature, hashes);
} else {
arbitraryDataFileListMessage = new ArbitraryDataFileListMessage(signature,
hashes, NTP.getTime(), 0, ourAddress, true);
}
arbitraryDataFileListMessage.setId(message.getId());
if (!peer.sendMessage(arbitraryDataFileListMessage)) {
LOGGER.debug("Couldn't send list of hashes");
peer.disconnect("failed to send list of hashes");
continue;
}
if (allChunksExist) {
// Nothing left to do, so return to prevent any unnecessary forwarding from occurring
LOGGER.debug("No need for any forwarding because file list request is fully served");
continue;
}
}
// We may need to forward this request on
boolean isBlocked = (transactionData == null || ListUtils.isNameBlocked(transactionData.getName()));
if (Settings.getInstance().isRelayModeEnabled() && !isBlocked) {
// In relay mode - so ask our other peers if they have it
GetArbitraryDataFileListMessage getArbitraryDataFileListMessage = (GetArbitraryDataFileListMessage) message;
long requestTime = getArbitraryDataFileListMessage.getRequestTime();
int requestHops = getArbitraryDataFileListMessage.getRequestHops() + 1;
long totalRequestTime = now - requestTime;
if (totalRequestTime < RELAY_REQUEST_MAX_DURATION) {
// Relay request hasn't timed out yet, so can potentially be rebroadcast
if (requestHops < RELAY_REQUEST_MAX_HOPS) {
// Relay request hasn't reached the maximum number of hops yet, so can be rebroadcast
Message relayGetArbitraryDataFileListMessage = new GetArbitraryDataFileListMessage(signature, hashes, requestTime, requestHops, requestingPeer);
relayGetArbitraryDataFileListMessage.setId(message.getId());
LOGGER.debug("Rebroadcasting hash list request from peer {} for signature {} to our other peers... totalRequestTime: {}, requestHops: {}", peer, Base58.encode(signature), totalRequestTime, requestHops);
Network.getInstance().broadcast(
broadcastPeer ->
!broadcastPeer.isAtLeastVersion(RELAY_MIN_PEER_VERSION) ? null :
broadcastPeer == peer || Objects.equals(broadcastPeer.getPeerData().getAddress().getHost(), peer.getPeerData().getAddress().getHost()) ? null : relayGetArbitraryDataFileListMessage
);
for (byte[] requestedHash : requestedHashes) {
ArbitraryDataFileChunk chunk = ArbitraryDataFileChunk.fromHash(requestedHash, signature);
if (chunk.exists()) {
hashes.add(chunk.getHash());
//LOGGER.trace("Added hash {}", chunk.getHash58());
} else {
LOGGER.trace("Couldn't add hash {} because it doesn't exist", chunk.getHash58());
allChunksExist = false;
// This relay request has reached the maximum number of allowed hops
}
} else {
// This relay request has timed out
}
}
}
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while fetching arbitrary file list for peer %s", peer), e);
}
// If the only file we have is the metadata then we shouldn't respond. Most nodes will already have that,
// or can use the separate metadata protocol to fetch it. This should greatly reduce network spam.
if (hasMetadata && hashes.size() == 1) {
hashes.clear();
}
// We should only respond if we have at least one hash
if (!hashes.isEmpty()) {
// Firstly we should keep track of the requesting peer, to allow for potential direct connections later
ArbitraryDataFileManager.getInstance().addRecentDataRequest(requestingPeer);
// We have all the chunks, so update requests map to reflect that we've sent it
// There is no need to keep track of the request, as we can serve all the chunks
if (allChunksExist) {
newEntry = new Triple<>(null, null, now);
arbitraryDataFileListRequests.put(message.getId(), newEntry);
}
String ourAddress = Network.getInstance().getOurExternalIpAddressAndPort();
ArbitraryDataFileListMessage arbitraryDataFileListMessage;
// Remove optional parameters if the requesting peer doesn't support it yet
// A message with less statistical data is better than no message at all
if (!peer.isAtLeastVersion(MIN_PEER_VERSION_FOR_FILE_LIST_STATS)) {
arbitraryDataFileListMessage = new ArbitraryDataFileListMessage(signature, hashes);
} else {
arbitraryDataFileListMessage = new ArbitraryDataFileListMessage(signature,
hashes, NTP.getTime(), 0, ourAddress, true);
}
arbitraryDataFileListMessage.setId(message.getId());
if (!peer.sendMessage(arbitraryDataFileListMessage)) {
LOGGER.debug("Couldn't send list of hashes");
peer.disconnect("failed to send list of hashes");
return;
}
LOGGER.debug("Sent list of hashes (count: {})", hashes.size());
if (allChunksExist) {
// Nothing left to do, so return to prevent any unnecessary forwarding from occurring
LOGGER.debug("No need for any forwarding because file list request is fully served");
return;
}
}
// We may need to forward this request on
boolean isBlocked = (transactionData == null || ListUtils.isNameBlocked(transactionData.getName()));
if (Settings.getInstance().isRelayModeEnabled() && !isBlocked) {
// In relay mode - so ask our other peers if they have it
long requestTime = getArbitraryDataFileListMessage.getRequestTime();
int requestHops = getArbitraryDataFileListMessage.getRequestHops() + 1;
long totalRequestTime = now - requestTime;
if (totalRequestTime < RELAY_REQUEST_MAX_DURATION) {
// Relay request hasn't timed out yet, so can potentially be rebroadcast
if (requestHops < RELAY_REQUEST_MAX_HOPS) {
// Relay request hasn't reached the maximum number of hops yet, so can be rebroadcast
Message relayGetArbitraryDataFileListMessage = new GetArbitraryDataFileListMessage(signature, hashes, requestTime, requestHops, requestingPeer);
relayGetArbitraryDataFileListMessage.setId(message.getId());
LOGGER.debug("Rebroadcasting hash list request from peer {} for signature {} to our other peers... totalRequestTime: {}, requestHops: {}", peer, Base58.encode(signature), totalRequestTime, requestHops);
Network.getInstance().broadcast(
broadcastPeer ->
!broadcastPeer.isAtLeastVersion(RELAY_MIN_PEER_VERSION) ? null :
broadcastPeer == peer || Objects.equals(broadcastPeer.getPeerData().getAddress().getHost(), peer.getPeerData().getAddress().getHost()) ? null : relayGetArbitraryDataFileListMessage
);
}
else {
// This relay request has reached the maximum number of allowed hops
}
}
else {
// This relay request has timed out
}
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
}

View File

@ -25,6 +25,8 @@ import java.security.SecureRandom;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
public class ArbitraryDataFileManager extends Thread {
@ -48,7 +50,7 @@ public class ArbitraryDataFileManager extends Thread {
/**
* List to keep track of any arbitrary data file hash responses
*/
public final List<ArbitraryFileListResponseInfo> arbitraryDataFileHashResponses = Collections.synchronizedList(new ArrayList<>());
private final List<ArbitraryFileListResponseInfo> arbitraryDataFileHashResponses = Collections.synchronizedList(new ArrayList<>());
/**
* List to keep track of peers potentially available for direct connections, based on recent requests
@ -67,6 +69,7 @@ public class ArbitraryDataFileManager extends Thread {
private ArbitraryDataFileManager() {
this.arbitraryDataFileHashResponseScheduler.scheduleAtFixedRate( this::processResponses, 60, 1, TimeUnit.SECONDS);
}
public static ArbitraryDataFileManager getInstance() {
@ -81,13 +84,6 @@ public class ArbitraryDataFileManager extends Thread {
Thread.currentThread().setName("Arbitrary Data File Manager");
try {
// Use a fixed thread pool to execute the arbitrary data file requests
int threadCount = 5;
ExecutorService arbitraryDataFileRequestExecutor = Executors.newFixedThreadPool(threadCount);
for (int i = 0; i < threadCount; i++) {
arbitraryDataFileRequestExecutor.execute(new ArbitraryDataFileRequestThread());
}
while (!isStopping) {
// Nothing to do yet
Thread.sleep(1000);
@ -112,7 +108,6 @@ public class ArbitraryDataFileManager extends Thread {
final long relayMinimumTimestamp = now - ArbitraryDataManager.getInstance().ARBITRARY_RELAY_TIMEOUT;
arbitraryRelayMap.removeIf(entry -> entry == null || entry.getTimestamp() == null || entry.getTimestamp() < relayMinimumTimestamp);
arbitraryDataFileHashResponses.removeIf(entry -> entry.getTimestamp() < relayMinimumTimestamp);
final long directConnectionInfoMinimumTimestamp = now - ArbitraryDataManager.getInstance().ARBITRARY_DIRECT_CONNECTION_INFO_TIMEOUT;
directConnectionInfo.removeIf(entry -> entry.getTimestamp() < directConnectionInfoMinimumTimestamp);
@ -125,8 +120,7 @@ public class ArbitraryDataFileManager extends Thread {
// Fetch data files by hash
public boolean fetchArbitraryDataFiles(Repository repository,
Peer peer,
public boolean fetchArbitraryDataFiles(Peer peer,
byte[] signature,
ArbitraryTransactionData arbitraryTransactionData,
List<byte[]> hashes) throws DataException {
@ -151,16 +145,10 @@ public class ArbitraryDataFileManager extends Thread {
if (receivedArbitraryDataFile != null) {
LOGGER.debug("Received data file {} from peer {}. Time taken: {} ms", receivedArbitraryDataFile.getHash58(), peer, (endTime-startTime));
receivedAtLeastOneFile = true;
// Remove this hash from arbitraryDataFileHashResponses now that we have received it
arbitraryDataFileHashResponses.remove(hash58);
}
else {
LOGGER.debug("Peer {} didn't respond with data file {} for signature {}. Time taken: {} ms", peer, Base58.encode(hash), Base58.encode(signature), (endTime-startTime));
// Remove this hash from arbitraryDataFileHashResponses now that we have failed to receive it
arbitraryDataFileHashResponses.remove(hash58);
// Stop asking for files from this peer
break;
}
@ -169,10 +157,6 @@ public class ArbitraryDataFileManager extends Thread {
LOGGER.trace("Already requesting data file {} for signature {} from peer {}", arbitraryDataFile, Base58.encode(signature), peer);
}
}
else {
// Remove this hash from arbitraryDataFileHashResponses because we have a local copy
arbitraryDataFileHashResponses.remove(hash58);
}
}
if (receivedAtLeastOneFile) {
@ -191,6 +175,38 @@ public class ArbitraryDataFileManager extends Thread {
return receivedAtLeastOneFile;
}
// Lock to synchronize access to the list
private final Object arbitraryDataFileHashResponseLock = new Object();
// Scheduled executor service to process messages every second
private final ScheduledExecutorService arbitraryDataFileHashResponseScheduler = Executors.newScheduledThreadPool(1);
public void addResponse( ArbitraryFileListResponseInfo responseInfo ) {
synchronized (arbitraryDataFileHashResponseLock) {
this.arbitraryDataFileHashResponses.add(responseInfo);
}
}
private void processResponses() {
try {
List<ArbitraryFileListResponseInfo> responsesToProcess;
synchronized (arbitraryDataFileHashResponseLock) {
responsesToProcess = new ArrayList<>(arbitraryDataFileHashResponses);
arbitraryDataFileHashResponses.clear();
}
if (responsesToProcess.isEmpty()) return;
Long now = NTP.getTime();
ArbitraryDataFileRequestThread.getInstance().processFileHashes(now, responsesToProcess, this);
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
}
private ArbitraryDataFile fetchArbitraryDataFile(Peer peer, Peer requestingPeer, ArbitraryTransactionData arbitraryTransactionData, byte[] signature, byte[] hash, Message originalMessage) throws DataException {
ArbitraryDataFile existingFile = ArbitraryDataFile.fromHash(hash, signature);
boolean fileAlreadyExists = existingFile.exists();

View File

@ -4,127 +4,172 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.qortal.controller.Controller;
import org.qortal.data.arbitrary.ArbitraryFileListResponseInfo;
import org.qortal.data.arbitrary.ArbitraryResourceData;
import org.qortal.data.transaction.ArbitraryTransactionData;
import org.qortal.event.DataMonitorEvent;
import org.qortal.event.EventBus;
import org.qortal.network.Peer;
import org.qortal.network.message.MessageType;
import org.qortal.repository.DataException;
import org.qortal.repository.Repository;
import org.qortal.repository.RepositoryManager;
import org.qortal.settings.Settings;
import org.qortal.utils.ArbitraryTransactionUtils;
import org.qortal.utils.Base58;
import org.qortal.utils.NTP;
import java.net.http.HttpResponse;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static java.lang.Thread.NORM_PRIORITY;
public class ArbitraryDataFileRequestThread implements Runnable {
public class ArbitraryDataFileRequestThread {
private static final Logger LOGGER = LogManager.getLogger(ArbitraryDataFileRequestThread.class);
public ArbitraryDataFileRequestThread() {
private ConcurrentHashMap<String, ExecutorService> executorByPeer = new ConcurrentHashMap<>();
private ArbitraryDataFileRequestThread() {
cleanupExecutorByPeerScheduler.scheduleAtFixedRate(this::cleanupExecutorsByPeer, 1, 1, TimeUnit.MINUTES);
}
@Override
public void run() {
Thread.currentThread().setName("Arbitrary Data File Request Thread");
Thread.currentThread().setPriority(NORM_PRIORITY);
private static ArbitraryDataFileRequestThread instance = null;
public static ArbitraryDataFileRequestThread getInstance() {
if( instance == null ) {
instance = new ArbitraryDataFileRequestThread();
}
return instance;
}
private final ScheduledExecutorService cleanupExecutorByPeerScheduler = Executors.newScheduledThreadPool(1);
private void cleanupExecutorsByPeer() {
try {
while (!Controller.isStopping()) {
Long now = NTP.getTime();
this.processFileHashes(now);
}
} catch (InterruptedException e) {
// Fall-through to exit thread...
this.executorByPeer.forEach((key, value) -> {
if (value instanceof ThreadPoolExecutor) {
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) value;
if (threadPoolExecutor.getActiveCount() == 0) {
if (this.executorByPeer.computeIfPresent(key, (k, v) -> null) == null) {
LOGGER.info("removed executor: peer = " + key);
}
}
} else {
LOGGER.warn("casting issue in cleanup");
}
});
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
}
private void processFileHashes(Long now) throws InterruptedException {
public void processFileHashes(Long now, List<ArbitraryFileListResponseInfo> responseInfos, ArbitraryDataFileManager arbitraryDataFileManager) {
if (Controller.isStopping()) {
return;
}
ArbitraryDataFileManager arbitraryDataFileManager = ArbitraryDataFileManager.getInstance();
String signature58 = null;
String hash58 = null;
Peer peer = null;
boolean shouldProcess = false;
List<ArbitraryFileListResponseInfo> toProcess = new ArrayList<>(responseInfos.size());
synchronized (arbitraryDataFileManager.arbitraryDataFileHashResponses) {
if (!arbitraryDataFileManager.arbitraryDataFileHashResponses.isEmpty()) {
Map<String, ArbitraryFileListResponseInfo> responseInfoByHash58 = new HashMap<>(responseInfos.size());
Map<String, byte[]> signatureBySignature58 = new HashMap<>(toProcess.size());
Map<String, List<ArbitraryFileListResponseInfo>> responseInfoBySignature58 = new HashMap<>();
// Sort by lowest number of node hops first
Comparator<ArbitraryFileListResponseInfo> lowestHopsFirstComparator =
Comparator.comparingInt(ArbitraryFileListResponseInfo::getRequestHops);
arbitraryDataFileManager.arbitraryDataFileHashResponses.sort(lowestHopsFirstComparator);
for( ArbitraryFileListResponseInfo responseInfo : responseInfos) {
Iterator iterator = arbitraryDataFileManager.arbitraryDataFileHashResponses.iterator();
while (iterator.hasNext()) {
if (Controller.isStopping()) {
return;
}
if( responseInfo == null ) continue;
ArbitraryFileListResponseInfo responseInfo = (ArbitraryFileListResponseInfo) iterator.next();
if (responseInfo == null) {
iterator.remove();
continue;
}
hash58 = responseInfo.getHash58();
peer = responseInfo.getPeer();
signature58 = responseInfo.getSignature58();
Long timestamp = responseInfo.getTimestamp();
if (now - timestamp >= ArbitraryDataManager.ARBITRARY_RELAY_TIMEOUT || signature58 == null || peer == null) {
// Ignore - to be deleted
iterator.remove();
continue;
}
// Skip if already requesting, but don't remove, as we might want to retry later
if (arbitraryDataFileManager.arbitraryDataFileRequests.containsKey(hash58)) {
// Already requesting - leave this attempt for later
continue;
}
// We want to process this file
shouldProcess = true;
iterator.remove();
break;
}
if (Controller.isStopping()) {
return;
}
Peer peer = responseInfo.getPeer();
// if relay timeout, then move on
if (now - responseInfo.getTimestamp() >= ArbitraryDataManager.ARBITRARY_RELAY_TIMEOUT || responseInfo.getSignature58() == null || peer == null) {
continue;
}
// Skip if already requesting, but don't remove, as we might want to retry later
if (arbitraryDataFileManager.arbitraryDataFileRequests.containsKey(responseInfo.getHash58())) {
// Already requesting - leave this attempt for later
arbitraryDataFileManager.addResponse(responseInfo); // don't remove -> adding back, beacause it was removed already above
continue;
}
byte[] hash = Base58.decode(responseInfo.getHash58());
byte[] signature = Base58.decode(responseInfo.getSignature58());
// check for null
if (signature == null || hash == null || peer == null) {
continue;
}
// We want to process this file, store and map data to process later
toProcess.add(responseInfo);
responseInfoByHash58.put(responseInfo.getHash58(), responseInfo);
signatureBySignature58.put(responseInfo.getSignature58(), signature);
responseInfoBySignature58
.computeIfAbsent(responseInfo.getSignature58(), signature58 -> new ArrayList<>())
.add(responseInfo);
}
if (!shouldProcess) {
// Nothing to do
Thread.sleep(1000L);
return;
}
// if there are no signatures, then there is nothing to process and nothing query the database
if( signatureBySignature58.isEmpty() ) return;
byte[] hash = Base58.decode(hash58);
byte[] signature = Base58.decode(signature58);
List<ArbitraryTransactionData> arbitraryTransactionDataList = new ArrayList<>();
// Fetch the transaction data
try (final Repository repository = RepositoryManager.getRepository()) {
ArbitraryTransactionData arbitraryTransactionData = ArbitraryTransactionUtils.fetchTransactionData(repository, signature);
if (arbitraryTransactionData == null) {
return;
}
if (signature == null || hash == null || peer == null || arbitraryTransactionData == null) {
return;
}
LOGGER.trace("Fetching file {} from peer {} via request thread...", hash58, peer);
arbitraryDataFileManager.fetchArbitraryDataFiles(repository, peer, signature, arbitraryTransactionData, Arrays.asList(hash));
arbitraryTransactionDataList.addAll(
ArbitraryTransactionUtils.fetchTransactionDataList(repository, new ArrayList<>(signatureBySignature58.values())));
} catch (DataException e) {
LOGGER.debug("Unable to process file hashes: {}", e.getMessage());
LOGGER.warn("Unable to fetch transaction data: {}", e.getMessage());
}
if( !arbitraryTransactionDataList.isEmpty() ) {
long start = System.currentTimeMillis();
for(ArbitraryTransactionData data : arbitraryTransactionDataList ) {
String signature58 = Base58.encode(data.getSignature());
for( ArbitraryFileListResponseInfo responseInfo : responseInfoBySignature58.get(signature58)) {
Runnable fetcher = () -> arbitraryDataFileFetcher(arbitraryDataFileManager, responseInfo, data);
this.executorByPeer
.computeIfAbsent(
responseInfo.getPeer().toString(),
peer -> Executors.newFixedThreadPool(
Settings.getInstance().getMaxThreadsForMessageType(MessageType.GET_ARBITRARY_DATA_FILE))
)
.execute(fetcher);
}
}
long timeLapse = System.currentTimeMillis() - start;
}
}
}
private void arbitraryDataFileFetcher(ArbitraryDataFileManager arbitraryDataFileManager, ArbitraryFileListResponseInfo responseInfo, ArbitraryTransactionData arbitraryTransactionData) {
try {
arbitraryDataFileManager.fetchArbitraryDataFiles(
responseInfo.getPeer(),
arbitraryTransactionData.getSignature(),
arbitraryTransactionData,
Arrays.asList(Base58.decode(responseInfo.getHash58()))
);
} catch (DataException e) {
LOGGER.warn("Unable to process file hashes: {}", e.getMessage());
}
}
}

View File

@ -24,6 +24,11 @@ import org.qortal.utils.Triple;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.qortal.controller.arbitrary.ArbitraryDataFileListManager.*;
@ -61,6 +66,7 @@ public class ArbitraryMetadataManager {
private ArbitraryMetadataManager() {
scheduler.scheduleAtFixedRate(this::processNetworkGetArbitraryMetadataMessage, 60, 1, TimeUnit.SECONDS);
}
public static ArbitraryMetadataManager getInstance() {
@ -371,107 +377,160 @@ public class ArbitraryMetadataManager {
}
}
// List to collect messages
private final List<PeerMessage> messageList = new ArrayList<>();
// Lock to synchronize access to the list
private final Object lock = new Object();
// Scheduled executor service to process messages every second
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
public void onNetworkGetArbitraryMetadataMessage(Peer peer, Message message) {
// Don't respond if QDN is disabled
if (!Settings.getInstance().isQdnEnabled()) {
return;
}
Controller.getInstance().stats.getArbitraryMetadataMessageStats.requests.incrementAndGet();
GetArbitraryMetadataMessage getArbitraryMetadataMessage = (GetArbitraryMetadataMessage) message;
byte[] signature = getArbitraryMetadataMessage.getSignature();
String signature58 = Base58.encode(signature);
Long now = NTP.getTime();
Triple<String, Peer, Long> newEntry = new Triple<>(signature58, peer, now);
// If we've seen this request recently, then ignore
if (arbitraryMetadataRequests.putIfAbsent(message.getId(), newEntry) != null) {
LOGGER.debug("Ignoring metadata request from peer {} for signature {}", peer, signature58);
return;
}
LOGGER.debug("Received metadata request from peer {} for signature {}", peer, signature58);
ArbitraryTransactionData transactionData = null;
ArbitraryDataFile metadataFile = null;
try (final Repository repository = RepositoryManager.getRepository()) {
// Firstly we need to lookup this file on chain to get its metadata hash
transactionData = (ArbitraryTransactionData)repository.getTransactionRepository().fromSignature(signature);
if (transactionData instanceof ArbitraryTransactionData) {
// Check if we're even allowed to serve metadata for this transaction
if (ArbitraryDataStorageManager.getInstance().canStoreData(transactionData)) {
byte[] metadataHash = transactionData.getMetadataHash();
if (metadataHash != null) {
// Load metadata file
metadataFile = ArbitraryDataFile.fromHash(metadataHash, signature);
}
}
}
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while fetching arbitrary metadata for peer %s", peer), e);
}
// We should only respond if we have the metadata file
if (metadataFile != null && metadataFile.exists()) {
// We have the metadata file, so update requests map to reflect that we've sent it
newEntry = new Triple<>(null, null, now);
arbitraryMetadataRequests.put(message.getId(), newEntry);
ArbitraryMetadataMessage arbitraryMetadataMessage = new ArbitraryMetadataMessage(signature, metadataFile);
arbitraryMetadataMessage.setId(message.getId());
if (!peer.sendMessage(arbitraryMetadataMessage)) {
LOGGER.debug("Couldn't send metadata");
peer.disconnect("failed to send metadata");
return;
}
LOGGER.debug("Sent metadata");
// Nothing left to do, so return to prevent any unnecessary forwarding from occurring
LOGGER.debug("No need for any forwarding because metadata request is fully served");
return;
}
// We may need to forward this request on
boolean isBlocked = (transactionData == null || ListUtils.isNameBlocked(transactionData.getName()));
if (Settings.getInstance().isRelayModeEnabled() && !isBlocked) {
// In relay mode - so ask our other peers if they have it
long requestTime = getArbitraryMetadataMessage.getRequestTime();
int requestHops = getArbitraryMetadataMessage.getRequestHops() + 1;
long totalRequestTime = now - requestTime;
if (totalRequestTime < RELAY_REQUEST_MAX_DURATION) {
// Relay request hasn't timed out yet, so can potentially be rebroadcast
if (requestHops < RELAY_REQUEST_MAX_HOPS) {
// Relay request hasn't reached the maximum number of hops yet, so can be rebroadcast
Message relayGetArbitraryMetadataMessage = new GetArbitraryMetadataMessage(signature, requestTime, requestHops);
relayGetArbitraryMetadataMessage.setId(message.getId());
LOGGER.debug("Rebroadcasting metadata request from peer {} for signature {} to our other peers... totalRequestTime: {}, requestHops: {}", peer, Base58.encode(signature), totalRequestTime, requestHops);
Network.getInstance().broadcast(
broadcastPeer ->
!broadcastPeer.isAtLeastVersion(RELAY_MIN_PEER_VERSION) ? null :
broadcastPeer == peer || Objects.equals(broadcastPeer.getPeerData().getAddress().getHost(), peer.getPeerData().getAddress().getHost()) ? null : relayGetArbitraryMetadataMessage);
}
else {
// This relay request has reached the maximum number of allowed hops
}
}
else {
// This relay request has timed out
}
synchronized (lock) {
messageList.add(new PeerMessage(peer, message));
}
}
private void processNetworkGetArbitraryMetadataMessage() {
try {
List<PeerMessage> messagesToProcess;
synchronized (lock) {
messagesToProcess = new ArrayList<>(messageList);
messageList.clear();
}
Map<String, byte[]> signatureBySignature58 = new HashMap<>((messagesToProcess.size()));
Map<String, Long> nowBySignature58 = new HashMap<>(messagesToProcess.size());
Map<String,PeerMessage> peerMessageBySignature58 = new HashMap<>(messagesToProcess.size());
for( PeerMessage peerMessage : messagesToProcess) {
Controller.getInstance().stats.getArbitraryMetadataMessageStats.requests.incrementAndGet();
GetArbitraryMetadataMessage getArbitraryMetadataMessage = (GetArbitraryMetadataMessage) peerMessage.message;
byte[] signature = getArbitraryMetadataMessage.getSignature();
String signature58 = Base58.encode(signature);
Long now = NTP.getTime();
Triple<String, Peer, Long> newEntry = new Triple<>(signature58, peerMessage.peer, now);
// If we've seen this request recently, then ignore
if (arbitraryMetadataRequests.putIfAbsent(peerMessage.message.getId(), newEntry) != null) {
LOGGER.debug("Ignoring metadata request from peer {} for signature {}", peerMessage.peer, signature58);
continue;
}
LOGGER.debug("Received metadata request from peer {} for signature {}", peerMessage.peer, signature58);
signatureBySignature58.put(signature58, signature);
nowBySignature58.put(signature58, now);
peerMessageBySignature58.put(signature58, peerMessage);
}
if( signatureBySignature58.isEmpty() ) return;
List<TransactionData> transactionDataList;
try (final Repository repository = RepositoryManager.getRepository()) {
// Firstly we need to lookup this file on chain to get its metadata hash
transactionDataList = repository.getTransactionRepository().fromSignatures(new ArrayList(signatureBySignature58.values()));
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while fetching arbitrary transactions"), e);
return;
}
Map<String, ArbitraryTransactionData> dataBySignature58
= transactionDataList.stream()
.filter(data -> data instanceof ArbitraryTransactionData)
.map(ArbitraryTransactionData.class::cast)
.collect(Collectors.toMap(data -> Base58.encode(data.getSignature()), Function.identity()));
for(Map.Entry<String, ArbitraryTransactionData> entry : dataBySignature58.entrySet()) {
String signature58 = entry.getKey();
ArbitraryTransactionData transactionData = entry.getValue();
try {
// Check if we're even allowed to serve metadata for this transaction
if (ArbitraryDataStorageManager.getInstance().canStoreData(transactionData)) {
byte[] metadataHash = transactionData.getMetadataHash();
if (metadataHash != null) {
// Load metadata file
ArbitraryDataFile metadataFile = ArbitraryDataFile.fromHash(metadataHash, transactionData.getSignature());
// We should only respond if we have the metadata file
if (metadataFile != null && metadataFile.exists()) {
PeerMessage peerMessage = peerMessageBySignature58.get(signature58);
Message message = peerMessage.message;
Peer peer = peerMessage.peer;
// We have the metadata file, so update requests map to reflect that we've sent it
Triple newEntry = new Triple<>(null, null, nowBySignature58.get(signature58));
arbitraryMetadataRequests.put(message.getId(), newEntry);
ArbitraryMetadataMessage arbitraryMetadataMessage = new ArbitraryMetadataMessage(entry.getValue().getSignature(), metadataFile);
arbitraryMetadataMessage.setId(message.getId());
if (!peer.sendMessage(arbitraryMetadataMessage)) {
LOGGER.debug("Couldn't send metadata");
peer.disconnect("failed to send metadata");
continue;
}
LOGGER.debug("Sent metadata");
// Nothing left to do, so return to prevent any unnecessary forwarding from occurring
LOGGER.debug("No need for any forwarding because metadata request is fully served");
}
}
}
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while fetching arbitrary metadata"), e);
}
// We may need to forward this request on
boolean isBlocked = (transactionDataList == null || ListUtils.isNameBlocked(transactionData.getName()));
if (Settings.getInstance().isRelayModeEnabled() && !isBlocked) {
// In relay mode - so ask our other peers if they have it
PeerMessage peerMessage = peerMessageBySignature58.get(signature58);
GetArbitraryMetadataMessage getArbitraryMetadataMessage = (GetArbitraryMetadataMessage) peerMessage.message;
long requestTime = getArbitraryMetadataMessage.getRequestTime();
int requestHops = getArbitraryMetadataMessage.getRequestHops() + 1;
long totalRequestTime = nowBySignature58.get(signature58) - requestTime;
if (totalRequestTime < RELAY_REQUEST_MAX_DURATION) {
// Relay request hasn't timed out yet, so can potentially be rebroadcast
if (requestHops < RELAY_REQUEST_MAX_HOPS) {
// Relay request hasn't reached the maximum number of hops yet, so can be rebroadcast
byte[] signature = signatureBySignature58.get(signature58);
Message relayGetArbitraryMetadataMessage = new GetArbitraryMetadataMessage(signature, requestTime, requestHops);
relayGetArbitraryMetadataMessage.setId(getArbitraryMetadataMessage.getId());
Peer peer = peerMessage.peer;
LOGGER.debug("Rebroadcasting metadata request from peer {} for signature {} to our other peers... totalRequestTime: {}, requestHops: {}", peer, Base58.encode(signature), totalRequestTime, requestHops);
Network.getInstance().broadcast(
broadcastPeer ->
!broadcastPeer.isAtLeastVersion(RELAY_MIN_PEER_VERSION) ? null :
broadcastPeer == peer || Objects.equals(broadcastPeer.getPeerData().getAddress().getHost(), peer.getPeerData().getAddress().getHost()) ? null : relayGetArbitraryMetadataMessage);
} else {
// This relay request has reached the maximum number of allowed hops
}
} else {
// This relay request has timed out
}
}
}
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
}
}

View File

@ -0,0 +1,22 @@
package org.qortal.controller.arbitrary;
import org.qortal.network.Peer;
import org.qortal.network.message.Message;
public class PeerMessage {
Peer peer;
Message message;
public PeerMessage(Peer peer, Message message) {
this.peer = peer;
this.message = message;
}
public Peer getPeer() {
return peer;
}
public Message getMessage() {
return message;
}
}

View File

@ -8,6 +8,7 @@ import org.qortal.account.PrivateKeyAccount;
import org.qortal.api.model.crosschain.TradeBotCreateRequest;
import org.qortal.controller.Controller;
import org.qortal.controller.Synchronizer;
import org.qortal.controller.arbitrary.PeerMessage;
import org.qortal.controller.tradebot.AcctTradeBot.ResponseResult;
import org.qortal.crosschain.*;
import org.qortal.crypto.Crypto;
@ -37,7 +38,12 @@ import org.qortal.utils.NTP;
import java.awt.TrayIcon.MessageType;
import java.security.SecureRandom;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
/**
* Performing cross-chain trading steps on behalf of user.
@ -118,6 +124,9 @@ public class TradeBot implements Listener {
private Map<String, Long> validTrades = new HashMap<>();
private TradeBot() {
tradePresenceMessageScheduler.scheduleAtFixedRate( this::processTradePresencesMessages, 60, 1, TimeUnit.SECONDS);
EventBus.INSTANCE.addListener(event -> TradeBot.getInstance().listen(event));
}
@ -551,77 +560,139 @@ public class TradeBot implements Listener {
}
}
// List to collect messages
private final List<PeerMessage> tradePresenceMessageList = new ArrayList<>();
// Lock to synchronize access to the list
private final Object tradePresenceMessageLock = new Object();
// Scheduled executor service to process messages every second
private final ScheduledExecutorService tradePresenceMessageScheduler = Executors.newScheduledThreadPool(1);
public void onTradePresencesMessage(Peer peer, Message message) {
TradePresencesMessage tradePresencesMessage = (TradePresencesMessage) message;
List<TradePresenceData> peersTradePresences = tradePresencesMessage.getTradePresences();
synchronized (tradePresenceMessageLock) {
tradePresenceMessageList.add(new PeerMessage(peer, message));
}
}
long now = NTP.getTime();
// Timestamps before this are too far into the past
long pastThreshold = now;
// Timestamps after this are too far into the future
long futureThreshold = now + PRESENCE_LIFETIME;
public void processTradePresencesMessages() {
Map<ByteArray, Supplier<ACCT>> acctSuppliersByCodeHash = SupportedBlockchain.getAcctMap();
try {
List<PeerMessage> messagesToProcess;
synchronized (tradePresenceMessageLock) {
messagesToProcess = new ArrayList<>(tradePresenceMessageList);
tradePresenceMessageList.clear();
}
int newCount = 0;
if( messagesToProcess.isEmpty() ) return;
try (final Repository repository = RepositoryManager.getRepository()) {
for (TradePresenceData peersTradePresence : peersTradePresences) {
long timestamp = peersTradePresence.getTimestamp();
Map<Peer, List<TradePresenceData>> tradePresencesByPeer = new HashMap<>(messagesToProcess.size());
// Ignore if timestamp is out of bounds
if (timestamp < pastThreshold || timestamp > futureThreshold) {
if (timestamp < pastThreshold)
LOGGER.trace("Ignoring trade presence {} from peer {} as timestamp {} is too old vs {}",
peersTradePresence.getAtAddress(), peer, timestamp, pastThreshold
);
else
LOGGER.trace("Ignoring trade presence {} from peer {} as timestamp {} is too new vs {}",
peersTradePresence.getAtAddress(), peer, timestamp, pastThreshold
// map all trade presences from the messages to their peer
for( PeerMessage peerMessage : messagesToProcess ) {
TradePresencesMessage tradePresencesMessage = (TradePresencesMessage) peerMessage.getMessage();
List<TradePresenceData> peersTradePresences = tradePresencesMessage.getTradePresences();
tradePresencesByPeer.put(peerMessage.getPeer(), peersTradePresences);
}
long now = NTP.getTime();
// Timestamps before this are too far into the past
long pastThreshold = now;
// Timestamps after this are too far into the future
long futureThreshold = now + PRESENCE_LIFETIME;
Map<ByteArray, Supplier<ACCT>> acctSuppliersByCodeHash = SupportedBlockchain.getAcctMap();
int newCount = 0;
Map<String, List<Peer>> peersByAtAddress = new HashMap<>(tradePresencesByPeer.size());
Map<String, TradePresenceData> tradePresenceByAtAddress = new HashMap<>(tradePresencesByPeer.size());
// for each batch of trade presence data from a peer, validate and populate the maps declared above
for ( Map.Entry<Peer, List<TradePresenceData>> entry: tradePresencesByPeer.entrySet()) {
Peer peer = entry.getKey();
for( TradePresenceData peersTradePresence : entry.getValue() ) {
// TradePresenceData peersTradePresence
long timestamp = peersTradePresence.getTimestamp();
// Ignore if timestamp is out of bounds
if (timestamp < pastThreshold || timestamp > futureThreshold) {
if (timestamp < pastThreshold)
LOGGER.trace("Ignoring trade presence {} from peer {} as timestamp {} is too old vs {}",
peersTradePresence.getAtAddress(), peer, timestamp, pastThreshold
);
else
LOGGER.trace("Ignoring trade presence {} from peer {} as timestamp {} is too new vs {}",
peersTradePresence.getAtAddress(), peer, timestamp, pastThreshold
);
continue;
}
ByteArray pubkeyByteArray = ByteArray.wrap(peersTradePresence.getPublicKey());
// Ignore if we've previously verified this timestamp+publickey combo or sent timestamp is older
TradePresenceData existingTradeData = this.safeAllTradePresencesByPubkey.get(pubkeyByteArray);
if (existingTradeData != null && timestamp <= existingTradeData.getTimestamp()) {
if (timestamp == existingTradeData.getTimestamp())
LOGGER.trace("Ignoring trade presence {} from peer {} as we have verified timestamp {} before",
peersTradePresence.getAtAddress(), peer, timestamp
);
else
LOGGER.trace("Ignoring trade presence {} from peer {} as timestamp {} is older than latest {}",
peersTradePresence.getAtAddress(), peer, timestamp, existingTradeData.getTimestamp()
);
continue;
}
// Check timestamp signature
byte[] timestampSignature = peersTradePresence.getSignature();
byte[] timestampBytes = Longs.toByteArray(timestamp);
byte[] publicKey = peersTradePresence.getPublicKey();
if (!Crypto.verify(publicKey, timestampSignature, timestampBytes)) {
LOGGER.trace("Ignoring trade presence {} from peer {} as signature failed to verify",
peersTradePresence.getAtAddress(), peer
);
continue;
continue;
}
peersByAtAddress.computeIfAbsent(peersTradePresence.getAtAddress(), address -> new ArrayList<>()).add(peer);
tradePresenceByAtAddress.put(peersTradePresence.getAtAddress(), peersTradePresence);
}
}
ByteArray pubkeyByteArray = ByteArray.wrap(peersTradePresence.getPublicKey());
if( tradePresenceByAtAddress.isEmpty() ) return;
// Ignore if we've previously verified this timestamp+publickey combo or sent timestamp is older
TradePresenceData existingTradeData = this.safeAllTradePresencesByPubkey.get(pubkeyByteArray);
if (existingTradeData != null && timestamp <= existingTradeData.getTimestamp()) {
if (timestamp == existingTradeData.getTimestamp())
LOGGER.trace("Ignoring trade presence {} from peer {} as we have verified timestamp {} before",
peersTradePresence.getAtAddress(), peer, timestamp
);
else
LOGGER.trace("Ignoring trade presence {} from peer {} as timestamp {} is older than latest {}",
peersTradePresence.getAtAddress(), peer, timestamp, existingTradeData.getTimestamp()
);
List<ATData> atDataList;
try (final Repository repository = RepositoryManager.getRepository()) {
atDataList = repository.getATRepository().fromATAddresses( new ArrayList<>(tradePresenceByAtAddress.keySet()) );
} catch (DataException e) {
LOGGER.error("Couldn't process TRADE_PRESENCES message due to repository issue", e);
return;
}
continue;
}
Map<String, Supplier<ACCT>> supplierByAtAddress = new HashMap<>(atDataList.size());
// Check timestamp signature
byte[] timestampSignature = peersTradePresence.getSignature();
byte[] timestampBytes = Longs.toByteArray(timestamp);
byte[] publicKey = peersTradePresence.getPublicKey();
if (!Crypto.verify(publicKey, timestampSignature, timestampBytes)) {
LOGGER.trace("Ignoring trade presence {} from peer {} as signature failed to verify",
peersTradePresence.getAtAddress(), peer
);
List<ATData> validatedAtDataList = new ArrayList<>(atDataList.size());
continue;
}
// for each trade
for( ATData atData : atDataList ) {
ATData atData = repository.getATRepository().fromATAddress(peersTradePresence.getAtAddress());
TradePresenceData peersTradePresence = tradePresenceByAtAddress.get(atData.getATAddress());
if (atData == null || atData.getIsFrozen() || atData.getIsFinished()) {
if (atData == null)
LOGGER.trace("Ignoring trade presence {} from peer {} as AT doesn't exist",
peersTradePresence.getAtAddress(), peer
LOGGER.trace("Ignoring trade presence {} from peer as AT doesn't exist",
peersTradePresence.getAtAddress()
);
else
LOGGER.trace("Ignoring trade presence {} from peer {} as AT is frozen or finished",
peersTradePresence.getAtAddress(), peer
LOGGER.trace("Ignoring trade presence {} from peer as AT is frozen or finished",
peersTradePresence.getAtAddress()
);
continue;
@ -630,51 +701,87 @@ public class TradeBot implements Listener {
ByteArray atCodeHash = ByteArray.wrap(atData.getCodeHash());
Supplier<ACCT> acctSupplier = acctSuppliersByCodeHash.get(atCodeHash);
if (acctSupplier == null) {
LOGGER.trace("Ignoring trade presence {} from peer {} as AT isn't a known ACCT?",
peersTradePresence.getAtAddress(), peer
LOGGER.trace("Ignoring trade presence {} from peer as AT isn't a known ACCT?",
peersTradePresence.getAtAddress()
);
continue;
}
CrossChainTradeData tradeData = acctSupplier.get().populateTradeData(repository, atData);
if (tradeData == null) {
LOGGER.trace("Ignoring trade presence {} from peer {} as trade data not found?",
peersTradePresence.getAtAddress(), peer
);
continue;
}
// Convert signer's public key to address form
String signerAddress = peersTradePresence.getTradeAddress();
// Signer's public key (in address form) must match Bob's / Alice's trade public key (in address form)
if (!signerAddress.equals(tradeData.qortalCreatorTradeAddress) && !signerAddress.equals(tradeData.qortalPartnerAddress)) {
LOGGER.trace("Ignoring trade presence {} from peer {} as signer isn't Alice or Bob?",
peersTradePresence.getAtAddress(), peer
);
continue;
}
// This is new to us
this.allTradePresencesByPubkey.put(pubkeyByteArray, peersTradePresence);
++newCount;
LOGGER.trace("Added trade presence {} from peer {} with timestamp {}",
peersTradePresence.getAtAddress(), peer, timestamp
);
EventBus.INSTANCE.notify(new TradePresenceEvent(peersTradePresence));
validatedAtDataList.add(atData);
}
} catch (DataException e) {
LOGGER.error("Couldn't process TRADE_PRESENCES message due to repository issue", e);
}
if (newCount > 0) {
LOGGER.debug("New trade presences: {}, all trade presences: {}", newCount, allTradePresencesByPubkey.size());
rebuildSafeAllTradePresences();
// populated data for each trade
List<CrossChainTradeData> crossChainTradeDataList;
// validated trade data grouped by code (cross chain coin)
Map<ByteArray, List<ATData>> atDataByCodeHash
= validatedAtDataList.stream().collect(
Collectors.groupingBy(data -> ByteArray.wrap(data.getCodeHash())));
try (final Repository repository = RepositoryManager.getRepository()) {
crossChainTradeDataList = new ArrayList<>();
// for each code (cross chain coin), get each trade, then populate trade data
for( Map.Entry<ByteArray, List<ATData>> entry : atDataByCodeHash.entrySet() ) {
Supplier<ACCT> acctSupplier = acctSuppliersByCodeHash.get(entry.getKey());
crossChainTradeDataList.addAll(
acctSupplier.get().populateTradeDataList(
repository,
entry.getValue()
)
.stream().filter( data -> data != null )
.collect(Collectors.toList())
);
}
} catch (DataException e) {
LOGGER.error("Couldn't process TRADE_PRESENCES message due to repository issue", e);
return;
}
// for each populated trade data, validate and fire event
for( CrossChainTradeData tradeData : crossChainTradeDataList ) {
List<Peer> peers = peersByAtAddress.get(tradeData.qortalAtAddress);
for( Peer peer : peers ) {
TradePresenceData peersTradePresence = tradePresenceByAtAddress.get(tradeData.qortalAtAddress);
// Convert signer's public key to address form
String signerAddress = peersTradePresence.getTradeAddress();
// Signer's public key (in address form) must match Bob's / Alice's trade public key (in address form)
if (!signerAddress.equals(tradeData.qortalCreatorTradeAddress) && !signerAddress.equals(tradeData.qortalPartnerAddress)) {
LOGGER.trace("Ignoring trade presence {} from peer {} as signer isn't Alice or Bob?",
peersTradePresence.getAtAddress(), peer
);
continue;
}
ByteArray pubkeyByteArray = ByteArray.wrap(peersTradePresence.getPublicKey());
// This is new to us
this.allTradePresencesByPubkey.put(pubkeyByteArray, peersTradePresence);
++newCount;
LOGGER.trace("Added trade presence {} from peer {} with timestamp {}",
peersTradePresence.getAtAddress(), peer, tradeData.creationTimestamp
);
EventBus.INSTANCE.notify(new TradePresenceEvent(peersTradePresence));
}
}
if (newCount > 0) {
LOGGER.info("New trade presences: {}, all trade presences: {}", newCount, allTradePresencesByPubkey.size());
rebuildSafeAllTradePresences();
}
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
}

View File

@ -6,6 +6,9 @@ import org.qortal.data.crosschain.CrossChainTradeData;
import org.qortal.repository.DataException;
import org.qortal.repository.Repository;
import java.util.List;
import java.util.OptionalLong;
public interface ACCT {
public byte[] getCodeBytesHash();
@ -16,8 +19,12 @@ public interface ACCT {
public CrossChainTradeData populateTradeData(Repository repository, ATData atData) throws DataException;
public List<CrossChainTradeData> populateTradeDataList(Repository respository, List<ATData> atDataList) throws DataException;
public CrossChainTradeData populateTradeData(Repository repository, ATStateData atStateData) throws DataException;
CrossChainTradeData populateTradeData(Repository repository, byte[] creatorPublicKey, long creationTimestamp, ATStateData atStateData, OptionalLong optionalBalance) throws DataException;
public byte[] buildCancelMessage(String creatorQortalAddress);
public byte[] findSecretA(Repository repository, CrossChainTradeData crossChainTradeData) throws DataException;

View File

@ -4,6 +4,7 @@ import com.google.common.hash.HashCode;
import com.google.common.primitives.Bytes;
import org.ciyam.at.*;
import org.qortal.account.Account;
import org.qortal.api.resource.CrossChainUtils;
import org.qortal.asset.Asset;
import org.qortal.at.QortalFunctionCode;
import org.qortal.crypto.Crypto;
@ -19,6 +20,7 @@ import org.qortal.utils.BitTwiddling;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.OptionalLong;
import static org.ciyam.at.OpCode.calcOffset;
@ -608,7 +610,14 @@ public class BitcoinACCTv1 implements ACCT {
@Override
public CrossChainTradeData populateTradeData(Repository repository, ATData atData) throws DataException {
ATStateData atStateData = repository.getATRepository().getLatestATState(atData.getATAddress());
return populateTradeData(repository, atData.getCreatorPublicKey(), atData.getCreation(), atStateData);
return populateTradeData(repository, atData.getCreatorPublicKey(), atData.getCreation(), atStateData, OptionalLong.empty());
}
@Override
public List<CrossChainTradeData> populateTradeDataList(Repository repository, List<ATData> atDataList) throws DataException {
List<CrossChainTradeData> crossChainTradeDataList = CrossChainUtils.populateTradeDataList(repository, this, atDataList);
return crossChainTradeDataList;
}
/**
@ -617,13 +626,14 @@ public class BitcoinACCTv1 implements ACCT {
@Override
public CrossChainTradeData populateTradeData(Repository repository, ATStateData atStateData) throws DataException {
ATData atData = repository.getATRepository().fromATAddress(atStateData.getATAddress());
return populateTradeData(repository, atData.getCreatorPublicKey(), atData.getCreation(), atStateData);
return populateTradeData(repository, atData.getCreatorPublicKey(), atData.getCreation(), atStateData, OptionalLong.empty());
}
/**
* Returns CrossChainTradeData with useful info extracted from AT.
*/
public CrossChainTradeData populateTradeData(Repository repository, byte[] creatorPublicKey, long creationTimestamp, ATStateData atStateData) throws DataException {
@Override
public CrossChainTradeData populateTradeData(Repository repository, byte[] creatorPublicKey, long creationTimestamp, ATStateData atStateData, OptionalLong optionalBalance) throws DataException {
byte[] addressBytes = new byte[25]; // for general use
String atAddress = atStateData.getATAddress();
@ -636,8 +646,13 @@ public class BitcoinACCTv1 implements ACCT {
tradeData.qortalCreator = Crypto.toAddress(creatorPublicKey);
tradeData.creationTimestamp = creationTimestamp;
Account atAccount = new Account(repository, atAddress);
tradeData.qortBalance = atAccount.getConfirmedBalance(Asset.QORT);
if(optionalBalance.isPresent()) {
tradeData.qortBalance = optionalBalance.getAsLong();
}
else {
Account atAccount = new Account(repository, atAddress);
tradeData.qortBalance = atAccount.getConfirmedBalance(Asset.QORT);
}
byte[] stateData = atStateData.getStateData();
ByteBuffer dataByteBuffer = ByteBuffer.wrap(stateData);

View File

@ -6,6 +6,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.ciyam.at.*;
import org.qortal.account.Account;
import org.qortal.api.resource.CrossChainUtils;
import org.qortal.asset.Asset;
import org.qortal.at.QortalFunctionCode;
import org.qortal.crypto.Crypto;
@ -21,6 +22,7 @@ import org.qortal.utils.BitTwiddling;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.OptionalLong;
import static org.ciyam.at.OpCode.calcOffset;
@ -569,7 +571,14 @@ public class BitcoinACCTv3 implements ACCT {
@Override
public CrossChainTradeData populateTradeData(Repository repository, ATData atData) throws DataException {
ATStateData atStateData = repository.getATRepository().getLatestATState(atData.getATAddress());
return populateTradeData(repository, atData.getCreatorPublicKey(), atData.getCreation(), atStateData);
return populateTradeData(repository, atData.getCreatorPublicKey(), atData.getCreation(), atStateData, OptionalLong.empty());
}
@Override
public List<CrossChainTradeData> populateTradeDataList(Repository repository, List<ATData> atDataList) throws DataException {
List<CrossChainTradeData> crossChainTradeDataList = CrossChainUtils.populateTradeDataList(repository, this, atDataList);
return crossChainTradeDataList;
}
/**
@ -578,13 +587,14 @@ public class BitcoinACCTv3 implements ACCT {
@Override
public CrossChainTradeData populateTradeData(Repository repository, ATStateData atStateData) throws DataException {
ATData atData = repository.getATRepository().fromATAddress(atStateData.getATAddress());
return populateTradeData(repository, atData.getCreatorPublicKey(), atData.getCreation(), atStateData);
return populateTradeData(repository, atData.getCreatorPublicKey(), atData.getCreation(), atStateData, OptionalLong.empty());
}
/**
* Returns CrossChainTradeData with useful info extracted from AT.
*/
public CrossChainTradeData populateTradeData(Repository repository, byte[] creatorPublicKey, long creationTimestamp, ATStateData atStateData) throws DataException {
@Override
public CrossChainTradeData populateTradeData(Repository repository, byte[] creatorPublicKey, long creationTimestamp, ATStateData atStateData, OptionalLong optionalBalance) throws DataException {
byte[] addressBytes = new byte[25]; // for general use
String atAddress = atStateData.getATAddress();
@ -597,8 +607,13 @@ public class BitcoinACCTv3 implements ACCT {
tradeData.qortalCreator = Crypto.toAddress(creatorPublicKey);
tradeData.creationTimestamp = creationTimestamp;
Account atAccount = new Account(repository, atAddress);
tradeData.qortBalance = atAccount.getConfirmedBalance(Asset.QORT);
if(optionalBalance.isPresent()) {
tradeData.qortBalance = optionalBalance.getAsLong();
}
else {
Account atAccount = new Account(repository, atAddress);
tradeData.qortBalance = atAccount.getConfirmedBalance(Asset.QORT);
}
byte[] stateData = atStateData.getStateData();
ByteBuffer dataByteBuffer = ByteBuffer.wrap(stateData);

View File

@ -6,6 +6,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.ciyam.at.*;
import org.qortal.account.Account;
import org.qortal.api.resource.CrossChainUtils;
import org.qortal.asset.Asset;
import org.qortal.at.QortalFunctionCode;
import org.qortal.crypto.Crypto;
@ -21,6 +22,7 @@ import org.qortal.utils.BitTwiddling;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.OptionalLong;
import static org.ciyam.at.OpCode.calcOffset;
@ -569,7 +571,14 @@ public class DigibyteACCTv3 implements ACCT {
@Override
public CrossChainTradeData populateTradeData(Repository repository, ATData atData) throws DataException {
ATStateData atStateData = repository.getATRepository().getLatestATState(atData.getATAddress());
return populateTradeData(repository, atData.getCreatorPublicKey(), atData.getCreation(), atStateData);
return populateTradeData(repository, atData.getCreatorPublicKey(), atData.getCreation(), atStateData, OptionalLong.empty());
}
@Override
public List<CrossChainTradeData> populateTradeDataList(Repository repository, List<ATData> atDataList) throws DataException {
List<CrossChainTradeData> crossChainTradeDataList = CrossChainUtils.populateTradeDataList(repository, this, atDataList);
return crossChainTradeDataList;
}
/**
@ -578,13 +587,14 @@ public class DigibyteACCTv3 implements ACCT {
@Override
public CrossChainTradeData populateTradeData(Repository repository, ATStateData atStateData) throws DataException {
ATData atData = repository.getATRepository().fromATAddress(atStateData.getATAddress());
return populateTradeData(repository, atData.getCreatorPublicKey(), atData.getCreation(), atStateData);
return populateTradeData(repository, atData.getCreatorPublicKey(), atData.getCreation(), atStateData, OptionalLong.empty());
}
/**
* Returns CrossChainTradeData with useful info extracted from AT.
*/
public CrossChainTradeData populateTradeData(Repository repository, byte[] creatorPublicKey, long creationTimestamp, ATStateData atStateData) throws DataException {
@Override
public CrossChainTradeData populateTradeData(Repository repository, byte[] creatorPublicKey, long creationTimestamp, ATStateData atStateData, OptionalLong optionalBalance) throws DataException {
byte[] addressBytes = new byte[25]; // for general use
String atAddress = atStateData.getATAddress();
@ -597,8 +607,13 @@ public class DigibyteACCTv3 implements ACCT {
tradeData.qortalCreator = Crypto.toAddress(creatorPublicKey);
tradeData.creationTimestamp = creationTimestamp;
Account atAccount = new Account(repository, atAddress);
tradeData.qortBalance = atAccount.getConfirmedBalance(Asset.QORT);
if(optionalBalance.isPresent()) {
tradeData.qortBalance = optionalBalance.getAsLong();
}
else {
Account atAccount = new Account(repository, atAddress);
tradeData.qortBalance = atAccount.getConfirmedBalance(Asset.QORT);
}
byte[] stateData = atStateData.getStateData();
ByteBuffer dataByteBuffer = ByteBuffer.wrap(stateData);

View File

@ -6,6 +6,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.ciyam.at.*;
import org.qortal.account.Account;
import org.qortal.api.resource.CrossChainUtils;
import org.qortal.asset.Asset;
import org.qortal.at.QortalFunctionCode;
import org.qortal.crypto.Crypto;
@ -21,6 +22,7 @@ import org.qortal.utils.BitTwiddling;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.OptionalLong;
import static org.ciyam.at.OpCode.calcOffset;
@ -566,7 +568,14 @@ public class DogecoinACCTv1 implements ACCT {
@Override
public CrossChainTradeData populateTradeData(Repository repository, ATData atData) throws DataException {
ATStateData atStateData = repository.getATRepository().getLatestATState(atData.getATAddress());
return populateTradeData(repository, atData.getCreatorPublicKey(), atData.getCreation(), atStateData);
return populateTradeData(repository, atData.getCreatorPublicKey(), atData.getCreation(), atStateData, OptionalLong.empty());
}
@Override
public List<CrossChainTradeData> populateTradeDataList(Repository repository, List<ATData> atDataList) throws DataException {
List<CrossChainTradeData> crossChainTradeDataList = CrossChainUtils.populateTradeDataList(repository, this, atDataList);
return crossChainTradeDataList;
}
/**
@ -575,13 +584,14 @@ public class DogecoinACCTv1 implements ACCT {
@Override
public CrossChainTradeData populateTradeData(Repository repository, ATStateData atStateData) throws DataException {
ATData atData = repository.getATRepository().fromATAddress(atStateData.getATAddress());
return populateTradeData(repository, atData.getCreatorPublicKey(), atData.getCreation(), atStateData);
return populateTradeData(repository, atData.getCreatorPublicKey(), atData.getCreation(), atStateData, OptionalLong.empty());
}
/**
* Returns CrossChainTradeData with useful info extracted from AT.
*/
public CrossChainTradeData populateTradeData(Repository repository, byte[] creatorPublicKey, long creationTimestamp, ATStateData atStateData) throws DataException {
@Override
public CrossChainTradeData populateTradeData(Repository repository, byte[] creatorPublicKey, long creationTimestamp, ATStateData atStateData, OptionalLong optionalBalance) throws DataException {
byte[] addressBytes = new byte[25]; // for general use
String atAddress = atStateData.getATAddress();
@ -594,8 +604,13 @@ public class DogecoinACCTv1 implements ACCT {
tradeData.qortalCreator = Crypto.toAddress(creatorPublicKey);
tradeData.creationTimestamp = creationTimestamp;
Account atAccount = new Account(repository, atAddress);
tradeData.qortBalance = atAccount.getConfirmedBalance(Asset.QORT);
if(optionalBalance.isPresent()) {
tradeData.qortBalance = optionalBalance.getAsLong();
}
else {
Account atAccount = new Account(repository, atAddress);
tradeData.qortBalance = atAccount.getConfirmedBalance(Asset.QORT);
}
byte[] stateData = atStateData.getStateData();
ByteBuffer dataByteBuffer = ByteBuffer.wrap(stateData);

View File

@ -6,6 +6,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.ciyam.at.*;
import org.qortal.account.Account;
import org.qortal.api.resource.CrossChainUtils;
import org.qortal.asset.Asset;
import org.qortal.at.QortalFunctionCode;
import org.qortal.crypto.Crypto;
@ -21,6 +22,7 @@ import org.qortal.utils.BitTwiddling;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.OptionalLong;
import static org.ciyam.at.OpCode.calcOffset;
@ -569,7 +571,14 @@ public class DogecoinACCTv3 implements ACCT {
@Override
public CrossChainTradeData populateTradeData(Repository repository, ATData atData) throws DataException {
ATStateData atStateData = repository.getATRepository().getLatestATState(atData.getATAddress());
return populateTradeData(repository, atData.getCreatorPublicKey(), atData.getCreation(), atStateData);
return populateTradeData(repository, atData.getCreatorPublicKey(), atData.getCreation(), atStateData, OptionalLong.empty());
}
@Override
public List<CrossChainTradeData> populateTradeDataList(Repository repository, List<ATData> atDataList) throws DataException {
List<CrossChainTradeData> crossChainTradeDataList = CrossChainUtils.populateTradeDataList(repository, this, atDataList);
return crossChainTradeDataList;
}
/**
@ -578,13 +587,14 @@ public class DogecoinACCTv3 implements ACCT {
@Override
public CrossChainTradeData populateTradeData(Repository repository, ATStateData atStateData) throws DataException {
ATData atData = repository.getATRepository().fromATAddress(atStateData.getATAddress());
return populateTradeData(repository, atData.getCreatorPublicKey(), atData.getCreation(), atStateData);
return populateTradeData(repository, atData.getCreatorPublicKey(), atData.getCreation(), atStateData, OptionalLong.empty());
}
/**
* Returns CrossChainTradeData with useful info extracted from AT.
*/
public CrossChainTradeData populateTradeData(Repository repository, byte[] creatorPublicKey, long creationTimestamp, ATStateData atStateData) throws DataException {
@Override
public CrossChainTradeData populateTradeData(Repository repository, byte[] creatorPublicKey, long creationTimestamp, ATStateData atStateData, OptionalLong optionalBalance) throws DataException {
byte[] addressBytes = new byte[25]; // for general use
String atAddress = atStateData.getATAddress();
@ -597,8 +607,13 @@ public class DogecoinACCTv3 implements ACCT {
tradeData.qortalCreator = Crypto.toAddress(creatorPublicKey);
tradeData.creationTimestamp = creationTimestamp;
Account atAccount = new Account(repository, atAddress);
tradeData.qortBalance = atAccount.getConfirmedBalance(Asset.QORT);
if(optionalBalance.isPresent()) {
tradeData.qortBalance = optionalBalance.getAsLong();
}
else {
Account atAccount = new Account(repository, atAddress);
tradeData.qortBalance = atAccount.getConfirmedBalance(Asset.QORT);
}
byte[] stateData = atStateData.getStateData();
ByteBuffer dataByteBuffer = ByteBuffer.wrap(stateData);

View File

@ -4,6 +4,7 @@ import com.google.common.hash.HashCode;
import com.google.common.primitives.Bytes;
import org.ciyam.at.*;
import org.qortal.account.Account;
import org.qortal.api.resource.CrossChainUtils;
import org.qortal.asset.Asset;
import org.qortal.at.QortalFunctionCode;
import org.qortal.crypto.Crypto;
@ -19,6 +20,7 @@ import org.qortal.utils.BitTwiddling;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.OptionalLong;
import static org.ciyam.at.OpCode.calcOffset;
@ -559,7 +561,14 @@ public class LitecoinACCTv1 implements ACCT {
@Override
public CrossChainTradeData populateTradeData(Repository repository, ATData atData) throws DataException {
ATStateData atStateData = repository.getATRepository().getLatestATState(atData.getATAddress());
return populateTradeData(repository, atData.getCreatorPublicKey(), atData.getCreation(), atStateData);
return populateTradeData(repository, atData.getCreatorPublicKey(), atData.getCreation(), atStateData, OptionalLong.empty());
}
@Override
public List<CrossChainTradeData> populateTradeDataList(Repository repository, List<ATData> atDataList) throws DataException {
List<CrossChainTradeData> crossChainTradeDataList = CrossChainUtils.populateTradeDataList(repository, this, atDataList);
return crossChainTradeDataList;
}
/**
@ -568,13 +577,14 @@ public class LitecoinACCTv1 implements ACCT {
@Override
public CrossChainTradeData populateTradeData(Repository repository, ATStateData atStateData) throws DataException {
ATData atData = repository.getATRepository().fromATAddress(atStateData.getATAddress());
return populateTradeData(repository, atData.getCreatorPublicKey(), atData.getCreation(), atStateData);
return populateTradeData(repository, atData.getCreatorPublicKey(), atData.getCreation(), atStateData, OptionalLong.empty());
}
/**
* Returns CrossChainTradeData with useful info extracted from AT.
*/
public CrossChainTradeData populateTradeData(Repository repository, byte[] creatorPublicKey, long creationTimestamp, ATStateData atStateData) throws DataException {
@Override
public CrossChainTradeData populateTradeData(Repository repository, byte[] creatorPublicKey, long creationTimestamp, ATStateData atStateData, OptionalLong optionalBalance) throws DataException {
byte[] addressBytes = new byte[25]; // for general use
String atAddress = atStateData.getATAddress();
@ -587,8 +597,13 @@ public class LitecoinACCTv1 implements ACCT {
tradeData.qortalCreator = Crypto.toAddress(creatorPublicKey);
tradeData.creationTimestamp = creationTimestamp;
Account atAccount = new Account(repository, atAddress);
tradeData.qortBalance = atAccount.getConfirmedBalance(Asset.QORT);
if(optionalBalance.isPresent()) {
tradeData.qortBalance = optionalBalance.getAsLong();
}
else {
Account atAccount = new Account(repository, atAddress);
tradeData.qortBalance = atAccount.getConfirmedBalance(Asset.QORT);
}
byte[] stateData = atStateData.getStateData();
ByteBuffer dataByteBuffer = ByteBuffer.wrap(stateData);

View File

@ -4,6 +4,7 @@ import com.google.common.hash.HashCode;
import com.google.common.primitives.Bytes;
import org.ciyam.at.*;
import org.qortal.account.Account;
import org.qortal.api.resource.CrossChainUtils;
import org.qortal.asset.Asset;
import org.qortal.at.QortalFunctionCode;
import org.qortal.crypto.Crypto;
@ -19,6 +20,7 @@ import org.qortal.utils.BitTwiddling;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.OptionalLong;
import static org.ciyam.at.OpCode.calcOffset;
@ -562,7 +564,14 @@ public class LitecoinACCTv3 implements ACCT {
@Override
public CrossChainTradeData populateTradeData(Repository repository, ATData atData) throws DataException {
ATStateData atStateData = repository.getATRepository().getLatestATState(atData.getATAddress());
return populateTradeData(repository, atData.getCreatorPublicKey(), atData.getCreation(), atStateData);
return populateTradeData(repository, atData.getCreatorPublicKey(), atData.getCreation(), atStateData, OptionalLong.empty());
}
@Override
public List<CrossChainTradeData> populateTradeDataList(Repository repository, List<ATData> atDataList) throws DataException {
List<CrossChainTradeData> crossChainTradeDataList = CrossChainUtils.populateTradeDataList(repository, this, atDataList);
return crossChainTradeDataList;
}
/**
@ -571,13 +580,14 @@ public class LitecoinACCTv3 implements ACCT {
@Override
public CrossChainTradeData populateTradeData(Repository repository, ATStateData atStateData) throws DataException {
ATData atData = repository.getATRepository().fromATAddress(atStateData.getATAddress());
return populateTradeData(repository, atData.getCreatorPublicKey(), atData.getCreation(), atStateData);
return populateTradeData(repository, atData.getCreatorPublicKey(), atData.getCreation(), atStateData, OptionalLong.empty());
}
/**
* Returns CrossChainTradeData with useful info extracted from AT.
*/
public CrossChainTradeData populateTradeData(Repository repository, byte[] creatorPublicKey, long creationTimestamp, ATStateData atStateData) throws DataException {
@Override
public CrossChainTradeData populateTradeData(Repository repository, byte[] creatorPublicKey, long creationTimestamp, ATStateData atStateData, OptionalLong optionalBalance) throws DataException {
byte[] addressBytes = new byte[25]; // for general use
String atAddress = atStateData.getATAddress();
@ -590,8 +600,13 @@ public class LitecoinACCTv3 implements ACCT {
tradeData.qortalCreator = Crypto.toAddress(creatorPublicKey);
tradeData.creationTimestamp = creationTimestamp;
Account atAccount = new Account(repository, atAddress);
tradeData.qortBalance = atAccount.getConfirmedBalance(Asset.QORT);
if(optionalBalance.isPresent()) {
tradeData.qortBalance = optionalBalance.getAsLong();
}
else {
Account atAccount = new Account(repository, atAddress);
tradeData.qortBalance = atAccount.getConfirmedBalance(Asset.QORT);
}
byte[] stateData = atStateData.getStateData();
ByteBuffer dataByteBuffer = ByteBuffer.wrap(stateData);

View File

@ -52,12 +52,7 @@ public class PirateChain extends Bitcoiny {
public Collection<Server> getServers() {
return Arrays.asList(
// Servers chosen on NO BASIS WHATSOEVER from various sources!
new Server("lightd.pirate.black", Server.ConnectionType.SSL, 443),
new Server("wallet-arrr1.qortal.online", Server.ConnectionType.SSL, 443),
new Server("wallet-arrr2.qortal.online", Server.ConnectionType.SSL, 443),
new Server("wallet-arrr3.qortal.online", Server.ConnectionType.SSL, 443),
new Server("wallet-arrr4.qortal.online", Server.ConnectionType.SSL, 443),
new Server("wallet-arrr5.qortal.online", Server.ConnectionType.SSL, 443)
new Server("lightd.pirate.black", Server.ConnectionType.SSL, 443)
);
}

View File

@ -4,6 +4,7 @@ import com.google.common.hash.HashCode;
import com.google.common.primitives.Bytes;
import org.ciyam.at.*;
import org.qortal.account.Account;
import org.qortal.api.resource.CrossChainUtils;
import org.qortal.asset.Asset;
import org.qortal.at.QortalFunctionCode;
import org.qortal.crypto.Crypto;
@ -19,6 +20,7 @@ import org.qortal.utils.BitTwiddling;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.OptionalLong;
import static org.ciyam.at.OpCode.calcOffset;
@ -580,7 +582,14 @@ public class PirateChainACCTv3 implements ACCT {
@Override
public CrossChainTradeData populateTradeData(Repository repository, ATData atData) throws DataException {
ATStateData atStateData = repository.getATRepository().getLatestATState(atData.getATAddress());
return populateTradeData(repository, atData.getCreatorPublicKey(), atData.getCreation(), atStateData);
return populateTradeData(repository, atData.getCreatorPublicKey(), atData.getCreation(), atStateData, OptionalLong.empty());
}
@Override
public List<CrossChainTradeData> populateTradeDataList(Repository repository, List<ATData> atDataList) throws DataException {
List<CrossChainTradeData> crossChainTradeDataList = CrossChainUtils.populateTradeDataList(repository, this, atDataList);
return crossChainTradeDataList;
}
/**
@ -589,13 +598,14 @@ public class PirateChainACCTv3 implements ACCT {
@Override
public CrossChainTradeData populateTradeData(Repository repository, ATStateData atStateData) throws DataException {
ATData atData = repository.getATRepository().fromATAddress(atStateData.getATAddress());
return populateTradeData(repository, atData.getCreatorPublicKey(), atData.getCreation(), atStateData);
return populateTradeData(repository, atData.getCreatorPublicKey(), atData.getCreation(), atStateData, OptionalLong.empty());
}
/**
* Returns CrossChainTradeData with useful info extracted from AT.
*/
public CrossChainTradeData populateTradeData(Repository repository, byte[] creatorPublicKey, long creationTimestamp, ATStateData atStateData) throws DataException {
@Override
public CrossChainTradeData populateTradeData(Repository repository, byte[] creatorPublicKey, long creationTimestamp, ATStateData atStateData, OptionalLong optionalBalance) throws DataException {
byte[] addressBytes = new byte[25]; // for general use
String atAddress = atStateData.getATAddress();
@ -608,8 +618,13 @@ public class PirateChainACCTv3 implements ACCT {
tradeData.qortalCreator = Crypto.toAddress(creatorPublicKey);
tradeData.creationTimestamp = creationTimestamp;
Account atAccount = new Account(repository, atAddress);
tradeData.qortBalance = atAccount.getConfirmedBalance(Asset.QORT);
if(optionalBalance.isPresent()) {
tradeData.qortBalance = optionalBalance.getAsLong();
}
else {
Account atAccount = new Account(repository, atAddress);
tradeData.qortBalance = atAccount.getConfirmedBalance(Asset.QORT);
}
byte[] stateData = atStateData.getStateData();
ByteBuffer dataByteBuffer = ByteBuffer.wrap(stateData);

View File

@ -8,6 +8,7 @@ import org.bouncycastle.util.encoders.DecoderException;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import org.qortal.api.resource.CrossChainUtils;
import org.qortal.controller.PirateChainWalletController;
import org.qortal.crypto.Crypto;
import org.qortal.settings.Settings;
@ -67,8 +68,8 @@ public class PirateWallet {
}
// Pick a random server
PirateLightClient.Server server = this.getRandomServer();
String serverUri = String.format("https://%s:%d/", server.hostname, server.port);
ChainableServer server = PirateChain.getInstance().blockchainProvider.getCurrentServer();
String serverUri = String.format("https://%s:%d/", server.getHostName(), server.getPort());
// Pirate library uses base64 encoding
String entropy64 = Base64.toBase64String(this.entropyBytes);

View File

@ -6,6 +6,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.ciyam.at.*;
import org.qortal.account.Account;
import org.qortal.api.resource.CrossChainUtils;
import org.qortal.asset.Asset;
import org.qortal.at.QortalFunctionCode;
import org.qortal.crypto.Crypto;
@ -21,6 +22,7 @@ import org.qortal.utils.BitTwiddling;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.OptionalLong;
import static org.ciyam.at.OpCode.calcOffset;
@ -569,7 +571,14 @@ public class RavencoinACCTv3 implements ACCT {
@Override
public CrossChainTradeData populateTradeData(Repository repository, ATData atData) throws DataException {
ATStateData atStateData = repository.getATRepository().getLatestATState(atData.getATAddress());
return populateTradeData(repository, atData.getCreatorPublicKey(), atData.getCreation(), atStateData);
return populateTradeData(repository, atData.getCreatorPublicKey(), atData.getCreation(), atStateData, OptionalLong.empty());
}
@Override
public List<CrossChainTradeData> populateTradeDataList(Repository repository, List<ATData> atDataList) throws DataException {
List<CrossChainTradeData> crossChainTradeDataList = CrossChainUtils.populateTradeDataList(repository, this, atDataList);
return crossChainTradeDataList;
}
/**
@ -578,13 +587,14 @@ public class RavencoinACCTv3 implements ACCT {
@Override
public CrossChainTradeData populateTradeData(Repository repository, ATStateData atStateData) throws DataException {
ATData atData = repository.getATRepository().fromATAddress(atStateData.getATAddress());
return populateTradeData(repository, atData.getCreatorPublicKey(), atData.getCreation(), atStateData);
return populateTradeData(repository, atData.getCreatorPublicKey(), atData.getCreation(), atStateData, OptionalLong.empty());
}
/**
* Returns CrossChainTradeData with useful info extracted from AT.
*/
public CrossChainTradeData populateTradeData(Repository repository, byte[] creatorPublicKey, long creationTimestamp, ATStateData atStateData) throws DataException {
@Override
public CrossChainTradeData populateTradeData(Repository repository, byte[] creatorPublicKey, long creationTimestamp, ATStateData atStateData, OptionalLong optionalBalance) throws DataException {
byte[] addressBytes = new byte[25]; // for general use
String atAddress = atStateData.getATAddress();
@ -597,8 +607,13 @@ public class RavencoinACCTv3 implements ACCT {
tradeData.qortalCreator = Crypto.toAddress(creatorPublicKey);
tradeData.creationTimestamp = creationTimestamp;
Account atAccount = new Account(repository, atAddress);
tradeData.qortBalance = atAccount.getConfirmedBalance(Asset.QORT);
if(optionalBalance.isPresent()) {
tradeData.qortBalance = optionalBalance.getAsLong();
}
else {
Account atAccount = new Account(repository, atAddress);
tradeData.qortBalance = atAccount.getConfirmedBalance(Asset.QORT);
}
byte[] stateData = atStateData.getStateData();
ByteBuffer dataByteBuffer = ByteBuffer.wrap(stateData);

View File

@ -982,7 +982,7 @@ public class Network {
if (maxThreadsForMessageType != null) {
Integer threadCount = threadsPerMessageType.get(message.getType());
if (threadCount != null && threadCount >= maxThreadsForMessageType) {
LOGGER.trace("Discarding {} message as there are already {} active threads", message.getType().name(), threadCount);
LOGGER.warn("Discarding {} message as there are already {} active threads", message.getType().name(), threadCount);
return;
}
}

View File

@ -14,6 +14,8 @@ public interface ATRepository {
/** Returns ATData using AT's address or null if none found */
public ATData fromATAddress(String atAddress) throws DataException;
public List<ATData> fromATAddresses(List<String> atAddresses) throws DataException;
/** Returns where AT with passed address exists in repository */
public boolean exists(String atAddress) throws DataException;
@ -62,6 +64,8 @@ public interface ATRepository {
*/
public ATStateData getLatestATState(String atAddress) throws DataException;
public List<ATStateData> getLatestATStates(List<String> collect) throws DataException;
/**
* Returns final ATStateData for ATs matching codeHash (required)
* and specific data segment value (optional).

View File

@ -130,6 +130,8 @@ public interface AccountRepository {
*/
public AccountBalanceData getBalance(String address, long assetId) throws DataException;
public List<AccountBalanceData> getBalances(List<String> addresses, long assetId) throws DataException;
/** Returns all account balances for given assetID, optionally excluding zero balances. */
public List<AccountBalanceData> getAssetBalances(long assetId, Boolean excludeZero) throws DataException;

View File

@ -18,6 +18,8 @@ public interface TransactionRepository {
public TransactionData fromSignature(byte[] signature) throws DataException;
public List<TransactionData> fromSignatures(List<byte[]> signatures) throws DataException;
public TransactionData fromReference(byte[] reference) throws DataException;
public TransactionData fromHeightAndSequence(int height, int sequence) throws DataException;
@ -351,4 +353,5 @@ public interface TransactionRepository {
public void delete(TransactionData transactionData) throws DataException;
}

View File

@ -15,8 +15,12 @@ import org.qortal.utils.ByteArray;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.qortal.data.account.AccountData;
@ -76,6 +80,63 @@ public class HSQLDBATRepository implements ATRepository {
}
}
@Override
public List<ATData> fromATAddresses(List<String> atAddresses) throws DataException {
String sql = "SELECT creator, created_when, version, asset_id, code_bytes, code_hash, "
+ "is_sleeping, sleep_until_height, is_finished, had_fatal_error, "
+ "is_frozen, frozen_balance, sleep_until_message_timestamp, AT_address "
+ "FROM ATs "
+ "WHERE AT_address IN ("
+ String.join(", ", Collections.nCopies(atAddresses.size(), "?"))
+ ")"
;
List<ATData> list;
try (ResultSet resultSet = this.repository.checkedExecute(sql, atAddresses.toArray(new String[atAddresses.size()]))) {
if (resultSet == null) {
return new ArrayList<>(0);
}
list = new ArrayList<>(atAddresses.size());
do {
byte[] creatorPublicKey = resultSet.getBytes(1);
long created = resultSet.getLong(2);
int version = resultSet.getInt(3);
long assetId = resultSet.getLong(4);
byte[] codeBytes = resultSet.getBytes(5); // Actually BLOB
byte[] codeHash = resultSet.getBytes(6);
boolean isSleeping = resultSet.getBoolean(7);
Integer sleepUntilHeight = resultSet.getInt(8);
if (sleepUntilHeight == 0 && resultSet.wasNull())
sleepUntilHeight = null;
boolean isFinished = resultSet.getBoolean(9);
boolean hadFatalError = resultSet.getBoolean(10);
boolean isFrozen = resultSet.getBoolean(11);
Long frozenBalance = resultSet.getLong(12);
if (frozenBalance == 0 && resultSet.wasNull())
frozenBalance = null;
Long sleepUntilMessageTimestamp = resultSet.getLong(13);
if (sleepUntilMessageTimestamp == 0 && resultSet.wasNull())
sleepUntilMessageTimestamp = null;
String atAddress = resultSet.getString(14);
list.add(new ATData(atAddress, creatorPublicKey, created, version, assetId, codeBytes, codeHash,
isSleeping, sleepUntilHeight, isFinished, hadFatalError, isFrozen, frozenBalance,
sleepUntilMessageTimestamp));
} while ( resultSet.next());
return list;
} catch (SQLException e) {
throw new DataException("Unable to fetch AT from repository", e);
}
}
@Override
public boolean exists(String atAddress) throws DataException {
try {
@ -403,6 +464,56 @@ public class HSQLDBATRepository implements ATRepository {
}
}
@Override
public List<ATStateData> getLatestATStates(List<String> atAddresses) throws DataException{
String sql = "SELECT height, state_data, state_hash, fees, is_initial, sleep_until_message_timestamp, AT_address "
+ "FROM ATStates "
+ "JOIN ATStatesData USING (AT_address, height) "
+ "WHERE ATStates.AT_address IN ("
+ String.join(", ", Collections.nCopies(atAddresses.size(), "?"))
+ ")";
List<ATStateData> stateDataList;
try (ResultSet resultSet = this.repository.checkedExecute(sql, atAddresses.toArray(new String[atAddresses.size()]))) {
if (resultSet == null)
return new ArrayList<>(0);
stateDataList = new ArrayList<>();
do {
int height = resultSet.getInt(1);
byte[] stateData = resultSet.getBytes(2); // Actually BLOB
byte[] stateHash = resultSet.getBytes(3);
long fees = resultSet.getLong(4);
boolean isInitial = resultSet.getBoolean(5);
Long sleepUntilMessageTimestamp = resultSet.getLong(6);
if (sleepUntilMessageTimestamp == 0 && resultSet.wasNull())
sleepUntilMessageTimestamp = null;
String atAddress = resultSet.getString(7);
stateDataList.add(new ATStateData(atAddress, height, stateData, stateHash, fees, isInitial, sleepUntilMessageTimestamp));
} while( resultSet.next());
} catch (SQLException e) {
throw new DataException("Unable to fetch latest AT state from repository", e);
}
Map<String, List<ATStateData>> stateDataByAtAddress
= stateDataList.stream()
.collect(Collectors.groupingBy(ATStateData::getATAddress));
List<ATStateData> latestForEachAtAddress
= stateDataByAtAddress.values().stream()
.map(list -> list.stream()
.max(Comparator.comparing(ATStateData::getHeight))
.orElse(null))
.filter(obj -> obj != null)
.collect(Collectors.toList());
return latestForEachAtAddress;
}
@Override
public List<ATStateData> getMatchingFinalATStates(byte[] codeHash, byte[] buyerPublicKey, byte[] sellerPublicKey, Boolean isFinished,
Integer dataByteOffset, Long expectedValue, Integer minimumFinalHeight,

View File

@ -407,6 +407,39 @@ public class HSQLDBAccountRepository implements AccountRepository {
}
}
@Override
public List<AccountBalanceData> getBalances(List<String> addresses, long assetId) throws DataException {
StringBuffer sql = new StringBuffer();
sql.append("SELECT balance, account, asset_id FROM AccountBalances ");
sql.append("WHERE account IN (");
sql.append(String.join(", ", Collections.nCopies(addresses.size(), "?")));
sql.append(")");
try (ResultSet resultSet = this.repository.checkedExecute(sql.toString(), addresses.toArray(new String[addresses.size()]))) {
if (resultSet == null)
return new ArrayList<>(0);
List<AccountBalanceData> balances = new ArrayList<>(addresses.size());
do {
long balance = resultSet.getLong(1);
String address = resultSet.getString(2);
Long assetIdResult = resultSet.getLong(3);
if( assetIdResult != assetId ) {
LOGGER.warn("assetIdResult = " + assetIdResult);
continue;
}
balances.add(new AccountBalanceData(address, assetId, balance) );
} while( resultSet.next());
return balances;
} catch (SQLException e) {
throw new DataException("Unable to fetch account balance from repository", e);
}
}
@Override
public List<AccountBalanceData> getAssetBalances(long assetId, Boolean excludeZero) throws DataException {
StringBuilder sql = new StringBuilder(1024);

View File

@ -40,13 +40,25 @@ public class HSQLDBChatRepository implements ChatRepository {
StringBuilder sql = new StringBuilder(1024);
String tableName;
// if the PrimaryTable is available, then use it
if( this.repository.getBlockRepository().getBlockchainHeight() > BlockChain.getInstance().getMultipleNamesPerAccountHeight()) {
LOGGER.debug("using PrimaryNames for chat transactions");
tableName = "PrimaryNames";
}
else {
LOGGER.debug("using Names for chat transactions");
tableName = "Names";
}
sql.append("SELECT created_when, tx_group_id, Transactions.reference, creator, "
+ "sender, SenderNames.name, recipient, RecipientNames.name, "
+ "chat_reference, data, is_text, is_encrypted, signature "
+ "FROM ChatTransactions "
+ "JOIN Transactions USING (signature) "
+ "LEFT OUTER JOIN Names AS SenderNames ON SenderNames.owner = sender "
+ "LEFT OUTER JOIN Names AS RecipientNames ON RecipientNames.owner = recipient ");
+ "LEFT OUTER JOIN " + tableName + " AS SenderNames ON SenderNames.owner = sender "
+ "LEFT OUTER JOIN " + tableName + " AS RecipientNames ON RecipientNames.owner = recipient ");
// WHERE clauses
@ -152,11 +164,11 @@ public class HSQLDBChatRepository implements ChatRepository {
// if the PrimaryTable is available, then use it
if( this.repository.getBlockRepository().getBlockchainHeight() > BlockChain.getInstance().getMultipleNamesPerAccountHeight()) {
LOGGER.info("using PrimaryNames for chat transactions");
LOGGER.debug("using PrimaryNames for chat transactions");
tableName = "PrimaryNames";
}
else {
LOGGER.info("using Names for chat transactions");
LOGGER.debug("using Names for chat transactions");
tableName = "Names";
}
@ -202,6 +214,18 @@ public class HSQLDBChatRepository implements ChatRepository {
}
private List<GroupChat> getActiveGroupChats(String address, Encoding encoding, Boolean hasChatReference) throws DataException {
String tableName;
// if the PrimaryTable is available, then use it
if( this.repository.getBlockRepository().getBlockchainHeight() > BlockChain.getInstance().getMultipleNamesPerAccountHeight()) {
LOGGER.debug("using PrimaryNames for chat transactions");
tableName = "PrimaryNames";
}
else {
LOGGER.debug("using Names for chat transactions");
tableName = "Names";
}
// Find groups where address is a member and potential latest message details
String groupsSql = "SELECT group_id, group_name, latest_timestamp, sender, sender_name, signature, data "
+ "FROM GroupMembers "
@ -210,7 +234,7 @@ public class HSQLDBChatRepository implements ChatRepository {
+ "SELECT created_when AS latest_timestamp, sender, name AS sender_name, signature, data "
+ "FROM ChatTransactions "
+ "JOIN Transactions USING (signature) "
+ "LEFT OUTER JOIN Names AS SenderNames ON SenderNames.owner = sender "
+ "LEFT OUTER JOIN " + tableName + " AS SenderNames ON SenderNames.owner = sender "
// NOTE: We need to qualify "Groups.group_id" here to avoid "General error" bug in HSQLDB v2.5.0
+ "WHERE tx_group_id = Groups.group_id AND type = " + TransactionType.CHAT.value + " ";
@ -254,7 +278,7 @@ public class HSQLDBChatRepository implements ChatRepository {
String grouplessSql = "SELECT created_when, sender, SenderNames.name, signature, data "
+ "FROM ChatTransactions "
+ "JOIN Transactions USING (signature) "
+ "LEFT OUTER JOIN Names AS SenderNames ON SenderNames.owner = sender "
+ "LEFT OUTER JOIN " + tableName + " AS SenderNames ON SenderNames.owner = sender "
+ "WHERE tx_group_id = 0 "
+ "AND recipient IS NULL ";
@ -294,6 +318,18 @@ public class HSQLDBChatRepository implements ChatRepository {
}
private List<DirectChat> getActiveDirectChats(String address, Boolean hasChatReference) throws DataException {
String tableName;
// if the PrimaryTable is available, then use it
if( this.repository.getBlockRepository().getBlockchainHeight() > BlockChain.getInstance().getMultipleNamesPerAccountHeight()) {
LOGGER.debug("using PrimaryNames for chat transactions");
tableName = "PrimaryNames";
}
else {
LOGGER.debug("using Names for chat transactions");
tableName = "Names";
}
// Find chat messages involving address
String directSql = "SELECT other_address, name, latest_timestamp, sender, sender_name "
+ "FROM ("
@ -307,7 +343,7 @@ public class HSQLDBChatRepository implements ChatRepository {
+ "SELECT created_when AS latest_timestamp, sender, name AS sender_name "
+ "FROM ChatTransactions "
+ "NATURAL JOIN Transactions "
+ "LEFT OUTER JOIN Names AS SenderNames ON SenderNames.owner = sender "
+ "LEFT OUTER JOIN " + tableName + " AS SenderNames ON SenderNames.owner = sender "
+ "WHERE (sender = other_address AND recipient = ?) "
+ "OR (sender = ? AND recipient = other_address) ";
@ -323,7 +359,7 @@ public class HSQLDBChatRepository implements ChatRepository {
directSql += "ORDER BY created_when DESC "
+ "LIMIT 1"
+ ") AS LatestMessages "
+ "LEFT OUTER JOIN Names ON owner = other_address";
+ "LEFT OUTER JOIN " + tableName + " ON owner = other_address";
Object[] bindParams = new Object[] { address, address, address, address };

View File

@ -155,6 +155,58 @@ public class HSQLDBTransactionRepository implements TransactionRepository {
}
}
@Override
public List<TransactionData> fromSignatures(List<byte[]> signatures) throws DataException {
StringBuffer sql = new StringBuffer();
sql.append("SELECT type, reference, creator, created_when, fee, tx_group_id, block_height, approval_status, approval_height, signature ");
sql.append("FROM Transactions WHERE signature IN (");
sql.append(String.join(", ", Collections.nCopies(signatures.size(), "?")));
sql.append(")");
List<TransactionData> list;
try (ResultSet resultSet = this.repository.checkedExecute(sql.toString(), signatures.toArray(new byte[0][]))) {
if (resultSet == null) {
return new ArrayList<>(0);
}
list = new ArrayList<>(signatures.size());
do {
TransactionType type = TransactionType.valueOf(resultSet.getInt(1));
byte[] reference = resultSet.getBytes(2);
byte[] creatorPublicKey = resultSet.getBytes(3);
long timestamp = resultSet.getLong(4);
Long fee = resultSet.getLong(5);
if (fee == 0 && resultSet.wasNull())
fee = null;
int txGroupId = resultSet.getInt(6);
Integer blockHeight = resultSet.getInt(7);
if (blockHeight == 0 && resultSet.wasNull())
blockHeight = null;
ApprovalStatus approvalStatus = ApprovalStatus.valueOf(resultSet.getInt(8));
Integer approvalHeight = resultSet.getInt(9);
if (approvalHeight == 0 && resultSet.wasNull())
approvalHeight = null;
byte[] signature = resultSet.getBytes(10);
BaseTransactionData baseTransactionData = new BaseTransactionData(timestamp, txGroupId, reference, creatorPublicKey, fee, approvalStatus, blockHeight, approvalHeight, signature);
list.add( fromBase(type, baseTransactionData) );
} while( resultSet.next());
return list;
} catch (SQLException e) {
throw new DataException("Unable to fetch transactions from repository", e);
}
}
@Override
public TransactionData fromReference(byte[] reference) throws DataException {
String sql = "SELECT type, signature, creator, created_when, fee, tx_group_id, block_height, approval_status, approval_height "

View File

@ -759,14 +759,14 @@ public class Settings {
maxThreadsPerMessageType.add(new ThreadLimit("GET_ARBITRARY_DATA_FILE", 5));
maxThreadsPerMessageType.add(new ThreadLimit("ARBITRARY_DATA", 5));
maxThreadsPerMessageType.add(new ThreadLimit("GET_ARBITRARY_DATA", 5));
maxThreadsPerMessageType.add(new ThreadLimit("ARBITRARY_DATA_FILE_LIST", 5));
maxThreadsPerMessageType.add(new ThreadLimit("GET_ARBITRARY_DATA_FILE_LIST", 5));
maxThreadsPerMessageType.add(new ThreadLimit("ARBITRARY_DATA_FILE_LIST", 50));
maxThreadsPerMessageType.add(new ThreadLimit("GET_ARBITRARY_DATA_FILE_LIST", 50));
maxThreadsPerMessageType.add(new ThreadLimit("ARBITRARY_SIGNATURES", 5));
maxThreadsPerMessageType.add(new ThreadLimit("ARBITRARY_METADATA", 5));
maxThreadsPerMessageType.add(new ThreadLimit("GET_ARBITRARY_METADATA", 5));
maxThreadsPerMessageType.add(new ThreadLimit("GET_TRANSACTION", 10));
maxThreadsPerMessageType.add(new ThreadLimit("TRANSACTION_SIGNATURES", 5));
maxThreadsPerMessageType.add(new ThreadLimit("TRADE_PRESENCES", 5));
maxThreadsPerMessageType.add(new ThreadLimit("GET_ARBITRARY_METADATA", 50));
maxThreadsPerMessageType.add(new ThreadLimit("GET_TRANSACTION", 50));
maxThreadsPerMessageType.add(new ThreadLimit("TRANSACTION_SIGNATURES", 50));
maxThreadsPerMessageType.add(new ThreadLimit("TRADE_PRESENCES", 50));
}
// Getters / setters

View File

@ -131,16 +131,12 @@ public class ArbitraryIndexUtils {
)
);
LOGGER.info("processed indices by term: count = " + indicesByTerm.size());
// lock, clear old, load new
synchronized( IndexCache.getInstance().getIndicesByTerm() ) {
IndexCache.getInstance().getIndicesByTerm().clear();
IndexCache.getInstance().getIndicesByTerm().putAll(indicesByTerm);
}
LOGGER.info("loaded indices by term");
LOGGER.debug("processing indices by issuer ...");
Map<String, List<ArbitraryDataIndexDetail>> indicesByIssuer
= indexDetails.stream().collect(
@ -154,15 +150,11 @@ public class ArbitraryIndexUtils {
)
);
LOGGER.info("processed indices by issuer: count = " + indicesByIssuer.size());
// lock, clear old, load new
synchronized( IndexCache.getInstance().getIndicesByIssuer() ) {
IndexCache.getInstance().getIndicesByIssuer().clear();
IndexCache.getInstance().getIndicesByIssuer().putAll(indicesByIssuer);
}
LOGGER.info("loaded indices by issuer");
}
}
@ -221,7 +213,6 @@ public class ArbitraryIndexUtils {
}
}
}
Thread.sleep(3000L);
}
java.nio.file.Path outputPath = arbitraryDataReader.getFilePath();

View File

@ -48,6 +48,24 @@ public class ArbitraryTransactionUtils {
}
}
public static List<ArbitraryTransactionData> fetchTransactionDataList(final Repository repository, final List<byte[]> signature) {
try {
List<TransactionData> transactions = repository.getTransactionRepository().fromSignatures(signature);
List<ArbitraryTransactionData> list
= transactions.stream()
.filter( transaction -> transaction instanceof ArbitraryTransactionData )
.map( transactionData -> (ArbitraryTransactionData) transactionData)
.collect(Collectors.toList());
return list;
} catch (DataException e) {
LOGGER.error("Repository issue when fetching arbitrary transaction data", e);
return null;
}
}
public static ArbitraryTransactionData fetchLatestPut(Repository repository, ArbitraryTransactionData arbitraryTransactionData) {
if (arbitraryTransactionData == null) {
return null;

View File

@ -120,7 +120,8 @@
"nullGroupMembershipHeight": 2012800,
"ignoreLevelForRewardShareHeight": 2012800,
"adminQueryFixHeight": 2012800,
"multipleNamesPerAccountHeight": 9999999
"multipleNamesPerAccountHeight": 9999999,
"mintedBlocksAdjustmentRemovalHeight": 9999999
},
"checkpoints": [
{ "height": 1136300, "signature": "3BbwawEF2uN8Ni5ofpJXkukoU8ctAPxYoFB7whq9pKfBnjfZcpfEJT4R95NvBDoTP8WDyWvsUvbfHbcr9qSZuYpSKZjUQTvdFf6eqznHGEwhZApWfvXu6zjGCxYCp65F4jsVYYJjkzbjmkCg5WAwN5voudngA23kMK6PpTNygapCzXt" }

View File

@ -115,7 +115,8 @@
"ignoreLevelForRewardShareHeight": 9999999999999,
"nullGroupMembershipHeight": 20,
"adminQueryFixHeight": 9999999999999,
"multipleNamesPerAccountHeight": 10
"multipleNamesPerAccountHeight": 10,
"mintedBlocksAdjustmentRemovalHeight": 9999999999999
},
"genesisInfo": {
"version": 4,