Fix repository race condition from using wrong synchronization object

Previous fixes for "transaction rollback: serialization failure" when updating trim heights
in commits 16397852 and 58ed7205 had the right idea but were broken due to being synchronized
on different objects.

this.repository.trimHeightsLock would be a new Object() for each repository connection/session
and so not actually synchronize concurrent updates.

Implicit saveChanges()/COMMIT is still needed.

Fix is to use a repository-wide object for synchronization - in this case the repositoryFactory
object as held by RepositoryManager.

Added test to cover.

Also reduced DB trim height read to one call at start of thread for both trimming threads.
This commit is contained in:
catbref 2020-11-17 14:53:39 +00:00
parent 62ae49b639
commit a12045c19e
5 changed files with 154 additions and 11 deletions

View File

@ -18,6 +18,8 @@ public class AtStatesTrimmer implements Runnable {
Thread.currentThread().setName("AT States trimmer");
try (final Repository repository = RepositoryManager.getRepository()) {
int trimStartHeight = repository.getATRepository().getAtTrimHeight();
repository.getATRepository().prepareForAtStateTrimming();
repository.saveChanges();
@ -41,8 +43,6 @@ public class AtStatesTrimmer implements Runnable {
long upperTrimmableTimestamp = Math.min(currentTrimmableTimestamp, chainTrimmableTimestamp);
int upperTrimmableHeight = repository.getBlockRepository().getHeightFromTimestamp(upperTrimmableTimestamp);
int trimStartHeight = repository.getATRepository().getAtTrimHeight();
int upperBatchHeight = trimStartHeight + Settings.getInstance().getAtStatesTrimBatchSize();
int upperTrimHeight = Math.min(upperBatchHeight, upperTrimmableHeight);
@ -53,17 +53,20 @@ public class AtStatesTrimmer implements Runnable {
repository.saveChanges();
if (numAtStatesTrimmed > 0) {
final int finalTrimStartHeight = trimStartHeight;
LOGGER.debug(() -> String.format("Trimmed %d AT state%s between blocks %d and %d",
numAtStatesTrimmed, (numAtStatesTrimmed != 1 ? "s" : ""),
trimStartHeight, upperTrimHeight));
finalTrimStartHeight, upperTrimHeight));
} else {
// Can we move onto next batch?
if (upperTrimmableHeight > upperBatchHeight) {
repository.getATRepository().setAtTrimHeight(upperBatchHeight);
trimStartHeight = upperBatchHeight;
repository.getATRepository().setAtTrimHeight(trimStartHeight);
repository.getATRepository().prepareForAtStateTrimming();
repository.saveChanges();
LOGGER.debug(() -> String.format("Bumping AT state trim height to %d", upperBatchHeight));
final int finalTrimStartHeight = trimStartHeight;
LOGGER.debug(() -> String.format("Bumping AT state base trim height to %d", finalTrimStartHeight));
}
}
}

View File

@ -23,6 +23,8 @@ public class OnlineAccountsSignaturesTrimmer implements Runnable {
// Don't even start trimming until initial rush has ended
Thread.sleep(INITIAL_SLEEP_PERIOD);
int trimStartHeight = repository.getBlockRepository().getOnlineAccountsSignaturesTrimHeight();
while (!Controller.isStopping()) {
repository.discardChanges();
@ -40,8 +42,6 @@ public class OnlineAccountsSignaturesTrimmer implements Runnable {
long upperTrimmableTimestamp = NTP.getTime() - BlockChain.getInstance().getOnlineAccountSignaturesMaxLifetime();
int upperTrimmableHeight = repository.getBlockRepository().getHeightFromTimestamp(upperTrimmableTimestamp);
int trimStartHeight = repository.getBlockRepository().getOnlineAccountsSignaturesTrimHeight();
int upperBatchHeight = trimStartHeight + Settings.getInstance().getOnlineSignaturesTrimBatchSize();
int upperTrimHeight = Math.min(upperBatchHeight, upperTrimmableHeight);
@ -52,16 +52,20 @@ public class OnlineAccountsSignaturesTrimmer implements Runnable {
repository.saveChanges();
if (numSigsTrimmed > 0) {
final int finalTrimStartHeight = trimStartHeight;
LOGGER.debug(() -> String.format("Trimmed %d online accounts signature%s between blocks %d and %d",
numSigsTrimmed, (numSigsTrimmed != 1 ? "s" : ""),
trimStartHeight, upperTrimHeight));
finalTrimStartHeight, upperTrimHeight));
} else {
// Can we move onto next batch?
if (upperTrimmableHeight > upperBatchHeight) {
repository.getBlockRepository().setOnlineAccountsSignaturesTrimHeight(upperBatchHeight);
trimStartHeight = upperBatchHeight;
repository.getBlockRepository().setOnlineAccountsSignaturesTrimHeight(trimStartHeight);
repository.saveChanges();
LOGGER.debug(() -> String.format("Bumping online accounts signatures trim height to %d", upperBatchHeight));
final int finalTrimStartHeight = trimStartHeight;
LOGGER.debug(() -> String.format("Bumping online accounts signatures base trim height to %d", finalTrimStartHeight));
}
}
}

View File

@ -4,6 +4,10 @@ public abstract class RepositoryManager {
private static RepositoryFactory repositoryFactory = null;
public static RepositoryFactory getRepositoryFactory() {
return repositoryFactory;
}
public static void setRepositoryFactory(RepositoryFactory newRepositoryFactory) {
repositoryFactory = newRepositoryFactory;
}

View File

@ -60,7 +60,8 @@ public class HSQLDBRepository implements Repository {
protected List<String> sqlStatements;
protected long sessionId;
protected final Map<String, PreparedStatement> preparedStatementCache = new HashMap<>();
protected final Object trimHeightsLock = new Object();
// We want the same object corresponding to the actual DB
protected final Object trimHeightsLock = RepositoryManager.getRepositoryFactory();
private final ATRepository atRepository = new HSQLDBATRepository(this);
private final AccountRepository accountRepository = new HSQLDBAccountRepository(this);

View File

@ -15,12 +15,18 @@ import org.qortal.test.common.Common;
import static org.junit.Assert.*;
import java.lang.reflect.Field;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@ -127,6 +133,131 @@ public class RepositoryTests extends Common {
}
}
@Test
public void testTrimDeadlock() {
ExecutorService executor = Executors.newCachedThreadPool();
CountDownLatch readyLatch = new CountDownLatch(1);
CountDownLatch updateLatch = new CountDownLatch(1);
CountDownLatch syncLatch = new CountDownLatch(1);
// Open connection 1
try (final HSQLDBRepository repository1 = (HSQLDBRepository) RepositoryManager.getRepository()) {
// Read AT states trim height
int atTrimHeight = repository1.getATRepository().getAtTrimHeight();
repository1.discardChanges();
// Open connection 2
try (final HSQLDBRepository repository2 = (HSQLDBRepository) RepositoryManager.getRepository()) {
// Read online signatures trim height
int onlineSignaturesTrimHeight = repository2.getBlockRepository().getOnlineAccountsSignaturesTrimHeight();
repository2.discardChanges();
Future<Boolean> f2 = executor.submit(() -> {
Object trimHeightsLock = extractTrimHeightsLock(repository2);
System.out.println(String.format("f2: repository2's trimHeightsLock object: %s", trimHeightsLock));
// Update online signatures trim height (implicit commit)
synchronized (trimHeightsLock) {
try {
System.out.println("f2: updating online signatures trim height...");
// simulate: repository2.getBlockRepository().setOnlineAccountsSignaturesTrimHeight(onlineSignaturesTrimHeight);
String updateSql = "UPDATE DatabaseInfo SET online_signatures_trim_height = ?";
PreparedStatement pstmt = repository2.prepareStatement(updateSql);
pstmt.setInt(1, onlineSignaturesTrimHeight);
pstmt.executeUpdate();
// But no commit/saveChanges yet to force HSQLDB error
System.out.println("f2: readyLatch.countDown()");
readyLatch.countDown();
// wait for other thread to be ready to hit sync block
System.out.println("f2: waiting for f1 syncLatch...");
syncLatch.await();
// hang on to trimHeightsLock to force other thread to wait (if code is correct), or to fail (if code is faulty)
System.out.println("f2: updateLatch.await(<with timeout>)");
if (!updateLatch.await(500L, TimeUnit.MILLISECONDS)) { // long enough for other thread to reach synchronized block
// wait period expired suggesting no concurrent access, i.e. code is correct
System.out.println("f2: updateLatch.await() timed out");
System.out.println("f2: saveChanges()");
repository2.saveChanges();
return Boolean.TRUE;
}
System.out.println("f2: saveChanges()");
repository2.saveChanges();
// Early exit from wait period suggests concurrent access, i.e. code faulty
return Boolean.FALSE;
} catch (InterruptedException | SQLException e) {
System.out.println("f2: exception: " + e.getMessage());
return Boolean.FALSE;
}
}
});
System.out.println("waiting for f2 readyLatch...");
readyLatch.await();
System.out.println("launching f1...");
Future<Boolean> f1 = executor.submit(() -> {
Object trimHeightsLock = extractTrimHeightsLock(repository1);
System.out.println(String.format("f1: repository1's trimHeightsLock object: %s", trimHeightsLock));
System.out.println("f1: syncLatch.countDown()");
syncLatch.countDown();
// Update AT states trim height (implicit commit)
synchronized (trimHeightsLock) {
try {
System.out.println("f1: updating AT trim height...");
// simulate: repository1.getATRepository().setAtTrimHeight(atTrimHeight);
String updateSql = "UPDATE DatabaseInfo SET AT_trim_height = ?";
PreparedStatement pstmt = repository1.prepareStatement(updateSql);
pstmt.setInt(1, atTrimHeight);
pstmt.executeUpdate();
System.out.println("f1: saveChanges()");
repository1.saveChanges();
System.out.println("f1: updateLatch.countDown()");
updateLatch.countDown();
return Boolean.TRUE;
} catch (SQLException e) {
System.out.println("f1: exception: " + e.getMessage());
return Boolean.FALSE;
}
}
});
if (Boolean.TRUE != f1.get())
fail("concurrency bug - simultaneous update of DatabaseInfo table");
if (Boolean.TRUE != f2.get())
fail("concurrency bug - not synchronized on same object?");
} catch (InterruptedException e) {
fail("concurrency bug: " + e.getMessage());
} catch (ExecutionException e) {
fail("concurrency bug: " + e.getMessage());
}
} catch (DataException e) {
fail("database bug");
}
}
private static Object extractTrimHeightsLock(HSQLDBRepository repository) {
try {
Field trimHeightsLockField = repository.getClass().getDeclaredField("trimHeightsLock");
trimHeightsLockField.setAccessible(true);
return trimHeightsLockField.get(repository);
} catch (IllegalArgumentException | NoSuchFieldException | SecurityException | IllegalAccessException e) {
fail();
return null;
}
}
/** Check that the <i>sub-query</i> used to fetch highest block height is optimized by HSQLDB. */
@Test
public void testBlockHeightSpeed() throws DataException, SQLException {