Browse Source

Improve startup & shutdown. Improve API call POST /admin/forcesync.

Previous version was prone to throwing exceptions during shutdown
sequence, especially in Peer/Network-related threads.

Shutdown now tries to wait for Peer/Network threads to cleanly exit.

API call POST /admin/forcesync now tries to grab blockchain lock,
with timeout, and then repeatedly sync with requested peer until
either fully synced or something unexpected happens.
pull/67/head
catbref 5 years ago
parent
commit
8b135eb447
  1. 18
      src/main/java/org/qora/api/resource/AdminResource.java
  2. 63
      src/main/java/org/qora/controller/Controller.java
  3. 14
      src/main/java/org/qora/controller/Synchronizer.java
  4. 89
      src/main/java/org/qora/network/Network.java
  5. 63
      src/main/java/org/qora/network/Peer.java

18
src/main/java/org/qora/api/resource/AdminResource.java

@ -20,6 +20,8 @@ import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import javax.servlet.http.HttpServletRequest;
@ -443,7 +445,19 @@ public class AdminResource {
if (targetPeer == null)
throw ApiExceptionFactory.INSTANCE.createException(request, ApiError.INVALID_DATA);
SynchronizationResult syncResult = Synchronizer.getInstance().synchronize(targetPeer, true);
// Try to grab blockchain lock
ReentrantLock blockchainLock = Controller.getInstance().getBlockchainLock();
if (!blockchainLock.tryLock(5000, TimeUnit.MILLISECONDS))
return SynchronizationResult.NO_BLOCKCHAIN_LOCK.name();
SynchronizationResult syncResult;
try {
do {
syncResult = Synchronizer.getInstance().synchronize(targetPeer, true);
} while (syncResult == SynchronizationResult.OK);
} finally {
blockchainLock.unlock();
}
return syncResult.name();
} catch (IllegalArgumentException e) {
@ -452,6 +466,8 @@ public class AdminResource {
throw e;
} catch (UnknownHostException e) {
throw ApiExceptionFactory.INSTANCE.createException(request, ApiError.INVALID_DATA);
} catch (InterruptedException e) {
return SynchronizationResult.NO_BLOCKCHAIN_LOCK.name();
}
}

63
src/main/java/org/qora/controller/Controller.java

@ -217,18 +217,8 @@ public class Controller extends Thread {
System.exit(2);
}
LOGGER.info("Starting block generator");
blockGenerator = new BlockGenerator();
blockGenerator.start();
LOGGER.info("Starting API on port " + Settings.getInstance().getApiPort());
try {
ApiService apiService = ApiService.getInstance();
apiService.start();
} catch (Exception e) {
LOGGER.error("Unable to start API", e);
System.exit(1);
}
LOGGER.info("Starting controller");
Controller.getInstance().start();
LOGGER.info("Starting networking");
try {
@ -248,8 +238,9 @@ public class Controller extends Thread {
}
});
LOGGER.info("Starting controller");
Controller.getInstance().start();
LOGGER.info("Starting block generator");
blockGenerator = new BlockGenerator();
blockGenerator.start();
// Arbitrary transaction data manager
// LOGGER.info("Starting arbitrary-transaction data manager");
@ -259,6 +250,15 @@ public class Controller extends Thread {
LOGGER.info("Starting auto-update");
AutoUpdate.getInstance().start();
LOGGER.info("Starting API on port " + Settings.getInstance().getApiPort());
try {
ApiService apiService = ApiService.getInstance();
apiService.start();
} catch (Exception e) {
LOGGER.error("Unable to start API", e);
System.exit(1);
}
LOGGER.info("Starting node management UI on port " + Settings.getInstance().getUiPort());
try {
UiService uiService = UiService.getInstance();
@ -284,12 +284,9 @@ public class Controller extends Thread {
public void run() {
Thread.currentThread().setName("Controller");
while (!isStopping) {
try {
while (!isStopping) {
Thread.sleep(1000);
} catch (InterruptedException e) {
return;
}
if (requestSync) {
requestSync = false;
@ -300,9 +297,12 @@ public class Controller extends Thread {
final long requestMinimumTimestamp = NTP.getTime() - ARBITRARY_REQUEST_TIMEOUT;
arbitraryDataRequests.entrySet().removeIf(entry -> entry.getValue().getC() < requestMinimumTimestamp);
}
} catch (InterruptedException e) {
// Fall-through to exit
}
}
private void potentiallySynchronize() {
private void potentiallySynchronize() throws InterruptedException {
List<Peer> peers = Network.getInstance().getUniqueHandshakedPeers();
// Disregard peers that have "misbehaved" recently
@ -391,6 +391,9 @@ public class Controller extends Thread {
LOGGER.info("Shutting down node management UI");
UiService.getInstance().stop();
LOGGER.info("Shutting down API");
ApiService.getInstance().stop();
LOGGER.info("Shutting down auto-update");
AutoUpdate.getInstance().shutdown();
@ -398,29 +401,26 @@ public class Controller extends Thread {
// LOGGER.info("Shutting down arbitrary-transaction data manager");
// ArbitraryDataManager.getInstance().shutdown();
LOGGER.info("Shutting down controller");
this.interrupt();
if (blockGenerator != null) {
LOGGER.info("Shutting down block generator");
blockGenerator.shutdown();
try {
this.join();
blockGenerator.join();
} catch (InterruptedException e) {
// We were interrupted while waiting for thread to join
}
}
LOGGER.info("Shutting down networking");
Network.getInstance().shutdown();
LOGGER.info("Shutting down API");
ApiService.getInstance().stop();
if (blockGenerator != null) {
LOGGER.info("Shutting down block generator");
blockGenerator.shutdown();
LOGGER.info("Shutting down controller");
this.interrupt();
try {
blockGenerator.join();
this.join();
} catch (InterruptedException e) {
// We were interrupted while waiting for thread to join
}
}
try {
LOGGER.info("Shutting down repository");
@ -793,6 +793,9 @@ public class Controller extends Thread {
}
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while processing unconfirmed transactions from peer %s", peer), e);
} catch (InterruptedException e) {
// Shutdown
return;
}
if (newSignatures.isEmpty())

14
src/main/java/org/qora/controller/Synchronizer.java

@ -66,8 +66,9 @@ public class Synchronizer {
* <p>
* @param peer
* @return false if something went wrong, true otherwise.
* @throws InterruptedException
*/
public SynchronizationResult synchronize(Peer peer, boolean force) {
public SynchronizationResult synchronize(Peer peer, boolean force) throws InterruptedException {
// Make sure we're the only thread modifying the blockchain
// If we're already synchronizing with another peer then this will also return fast
ReentrantLock blockchainLock = Controller.getInstance().getBlockchainLock();
@ -277,10 +278,10 @@ public class Synchronizer {
repository.discardChanges(); // Free repository locks, if any, also in case anything went wrong
this.repository = null;
}
}
} catch (DataException e) {
LOGGER.error("Repository issue during synchronization with peer", e);
return SynchronizationResult.REPOSITORY_ISSUE;
}
} finally {
blockchainLock.unlock();
}
@ -292,8 +293,9 @@ public class Synchronizer {
* @param peer
* @return block signatures, or empty list if no common block, or null if there was an issue
* @throws DataException
* @throws InterruptedException
*/
private List<byte[]> findSignaturesFromCommonBlock(Peer peer, int ourHeight) throws DataException {
private List<byte[]> findSignaturesFromCommonBlock(Peer peer, int ourHeight) throws DataException, InterruptedException {
// Start by asking for a few recent block hashes as this will cover a majority of reorgs
// Failing that, back off exponentially
int step = INITIAL_BLOCK_STEP;
@ -362,7 +364,7 @@ public class Synchronizer {
return blockSignatures;
}
private List<BlockSummaryData> getBlockSummaries(Peer peer, byte[] parentSignature, int numberRequested) {
private List<BlockSummaryData> getBlockSummaries(Peer peer, byte[] parentSignature, int numberRequested) throws InterruptedException {
Message getBlockSummariesMessage = new GetBlockSummariesMessage(parentSignature, numberRequested);
Message message = peer.getResponse(getBlockSummariesMessage);
@ -374,7 +376,7 @@ public class Synchronizer {
return blockSummariesMessage.getBlockSummaries();
}
private List<byte[]> getBlockSignatures(Peer peer, byte[] parentSignature, int numberRequested) {
private List<byte[]> getBlockSignatures(Peer peer, byte[] parentSignature, int numberRequested) throws InterruptedException {
// numberRequested is v2+ feature
Message getSignaturesMessage = peer.getVersion() >= 2 ? new GetSignaturesV2Message(parentSignature, numberRequested) : new GetSignaturesMessage(parentSignature);
@ -387,7 +389,7 @@ public class Synchronizer {
return signaturesMessage.getSignatures();
}
private Block fetchBlock(Repository repository, Peer peer, byte[] signature) {
private Block fetchBlock(Repository repository, Peer peer, byte[] signature) throws InterruptedException {
Message getBlockMessage = new GetBlockMessage(signature);
Message message = peer.getResponse(getBlockMessage);

89
src/main/java/org/qora/network/Network.java

@ -17,6 +17,7 @@ import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
@ -84,6 +85,7 @@ public class Network extends Thread {
public static final int PEER_ID_LENGTH = 128;
private final byte[] ourPeerId;
private volatile boolean isStopping = false;
private List<Peer> connectedPeers;
private List<PeerAddress> selfPeers;
private ServerSocket listenSocket;
@ -91,6 +93,7 @@ public class Network extends Thread {
private int maxPeers;
private ExecutorService peerExecutor;
private ExecutorService mergePeersExecutor;
private ExecutorService broadcastExecutor;
private long nextBroadcast;
private Lock mergePeersLock;
@ -128,6 +131,8 @@ public class Network extends Thread {
maxPeers = Settings.getInstance().getMaxPeers();
peerExecutor = Executors.newCachedThreadPool();
broadcastExecutor = Executors.newCachedThreadPool();
nextBroadcast = System.currentTimeMillis();
mergePeersLock = new ReentrantLock();
@ -192,7 +197,7 @@ public class Network extends Thread {
// Maintain long-term connections to various peers' API applications
try {
while (true) {
while (!isStopping) {
acceptConnections();
pruneOldPeers();
@ -215,14 +220,6 @@ public class Network extends Thread {
LOGGER.warn("Repository issue while running network", e);
// Fall-through to shutdown
}
// Shutdown
if (!this.listenSocket.isClosed())
try {
this.listenSocket.close();
} catch (IOException e) {
// Not important
}
}
@SuppressWarnings("resource")
@ -240,6 +237,7 @@ public class Network extends Thread {
return;
}
Peer newPeer = null;
synchronized (this.connectedPeers) {
if (connectedPeers.size() >= maxPeers) {
// We have enough peers
@ -255,9 +253,14 @@ public class Network extends Thread {
}
LOGGER.debug(String.format("Connection accepted from peer %s", socket.getRemoteSocketAddress()));
Peer newPeer = new Peer(socket);
newPeer = new Peer(socket);
this.connectedPeers.add(newPeer);
}
try {
peerExecutor.execute(newPeer);
} catch (RejectedExecutionException e) {
// Can't execute - probably because we're shutting down, so ignore
}
} while (true);
}
@ -369,7 +372,11 @@ public class Network extends Thread {
this.connectedPeers.add(newPeer);
}
try {
peerExecutor.execute(newPeer);
} catch (RejectedExecutionException e) {
// Can't execute - probably because we're shutting down, so ignore
}
}
// Peer callbacks
@ -586,8 +593,23 @@ public class Network extends Thread {
List<PeerData> knownPeers = repository.getNetworkRepository().getAllPeers();
// Filter out peers that we've not connected to ever or within X milliseconds
long connectionThreshold = NTP.getTime() - RECENT_CONNECTION_THRESHOLD;
knownPeers.removeIf(peerData -> peerData.getLastConnected() == null || peerData.getLastConnected() < connectionThreshold);
final long connectionThreshold = NTP.getTime() - RECENT_CONNECTION_THRESHOLD;
Predicate<PeerData> notRecentlyConnected = peerData -> {
final Long lastAttempted = peerData.getLastAttempted();
final Long lastConnected = peerData.getLastConnected();
if (lastAttempted == null || lastConnected == null)
return true;
if (lastConnected < lastAttempted)
return true;
if (lastConnected < connectionThreshold)
return true;
return false;
};
knownPeers.removeIf(notRecentlyConnected);
if (peer.getVersion() >= 2) {
List<PeerAddress> peerAddresses = new ArrayList<>();
@ -778,7 +800,11 @@ public class Network extends Thread {
}
}
try {
mergePeersExecutor.execute(new PeersMerger(peerAddresses));
} catch (RejectedExecutionException e) {
// Can't execute - probably because we're shutting down, so ignore
}
}
public void broadcast(Function<Peer, Message> peerMessageBuilder) {
@ -817,21 +843,54 @@ public class Network extends Thread {
}
try {
peerExecutor.execute(new Broadcaster(this.getUniqueHandshakedPeers(), peerMessageBuilder));
broadcastExecutor.execute(new Broadcaster(this.getUniqueHandshakedPeers(), peerMessageBuilder));
} catch (RejectedExecutionException e) {
// Can't execute - probably because we're shutting down, so ignore
}
}
public void shutdown() {
peerExecutor.shutdownNow();
this.isStopping = true;
// Close listen socket to prevent more incoming connections
if (!this.listenSocket.isClosed())
try {
this.listenSocket.close();
} catch (IOException e) {
// Not important
}
// Stop our run() thread
this.interrupt();
try {
this.join();
} catch (InterruptedException e) {
// We were interrupted while waiting for thread to join
LOGGER.debug("Interrupted while waiting for networking thread to terminate");
}
// Give up merging peer lists
this.mergePeersExecutor.shutdownNow();
try {
if (!this.mergePeersExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS))
LOGGER.debug("Peer-list merging threads failed to terminate");
} catch (InterruptedException e) {
LOGGER.debug("Interrupted while waiting for peer-list merging threads failed to terminate");
}
// Stop broadcasts
this.broadcastExecutor.shutdownNow();
try {
if (!this.broadcastExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS))
LOGGER.debug("Broadcast threads failed to terminate");
} catch (InterruptedException e) {
LOGGER.debug("Interrupted while waiting for broadcast threads failed to terminate");
}
// Close all peer connections
synchronized (this.connectedPeers) {
for (Peer peer : this.connectedPeers)
peer.shutdown();
}
}
}

63
src/main/java/org/qora/network/Peer.java

@ -37,17 +37,18 @@ import com.google.common.net.HostAndPort;
import com.google.common.net.InetAddresses;
// For managing one peer
public class Peer implements Runnable {
public class Peer extends Thread {
private static final Logger LOGGER = LogManager.getLogger(Peer.class);
private static final int CONNECT_TIMEOUT = 1000; // ms
private static final int RESPONSE_TIMEOUT = 5000; // ms
private static final int PING_INTERVAL = 20000; // ms - just under every 30s is usually ideal to keep NAT mappings refreshed
private static final int INACTIVITY_TIMEOUT = 30000; // ms
private static final int SOCKET_TIMEOUT = 10000; // ms
private static final int UNSOLICITED_MESSAGE_QUEUE_CAPACITY = 10;
private final boolean isOutbound;
private volatile boolean isStopping = false;
private Socket socket = null;
private PeerData peerData = null;
private final ReentrantLock peerDataLock = new ReentrantLock();
@ -227,7 +228,7 @@ public class Peer implements Runnable {
}
private void setup() throws IOException {
this.socket.setSoTimeout(INACTIVITY_TIMEOUT);
this.socket.setSoTimeout(SOCKET_TIMEOUT);
this.out = this.socket.getOutputStream();
this.connectionTimestamp = NTP.getTime();
this.replyQueues = Collections.synchronizedMap(new HashMap<Integer, BlockingQueue<Message>>());
@ -272,7 +273,7 @@ public class Peer implements Runnable {
Network.getInstance().onPeerReady(this);
while (true) {
while (!isStopping) {
// Wait (up to INACTIVITY_TIMEOUT) for, and parse, incoming message
Message message = Message.fromStream(in);
if (message == null) {
@ -305,6 +306,9 @@ public class Peer implements Runnable {
} catch (SocketTimeoutException e) {
this.disconnect("timeout");
} catch (IOException e) {
if (isStopping)
LOGGER.debug(String.format("Peer %s stopping...", this));
else
this.disconnect("I/O error");
} finally {
Thread.currentThread().setName("disconnected peer");
@ -345,13 +349,14 @@ public class Peer implements Runnable {
* <p>
* Message is assigned a random ID and sent. If a response with matching ID is received then it is returned to caller.
* <p>
* If no response with matching ID within timeout, or some other error/exception occurs, then return <code>null</code>. (Assume peer will be rapidly
* disconnected after this).
* If no response with matching ID within timeout, or some other error/exception occurs, then return <code>null</code>.<br>
* (Assume peer will be rapidly disconnected after this).
*
* @param message
* @return <code>Message</code> if valid response received; <code>null</code> if not or error/exception occurs
* @throws InterruptedException
*/
public Message getResponse(Message message) {
public Message getResponse(Message message) throws InterruptedException {
BlockingQueue<Message> blockingQueue = new ArrayBlockingQueue<Message>(1);
// Assign random ID to this message
@ -373,9 +378,6 @@ public class Peer implements Runnable {
try {
Message response = blockingQueue.poll(RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS);
return response;
} catch (InterruptedException e) {
// Our thread was interrupted. Probably in shutdown scenario.
return null;
} finally {
this.replyQueues.remove(id);
}
@ -395,6 +397,7 @@ public class Peer implements Runnable {
PingMessage pingMessage = new PingMessage();
try {
long before = System.currentTimeMillis();
Message message = peer.getResponse(pingMessage);
long after = System.currentTimeMillis();
@ -403,6 +406,9 @@ public class Peer implements Runnable {
peer.disconnect("no ping received");
peer.setLastPing(after - before);
} catch (InterruptedException e) {
// Shutdown
}
}
}
@ -413,31 +419,54 @@ public class Peer implements Runnable {
}
public void disconnect(String reason) {
LOGGER.trace(String.format("Disconnecting peer %s: %s", this, reason));
LOGGER.debug(String.format("Disconnecting peer %s: %s", this, reason));
this.shutdown();
Network.getInstance().onDisconnect(this);
}
public void shutdown() {
LOGGER.debug(String.format("Shutting down peer %s", this));
this.isStopping = true;
// Shut down pinger
if (this.pingExecutor != null) {
this.pingExecutor.shutdownNow();
this.pingExecutor = null;
try {
if (!this.pingExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS))
LOGGER.debug(String.format("Pinger for peer %s failed to terminate", this));
} catch (InterruptedException e) {
LOGGER.debug(String.format("Interrupted while terminating pinger for peer %s", this));
}
}
// Shut down unsolicited message processor
if (this.messageExecutor != null) {
this.messageExecutor.shutdownNow();
this.messageExecutor = null;
try {
if (!this.messageExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS))
LOGGER.debug(String.format("Message processor for peer %s failed to terminate", this));
} catch (InterruptedException e) {
LOGGER.debug(String.format("Interrupted while terminating message processor for peer %s", this));
}
}
// Close socket
if (!this.socket.isClosed()) {
LOGGER.debug(String.format("Closing socket with peer %s: %s", this, reason));
this.interrupt();
// Close socket, which should trigger run() to exit
if (!this.socket.isClosed()) {
try {
this.socket.close();
} catch (IOException e) {
}
}
Network.getInstance().onDisconnect(this);
try {
this.join();
} catch (InterruptedException e) {
LOGGER.debug(String.format("Interrupted while waiting for peer %s to shutdown", this));
}
}
// Utility methods

Loading…
Cancel
Save