diff --git a/src/main/java/org/qora/api/resource/AdminResource.java b/src/main/java/org/qora/api/resource/AdminResource.java index d2e5b114..6295da95 100644 --- a/src/main/java/org/qora/api/resource/AdminResource.java +++ b/src/main/java/org/qora/api/resource/AdminResource.java @@ -20,6 +20,8 @@ import java.time.LocalTime; import java.time.OffsetDateTime; import java.time.ZoneOffset; import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import javax.servlet.http.HttpServletRequest; @@ -443,7 +445,19 @@ public class AdminResource { if (targetPeer == null) throw ApiExceptionFactory.INSTANCE.createException(request, ApiError.INVALID_DATA); - SynchronizationResult syncResult = Synchronizer.getInstance().synchronize(targetPeer, true); + // Try to grab blockchain lock + ReentrantLock blockchainLock = Controller.getInstance().getBlockchainLock(); + if (!blockchainLock.tryLock(5000, TimeUnit.MILLISECONDS)) + return SynchronizationResult.NO_BLOCKCHAIN_LOCK.name(); + + SynchronizationResult syncResult; + try { + do { + syncResult = Synchronizer.getInstance().synchronize(targetPeer, true); + } while (syncResult == SynchronizationResult.OK); + } finally { + blockchainLock.unlock(); + } return syncResult.name(); } catch (IllegalArgumentException e) { @@ -452,6 +466,8 @@ public class AdminResource { throw e; } catch (UnknownHostException e) { throw ApiExceptionFactory.INSTANCE.createException(request, ApiError.INVALID_DATA); + } catch (InterruptedException e) { + return SynchronizationResult.NO_BLOCKCHAIN_LOCK.name(); } } diff --git a/src/main/java/org/qora/controller/Controller.java b/src/main/java/org/qora/controller/Controller.java index 35737a83..7bdc5d6f 100644 --- a/src/main/java/org/qora/controller/Controller.java +++ b/src/main/java/org/qora/controller/Controller.java @@ -217,18 +217,8 @@ public class Controller extends Thread { System.exit(2); } - LOGGER.info("Starting block generator"); - blockGenerator = new BlockGenerator(); - blockGenerator.start(); - - LOGGER.info("Starting API on port " + Settings.getInstance().getApiPort()); - try { - ApiService apiService = ApiService.getInstance(); - apiService.start(); - } catch (Exception e) { - LOGGER.error("Unable to start API", e); - System.exit(1); - } + LOGGER.info("Starting controller"); + Controller.getInstance().start(); LOGGER.info("Starting networking"); try { @@ -248,8 +238,9 @@ public class Controller extends Thread { } }); - LOGGER.info("Starting controller"); - Controller.getInstance().start(); + LOGGER.info("Starting block generator"); + blockGenerator = new BlockGenerator(); + blockGenerator.start(); // Arbitrary transaction data manager // LOGGER.info("Starting arbitrary-transaction data manager"); @@ -259,6 +250,15 @@ public class Controller extends Thread { LOGGER.info("Starting auto-update"); AutoUpdate.getInstance().start(); + LOGGER.info("Starting API on port " + Settings.getInstance().getApiPort()); + try { + ApiService apiService = ApiService.getInstance(); + apiService.start(); + } catch (Exception e) { + LOGGER.error("Unable to start API", e); + System.exit(1); + } + LOGGER.info("Starting node management UI on port " + Settings.getInstance().getUiPort()); try { UiService uiService = UiService.getInstance(); @@ -284,25 +284,25 @@ public class Controller extends Thread { public void run() { Thread.currentThread().setName("Controller"); - while (!isStopping) { - try { + try { + while (!isStopping) { Thread.sleep(1000); - } catch (InterruptedException e) { - return; - } - if (requestSync) { - requestSync = false; - potentiallySynchronize(); - } + if (requestSync) { + requestSync = false; + potentiallySynchronize(); + } - // Clean up arbitrary data request cache - final long requestMinimumTimestamp = NTP.getTime() - ARBITRARY_REQUEST_TIMEOUT; - arbitraryDataRequests.entrySet().removeIf(entry -> entry.getValue().getC() < requestMinimumTimestamp); + // Clean up arbitrary data request cache + final long requestMinimumTimestamp = NTP.getTime() - ARBITRARY_REQUEST_TIMEOUT; + arbitraryDataRequests.entrySet().removeIf(entry -> entry.getValue().getC() < requestMinimumTimestamp); + } + } catch (InterruptedException e) { + // Fall-through to exit } } - private void potentiallySynchronize() { + private void potentiallySynchronize() throws InterruptedException { List peers = Network.getInstance().getUniqueHandshakedPeers(); // Disregard peers that have "misbehaved" recently @@ -391,6 +391,9 @@ public class Controller extends Thread { LOGGER.info("Shutting down node management UI"); UiService.getInstance().stop(); + LOGGER.info("Shutting down API"); + ApiService.getInstance().stop(); + LOGGER.info("Shutting down auto-update"); AutoUpdate.getInstance().shutdown(); @@ -398,20 +401,6 @@ public class Controller extends Thread { // LOGGER.info("Shutting down arbitrary-transaction data manager"); // ArbitraryDataManager.getInstance().shutdown(); - LOGGER.info("Shutting down controller"); - this.interrupt(); - try { - this.join(); - } catch (InterruptedException e) { - // We were interrupted while waiting for thread to join - } - - LOGGER.info("Shutting down networking"); - Network.getInstance().shutdown(); - - LOGGER.info("Shutting down API"); - ApiService.getInstance().stop(); - if (blockGenerator != null) { LOGGER.info("Shutting down block generator"); blockGenerator.shutdown(); @@ -422,6 +411,17 @@ public class Controller extends Thread { } } + LOGGER.info("Shutting down networking"); + Network.getInstance().shutdown(); + + LOGGER.info("Shutting down controller"); + this.interrupt(); + try { + this.join(); + } catch (InterruptedException e) { + // We were interrupted while waiting for thread to join + } + try { LOGGER.info("Shutting down repository"); RepositoryManager.closeRepositoryFactory(); @@ -793,6 +793,9 @@ public class Controller extends Thread { } } catch (DataException e) { LOGGER.error(String.format("Repository issue while processing unconfirmed transactions from peer %s", peer), e); + } catch (InterruptedException e) { + // Shutdown + return; } if (newSignatures.isEmpty()) diff --git a/src/main/java/org/qora/controller/Synchronizer.java b/src/main/java/org/qora/controller/Synchronizer.java index e220a6eb..62d980b0 100644 --- a/src/main/java/org/qora/controller/Synchronizer.java +++ b/src/main/java/org/qora/controller/Synchronizer.java @@ -66,8 +66,9 @@ public class Synchronizer { *

* @param peer * @return false if something went wrong, true otherwise. + * @throws InterruptedException */ - public SynchronizationResult synchronize(Peer peer, boolean force) { + public SynchronizationResult synchronize(Peer peer, boolean force) throws InterruptedException { // Make sure we're the only thread modifying the blockchain // If we're already synchronizing with another peer then this will also return fast ReentrantLock blockchainLock = Controller.getInstance().getBlockchainLock(); @@ -277,10 +278,10 @@ public class Synchronizer { repository.discardChanges(); // Free repository locks, if any, also in case anything went wrong this.repository = null; } + } catch (DataException e) { + LOGGER.error("Repository issue during synchronization with peer", e); + return SynchronizationResult.REPOSITORY_ISSUE; } - } catch (DataException e) { - LOGGER.error("Repository issue during synchronization with peer", e); - return SynchronizationResult.REPOSITORY_ISSUE; } finally { blockchainLock.unlock(); } @@ -292,8 +293,9 @@ public class Synchronizer { * @param peer * @return block signatures, or empty list if no common block, or null if there was an issue * @throws DataException + * @throws InterruptedException */ - private List findSignaturesFromCommonBlock(Peer peer, int ourHeight) throws DataException { + private List findSignaturesFromCommonBlock(Peer peer, int ourHeight) throws DataException, InterruptedException { // Start by asking for a few recent block hashes as this will cover a majority of reorgs // Failing that, back off exponentially int step = INITIAL_BLOCK_STEP; @@ -362,7 +364,7 @@ public class Synchronizer { return blockSignatures; } - private List getBlockSummaries(Peer peer, byte[] parentSignature, int numberRequested) { + private List getBlockSummaries(Peer peer, byte[] parentSignature, int numberRequested) throws InterruptedException { Message getBlockSummariesMessage = new GetBlockSummariesMessage(parentSignature, numberRequested); Message message = peer.getResponse(getBlockSummariesMessage); @@ -374,7 +376,7 @@ public class Synchronizer { return blockSummariesMessage.getBlockSummaries(); } - private List getBlockSignatures(Peer peer, byte[] parentSignature, int numberRequested) { + private List getBlockSignatures(Peer peer, byte[] parentSignature, int numberRequested) throws InterruptedException { // numberRequested is v2+ feature Message getSignaturesMessage = peer.getVersion() >= 2 ? new GetSignaturesV2Message(parentSignature, numberRequested) : new GetSignaturesMessage(parentSignature); @@ -387,7 +389,7 @@ public class Synchronizer { return signaturesMessage.getSignatures(); } - private Block fetchBlock(Repository repository, Peer peer, byte[] signature) { + private Block fetchBlock(Repository repository, Peer peer, byte[] signature) throws InterruptedException { Message getBlockMessage = new GetBlockMessage(signature); Message message = peer.getResponse(getBlockMessage); diff --git a/src/main/java/org/qora/network/Network.java b/src/main/java/org/qora/network/Network.java index ab870af6..1cf21c24 100644 --- a/src/main/java/org/qora/network/Network.java +++ b/src/main/java/org/qora/network/Network.java @@ -17,6 +17,7 @@ import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; @@ -84,6 +85,7 @@ public class Network extends Thread { public static final int PEER_ID_LENGTH = 128; private final byte[] ourPeerId; + private volatile boolean isStopping = false; private List connectedPeers; private List selfPeers; private ServerSocket listenSocket; @@ -91,6 +93,7 @@ public class Network extends Thread { private int maxPeers; private ExecutorService peerExecutor; private ExecutorService mergePeersExecutor; + private ExecutorService broadcastExecutor; private long nextBroadcast; private Lock mergePeersLock; @@ -128,6 +131,8 @@ public class Network extends Thread { maxPeers = Settings.getInstance().getMaxPeers(); peerExecutor = Executors.newCachedThreadPool(); + + broadcastExecutor = Executors.newCachedThreadPool(); nextBroadcast = System.currentTimeMillis(); mergePeersLock = new ReentrantLock(); @@ -192,7 +197,7 @@ public class Network extends Thread { // Maintain long-term connections to various peers' API applications try { - while (true) { + while (!isStopping) { acceptConnections(); pruneOldPeers(); @@ -215,14 +220,6 @@ public class Network extends Thread { LOGGER.warn("Repository issue while running network", e); // Fall-through to shutdown } - - // Shutdown - if (!this.listenSocket.isClosed()) - try { - this.listenSocket.close(); - } catch (IOException e) { - // Not important - } } @SuppressWarnings("resource") @@ -240,6 +237,7 @@ public class Network extends Thread { return; } + Peer newPeer = null; synchronized (this.connectedPeers) { if (connectedPeers.size() >= maxPeers) { // We have enough peers @@ -255,9 +253,14 @@ public class Network extends Thread { } LOGGER.debug(String.format("Connection accepted from peer %s", socket.getRemoteSocketAddress())); - Peer newPeer = new Peer(socket); + newPeer = new Peer(socket); this.connectedPeers.add(newPeer); + } + + try { peerExecutor.execute(newPeer); + } catch (RejectedExecutionException e) { + // Can't execute - probably because we're shutting down, so ignore } } while (true); } @@ -369,7 +372,11 @@ public class Network extends Thread { this.connectedPeers.add(newPeer); } - peerExecutor.execute(newPeer); + try { + peerExecutor.execute(newPeer); + } catch (RejectedExecutionException e) { + // Can't execute - probably because we're shutting down, so ignore + } } // Peer callbacks @@ -586,8 +593,23 @@ public class Network extends Thread { List knownPeers = repository.getNetworkRepository().getAllPeers(); // Filter out peers that we've not connected to ever or within X milliseconds - long connectionThreshold = NTP.getTime() - RECENT_CONNECTION_THRESHOLD; - knownPeers.removeIf(peerData -> peerData.getLastConnected() == null || peerData.getLastConnected() < connectionThreshold); + final long connectionThreshold = NTP.getTime() - RECENT_CONNECTION_THRESHOLD; + Predicate notRecentlyConnected = peerData -> { + final Long lastAttempted = peerData.getLastAttempted(); + final Long lastConnected = peerData.getLastConnected(); + + if (lastAttempted == null || lastConnected == null) + return true; + + if (lastConnected < lastAttempted) + return true; + + if (lastConnected < connectionThreshold) + return true; + + return false; + }; + knownPeers.removeIf(notRecentlyConnected); if (peer.getVersion() >= 2) { List peerAddresses = new ArrayList<>(); @@ -778,7 +800,11 @@ public class Network extends Thread { } } - mergePeersExecutor.execute(new PeersMerger(peerAddresses)); + try { + mergePeersExecutor.execute(new PeersMerger(peerAddresses)); + } catch (RejectedExecutionException e) { + // Can't execute - probably because we're shutting down, so ignore + } } public void broadcast(Function peerMessageBuilder) { @@ -817,21 +843,54 @@ public class Network extends Thread { } try { - peerExecutor.execute(new Broadcaster(this.getUniqueHandshakedPeers(), peerMessageBuilder)); + broadcastExecutor.execute(new Broadcaster(this.getUniqueHandshakedPeers(), peerMessageBuilder)); } catch (RejectedExecutionException e) { // Can't execute - probably because we're shutting down, so ignore } } public void shutdown() { - peerExecutor.shutdownNow(); + this.isStopping = true; + + // Close listen socket to prevent more incoming connections + if (!this.listenSocket.isClosed()) + try { + this.listenSocket.close(); + } catch (IOException e) { + // Not important + } + // Stop our run() thread this.interrupt(); try { this.join(); } catch (InterruptedException e) { - // We were interrupted while waiting for thread to join + LOGGER.debug("Interrupted while waiting for networking thread to terminate"); } - } + + // Give up merging peer lists + this.mergePeersExecutor.shutdownNow(); + try { + if (!this.mergePeersExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS)) + LOGGER.debug("Peer-list merging threads failed to terminate"); + } catch (InterruptedException e) { + LOGGER.debug("Interrupted while waiting for peer-list merging threads failed to terminate"); + } + + // Stop broadcasts + this.broadcastExecutor.shutdownNow(); + try { + if (!this.broadcastExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS)) + LOGGER.debug("Broadcast threads failed to terminate"); + } catch (InterruptedException e) { + LOGGER.debug("Interrupted while waiting for broadcast threads failed to terminate"); + } + + // Close all peer connections + synchronized (this.connectedPeers) { + for (Peer peer : this.connectedPeers) + peer.shutdown(); + } +} } diff --git a/src/main/java/org/qora/network/Peer.java b/src/main/java/org/qora/network/Peer.java index 3bf5a239..d1212874 100644 --- a/src/main/java/org/qora/network/Peer.java +++ b/src/main/java/org/qora/network/Peer.java @@ -37,17 +37,18 @@ import com.google.common.net.HostAndPort; import com.google.common.net.InetAddresses; // For managing one peer -public class Peer implements Runnable { +public class Peer extends Thread { private static final Logger LOGGER = LogManager.getLogger(Peer.class); private static final int CONNECT_TIMEOUT = 1000; // ms private static final int RESPONSE_TIMEOUT = 5000; // ms private static final int PING_INTERVAL = 20000; // ms - just under every 30s is usually ideal to keep NAT mappings refreshed - private static final int INACTIVITY_TIMEOUT = 30000; // ms + private static final int SOCKET_TIMEOUT = 10000; // ms private static final int UNSOLICITED_MESSAGE_QUEUE_CAPACITY = 10; private final boolean isOutbound; + private volatile boolean isStopping = false; private Socket socket = null; private PeerData peerData = null; private final ReentrantLock peerDataLock = new ReentrantLock(); @@ -227,7 +228,7 @@ public class Peer implements Runnable { } private void setup() throws IOException { - this.socket.setSoTimeout(INACTIVITY_TIMEOUT); + this.socket.setSoTimeout(SOCKET_TIMEOUT); this.out = this.socket.getOutputStream(); this.connectionTimestamp = NTP.getTime(); this.replyQueues = Collections.synchronizedMap(new HashMap>()); @@ -272,7 +273,7 @@ public class Peer implements Runnable { Network.getInstance().onPeerReady(this); - while (true) { + while (!isStopping) { // Wait (up to INACTIVITY_TIMEOUT) for, and parse, incoming message Message message = Message.fromStream(in); if (message == null) { @@ -305,7 +306,10 @@ public class Peer implements Runnable { } catch (SocketTimeoutException e) { this.disconnect("timeout"); } catch (IOException e) { - this.disconnect("I/O error"); + if (isStopping) + LOGGER.debug(String.format("Peer %s stopping...", this)); + else + this.disconnect("I/O error"); } finally { Thread.currentThread().setName("disconnected peer"); } @@ -345,13 +349,14 @@ public class Peer implements Runnable { *

* Message is assigned a random ID and sent. If a response with matching ID is received then it is returned to caller. *

- * If no response with matching ID within timeout, or some other error/exception occurs, then return null. (Assume peer will be rapidly - * disconnected after this). + * If no response with matching ID within timeout, or some other error/exception occurs, then return null.
+ * (Assume peer will be rapidly disconnected after this). * * @param message * @return Message if valid response received; null if not or error/exception occurs + * @throws InterruptedException */ - public Message getResponse(Message message) { + public Message getResponse(Message message) throws InterruptedException { BlockingQueue blockingQueue = new ArrayBlockingQueue(1); // Assign random ID to this message @@ -373,9 +378,6 @@ public class Peer implements Runnable { try { Message response = blockingQueue.poll(RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS); return response; - } catch (InterruptedException e) { - // Our thread was interrupted. Probably in shutdown scenario. - return null; } finally { this.replyQueues.remove(id); } @@ -395,14 +397,18 @@ public class Peer implements Runnable { PingMessage pingMessage = new PingMessage(); - long before = System.currentTimeMillis(); - Message message = peer.getResponse(pingMessage); - long after = System.currentTimeMillis(); + try { + long before = System.currentTimeMillis(); + Message message = peer.getResponse(pingMessage); + long after = System.currentTimeMillis(); - if (message == null || message.getType() != MessageType.PING) - peer.disconnect("no ping received"); + if (message == null || message.getType() != MessageType.PING) + peer.disconnect("no ping received"); - peer.setLastPing(after - before); + peer.setLastPing(after - before); + } catch (InterruptedException e) { + // Shutdown + } } } @@ -413,31 +419,54 @@ public class Peer implements Runnable { } public void disconnect(String reason) { - LOGGER.trace(String.format("Disconnecting peer %s: %s", this, reason)); + LOGGER.debug(String.format("Disconnecting peer %s: %s", this, reason)); + + this.shutdown(); + + Network.getInstance().onDisconnect(this); + } + + public void shutdown() { + LOGGER.debug(String.format("Shutting down peer %s", this)); + this.isStopping = true; // Shut down pinger if (this.pingExecutor != null) { this.pingExecutor.shutdownNow(); - this.pingExecutor = null; + try { + if (!this.pingExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS)) + LOGGER.debug(String.format("Pinger for peer %s failed to terminate", this)); + } catch (InterruptedException e) { + LOGGER.debug(String.format("Interrupted while terminating pinger for peer %s", this)); + } } // Shut down unsolicited message processor if (this.messageExecutor != null) { this.messageExecutor.shutdownNow(); - this.messageExecutor = null; + try { + if (!this.messageExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS)) + LOGGER.debug(String.format("Message processor for peer %s failed to terminate", this)); + } catch (InterruptedException e) { + LOGGER.debug(String.format("Interrupted while terminating message processor for peer %s", this)); + } } - // Close socket - if (!this.socket.isClosed()) { - LOGGER.debug(String.format("Closing socket with peer %s: %s", this, reason)); + this.interrupt(); + // Close socket, which should trigger run() to exit + if (!this.socket.isClosed()) { try { this.socket.close(); } catch (IOException e) { } } - Network.getInstance().onDisconnect(this); + try { + this.join(); + } catch (InterruptedException e) { + LOGGER.debug(String.format("Interrupted while waiting for peer %s to shutdown", this)); + } } // Utility methods