3
0
mirror of https://github.com/Qortal/qortal.git synced 2025-02-12 10:15:49 +00:00

NTP and performance changes + fixes.

New NTP class now runs as a simplistic NTP client, repeatedly polling
several NTP servers and maintaining a more accurate time independent
of operating system.

Several occurrences of System.currentTimeMillis() replaced with NTP.getTime()
particularly where block/transaction/networking is involved.

GET /admin/info now includes "currentTimestamp" as reported from NTP.

Added support for block timestamps determined by generator, instead of
supplied by clock. (BlockChain.newBlockTimestampHeight - not yet activated).
Incorrect timestamps will produce a TIMESTAMP_INCORRECT Block.ValidationResult.

Block.calcMinimumTimestamp repurposed as Block.calcTimestamp for above.

Block timestamps are now allowed to be max 2000ms in the future,
was previously max 500ms.

Block generation prohibited until initial NTP sync.

Instead of deleting INVALID unconfirmed transactions in BlockGenerator,
Controller now deletes EXPIRED unconfirmed transactions every so often.
This also fixes persistent expired unconfirmed transactions on nodes
that do not generate blocks, as BlockGenerator.deleteInvalidTransactions()
was never reached.

Abbreviated block sigs added to log entries declaring a new block is generated
in BlockGenerator.

Controller checks for NTP sync much faster during start-up and SysTray's
tooltip text starts as "Synchronizing clock" until NTP sync occurs.
After NTP sync, Controller logs NTP offset every so often (currently every 5 mins).

When considering synchronizing, Controller skips peers that have the same block sig
as last time when synchronization resulted in no action, e.g. INFERIOR_CHAIN,
NOTHING_TO_DO and also OK. OK is included as another sync attempt would result in
NOTHING_TO_DO.
Previously this skipping check only happened after prior INFERIOR_CHAIN.

During inbound peer handshaking, if we receive a peer ID that matches an existing inbound
peer then send peer ID of all zeros, then close connection.
Remote end should detect this and cleanly close connection instead of waiting for handshake timeout.
Randomly generated peer IDs have lowest bit set to avoid all zeros.
Might need further work.

Networking doesn't connect, or accept, until NTP has synced.

Transaction validation can fail with CLOCK_NOT_SYNCED if NTP not synced.
This commit is contained in:
catbref 2019-07-31 16:08:22 +01:00
parent 05e491f65b
commit 63b262a76e
15 changed files with 605 additions and 240 deletions

View File

@ -6,6 +6,7 @@ import javax.xml.bind.annotation.XmlAccessorType;
@XmlAccessorType(XmlAccessType.FIELD)
public class NodeInfo {
public Long currentTimestamp;
public long uptime;
public String buildVersion;
public long buildTimestamp;

View File

@ -58,6 +58,7 @@ import org.qora.network.Network;
import org.qora.network.Peer;
import org.qora.network.PeerAddress;
import org.qora.utils.Base58;
import org.qora.utils.NTP;
import com.google.common.collect.Lists;
@ -112,6 +113,7 @@ public class AdminResource {
public NodeInfo info() {
NodeInfo nodeInfo = new NodeInfo();
nodeInfo.currentTimestamp = NTP.getTime();
nodeInfo.uptime = System.currentTimeMillis() - Controller.startTime;
nodeInfo.buildVersion = Controller.getInstance().getVersionString();
nodeInfo.buildTimestamp = Controller.getInstance().getBuildTimestamp();

View File

@ -40,6 +40,7 @@ import org.qora.transform.TransformationException;
import org.qora.transform.block.BlockTransformer;
import org.qora.transform.transaction.TransactionTransformer;
import org.qora.utils.Base58;
import org.qora.utils.NTP;
import com.google.common.primitives.Bytes;
@ -78,6 +79,7 @@ public class Block {
TIMESTAMP_IN_FUTURE(21),
TIMESTAMP_MS_INCORRECT(22),
TIMESTAMP_TOO_SOON(23),
TIMESTAMP_INCORRECT(24),
VERSION_INCORRECT(30),
FEATURE_NOT_YET_RELEASED(31),
GENERATING_BALANCE_INCORRECT(40),
@ -205,6 +207,10 @@ public class Block {
byte[] reference = parentBlockData.getSignature();
BigDecimal generatingBalance = parentBlock.calcNextBlockGeneratingBalance();
// After a certain height, block timestamps are generated using previous block and generator's public key
if (height >= BlockChain.getInstance().getNewBlockTimestampHeight())
timestamp = calcTimestamp(parentBlockData, generator.getPublicKey());
byte[] generatorSignature;
try {
generatorSignature = generator
@ -258,6 +264,7 @@ public class Block {
Block parentBlock = new Block(repository, parentBlockData);
newBlock.generator = generator;
BlockData parentBlockData = newBlock.getParent();
// Copy AT state data
newBlock.ourAtStates = this.ourAtStates;
@ -269,6 +276,10 @@ public class Block {
byte[] reference = this.blockData.getReference();
BigDecimal generatingBalance = this.blockData.getGeneratingBalance();
// After a certain height, block timestamps are generated using previous block and generator's public key
if (height >= BlockChain.getInstance().getNewBlockTimestampHeight())
timestamp = calcTimestamp(parentBlockData, generator.getPublicKey());
byte[] generatorSignature;
try {
generatorSignature = generator
@ -734,13 +745,15 @@ public class Block {
* <p>
* For qora-core, we'll using the minimum from BlockChain config.
*/
public static long calcMinimumTimestamp(BlockData parentBlockData, byte[] generatorPublicKey) {
public static long calcTimestamp(BlockData parentBlockData, byte[] generatorPublicKey) {
long minBlockTime = BlockChain.getInstance().getMinBlockTime(); // seconds
return parentBlockData.getTimestamp() + (minBlockTime * 1000L);
}
public long calcMinimumTimestamp(BlockData parentBlockData) {
return calcMinimumTimestamp(parentBlockData, this.generator.getPublicKey());
public static long calcMinimumTimestamp(BlockData parentBlockData) {
final int thisHeight = parentBlockData.getHeight() + 1;
BlockTimingByHeight blockTiming = BlockChain.getInstance().getBlockTimingByHeight(thisHeight);
return parentBlockData.getTimestamp() + blockTiming.target - blockTiming.deviation;
}
/**
@ -797,8 +810,9 @@ public class Block {
if (this.blockData.getTimestamp() <= parentBlockData.getTimestamp())
return ValidationResult.TIMESTAMP_OLDER_THAN_PARENT;
// Check timestamp is not in the future (within configurable ~500ms margin)
if (this.blockData.getTimestamp() - BlockChain.getInstance().getBlockTimestampMargin() > System.currentTimeMillis())
// Check timestamp is not in the future (within configurable margin)
// We don't need to check NTP.getTime() for null as we shouldn't reach here if that is already the case
if (this.blockData.getTimestamp() - BlockChain.getInstance().getBlockTimestampMargin() > NTP.getTime())
return ValidationResult.TIMESTAMP_IN_FUTURE;
// Legacy gen1 test: check timestamp milliseconds is the same as parent timestamp milliseconds?
@ -807,9 +821,15 @@ public class Block {
// Too early to forge block?
// XXX DISABLED as it doesn't work - but why?
// if (this.blockData.getTimestamp() < parentBlock.getBlockData().getTimestamp() + BlockChain.getInstance().getMinBlockTime())
// if (this.blockData.getTimestamp() < Block.calcMinimumTimestamp(parentBlockData))
// return ValidationResult.TIMESTAMP_TOO_SOON;
if (this.blockData.getHeight() >= BlockChain.getInstance().getNewBlockTimestampHeight()) {
long expectedTimestamp = calcTimestamp(parentBlockData, this.blockData.getGeneratorPublicKey());
if (this.blockData.getTimestamp() != expectedTimestamp)
return ValidationResult.TIMESTAMP_INCORRECT;
}
return ValidationResult.OK;
}

View File

@ -26,6 +26,7 @@ import org.qora.repository.RepositoryManager;
import org.qora.settings.Settings;
import org.qora.transaction.Transaction;
import org.qora.utils.Base58;
import org.qora.utils.NTP;
// Forging new blocks
@ -83,6 +84,14 @@ public class BlockGenerator extends Thread {
if (!Controller.getInstance().isGenerationAllowed())
continue;
final Long minLatestBlockTimestamp = Controller.getMinimumLatestBlockTimestamp();
if (minLatestBlockTimestamp == null)
continue;
final Long now = NTP.getTime();
if (now == null)
continue;
List<ForgingAccountData> forgingAccountsData = repository.getAccountRepository().getForgingAccounts();
// No forging accounts?
if (forgingAccountsData.isEmpty())
@ -98,8 +107,6 @@ public class BlockGenerator extends Thread {
if (peers.size() < Settings.getInstance().getMinBlockchainPeers())
continue;
final long minLatestBlockTimestamp = Controller.getMinimumLatestBlockTimestamp();
// Disregard peers that don't have a recent block
peers.removeIf(peer -> peer.getLastBlockTimestamp() == null || peer.getLastBlockTimestamp() < minLatestBlockTimestamp);
@ -172,7 +179,7 @@ public class BlockGenerator extends Thread {
Block newBlock = goodBlocks.get(winningIndex);
// Delete invalid transactions. NOTE: discards repository changes on entry, saves changes on exit.
deleteInvalidTransactions(repository);
// deleteInvalidTransactions(repository);
// Add unconfirmed transactions
addUnconfirmedTransactions(repository, newBlock);
@ -202,12 +209,16 @@ public class BlockGenerator extends Thread {
if (proxyForgerData != null) {
PublicKeyAccount forger = new PublicKeyAccount(repository, proxyForgerData.getForgerPublicKey());
LOGGER.info(String.format("Generated block %d by %s on behalf of %s",
LOGGER.info(String.format("Generated block %d, sig %.8s by %s on behalf of %s",
newBlock.getBlockData().getHeight(),
Base58.encode(newBlock.getBlockData().getSignature()),
forger.getAddress(),
proxyForgerData.getRecipient()));
} else {
LOGGER.info(String.format("Generated block %d by %s", newBlock.getBlockData().getHeight(), newBlock.getGenerator().getAddress()));
LOGGER.info(String.format("Generated block %d, sig %.8s by %s",
newBlock.getBlockData().getHeight(),
Base58.encode(newBlock.getBlockData().getSignature()),
newBlock.getGenerator().getAddress()));
}
repository.saveChanges();
@ -327,7 +338,7 @@ public class BlockGenerator extends Thread {
blockchainLock.lock();
try {
// Delete invalid transactions
deleteInvalidTransactions(repository);
// deleteInvalidTransactions(repository);
// Add unconfirmed transactions
addUnconfirmedTransactions(repository, newBlock);

View File

@ -1,6 +1,5 @@
package org.qora.controller;
import java.awt.TrayIcon.MessageType;
import java.io.IOException;
import java.io.InputStream;
import java.security.SecureRandom;
@ -16,7 +15,6 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Scanner;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;
@ -91,8 +89,9 @@ public class Controller extends Thread {
private static final String repositoryUrlTemplate = "jdbc:hsqldb:file:%s/blockchain;create=true;hsqldb.full_log_replay=true";
private static final long ARBITRARY_REQUEST_TIMEOUT = 5 * 1000; // ms
private static final long REPOSITORY_BACKUP_PERIOD = 123 * 60 * 1000; // ms
private static final long NTP_CHECK_PERIOD = 10 * 60 * 1000; // ms
private static final long MAX_NTP_OFFSET = 30 * 1000; // ms
private static final long NTP_PRE_SYNC_CHECK_PERIOD = 5 * 1000; // ms
private static final long NTP_POST_SYNC_CHECK_PERIOD = 5 * 60 * 1000; // ms
private static final long DELETE_EXPIRED_INTERVAL = 5 * 60 * 1000; // ms
private static volatile boolean isStopping = false;
private static BlockGenerator blockGenerator = null;
@ -103,15 +102,16 @@ public class Controller extends Thread {
private final String buildVersion;
private final long buildTimestamp; // seconds
private long repositoryBackupTimestamp = startTime + REPOSITORY_BACKUP_PERIOD;
private long repositoryBackupTimestamp = startTime + REPOSITORY_BACKUP_PERIOD; // ms
private long ntpCheckTimestamp = startTime; // ms
private long deleteExpiredTimestamp = startTime + DELETE_EXPIRED_INTERVAL; // ms
/** Whether BlockGenerator is allowed to generate blocks. Mostly determined by system clock accuracy. */
private volatile boolean isGenerationAllowed = false;
/** Signature of peer's latest block when we tried to sync but peer had inferior chain. */
private byte[] inferiorChainPeerBlockSignature = null;
/** Signature of our latest block when we tried to sync but peer had inferior chain. */
private byte[] inferiorChainOurBlockSignature = null;
/** Signature of peer's latest block that will result in no sync action needed (e.g. INFERIOR_CHAIN, NOTHING_TO_DO, OK). */
private byte[] noSyncPeerBlockSignature = null;
/** Signature of our latest block that will result in no sync action needed (e.g. INFERIOR_CHAIN, NOTHING_TO_DO, OK). */
private byte[] noSyncOurBlockSignature = null;
/**
* Map of recent requests for ARBITRARY transaction data payloads.
@ -223,6 +223,9 @@ public class Controller extends Thread {
// Load/check settings, which potentially sets up blockchain config, etc.
Settings.getInstance();
LOGGER.info("Starting NTP");
NTP.start();
LOGGER.info("Starting repository");
try {
RepositoryFactory repositoryFactory = new HSQLDBRepositoryFactory(getRepositoryUrl());
@ -317,20 +320,31 @@ public class Controller extends Thread {
potentiallySynchronize();
}
final long now = System.currentTimeMillis();
// Clean up arbitrary data request cache
final long requestMinimumTimestamp = System.currentTimeMillis() - ARBITRARY_REQUEST_TIMEOUT;
final long requestMinimumTimestamp = now - ARBITRARY_REQUEST_TIMEOUT;
arbitraryDataRequests.entrySet().removeIf(entry -> entry.getValue().getC() < requestMinimumTimestamp);
// Give repository a chance to backup
if (System.currentTimeMillis() >= repositoryBackupTimestamp) {
repositoryBackupTimestamp += REPOSITORY_BACKUP_PERIOD;
if (now >= repositoryBackupTimestamp) {
repositoryBackupTimestamp = now + REPOSITORY_BACKUP_PERIOD;
RepositoryManager.backup(true);
}
// Potentially nag end-user about NTP
if (System.currentTimeMillis() >= ntpCheckTimestamp) {
ntpCheckTimestamp += NTP_CHECK_PERIOD;
isGenerationAllowed = ntpCheck();
// Check NTP status
if (now >= ntpCheckTimestamp) {
Long ntpTime = NTP.getTime();
if (ntpTime != null) {
LOGGER.info(String.format("Adjusting system time by NTP offset: %dms", ntpTime - now));
ntpCheckTimestamp = now + NTP_POST_SYNC_CHECK_PERIOD;
} else {
LOGGER.info(String.format("No NTP offset yet"));
ntpCheckTimestamp = now + NTP_PRE_SYNC_CHECK_PERIOD;
}
isGenerationAllowed = ntpTime != null;
requestSysTrayUpdate = true;
}
@ -341,6 +355,12 @@ public class Controller extends Thread {
LOGGER.warn(String.format("Repository issue when trying to prune peers: %s", e.getMessage()));
}
// Delete expired transactions
if (now >= deleteExpiredTimestamp) {
deleteExpiredTimestamp = now + DELETE_EXPIRED_INTERVAL;
deleteExpiredTransactions();
}
// Maybe update SysTray
if (requestSysTrayUpdate) {
requestSysTrayUpdate = false;
@ -353,6 +373,10 @@ public class Controller extends Thread {
}
private void potentiallySynchronize() throws InterruptedException {
final Long minLatestBlockTimestamp = getMinimumLatestBlockTimestamp();
if (minLatestBlockTimestamp == null)
return;
List<Peer> peers = Network.getInstance().getUniqueHandshakedPeers();
// Disregard peers that have "misbehaved" recently
@ -363,7 +387,6 @@ public class Controller extends Thread {
return;
// Disregard peers that don't have a recent block
final long minLatestBlockTimestamp = getMinimumLatestBlockTimestamp();
peers.removeIf(peer -> peer.getLastBlockTimestamp() == null || peer.getLastBlockTimestamp() < minLatestBlockTimestamp);
BlockData latestBlockData = getChainTip();
@ -371,17 +394,17 @@ public class Controller extends Thread {
// Disregard peers that have no block signature or the same block signature as us
peers.removeIf(peer -> peer.getLastBlockSignature() == null || Arrays.equals(latestBlockData.getSignature(), peer.getLastBlockSignature()));
// Disregard peer we used last time, if both we and they are still on the same block and we didn't like their chain
if (inferiorChainOurBlockSignature != null && Arrays.equals(inferiorChainOurBlockSignature, latestBlockData.getSignature()))
peers.removeIf(peer -> Arrays.equals(inferiorChainPeerBlockSignature, peer.getLastBlockSignature()));
// Disregard peers that are on the same block as last sync attempt and we didn't like their chain
if (noSyncOurBlockSignature != null && Arrays.equals(noSyncOurBlockSignature, latestBlockData.getSignature()))
peers.removeIf(peer -> Arrays.equals(noSyncPeerBlockSignature, peer.getLastBlockSignature()));
if (!peers.isEmpty()) {
// Pick random peer to sync with
int index = new SecureRandom().nextInt(peers.size());
Peer peer = peers.get(index);
inferiorChainOurBlockSignature = null;
inferiorChainPeerBlockSignature = null;
noSyncOurBlockSignature = null;
noSyncPeerBlockSignature = null;
SynchronizationResult syncResult = Synchronizer.getInstance().synchronize(peer, false);
switch (syncResult) {
@ -395,7 +418,7 @@ public class Controller extends Thread {
// Don't use this peer again for a while
PeerData peerData = peer.getPeerData();
peerData.setLastMisbehaved(System.currentTimeMillis());
peerData.setLastMisbehaved(NTP.getTime());
// Only save to repository if outbound peer
if (peer.isOutbound())
@ -408,8 +431,8 @@ public class Controller extends Thread {
break;
case INFERIOR_CHAIN:
inferiorChainOurBlockSignature = latestBlockData.getSignature();
inferiorChainPeerBlockSignature = peer.getLastBlockSignature();
noSyncOurBlockSignature = latestBlockData.getSignature();
noSyncPeerBlockSignature = peer.getLastBlockSignature();
// These are minor failure results so fine to try again
LOGGER.debug(() -> String.format("Refused to synchronize with peer %s (%s)", peer, syncResult.name()));
break;
@ -425,6 +448,8 @@ public class Controller extends Thread {
requestSysTrayUpdate = true;
// fall-through...
case NOTHING_TO_DO:
noSyncOurBlockSignature = latestBlockData.getSignature();
noSyncPeerBlockSignature = peer.getLastBlockSignature();
LOGGER.debug(() -> String.format("Synchronized with peer %s (%s)", peer, syncResult.name()));
break;
}
@ -436,62 +461,12 @@ public class Controller extends Thread {
}
}
/**
* Nag if we detect system clock is too far from internet time.
*
* @return <tt>true</tt> if clock is accurate, <tt>false</tt> if inaccurate or we don't know.
*/
private boolean ntpCheck() {
// Fetch mean offset from internet time (ms).
Long meanOffset = NTP.getOffset();
final boolean isWindows = System.getProperty("os.name").toLowerCase().contains("win");
boolean isNtpActive = false;
if (isWindows) {
// Detecting Windows Time service
String[] detectCmd = new String[] { "net", "start" };
try {
Process process = new ProcessBuilder(Arrays.asList(detectCmd)).start();
try (InputStream in = process.getInputStream(); Scanner scanner = new Scanner(in, "UTF8")) {
scanner.useDelimiter("\\A");
String output = scanner.hasNext() ? scanner.next() : "";
isNtpActive = output.contains("Windows Time");
}
} catch (IOException e) {
// Not important
}
} else {
// Very basic unix-based attempt to check for ntpd
String[] detectCmd = new String[] { "ps", "-agx" };
try {
Process process = new ProcessBuilder(Arrays.asList(detectCmd)).start();
try (InputStream in = process.getInputStream(); Scanner scanner = new Scanner(in, "UTF8")) {
scanner.useDelimiter("\\A");
String output = scanner.hasNext() ? scanner.next() : "";
isNtpActive = output.contains("ntpd");
}
} catch (IOException e) {
// Not important
}
}
LOGGER.info(String.format("NTP mean offset %s, NTP service active: %s", meanOffset, isNtpActive));
final boolean isOffsetGood = meanOffset != null && Math.abs(meanOffset) < MAX_NTP_OFFSET;
// If offset bad or NTP not active then nag
if (!isOffsetGood || !isNtpActive) {
String caption = Translator.INSTANCE.translate("SysTray", "NTP_NAG_CAPTION");
String text = Translator.INSTANCE.translate("SysTray", isWindows ? "NTP_NAG_TEXT_WINDOWS" : "NTP_NAG_TEXT_UNIX");
SysTray.getInstance().showMessage(caption, text, MessageType.WARNING);
}
// Return whether we're accurate (disregarding whether NTP service is active)
return isOffsetGood;
}
private void updateSysTray() {
if (NTP.getTime() == null) {
SysTray.getInstance().setToolTipText(Translator.INSTANCE.translate("SysTray", "SYNCHRONIZING CLOCK"));
return;
}
final int numberOfPeers = Network.getInstance().getUniqueHandshakedPeers().size();
final int height = getChainHeight();
@ -504,6 +479,22 @@ public class Controller extends Thread {
SysTray.getInstance().setToolTipText(tooltip);
}
public void deleteExpiredTransactions() {
try (final Repository repository = RepositoryManager.getRepository()) {
List<TransactionData> transactions = repository.getTransactionRepository().getUnconfirmedTransactions();
for (TransactionData transactionData : transactions)
if (transactionData.getTimestamp() >= Transaction.getDeadline(transactionData)) {
LOGGER.info(String.format("Deleting expired, unconfirmed transaction %s", Base58.encode(transactionData.getSignature())));
repository.getTransactionRepository().delete(transactionData);
}
repository.saveChanges();
} catch (DataException e) {
LOGGER.error("Repository issue while deleting expired unconfirmed transactions", e);
}
}
// Shutdown
public void shutdown() {
@ -552,6 +543,9 @@ public class Controller extends Thread {
LOGGER.error("Error occurred while shutting down repository", e);
}
LOGGER.info("Shutting down NTP");
NTP.shutdownNow();
LOGGER.info("Shutdown complete!");
}
}
@ -960,7 +954,7 @@ public class Controller extends Thread {
byte[] signature = getArbitraryDataMessage.getSignature();
String signature58 = Base58.encode(signature);
Long timestamp = System.currentTimeMillis();
Long timestamp = NTP.getTime();
Triple<String, Peer, Long> newEntry = new Triple<>(signature58, peer, timestamp);
// If we've seen this request recently, then ignore
@ -1070,7 +1064,7 @@ public class Controller extends Thread {
// Save our request into requests map
String signature58 = Base58.encode(signature);
Triple<String, Peer, Long> requestEntry = new Triple<>(signature58, null, System.currentTimeMillis());
Triple<String, Peer, Long> requestEntry = new Triple<>(signature58, null, NTP.getTime());
// Assign random ID to this message
int id;
@ -1111,12 +1105,15 @@ public class Controller extends Thread {
public static final Predicate<Peer> hasPeerMisbehaved = peer -> {
Long lastMisbehaved = peer.getPeerData().getLastMisbehaved();
return lastMisbehaved != null && lastMisbehaved > System.currentTimeMillis() - MISBEHAVIOUR_COOLOFF;
return lastMisbehaved != null && lastMisbehaved > NTP.getTime() - MISBEHAVIOUR_COOLOFF;
};
/** Returns whether we think our node has up-to-date blockchain based on our info about other peers. */
public boolean isUpToDate() {
final long minLatestBlockTimestamp = getMinimumLatestBlockTimestamp();
final Long minLatestBlockTimestamp = getMinimumLatestBlockTimestamp();
if (minLatestBlockTimestamp == null)
return false;
BlockData latestBlockData = getChainTip();
// Is our blockchain too old?
@ -1139,8 +1136,12 @@ public class Controller extends Thread {
return !peers.isEmpty();
}
public static long getMinimumLatestBlockTimestamp() {
return System.currentTimeMillis() - BlockChain.getInstance().getMaxBlockTime() * 1000L * MAX_BLOCKCHAIN_TIP_AGE;
public static Long getMinimumLatestBlockTimestamp() {
Long now = NTP.getTime();
if (now == null)
return null;
return now - BlockChain.getInstance().getMaxBlockTime() * 1000L * MAX_BLOCKCHAIN_TIP_AGE;
}
}

View File

@ -159,7 +159,10 @@ public class Synchronizer {
int highestMutualHeight = Math.min(peerHeight, ourHeight);
// If our latest block is very old, we're very behind and should ditch our fork.
final long minLatestBlockTimestamp = Controller.getMinimumLatestBlockTimestamp();
final Long minLatestBlockTimestamp = Controller.getMinimumLatestBlockTimestamp();
if (minLatestBlockTimestamp == null)
return SynchronizationResult.REPOSITORY_ISSUE;
if (ourInitialHeight > commonBlockHeight && ourLatestBlockData.getTimestamp() < minLatestBlockTimestamp) {
LOGGER.info(String.format("Ditching our chain after height %d as our latest block is very old", commonBlockHeight));
highestMutualHeight = commonBlockHeight;

View File

@ -30,6 +30,18 @@ public enum Handshake {
PeerIdMessage peerIdMessage = (PeerIdMessage) message;
byte[] peerId = peerIdMessage.getPeerId();
if (Arrays.equals(peerId, Network.ZERO_PEER_ID)) {
if (peer.isOutbound()) {
// Peer has indicated they already have an outbound connection to us
LOGGER.trace(String.format("Peer %s already connected to us - discarding this connection", peer));
} else {
// Not sure this should occur so log it
LOGGER.info(String.format("Inbound peer %s claims we also have outbound connection to them?", peer));
}
return null;
}
if (Arrays.equals(peerId, Network.getInstance().getOurPeerId())) {
// Connected to self!
// If outgoing connection then record destination as self so we don't try again
@ -53,6 +65,11 @@ public enum Handshake {
if (otherOutboundPeer == null) {
// We already have an inbound peer with this ID, but no outgoing peer with which to request verification
LOGGER.trace(String.format("Discarding inbound peer %s with existing ID", peer));
// Let peer know by sending special zero peer ID. This avoids peer keeping connection open until timeout.
peerIdMessage = new PeerIdMessage(Network.ZERO_PEER_ID);
peer.sendMessage(peerIdMessage);
return null;
} else {
// Use corresponding outbound peer to verify inbound

View File

@ -54,6 +54,7 @@ import org.qora.repository.Repository;
import org.qora.repository.RepositoryManager;
import org.qora.settings.Settings;
import org.qora.utils.ExecuteProduceConsume;
import org.qora.utils.NTP;
// For managing peers
public class Network extends Thread {
@ -94,6 +95,7 @@ public class Network extends Thread {
public static final int MAX_SIGNATURES_PER_REPLY = 500;
public static final int MAX_BLOCK_SUMMARIES_PER_REPLY = 500;
public static final int PEER_ID_LENGTH = 128;
public static final byte[] ZERO_PEER_ID = new byte[PEER_ID_LENGTH];
private final byte[] ourPeerId;
private List<Peer> connectedPeers;
@ -109,7 +111,6 @@ public class Network extends Thread {
private long nextConnectTaskTimestamp;
private ExecutorService broadcastExecutor;
/** Timestamp (ms) for next general info broadcast to all connected peers. Based on <tt>System.currentTimeMillis()</tt>. */
private long nextBroadcastTimestamp;
private Lock mergePeersLock;
@ -146,14 +147,16 @@ public class Network extends Thread {
ourPeerId = new byte[PEER_ID_LENGTH];
new SecureRandom().nextBytes(ourPeerId);
// Set bit to make sure our peer ID is not 0
ourPeerId[ourPeerId.length - 1] |= 0x01;
minOutboundPeers = Settings.getInstance().getMinOutboundPeers();
maxPeers = Settings.getInstance().getMaxPeers();
nextConnectTaskTimestamp = System.currentTimeMillis();
nextConnectTaskTimestamp = 0; // First connect once NTP syncs
broadcastExecutor = Executors.newCachedThreadPool();
nextBroadcastTimestamp = System.currentTimeMillis();
nextBroadcastTimestamp = 0; // First broadcast once NTP syncs
mergePeersLock = new ReentrantLock();
@ -420,8 +423,8 @@ public class Network extends Thread {
if (getOutboundHandshakedPeers().size() >= minOutboundPeers)
return null;
final long now = System.currentTimeMillis();
if (now < nextConnectTaskTimestamp)
final Long now = NTP.getTime();
if (now == null || now < nextConnectTaskTimestamp)
return null;
nextConnectTaskTimestamp = now + 1000L;
@ -435,8 +438,8 @@ public class Network extends Thread {
}
private Task maybeProduceBroadcastTask() {
final long now = System.currentTimeMillis();
if (now < nextBroadcastTimestamp)
final Long now = NTP.getTime();
if (now == null || now < nextBroadcastTimestamp)
return null;
nextBroadcastTimestamp = now + BROADCAST_INTERVAL;
@ -457,9 +460,15 @@ public class Network extends Thread {
if (socketChannel == null)
return;
final Long now = NTP.getTime();
Peer newPeer;
try {
if (now == null) {
LOGGER.trace(String.format("Connection discarded from peer %s due to lack of NTP sync", socketChannel.getRemoteAddress()));
return;
}
synchronized (this.connectedPeers) {
if (connectedPeers.size() >= maxPeers) {
// We have enough peers
@ -499,7 +508,9 @@ public class Network extends Thread {
}
public void prunePeers() throws InterruptedException, DataException {
final long now = System.currentTimeMillis();
final Long now = NTP.getTime();
if (now == null)
return;
// Disconnect peers that are stuck during handshake
List<Peer> handshakePeers = this.getConnectedPeers();
@ -551,12 +562,14 @@ public class Network extends Thread {
}
private Peer getConnectablePeer() throws InterruptedException {
final long now = NTP.getTime();
try (final Repository repository = RepositoryManager.getRepository()) {
// Find an address to connect to
List<PeerData> peers = repository.getNetworkRepository().getAllPeers();
// Don't consider peers with recent connection failures
final long lastAttemptedThreshold = System.currentTimeMillis() - CONNECT_FAILURE_BACKOFF;
final long lastAttemptedThreshold = now - CONNECT_FAILURE_BACKOFF;
peers.removeIf(peerData -> peerData.getLastAttempted() != null && peerData.getLastAttempted() > lastAttemptedThreshold);
// Don't consider peers that we know loop back to ourself
@ -607,7 +620,7 @@ public class Network extends Thread {
// Update connection attempt info
repository.discardChanges();
peerData.setLastAttempted(System.currentTimeMillis());
peerData.setLastAttempted(now);
repository.getNetworkRepository().save(peerData);
repository.saveChanges();
@ -834,7 +847,7 @@ public class Network extends Thread {
LOGGER.debug(String.format("Handshake completed with peer %s", peer));
// Make a note that we've successfully completed handshake (and when)
peer.getPeerData().setLastConnected(System.currentTimeMillis());
peer.getPeerData().setLastConnected(NTP.getTime());
// Update connection info for outbound peers only
if (peer.isOutbound())
@ -882,7 +895,7 @@ public class Network extends Thread {
List<PeerData> knownPeers = repository.getNetworkRepository().getAllPeers();
// Filter out peers that we've not connected to ever or within X milliseconds
final long connectionThreshold = System.currentTimeMillis() - RECENT_CONNECTION_THRESHOLD;
final long connectionThreshold = NTP.getTime() - RECENT_CONNECTION_THRESHOLD;
Predicate<PeerData> notRecentlyConnected = peerData -> {
final Long lastAttempted = peerData.getLastAttempted();
final Long lastConnected = peerData.getLastConnected();
@ -1031,12 +1044,14 @@ public class Network extends Thread {
// Network-wide calls
private void mergePeers(String addedBy, List<PeerAddress> peerAddresses) {
final Long addedWhen = NTP.getTime();
if (addedWhen == null)
return;
// Serialize using lock to prevent repository deadlocks
if (!mergePeersLock.tryLock())
return;
final long addedWhen = System.currentTimeMillis();
try {
try (final Repository repository = RepositoryManager.getRepository()) {
List<PeerData> knownPeers = repository.getNetworkRepository().getAllPeers();

View File

@ -28,6 +28,7 @@ import org.qora.network.message.Message.MessageException;
import org.qora.network.message.Message.MessageType;
import org.qora.settings.Settings;
import org.qora.utils.ExecuteProduceConsume;
import org.qora.utils.NTP;
import org.qora.network.message.PingMessage;
import org.qora.network.message.VersionMessage;
@ -279,7 +280,7 @@ public class Peer {
}
private void sharedSetup() throws IOException {
this.connectionTimestamp = System.currentTimeMillis();
this.connectionTimestamp = NTP.getTime();
this.socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
this.socketChannel.configureBlocking(false);
this.byteBuffer = ByteBuffer.allocate(Network.MAXIMUM_MESSAGE_SIZE);
@ -510,6 +511,7 @@ public class Peer {
if (this.socketChannel.isOpen()) {
try {
this.socketChannel.shutdownOutput();
this.socketChannel.close();
} catch (IOException e) {
LOGGER.debug(String.format("IOException while trying to close peer %s", this));

View File

@ -30,6 +30,7 @@ import org.qora.repository.Repository;
import org.qora.settings.Settings;
import org.qora.transform.TransformationException;
import org.qora.transform.transaction.TransactionTransformer;
import org.qora.utils.NTP;
import static java.util.Arrays.stream;
import static java.util.stream.Collectors.toMap;
@ -235,6 +236,7 @@ public abstract class Transaction {
TRANSACTION_ALREADY_EXISTS(85),
NO_BLOCKCHAIN_LOCK(86),
ORDER_ALREADY_CLOSED(87),
CLOCK_NOT_SYNCED(88),
NOT_YET_RELEASED(1000);
public final int value;
@ -301,9 +303,13 @@ public abstract class Transaction {
// More information
public long getDeadline() {
public static long getDeadline(TransactionData transactionData) {
// 24 hour deadline to include transaction in a block
return this.transactionData.getTimestamp() + (24 * 60 * 60 * 1000);
return transactionData.getTimestamp() + (24 * 60 * 60 * 1000);
}
public long getDeadline() {
return Transaction.getDeadline(transactionData);
}
public boolean hasMinimumFee() {
@ -507,7 +513,7 @@ public abstract class Transaction {
* NOTE: temporarily updates accounts' lastReference to check validity.<br>
* To do this, blockchain lock is obtained and pending repository changes are discarded.
*
* @return true if transaction can be added to unconfirmed transactions, false otherwise
* @return transaction validation result, e.g. OK
* @throws DataException
*/
public ValidationResult isValidUnconfirmed() throws DataException {
@ -517,7 +523,11 @@ public abstract class Transaction {
return ValidationResult.TIMESTAMP_TOO_OLD;
// Transactions with a timestamp too far into future are too new
long maxTimestamp = System.currentTimeMillis() + Settings.getInstance().getMaxTransactionTimestampFuture();
final Long now = NTP.getTime();
if (now == null)
return ValidationResult.CLOCK_NOT_SYNCED;
long maxTimestamp = now + Settings.getInstance().getMaxTransactionTimestampFuture();
if (this.transactionData.getTimestamp() > maxTimestamp)
return ValidationResult.TIMESTAMP_TOO_NEW;
@ -734,10 +744,14 @@ public abstract class Transaction {
* @throws DataException
*/
private static boolean isStillValidUnconfirmed(Repository repository, TransactionData transactionData, long blockTimestamp) throws DataException {
final Long now = NTP.getTime();
if (now == null)
return false;
Transaction transaction = Transaction.fromData(repository, transactionData);
// Check transaction has not expired
if (transaction.getDeadline() <= blockTimestamp || transaction.getDeadline() < System.currentTimeMillis())
if (transaction.getDeadline() <= blockTimestamp || transaction.getDeadline() < now)
return false;
// Is transaction is past max approval period?

View File

@ -3,99 +3,260 @@ package org.qora.utils;
import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.net.ntp.NTPUDPClient;
import org.apache.commons.net.ntp.NtpV3Packet;
import org.apache.commons.net.ntp.TimeInfo;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.qora.settings.Settings;
public class NTP {
public class NTP implements Runnable {
private static final Logger LOGGER = LogManager.getLogger(NTP.class);
private static final double MAX_STDDEV = 125; // ms
/**
* Returns aggregated internet time.
*
* @return internet time (ms), or null if unsuccessful.
*/
public static Long getTime() {
Long meanOffset = getOffset();
if (meanOffset == null)
return null;
private static boolean isStarted = false;
private static volatile boolean isStopping = false;
private static ExecutorService instanceExecutor;
private static NTP instance;
private static volatile Long offset = null;
return System.currentTimeMillis() + meanOffset;
}
static class NTPServer {
private static final int MIN_POLL = 64;
/**
* Returns mean offset from internet time.
*
* Positive offset means local clock is behind internet time.
*
* @return offset (ms), or null if unsuccessful.
*/
public static Long getOffset() {
String[] ntpServers = Settings.getInstance().getNtpServers();
public char usage = ' ';
public String remote;
public String refId;
public Integer stratum;
public char type = 'u'; // unicast
public int poll = MIN_POLL;
public byte reach = 0;
public Long delay;
public Double offset;
public Double jitter;
NTPUDPClient client = new NTPUDPClient();
client.setDefaultTimeout(2000);
private Deque<Double> offsets = new LinkedList<>();
private double totalSquareOffsets = 0.0;
private long nextPoll;
private Long lastGood;
List<Double> offsets = new ArrayList<>();
public NTPServer(String remote) {
this.remote = remote;
}
public boolean poll(NTPUDPClient client) {
Thread.currentThread().setName(String.format("NTP: %s", this.remote));
for (String server : ntpServers) {
try {
TimeInfo timeInfo = client.getTime(InetAddress.getByName(server));
final long now = System.currentTimeMillis();
timeInfo.computeDetails();
if (now < this.nextPoll)
return false;
LOGGER.debug(() -> String.format("%c%16.16s %16.16s %2d %c %4d %4d %3o %6dms % 5dms % 5dms",
' ',
server,
timeInfo.getMessage().getReferenceIdString(),
timeInfo.getMessage().getStratum(),
'u',
0,
1 << timeInfo.getMessage().getPoll(),
1,
timeInfo.getDelay(),
timeInfo.getOffset(),
0
));
boolean isUpdated = false;
try {
TimeInfo timeInfo = client.getTime(InetAddress.getByName(remote));
offsets.add((double) timeInfo.getOffset());
} catch (IOException e) {
// Try next server...
timeInfo.computeDetails();
NtpV3Packet ntpMessage = timeInfo.getMessage();
this.refId = ntpMessage.getReferenceIdString();
this.stratum = ntpMessage.getStratum();
this.poll = Math.max(MIN_POLL, 1 << ntpMessage.getPoll());
this.delay = timeInfo.getDelay();
this.offset = (double) timeInfo.getOffset();
if (this.offsets.size() == 8) {
double oldOffset = this.offsets.removeFirst();
this.totalSquareOffsets -= oldOffset * oldOffset;
}
this.offsets.addLast(this.offset);
this.totalSquareOffsets += this.offset * this.offset;
this.jitter = Math.sqrt(this.totalSquareOffsets / this.offsets.size());
this.reach = (byte) ((this.reach << 1) | 1);
this.lastGood = now;
isUpdated = true;
} catch (IOException e) {
this.reach <<= 1;
}
this.nextPoll = now + this.poll * 1000;
return isUpdated;
} finally {
Thread.currentThread().setName("NTP (dormant)");
}
}
if (offsets.size() < ntpServers.length / 2) {
LOGGER.info(String.format("Not enough replies: %d, minimum is %d", offsets.size(), ntpServers.length / 2));
public Integer getWhen() {
if (this.lastGood == null)
return null;
return (int) ((System.currentTimeMillis() - this.lastGood) / 1000);
}
}
private final NTPUDPClient client;
private List<NTPServer> ntpServers = new ArrayList<>();
private final ExecutorService serverExecutor;
private NTP() {
client = new NTPUDPClient();
client.setDefaultTimeout(2000);
for (String serverName : Settings.getInstance().getNtpServers())
ntpServers.add(new NTPServer(serverName));
serverExecutor = Executors.newCachedThreadPool();
}
public static synchronized void start() {
if (isStarted)
return;
instanceExecutor = Executors.newSingleThreadExecutor();
instance = new NTP();
instanceExecutor.execute(instance);
}
public static void shutdownNow() {
instanceExecutor.shutdownNow();
}
/**
* Returns our estimate of internet time.
*
* @return internet time (ms), or null if unsynchronized.
*/
public static Long getTime() {
if (offset == null)
return null;
return System.currentTimeMillis() + offset;
}
public void run() {
Thread.currentThread().setName("NTP instance");
try {
while (!isStopping) {
Thread.sleep(1000);
CompletionService<Boolean> ecs = new ExecutorCompletionService<Boolean>(serverExecutor);
for (NTPServer server : ntpServers)
ecs.submit(() -> server.poll(client));
boolean hasUpdate = false;
for (int i = 0; i < ntpServers.size(); ++i) {
if (isStopping)
return;
try {
hasUpdate = ecs.take().get() || hasUpdate;
} catch (ExecutionException e) {
// skip
}
}
if (hasUpdate) {
double s0 = 0;
double s1 = 0;
double s2 = 0;
for (NTPServer server : ntpServers) {
if (server.offset == null) {
server.usage = ' ';
continue;
}
server.usage = '+';
double value = server.offset * (double) server.stratum;
s0 += 1;
s1 += value;
s2 += value * value;
}
if (s0 < ntpServers.size() / 3 + 1) {
LOGGER.debug(String.format("Not enough replies (%d) to calculate network time", s0));
} else {
double thresholdStddev = Math.sqrt(((s0 * s2) - (s1 * s1)) / (s0 * (s0 - 1)));
double mean = s1 / s0;
// Now only consider offsets within 1 stddev?
s0 = 0;
s1 = 0;
s2 = 0;
for (NTPServer server : ntpServers) {
if (server.offset == null || server.reach == 0)
continue;
if (Math.abs(server.offset * (double)server.stratum - mean) > thresholdStddev)
continue;
server.usage = '*';
s0 += 1;
s1 += server.offset;
s2 += server.offset * server.offset;
}
if (s0 <= 1) {
LOGGER.debug(String.format("Not enough useful values (%d) to calculate network time. (stddev: %7.4f)", s0, thresholdStddev));
} else {
double filteredMean = s1 / s0;
double filteredStddev = Math.sqrt(((s0 * s2) - (s1 * s1)) / (s0 * (s0 - 1)));
LOGGER.trace(String.format("Threshold stddev: %7.3f, mean: %7.3f, stddev: %7.3f, nValues: %.0f / %d",
thresholdStddev, filteredMean, filteredStddev, s0, ntpServers.size()));
NTP.offset = (long) filteredMean;
LOGGER.debug(String.format("New NTP offset: %d", NTP.offset));
}
}
if (LOGGER.getLevel().isMoreSpecificThan(Level.TRACE)) {
LOGGER.trace(String.format("%c%16s %16s %2s %c %4s %4s %3s %7s %7s %7s",
' ', "remote", "refid", "st", 't', "when", "poll", "reach", "delay", "offset", "jitter"
));
for (NTPServer server : ntpServers)
LOGGER.trace(String.format("%c%16.16s %16.16s %2s %c %4s %4d %3o %7s %7s %7s",
server.usage,
server.remote,
formatNull("%s", server.refId, ""),
formatNull("%2d", server.stratum, ""),
server.type,
formatNull("%4d", server.getWhen(), "-"),
server.poll,
server.reach,
formatNull("%5dms", server.delay, ""),
formatNull("% 5.0fms", server.offset, ""),
formatNull("%5.2fms", server.jitter, "")
));
}
}
}
} catch (InterruptedException e) {
// Exit
}
}
// sₙ represents sum of offsetⁿ
double s0 = 0;
double s1 = 0;
double s2 = 0;
for (Double offset : offsets) {
s0 += 1;
s1 += offset;
s2 += offset * offset;
}
double mean = s1 / s0;
double stddev = Math.sqrt(((s0 * s2) - (s1 * s1)) / (s0 * (s0 - 1)));
// If stddev is excessive then we're not very sure so give up
if (stddev > MAX_STDDEV) {
LOGGER.info(String.format("Excessive standard deviation %.1f, maximum is %.1f", stddev, MAX_STDDEV));
return null;
}
return (long) mean;
private static String formatNull(String format, Object arg, String nullOutput) {
return arg != null ? String.format(format, arg) : nullOutput;
}
}

View File

@ -3,7 +3,7 @@
"blockDifficultyInterval": 10,
"minBlockTime": 60,
"maxBlockTime": 300,
"blockTimestampMargin": 500,
"blockTimestampMargin": 2000,
"maxBytesPerUnitFee": 1024,
"unitFee": "1.0",
"useBrokenMD160ForAddresses": true,

View File

@ -25,3 +25,5 @@ NTP_NAG_TEXT_WINDOWS = Select "Synchronize clock" from menu to fix.
OPEN_NODE_UI = Open Node UI
SYNCHRONIZE_CLOCK = Synchronize clock
SYNCHRONIZING_CLOCK = Synchronizing clock

View File

@ -25,3 +25,5 @@ NTP_NAG_TEXT_WINDOWS = \u4ECE\u83DC\u5355\u4E2D\u9009\u62E9\u201C\u540C\u6B65\u6
OPEN_NODE_UI = \u5F00\u542F\u754C\u9762
SYNCHRONIZE_CLOCK = \u540C\u6B65\u65F6\u949F
SYNCHRONIZING_CLOCK = \u540C\u6B65\u7740\u65F6\u949F

View File

@ -5,7 +5,13 @@ import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors;
import org.apache.commons.net.ntp.NTPUDPClient;
import org.apache.commons.net.ntp.NtpV3Packet;
@ -13,74 +19,182 @@ import org.apache.commons.net.ntp.TimeInfo;
public class NTPTests {
private static final List<String> CC_TLDS = Arrays.asList("oceania", "europe", "lat", "asia", "africa");
private static final List<String> CC_TLDS = Arrays.asList("oceania", "europe", "cn", "asia", "africa");
public static void main(String[] args) throws UnknownHostException, IOException {
public static void main(String[] args) throws UnknownHostException, IOException, InterruptedException {
NTPUDPClient client = new NTPUDPClient();
client.setDefaultTimeout(2000);
System.out.println(String.format("%c%16s %16s %2s %c %4s %4s %3s %7s %7s %7s",
' ', "remote", "refid", "st", 't', "when", "poll", "reach", "delay", "offset", "jitter"
));
class NTPServer {
private static final int MIN_POLL = 8;
List<Double> offsets = new ArrayList<>();
public char usage = ' ';
public String remote;
public String refId;
public Integer stratum;
public char type = 'u'; // unicast
public int poll = MIN_POLL;
public byte reach = 0;
public Long delay;
public Double offset;
public Double jitter;
List<String> ntpServers = new ArrayList<>();
for (String ccTld : CC_TLDS) {
ntpServers.add(ccTld + ".pool.ntp.org");
for (int subpool = 0; subpool <=3; ++subpool)
ntpServers.add(subpool + "." + ccTld + ".pool.ntp.org");
}
private Deque<Double> offsets = new LinkedList<>();
private double totalSquareOffsets = 0.0;
private long nextPoll;
private Long lastGood;
for (String server : ntpServers) {
try {
TimeInfo timeInfo = client.getTime(InetAddress.getByName(server));
public NTPServer(String remote) {
this.remote = remote;
}
timeInfo.computeDetails();
NtpV3Packet ntpMessage = timeInfo.getMessage();
public boolean poll(NTPUDPClient client) {
final long now = System.currentTimeMillis();
System.out.println(String.format("%c%16.16s %16.16s %2d %c %4d %4d %3o %6dms % 5dms % 5dms",
' ',
server,
ntpMessage.getReferenceIdString(),
ntpMessage.getStratum(),
'u',
0,
1 << ntpMessage.getPoll(),
1,
timeInfo.getDelay(),
timeInfo.getOffset(),
0
));
if (now < this.nextPoll)
return false;
offsets.add((double) timeInfo.getOffset());
} catch (IOException e) {
// Try next server...
boolean isUpdated = false;
try {
TimeInfo timeInfo = client.getTime(InetAddress.getByName(remote));
timeInfo.computeDetails();
NtpV3Packet ntpMessage = timeInfo.getMessage();
this.refId = ntpMessage.getReferenceIdString();
this.stratum = ntpMessage.getStratum();
this.poll = Math.max(MIN_POLL, 1 << ntpMessage.getPoll());
this.delay = timeInfo.getDelay();
this.offset = (double) timeInfo.getOffset();
if (this.offsets.size() == 8) {
double oldOffset = this.offsets.removeFirst();
this.totalSquareOffsets -= oldOffset * oldOffset;
}
this.offsets.addLast(this.offset);
this.totalSquareOffsets += this.offset * this.offset;
this.jitter = Math.sqrt(this.totalSquareOffsets / this.offsets.size());
this.reach = (byte) ((this.reach << 1) | 1);
this.lastGood = now;
isUpdated = true;
} catch (IOException e) {
this.reach <<= 1;
}
this.nextPoll = now + this.poll * 1000;
return isUpdated;
}
public Integer getWhen() {
if (this.lastGood == null)
return null;
return (int) ((System.currentTimeMillis() - this.lastGood) / 1000);
}
}
if (offsets.size() < ntpServers.size() / 2) {
System.err.println("Not enough replies");
System.exit(1);
List<NTPServer> ntpServers = new ArrayList<>();
for (String ccTld : CC_TLDS)
for (int subpool = 0; subpool <=3; ++subpool)
ntpServers.add(new NTPServer(subpool + "." + ccTld + ".pool.ntp.org"));
while (true) {
Thread.sleep(1000);
CompletionService<Boolean> ecs = new ExecutorCompletionService<Boolean>(Executors.newCachedThreadPool());
for (NTPServer server : ntpServers)
ecs.submit(() -> server.poll(client));
boolean showReport = false;
for (int i = 0; i < ntpServers.size(); ++i)
try {
showReport = ecs.take().get() || showReport;
} catch (ExecutionException e) {
// skip
}
if (showReport) {
double s0 = 0;
double s1 = 0;
double s2 = 0;
for (NTPServer server : ntpServers) {
if (server.offset == null) {
server.usage = ' ';
continue;
}
server.usage = '+';
double value = server.offset * (double) server.stratum;
s0 += 1;
s1 += value;
s2 += value * value;
}
if (s0 < ntpServers.size() / 3 + 1) {
System.out.println("Not enough replies to calculate network time");
} else {
double filterStddev = Math.sqrt(((s0 * s2) - (s1 * s1)) / (s0 * (s0 - 1)));
double filterMean = s1 / s0;
// Now only consider offsets within 1 stddev?
s0 = 0;
s1 = 0;
s2 = 0;
for (NTPServer server : ntpServers) {
if (server.offset == null || server.reach == 0)
continue;
if (Math.abs(server.offset * (double)server.stratum - filterMean) > filterStddev)
continue;
server.usage = '*';
s0 += 1;
s1 += server.offset;
s2 += server.offset * server.offset;
}
if (s0 <= 1) {
System.out.println(String.format("Not enough values to calculate network time. stddev: %7.4f", filterStddev));
} else {
double mean = s1 / s0;
double newStddev = Math.sqrt(((s0 * s2) - (s1 * s1)) / (s0 * (s0 - 1)));
System.out.println(String.format("filtering stddev: %7.3f, mean: %7.3f, new stddev: %7.3f, nValues: %.0f / %d", filterStddev, mean, newStddev, s0, ntpServers.size()));
}
}
System.out.println(String.format("%c%16s %16s %2s %c %4s %4s %3s %7s %7s %7s",
' ', "remote", "refid", "st", 't', "when", "poll", "reach", "delay", "offset", "jitter"
));
for (NTPServer server : ntpServers)
System.out.println(String.format("%c%16.16s %16.16s %2s %c %4s %4d %3o %7s %7s %7s",
server.usage,
server.remote,
formatNull("%s", server.refId, ""),
formatNull("%2d", server.stratum, ""),
server.type,
formatNull("%4d", server.getWhen(), "-"),
server.poll,
server.reach,
formatNull("%5dms", server.delay, ""),
formatNull("% 5.0fms", server.offset, ""),
formatNull("%5.2fms", server.jitter, "")
));
}
}
}
double s0 = 0;
double s1 = 0;
double s2 = 0;
for (Double offset : offsets) {
// Exclude nearby results for more extreme testing
if (offset < 100.0)
continue;
s0 += 1;
s1 += offset;
s2 += offset * offset;
}
double mean = s1 / s0;
double stddev = Math.sqrt(((s0 * s2) - (s1 * s1)) / (s0 * (s0 - 1)));
System.out.println(String.format("mean: %7.3f, stddev: %7.3f", mean, stddev));
private static String formatNull(String format, Object arg, String nullOutput) {
return arg != null ? String.format(format, arg) : nullOutput;
}
}