From 64d835362993ef8a7615257bac178c05c139c555 Mon Sep 17 00:00:00 2001 From: CalDescent Date: Thu, 2 Feb 2023 15:54:03 +0100 Subject: [PATCH] Added V2 support in the block archive, and added feature to rebuild a V1 block archive using V2 block serialization. Should drastically reduce the archive size once rebuilt. --- .../qortal/api/resource/BlocksResource.java | 12 +- src/main/java/org/qortal/block/Block.java | 4 + .../org/qortal/controller/Controller.java | 21 +++- .../repository/BlockArchiveRebuilder.java | 119 ++++++++++++++++++ .../network/message/CachedBlockV2Message.java | 43 +++++++ .../qortal/repository/BlockArchiveReader.java | 66 +++++++--- .../qortal/repository/BlockArchiveWriter.java | 109 ++++++++++++++-- .../transform/block/BlockTransformer.java | 24 ++-- 8 files changed, 359 insertions(+), 39 deletions(-) create mode 100644 src/main/java/org/qortal/controller/repository/BlockArchiveRebuilder.java create mode 100644 src/main/java/org/qortal/network/message/CachedBlockV2Message.java diff --git a/src/main/java/org/qortal/api/resource/BlocksResource.java b/src/main/java/org/qortal/api/resource/BlocksResource.java index 15541802..20207c70 100644 --- a/src/main/java/org/qortal/api/resource/BlocksResource.java +++ b/src/main/java/org/qortal/api/resource/BlocksResource.java @@ -48,6 +48,7 @@ import org.qortal.repository.RepositoryManager; import org.qortal.transform.TransformationException; import org.qortal.transform.block.BlockTransformer; import org.qortal.utils.Base58; +import org.qortal.utils.Triple; @Path("/blocks") @Tag(name = "Blocks") @@ -165,10 +166,13 @@ public class BlocksResource { } // Not found, so try the block archive - byte[] bytes = BlockArchiveReader.getInstance().fetchSerializedBlockBytesForSignature(signature, false, repository); - if (bytes != null) { - if (version != 1) { - throw ApiExceptionFactory.INSTANCE.createCustomException(request, ApiError.INVALID_CRITERIA, "Archived blocks require version 1"); + Triple serializedBlock = BlockArchiveReader.getInstance().fetchSerializedBlockBytesForSignature(signature, false, repository); + if (serializedBlock != null) { + byte[] bytes = serializedBlock.getA(); + Integer serializationVersion = serializedBlock.getB(); + if (version != serializationVersion) { + // TODO: we could quite easily reserialize the block with the requested version + throw ApiExceptionFactory.INSTANCE.createCustomException(request, ApiError.INVALID_CRITERIA, "Block is not stored using requested serialization version."); } return Base58.encode(bytes); } diff --git a/src/main/java/org/qortal/block/Block.java b/src/main/java/org/qortal/block/Block.java index 3f306b93..540f8cf7 100644 --- a/src/main/java/org/qortal/block/Block.java +++ b/src/main/java/org/qortal/block/Block.java @@ -657,6 +657,10 @@ public class Block { return this.atStates; } + public byte[] getAtStatesHash() { + return this.atStatesHash; + } + /** * Return expanded info on block's online accounts. *

diff --git a/src/main/java/org/qortal/controller/Controller.java b/src/main/java/org/qortal/controller/Controller.java index e9e1fcc2..ed1d2d07 100644 --- a/src/main/java/org/qortal/controller/Controller.java +++ b/src/main/java/org/qortal/controller/Controller.java @@ -1379,9 +1379,24 @@ public class Controller extends Thread { // If we have no block data, we should check the archive in case it's there if (blockData == null) { if (Settings.getInstance().isArchiveEnabled()) { - byte[] bytes = BlockArchiveReader.getInstance().fetchSerializedBlockBytesForSignature(signature, true, repository); - if (bytes != null) { - CachedBlockMessage blockMessage = new CachedBlockMessage(bytes); + Triple serializedBlock = BlockArchiveReader.getInstance().fetchSerializedBlockBytesForSignature(signature, true, repository); + if (serializedBlock != null) { + byte[] bytes = serializedBlock.getA(); + Integer serializationVersion = serializedBlock.getB(); + + Message blockMessage; + switch (serializationVersion) { + case 1: + blockMessage = new CachedBlockMessage(bytes); + break; + + case 2: + blockMessage = new CachedBlockV2Message(bytes); + break; + + default: + return; + } blockMessage.setId(message.getId()); // This call also causes the other needed data to be pulled in from repository diff --git a/src/main/java/org/qortal/controller/repository/BlockArchiveRebuilder.java b/src/main/java/org/qortal/controller/repository/BlockArchiveRebuilder.java new file mode 100644 index 00000000..74201251 --- /dev/null +++ b/src/main/java/org/qortal/controller/repository/BlockArchiveRebuilder.java @@ -0,0 +1,119 @@ +package org.qortal.controller.repository; + +import org.apache.commons.io.FileUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.qortal.controller.Controller; +import org.qortal.controller.Synchronizer; +import org.qortal.repository.*; +import org.qortal.settings.Settings; +import org.qortal.transform.TransformationException; + +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; + + +public class BlockArchiveRebuilder { + + private static final Logger LOGGER = LogManager.getLogger(BlockArchiveRebuilder.class); + + private final int serializationVersion; + + public BlockArchiveRebuilder(int serializationVersion) { + this.serializationVersion = serializationVersion; + } + + public void start() throws DataException, IOException { + if (!Settings.getInstance().isArchiveEnabled() || Settings.getInstance().isLite()) { + return; + } + + // New archive path is in a different location from original archive path, to avoid conflicts. + // It will be moved later, once the process is complete. + final Path newArchivePath = Paths.get(Settings.getInstance().getRepositoryPath(), "archive-rebuild"); + final Path originalArchivePath = Paths.get(Settings.getInstance().getRepositoryPath(), "archive"); + + // Delete archive-rebuild if it exists from a previous attempt + FileUtils.deleteDirectory(newArchivePath.toFile()); + + try (final Repository repository = RepositoryManager.getRepository()) { + int startHeight = 1; // We need to rebuild the entire archive + + LOGGER.info("Rebuilding block archive from height {}...", startHeight); + + while (!Controller.isStopping()) { + repository.discardChanges(); + + Thread.sleep(1000L); + + // Don't even attempt if we're mid-sync as our repository requests will be delayed for ages + if (Synchronizer.getInstance().isSynchronizing()) { + continue; + } + + // Rebuild archive + try { + final int maximumArchiveHeight = BlockArchiveReader.getInstance().getHeightOfLastArchivedBlock(); + if (startHeight >= maximumArchiveHeight) { + // We've finished. + // Delete existing archive and move the newly built one into its place + FileUtils.deleteDirectory(originalArchivePath.toFile()); + FileUtils.moveDirectory(newArchivePath.toFile(), originalArchivePath.toFile()); + LOGGER.info("Block archive successfully rebuilt"); + return; + } + + BlockArchiveWriter writer = new BlockArchiveWriter(startHeight, maximumArchiveHeight, serializationVersion, newArchivePath, repository); + + // Set data source to BLOCK_ARCHIVE as we are rebuilding + writer.setDataSource(BlockArchiveWriter.BlockArchiveDataSource.BLOCK_ARCHIVE); + + // We can't enforce the 100MB file size target, as the final file needs to contain all blocks + // that exist in the current archive. Otherwise, the final blocks in the archive will be lost. + writer.setShouldEnforceFileSizeTarget(false); + + // We want to log the rebuild progress + writer.setShouldLogProgress(true); + + BlockArchiveWriter.BlockArchiveWriteResult result = writer.write(); + switch (result) { + case OK: + // Increment block archive height + startHeight += writer.getWrittenCount(); + repository.saveChanges(); + break; + + case STOPPING: + return; + + // We've reached the limit of the blocks we can archive + // Sleep for a while to allow more to become available + case NOT_ENOUGH_BLOCKS: + // This shouldn't happen, as we're not enforcing minimum file sizes + repository.discardChanges(); + throw new DataException("Unable to rebuild archive due to unexpected NOT_ENOUGH_BLOCKS response."); + + case BLOCK_NOT_FOUND: + // We tried to archive a block that didn't exist. This is a major failure and likely means + // that a bootstrap or re-sync is needed. Try again every minute until then. + LOGGER.info("Error: block not found when rebuilding archive. If this error persists, " + + "a bootstrap or re-sync may be needed."); + repository.discardChanges(); + throw new DataException("Unable to rebuild archive because a block is missing."); + } + + } catch (IOException | TransformationException e) { + LOGGER.info("Caught exception when rebuilding block archive", e); + } + + } + } catch (InterruptedException e) { + // Do nothing + } finally { + // Delete archive-rebuild if it still exists, as that means something went wrong + FileUtils.deleteDirectory(newArchivePath.toFile()); + } + } + +} diff --git a/src/main/java/org/qortal/network/message/CachedBlockV2Message.java b/src/main/java/org/qortal/network/message/CachedBlockV2Message.java new file mode 100644 index 00000000..c981293d --- /dev/null +++ b/src/main/java/org/qortal/network/message/CachedBlockV2Message.java @@ -0,0 +1,43 @@ +package org.qortal.network.message; + +import com.google.common.primitives.Ints; +import org.qortal.block.Block; +import org.qortal.transform.TransformationException; +import org.qortal.transform.block.BlockTransformer; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; + +// This is an OUTGOING-only Message which more readily lends itself to being cached +public class CachedBlockV2Message extends Message implements Cloneable { + + public CachedBlockV2Message(Block block) throws TransformationException { + super(MessageType.BLOCK_V2); + + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + + try { + bytes.write(Ints.toByteArray(block.getBlockData().getHeight())); + + bytes.write(BlockTransformer.toBytes(block)); + } catch (IOException e) { + throw new AssertionError("IOException shouldn't occur with ByteArrayOutputStream"); + } + + this.dataBytes = bytes.toByteArray(); + this.checksumBytes = Message.generateChecksum(this.dataBytes); + } + + public CachedBlockV2Message(byte[] cachedBytes) { + super(MessageType.BLOCK_V2); + + this.dataBytes = cachedBytes; + this.checksumBytes = Message.generateChecksum(this.dataBytes); + } + + public static Message fromByteBuffer(int id, ByteBuffer byteBuffer) { + throw new UnsupportedOperationException("CachedBlockMessageV2 is for outgoing messages only"); + } + +} diff --git a/src/main/java/org/qortal/repository/BlockArchiveReader.java b/src/main/java/org/qortal/repository/BlockArchiveReader.java index 311d21c7..c5878563 100644 --- a/src/main/java/org/qortal/repository/BlockArchiveReader.java +++ b/src/main/java/org/qortal/repository/BlockArchiveReader.java @@ -3,10 +3,7 @@ package org.qortal.repository; import com.google.common.primitives.Ints; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.qortal.data.at.ATStateData; import org.qortal.data.block.BlockArchiveData; -import org.qortal.data.block.BlockData; -import org.qortal.data.transaction.TransactionData; import org.qortal.settings.Settings; import org.qortal.transform.TransformationException; import org.qortal.transform.block.BlockTransformation; @@ -72,15 +69,30 @@ public class BlockArchiveReader { this.fetchFileList(); } - byte[] serializedBytes = this.fetchSerializedBlockBytesForHeight(height); - if (serializedBytes == null) { + Triple serializedBlock = this.fetchSerializedBlockBytesForHeight(height); + byte[] serializedBytes = serializedBlock.getA(); + Integer serializationVersion = serializedBlock.getB(); + if (serializedBytes == null || serializationVersion == null) { return null; } ByteBuffer byteBuffer = ByteBuffer.wrap(serializedBytes); BlockTransformation blockInfo = null; try { - blockInfo = BlockTransformer.fromByteBuffer(byteBuffer); + switch (serializationVersion) { + case 1: + blockInfo = BlockTransformer.fromByteBuffer(byteBuffer); + break; + + case 2: + blockInfo = BlockTransformer.fromByteBufferV2(byteBuffer); + break; + + default: + // Invalid serialization version + return null; + } + if (blockInfo != null && blockInfo.getBlockData() != null) { // Block height is stored outside of the main serialized bytes, so it // won't be set automatically. @@ -168,15 +180,17 @@ public class BlockArchiveReader { return null; } - public byte[] fetchSerializedBlockBytesForSignature(byte[] signature, boolean includeHeightPrefix, Repository repository) { + public Triple fetchSerializedBlockBytesForSignature(byte[] signature, boolean includeHeightPrefix, Repository repository) { if (this.fileListCache == null) { this.fetchFileList(); } Integer height = this.fetchHeightForSignature(signature, repository); if (height != null) { - byte[] blockBytes = this.fetchSerializedBlockBytesForHeight(height); - if (blockBytes == null) { + Triple serializedBlock = this.fetchSerializedBlockBytesForHeight(height); + byte[] blockBytes = serializedBlock.getA(); + Integer version = serializedBlock.getB(); + if (blockBytes == null || version == null) { return null; } @@ -187,18 +201,18 @@ public class BlockArchiveReader { try { bytes.write(Ints.toByteArray(height)); bytes.write(blockBytes); - return bytes.toByteArray(); + return new Triple<>(bytes.toByteArray(), version, height); } catch (IOException e) { return null; } } - return blockBytes; + return new Triple<>(blockBytes, version, height); } return null; } - public byte[] fetchSerializedBlockBytesForHeight(int height) { + public Triple fetchSerializedBlockBytesForHeight(int height) { String filename = this.getFilenameForHeight(height); if (filename == null) { // We don't have this block in the archive @@ -221,7 +235,7 @@ public class BlockArchiveReader { // End of fixed length header // Make sure the version is one we recognize - if (version != 1) { + if (version != 1 && version != 2) { LOGGER.info("Error: unknown version in file {}: {}", filename, version); return null; } @@ -258,7 +272,7 @@ public class BlockArchiveReader { byte[] blockBytes = new byte[blockLength]; file.read(blockBytes); - return blockBytes; + return new Triple<>(blockBytes, version, height); } catch (FileNotFoundException e) { LOGGER.info("File {} not found: {}", filename, e.getMessage()); @@ -279,6 +293,30 @@ public class BlockArchiveReader { } } + public int getHeightOfLastArchivedBlock() { + if (this.fileListCache == null) { + this.fetchFileList(); + } + + int maxEndHeight = 0; + + Iterator it = this.fileListCache.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry pair = (Map.Entry) it.next(); + if (pair == null && pair.getKey() == null && pair.getValue() == null) { + continue; + } + Triple heightInfo = (Triple) pair.getValue(); + Integer endHeight = heightInfo.getB(); + + if (endHeight != null && endHeight > maxEndHeight) { + maxEndHeight = endHeight; + } + } + + return maxEndHeight; + } + public void invalidateFileListCache() { this.fileListCache = null; } diff --git a/src/main/java/org/qortal/repository/BlockArchiveWriter.java b/src/main/java/org/qortal/repository/BlockArchiveWriter.java index 5127bf9b..c2eb17c9 100644 --- a/src/main/java/org/qortal/repository/BlockArchiveWriter.java +++ b/src/main/java/org/qortal/repository/BlockArchiveWriter.java @@ -6,10 +6,13 @@ import org.apache.logging.log4j.Logger; import org.qortal.block.Block; import org.qortal.controller.Controller; import org.qortal.controller.Synchronizer; +import org.qortal.data.at.ATStateData; import org.qortal.data.block.BlockArchiveData; import org.qortal.data.block.BlockData; +import org.qortal.data.transaction.TransactionData; import org.qortal.settings.Settings; import org.qortal.transform.TransformationException; +import org.qortal.transform.block.BlockTransformation; import org.qortal.transform.block.BlockTransformer; import java.io.ByteArrayOutputStream; @@ -18,6 +21,7 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.List; public class BlockArchiveWriter { @@ -28,27 +32,57 @@ public class BlockArchiveWriter { BLOCK_NOT_FOUND } + public enum BlockArchiveDataSource { + BLOCK_REPOSITORY, // To build an archive from the Blocks table + BLOCK_ARCHIVE // To build a new archive from an existing archive + } + private static final Logger LOGGER = LogManager.getLogger(BlockArchiveWriter.class); public static final long DEFAULT_FILE_SIZE_TARGET = 100 * 1024 * 1024; // 100MiB private int startHeight; private final int endHeight; + private final int serializationVersion; + private final Path archivePath; private final Repository repository; private long fileSizeTarget = DEFAULT_FILE_SIZE_TARGET; private boolean shouldEnforceFileSizeTarget = true; + // Default data source to BLOCK_REPOSITORY; can optionally be overridden + private BlockArchiveDataSource dataSource = BlockArchiveDataSource.BLOCK_REPOSITORY; + + private boolean shouldLogProgress = false; + private int writtenCount; private int lastWrittenHeight; private Path outputPath; - public BlockArchiveWriter(int startHeight, int endHeight, Repository repository) { + /** + * Instantiate a BlockArchiveWriter using a custom archive path + * @param startHeight + * @param endHeight + * @param repository + */ + public BlockArchiveWriter(int startHeight, int endHeight, int serializationVersion, Path archivePath, Repository repository) { this.startHeight = startHeight; this.endHeight = endHeight; + this.serializationVersion = serializationVersion; + this.archivePath = archivePath.toAbsolutePath(); this.repository = repository; } + /** + * Instantiate a BlockArchiveWriter using the default archive path and version + * @param startHeight + * @param endHeight + * @param repository + */ + public BlockArchiveWriter(int startHeight, int endHeight, Repository repository) { + this(startHeight, endHeight, 1, Paths.get(Settings.getInstance().getRepositoryPath(), "archive"), repository); + } + public static int getMaxArchiveHeight(Repository repository) throws DataException { // We must only archive trimmed blocks, or the archive will grow far too large final int accountSignaturesTrimStartHeight = repository.getBlockRepository().getOnlineAccountsSignaturesTrimHeight(); @@ -72,8 +106,7 @@ public class BlockArchiveWriter { public BlockArchiveWriteResult write() throws DataException, IOException, TransformationException, InterruptedException { // Create the archive folder if it doesn't exist - // This is a subfolder of the db directory, to make bootstrapping easier - Path archivePath = Paths.get(Settings.getInstance().getRepositoryPath(), "archive").toAbsolutePath(); + // This is generally a subfolder of the db directory, to make bootstrapping easier try { Files.createDirectories(archivePath); } catch (IOException e) { @@ -95,8 +128,7 @@ public class BlockArchiveWriter { LOGGER.info(String.format("Fetching blocks from height %d...", startHeight)); int i = 0; - while (headerBytes.size() + bytes.size() < this.fileSizeTarget - || this.shouldEnforceFileSizeTarget == false) { + while (headerBytes.size() + bytes.size() < this.fileSizeTarget) { if (Controller.isStopping()) { return BlockArchiveWriteResult.STOPPING; @@ -112,7 +144,28 @@ public class BlockArchiveWriter { //LOGGER.info("Fetching block {}...", currentHeight); - BlockData blockData = repository.getBlockRepository().fromHeight(currentHeight); + BlockData blockData = null; + List transactions = null; + List atStates = null; + byte[] atStatesHash = null; + + switch (this.dataSource) { + case BLOCK_ARCHIVE: + BlockTransformation archivedBlock = BlockArchiveReader.getInstance().fetchBlockAtHeight(currentHeight); + if (archivedBlock != null) { + blockData = archivedBlock.getBlockData(); + transactions = archivedBlock.getTransactions(); + atStates = archivedBlock.getAtStates(); + atStatesHash = archivedBlock.getAtStatesHash(); + } + break; + + case BLOCK_REPOSITORY: + default: + blockData = repository.getBlockRepository().fromHeight(currentHeight); + break; + } + if (blockData == null) { return BlockArchiveWriteResult.BLOCK_NOT_FOUND; } @@ -122,18 +175,47 @@ public class BlockArchiveWriter { repository.getBlockArchiveRepository().save(blockArchiveData); repository.saveChanges(); + // Build the block + Block block; + if (atStatesHash != null) { + block = new Block(repository, blockData, transactions, atStatesHash); + } + else { + block = new Block(repository, blockData, transactions, atStates); + } + // Write the block data to some byte buffers - Block block = new Block(repository, blockData); int blockIndex = bytes.size(); // Write block index to header headerBytes.write(Ints.toByteArray(blockIndex)); // Write block height bytes.write(Ints.toByteArray(block.getBlockData().getHeight())); - byte[] blockBytes = BlockTransformer.toBytes(block); + + // Get serialized block bytes + byte[] blockBytes; + switch (serializationVersion) { + case 1: + blockBytes = BlockTransformer.toBytes(block); + break; + + case 2: + blockBytes = BlockTransformer.toBytesV2(block); + break; + + default: + throw new DataException("Invalid serialization version"); + } + // Write block length bytes.write(Ints.toByteArray(blockBytes.length)); // Write block bytes bytes.write(blockBytes); + + // Log every 1000 blocks + if (this.shouldLogProgress && i % 1000 == 0) { + LOGGER.info("Archived up to block height {}. Size of current file: {} bytes", currentHeight, (headerBytes.size() + bytes.size())); + } + i++; } @@ -147,11 +229,10 @@ public class BlockArchiveWriter { // We have enough blocks to create a new file int endHeight = startHeight + i - 1; - int version = 1; String filePath = String.format("%s/%d-%d.dat", archivePath.toString(), startHeight, endHeight); FileOutputStream fileOutputStream = new FileOutputStream(filePath); // Write version number - fileOutputStream.write(Ints.toByteArray(version)); + fileOutputStream.write(Ints.toByteArray(serializationVersion)); // Write start height fileOutputStream.write(Ints.toByteArray(startHeight)); // Write end height @@ -199,4 +280,12 @@ public class BlockArchiveWriter { this.shouldEnforceFileSizeTarget = shouldEnforceFileSizeTarget; } + public void setDataSource(BlockArchiveDataSource dataSource) { + this.dataSource = dataSource; + } + + public void setShouldLogProgress(boolean shouldLogProgress) { + this.shouldLogProgress = shouldLogProgress; + } + } diff --git a/src/main/java/org/qortal/transform/block/BlockTransformer.java b/src/main/java/org/qortal/transform/block/BlockTransformer.java index c97aa090..15445327 100644 --- a/src/main/java/org/qortal/transform/block/BlockTransformer.java +++ b/src/main/java/org/qortal/transform/block/BlockTransformer.java @@ -312,16 +312,24 @@ public class BlockTransformer extends Transformer { ByteArrayOutputStream atHashBytes = new ByteArrayOutputStream(atBytesLength); long atFees = 0; - for (ATStateData atStateData : block.getATStates()) { - // Skip initial states generated by DEPLOY_AT transactions in the same block - if (atStateData.isInitial()) - continue; + if (block.getAtStatesHash() != null) { + // We already have the AT states hash + atFees = blockData.getATFees(); + atHashBytes.write(block.getAtStatesHash()); + } + else { + // We need to build the AT states hash + for (ATStateData atStateData : block.getATStates()) { + // Skip initial states generated by DEPLOY_AT transactions in the same block + if (atStateData.isInitial()) + continue; - atHashBytes.write(atStateData.getATAddress().getBytes(StandardCharsets.UTF_8)); - atHashBytes.write(atStateData.getStateHash()); - atHashBytes.write(Longs.toByteArray(atStateData.getFees())); + atHashBytes.write(atStateData.getATAddress().getBytes(StandardCharsets.UTF_8)); + atHashBytes.write(atStateData.getStateHash()); + atHashBytes.write(Longs.toByteArray(atStateData.getFees())); - atFees += atStateData.getFees(); + atFees += atStateData.getFees(); + } } bytes.write(Ints.toByteArray(blockData.getATCount()));