forked from Qortal/qortal
Added V2 support in the block archive, and added feature to rebuild a V1 block archive using V2 block serialization. Should drastically reduce the archive size once rebuilt.
This commit is contained in:
parent
ab34fae810
commit
64d8353629
@ -48,6 +48,7 @@ import org.qortal.repository.RepositoryManager;
|
|||||||
import org.qortal.transform.TransformationException;
|
import org.qortal.transform.TransformationException;
|
||||||
import org.qortal.transform.block.BlockTransformer;
|
import org.qortal.transform.block.BlockTransformer;
|
||||||
import org.qortal.utils.Base58;
|
import org.qortal.utils.Base58;
|
||||||
|
import org.qortal.utils.Triple;
|
||||||
|
|
||||||
@Path("/blocks")
|
@Path("/blocks")
|
||||||
@Tag(name = "Blocks")
|
@Tag(name = "Blocks")
|
||||||
@ -165,10 +166,13 @@ public class BlocksResource {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Not found, so try the block archive
|
// Not found, so try the block archive
|
||||||
byte[] bytes = BlockArchiveReader.getInstance().fetchSerializedBlockBytesForSignature(signature, false, repository);
|
Triple<byte[], Integer, Integer> serializedBlock = BlockArchiveReader.getInstance().fetchSerializedBlockBytesForSignature(signature, false, repository);
|
||||||
if (bytes != null) {
|
if (serializedBlock != null) {
|
||||||
if (version != 1) {
|
byte[] bytes = serializedBlock.getA();
|
||||||
throw ApiExceptionFactory.INSTANCE.createCustomException(request, ApiError.INVALID_CRITERIA, "Archived blocks require version 1");
|
Integer serializationVersion = serializedBlock.getB();
|
||||||
|
if (version != serializationVersion) {
|
||||||
|
// TODO: we could quite easily reserialize the block with the requested version
|
||||||
|
throw ApiExceptionFactory.INSTANCE.createCustomException(request, ApiError.INVALID_CRITERIA, "Block is not stored using requested serialization version.");
|
||||||
}
|
}
|
||||||
return Base58.encode(bytes);
|
return Base58.encode(bytes);
|
||||||
}
|
}
|
||||||
|
@ -657,6 +657,10 @@ public class Block {
|
|||||||
return this.atStates;
|
return this.atStates;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public byte[] getAtStatesHash() {
|
||||||
|
return this.atStatesHash;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return expanded info on block's online accounts.
|
* Return expanded info on block's online accounts.
|
||||||
* <p>
|
* <p>
|
||||||
|
@ -1379,9 +1379,24 @@ public class Controller extends Thread {
|
|||||||
// If we have no block data, we should check the archive in case it's there
|
// If we have no block data, we should check the archive in case it's there
|
||||||
if (blockData == null) {
|
if (blockData == null) {
|
||||||
if (Settings.getInstance().isArchiveEnabled()) {
|
if (Settings.getInstance().isArchiveEnabled()) {
|
||||||
byte[] bytes = BlockArchiveReader.getInstance().fetchSerializedBlockBytesForSignature(signature, true, repository);
|
Triple<byte[], Integer, Integer> serializedBlock = BlockArchiveReader.getInstance().fetchSerializedBlockBytesForSignature(signature, true, repository);
|
||||||
if (bytes != null) {
|
if (serializedBlock != null) {
|
||||||
CachedBlockMessage blockMessage = new CachedBlockMessage(bytes);
|
byte[] bytes = serializedBlock.getA();
|
||||||
|
Integer serializationVersion = serializedBlock.getB();
|
||||||
|
|
||||||
|
Message blockMessage;
|
||||||
|
switch (serializationVersion) {
|
||||||
|
case 1:
|
||||||
|
blockMessage = new CachedBlockMessage(bytes);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case 2:
|
||||||
|
blockMessage = new CachedBlockV2Message(bytes);
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
return;
|
||||||
|
}
|
||||||
blockMessage.setId(message.getId());
|
blockMessage.setId(message.getId());
|
||||||
|
|
||||||
// This call also causes the other needed data to be pulled in from repository
|
// This call also causes the other needed data to be pulled in from repository
|
||||||
|
@ -0,0 +1,119 @@
|
|||||||
|
package org.qortal.controller.repository;
|
||||||
|
|
||||||
|
import org.apache.commons.io.FileUtils;
|
||||||
|
import org.apache.logging.log4j.LogManager;
|
||||||
|
import org.apache.logging.log4j.Logger;
|
||||||
|
import org.qortal.controller.Controller;
|
||||||
|
import org.qortal.controller.Synchronizer;
|
||||||
|
import org.qortal.repository.*;
|
||||||
|
import org.qortal.settings.Settings;
|
||||||
|
import org.qortal.transform.TransformationException;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.nio.file.Paths;
|
||||||
|
|
||||||
|
|
||||||
|
public class BlockArchiveRebuilder {
|
||||||
|
|
||||||
|
private static final Logger LOGGER = LogManager.getLogger(BlockArchiveRebuilder.class);
|
||||||
|
|
||||||
|
private final int serializationVersion;
|
||||||
|
|
||||||
|
public BlockArchiveRebuilder(int serializationVersion) {
|
||||||
|
this.serializationVersion = serializationVersion;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void start() throws DataException, IOException {
|
||||||
|
if (!Settings.getInstance().isArchiveEnabled() || Settings.getInstance().isLite()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// New archive path is in a different location from original archive path, to avoid conflicts.
|
||||||
|
// It will be moved later, once the process is complete.
|
||||||
|
final Path newArchivePath = Paths.get(Settings.getInstance().getRepositoryPath(), "archive-rebuild");
|
||||||
|
final Path originalArchivePath = Paths.get(Settings.getInstance().getRepositoryPath(), "archive");
|
||||||
|
|
||||||
|
// Delete archive-rebuild if it exists from a previous attempt
|
||||||
|
FileUtils.deleteDirectory(newArchivePath.toFile());
|
||||||
|
|
||||||
|
try (final Repository repository = RepositoryManager.getRepository()) {
|
||||||
|
int startHeight = 1; // We need to rebuild the entire archive
|
||||||
|
|
||||||
|
LOGGER.info("Rebuilding block archive from height {}...", startHeight);
|
||||||
|
|
||||||
|
while (!Controller.isStopping()) {
|
||||||
|
repository.discardChanges();
|
||||||
|
|
||||||
|
Thread.sleep(1000L);
|
||||||
|
|
||||||
|
// Don't even attempt if we're mid-sync as our repository requests will be delayed for ages
|
||||||
|
if (Synchronizer.getInstance().isSynchronizing()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Rebuild archive
|
||||||
|
try {
|
||||||
|
final int maximumArchiveHeight = BlockArchiveReader.getInstance().getHeightOfLastArchivedBlock();
|
||||||
|
if (startHeight >= maximumArchiveHeight) {
|
||||||
|
// We've finished.
|
||||||
|
// Delete existing archive and move the newly built one into its place
|
||||||
|
FileUtils.deleteDirectory(originalArchivePath.toFile());
|
||||||
|
FileUtils.moveDirectory(newArchivePath.toFile(), originalArchivePath.toFile());
|
||||||
|
LOGGER.info("Block archive successfully rebuilt");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
BlockArchiveWriter writer = new BlockArchiveWriter(startHeight, maximumArchiveHeight, serializationVersion, newArchivePath, repository);
|
||||||
|
|
||||||
|
// Set data source to BLOCK_ARCHIVE as we are rebuilding
|
||||||
|
writer.setDataSource(BlockArchiveWriter.BlockArchiveDataSource.BLOCK_ARCHIVE);
|
||||||
|
|
||||||
|
// We can't enforce the 100MB file size target, as the final file needs to contain all blocks
|
||||||
|
// that exist in the current archive. Otherwise, the final blocks in the archive will be lost.
|
||||||
|
writer.setShouldEnforceFileSizeTarget(false);
|
||||||
|
|
||||||
|
// We want to log the rebuild progress
|
||||||
|
writer.setShouldLogProgress(true);
|
||||||
|
|
||||||
|
BlockArchiveWriter.BlockArchiveWriteResult result = writer.write();
|
||||||
|
switch (result) {
|
||||||
|
case OK:
|
||||||
|
// Increment block archive height
|
||||||
|
startHeight += writer.getWrittenCount();
|
||||||
|
repository.saveChanges();
|
||||||
|
break;
|
||||||
|
|
||||||
|
case STOPPING:
|
||||||
|
return;
|
||||||
|
|
||||||
|
// We've reached the limit of the blocks we can archive
|
||||||
|
// Sleep for a while to allow more to become available
|
||||||
|
case NOT_ENOUGH_BLOCKS:
|
||||||
|
// This shouldn't happen, as we're not enforcing minimum file sizes
|
||||||
|
repository.discardChanges();
|
||||||
|
throw new DataException("Unable to rebuild archive due to unexpected NOT_ENOUGH_BLOCKS response.");
|
||||||
|
|
||||||
|
case BLOCK_NOT_FOUND:
|
||||||
|
// We tried to archive a block that didn't exist. This is a major failure and likely means
|
||||||
|
// that a bootstrap or re-sync is needed. Try again every minute until then.
|
||||||
|
LOGGER.info("Error: block not found when rebuilding archive. If this error persists, " +
|
||||||
|
"a bootstrap or re-sync may be needed.");
|
||||||
|
repository.discardChanges();
|
||||||
|
throw new DataException("Unable to rebuild archive because a block is missing.");
|
||||||
|
}
|
||||||
|
|
||||||
|
} catch (IOException | TransformationException e) {
|
||||||
|
LOGGER.info("Caught exception when rebuilding block archive", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
// Do nothing
|
||||||
|
} finally {
|
||||||
|
// Delete archive-rebuild if it still exists, as that means something went wrong
|
||||||
|
FileUtils.deleteDirectory(newArchivePath.toFile());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,43 @@
|
|||||||
|
package org.qortal.network.message;
|
||||||
|
|
||||||
|
import com.google.common.primitives.Ints;
|
||||||
|
import org.qortal.block.Block;
|
||||||
|
import org.qortal.transform.TransformationException;
|
||||||
|
import org.qortal.transform.block.BlockTransformer;
|
||||||
|
|
||||||
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
|
// This is an OUTGOING-only Message which more readily lends itself to being cached
|
||||||
|
public class CachedBlockV2Message extends Message implements Cloneable {
|
||||||
|
|
||||||
|
public CachedBlockV2Message(Block block) throws TransformationException {
|
||||||
|
super(MessageType.BLOCK_V2);
|
||||||
|
|
||||||
|
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
|
||||||
|
|
||||||
|
try {
|
||||||
|
bytes.write(Ints.toByteArray(block.getBlockData().getHeight()));
|
||||||
|
|
||||||
|
bytes.write(BlockTransformer.toBytes(block));
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new AssertionError("IOException shouldn't occur with ByteArrayOutputStream");
|
||||||
|
}
|
||||||
|
|
||||||
|
this.dataBytes = bytes.toByteArray();
|
||||||
|
this.checksumBytes = Message.generateChecksum(this.dataBytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
public CachedBlockV2Message(byte[] cachedBytes) {
|
||||||
|
super(MessageType.BLOCK_V2);
|
||||||
|
|
||||||
|
this.dataBytes = cachedBytes;
|
||||||
|
this.checksumBytes = Message.generateChecksum(this.dataBytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Message fromByteBuffer(int id, ByteBuffer byteBuffer) {
|
||||||
|
throw new UnsupportedOperationException("CachedBlockMessageV2 is for outgoing messages only");
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -3,10 +3,7 @@ package org.qortal.repository;
|
|||||||
import com.google.common.primitives.Ints;
|
import com.google.common.primitives.Ints;
|
||||||
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.LogManager;
|
||||||
import org.apache.logging.log4j.Logger;
|
import org.apache.logging.log4j.Logger;
|
||||||
import org.qortal.data.at.ATStateData;
|
|
||||||
import org.qortal.data.block.BlockArchiveData;
|
import org.qortal.data.block.BlockArchiveData;
|
||||||
import org.qortal.data.block.BlockData;
|
|
||||||
import org.qortal.data.transaction.TransactionData;
|
|
||||||
import org.qortal.settings.Settings;
|
import org.qortal.settings.Settings;
|
||||||
import org.qortal.transform.TransformationException;
|
import org.qortal.transform.TransformationException;
|
||||||
import org.qortal.transform.block.BlockTransformation;
|
import org.qortal.transform.block.BlockTransformation;
|
||||||
@ -72,15 +69,30 @@ public class BlockArchiveReader {
|
|||||||
this.fetchFileList();
|
this.fetchFileList();
|
||||||
}
|
}
|
||||||
|
|
||||||
byte[] serializedBytes = this.fetchSerializedBlockBytesForHeight(height);
|
Triple<byte[], Integer, Integer> serializedBlock = this.fetchSerializedBlockBytesForHeight(height);
|
||||||
if (serializedBytes == null) {
|
byte[] serializedBytes = serializedBlock.getA();
|
||||||
|
Integer serializationVersion = serializedBlock.getB();
|
||||||
|
if (serializedBytes == null || serializationVersion == null) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
ByteBuffer byteBuffer = ByteBuffer.wrap(serializedBytes);
|
ByteBuffer byteBuffer = ByteBuffer.wrap(serializedBytes);
|
||||||
BlockTransformation blockInfo = null;
|
BlockTransformation blockInfo = null;
|
||||||
try {
|
try {
|
||||||
blockInfo = BlockTransformer.fromByteBuffer(byteBuffer);
|
switch (serializationVersion) {
|
||||||
|
case 1:
|
||||||
|
blockInfo = BlockTransformer.fromByteBuffer(byteBuffer);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case 2:
|
||||||
|
blockInfo = BlockTransformer.fromByteBufferV2(byteBuffer);
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
// Invalid serialization version
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
if (blockInfo != null && blockInfo.getBlockData() != null) {
|
if (blockInfo != null && blockInfo.getBlockData() != null) {
|
||||||
// Block height is stored outside of the main serialized bytes, so it
|
// Block height is stored outside of the main serialized bytes, so it
|
||||||
// won't be set automatically.
|
// won't be set automatically.
|
||||||
@ -168,15 +180,17 @@ public class BlockArchiveReader {
|
|||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
public byte[] fetchSerializedBlockBytesForSignature(byte[] signature, boolean includeHeightPrefix, Repository repository) {
|
public Triple<byte[], Integer, Integer> fetchSerializedBlockBytesForSignature(byte[] signature, boolean includeHeightPrefix, Repository repository) {
|
||||||
if (this.fileListCache == null) {
|
if (this.fileListCache == null) {
|
||||||
this.fetchFileList();
|
this.fetchFileList();
|
||||||
}
|
}
|
||||||
|
|
||||||
Integer height = this.fetchHeightForSignature(signature, repository);
|
Integer height = this.fetchHeightForSignature(signature, repository);
|
||||||
if (height != null) {
|
if (height != null) {
|
||||||
byte[] blockBytes = this.fetchSerializedBlockBytesForHeight(height);
|
Triple<byte[], Integer, Integer> serializedBlock = this.fetchSerializedBlockBytesForHeight(height);
|
||||||
if (blockBytes == null) {
|
byte[] blockBytes = serializedBlock.getA();
|
||||||
|
Integer version = serializedBlock.getB();
|
||||||
|
if (blockBytes == null || version == null) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -187,18 +201,18 @@ public class BlockArchiveReader {
|
|||||||
try {
|
try {
|
||||||
bytes.write(Ints.toByteArray(height));
|
bytes.write(Ints.toByteArray(height));
|
||||||
bytes.write(blockBytes);
|
bytes.write(blockBytes);
|
||||||
return bytes.toByteArray();
|
return new Triple<>(bytes.toByteArray(), version, height);
|
||||||
|
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return blockBytes;
|
return new Triple<>(blockBytes, version, height);
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
public byte[] fetchSerializedBlockBytesForHeight(int height) {
|
public Triple<byte[], Integer, Integer> fetchSerializedBlockBytesForHeight(int height) {
|
||||||
String filename = this.getFilenameForHeight(height);
|
String filename = this.getFilenameForHeight(height);
|
||||||
if (filename == null) {
|
if (filename == null) {
|
||||||
// We don't have this block in the archive
|
// We don't have this block in the archive
|
||||||
@ -221,7 +235,7 @@ public class BlockArchiveReader {
|
|||||||
// End of fixed length header
|
// End of fixed length header
|
||||||
|
|
||||||
// Make sure the version is one we recognize
|
// Make sure the version is one we recognize
|
||||||
if (version != 1) {
|
if (version != 1 && version != 2) {
|
||||||
LOGGER.info("Error: unknown version in file {}: {}", filename, version);
|
LOGGER.info("Error: unknown version in file {}: {}", filename, version);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
@ -258,7 +272,7 @@ public class BlockArchiveReader {
|
|||||||
byte[] blockBytes = new byte[blockLength];
|
byte[] blockBytes = new byte[blockLength];
|
||||||
file.read(blockBytes);
|
file.read(blockBytes);
|
||||||
|
|
||||||
return blockBytes;
|
return new Triple<>(blockBytes, version, height);
|
||||||
|
|
||||||
} catch (FileNotFoundException e) {
|
} catch (FileNotFoundException e) {
|
||||||
LOGGER.info("File {} not found: {}", filename, e.getMessage());
|
LOGGER.info("File {} not found: {}", filename, e.getMessage());
|
||||||
@ -279,6 +293,30 @@ public class BlockArchiveReader {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getHeightOfLastArchivedBlock() {
|
||||||
|
if (this.fileListCache == null) {
|
||||||
|
this.fetchFileList();
|
||||||
|
}
|
||||||
|
|
||||||
|
int maxEndHeight = 0;
|
||||||
|
|
||||||
|
Iterator it = this.fileListCache.entrySet().iterator();
|
||||||
|
while (it.hasNext()) {
|
||||||
|
Map.Entry pair = (Map.Entry) it.next();
|
||||||
|
if (pair == null && pair.getKey() == null && pair.getValue() == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
Triple<Integer, Integer, Integer> heightInfo = (Triple<Integer, Integer, Integer>) pair.getValue();
|
||||||
|
Integer endHeight = heightInfo.getB();
|
||||||
|
|
||||||
|
if (endHeight != null && endHeight > maxEndHeight) {
|
||||||
|
maxEndHeight = endHeight;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return maxEndHeight;
|
||||||
|
}
|
||||||
|
|
||||||
public void invalidateFileListCache() {
|
public void invalidateFileListCache() {
|
||||||
this.fileListCache = null;
|
this.fileListCache = null;
|
||||||
}
|
}
|
||||||
|
@ -6,10 +6,13 @@ import org.apache.logging.log4j.Logger;
|
|||||||
import org.qortal.block.Block;
|
import org.qortal.block.Block;
|
||||||
import org.qortal.controller.Controller;
|
import org.qortal.controller.Controller;
|
||||||
import org.qortal.controller.Synchronizer;
|
import org.qortal.controller.Synchronizer;
|
||||||
|
import org.qortal.data.at.ATStateData;
|
||||||
import org.qortal.data.block.BlockArchiveData;
|
import org.qortal.data.block.BlockArchiveData;
|
||||||
import org.qortal.data.block.BlockData;
|
import org.qortal.data.block.BlockData;
|
||||||
|
import org.qortal.data.transaction.TransactionData;
|
||||||
import org.qortal.settings.Settings;
|
import org.qortal.settings.Settings;
|
||||||
import org.qortal.transform.TransformationException;
|
import org.qortal.transform.TransformationException;
|
||||||
|
import org.qortal.transform.block.BlockTransformation;
|
||||||
import org.qortal.transform.block.BlockTransformer;
|
import org.qortal.transform.block.BlockTransformer;
|
||||||
|
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
@ -18,6 +21,7 @@ import java.io.IOException;
|
|||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.nio.file.Paths;
|
import java.nio.file.Paths;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
public class BlockArchiveWriter {
|
public class BlockArchiveWriter {
|
||||||
|
|
||||||
@ -28,27 +32,57 @@ public class BlockArchiveWriter {
|
|||||||
BLOCK_NOT_FOUND
|
BLOCK_NOT_FOUND
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public enum BlockArchiveDataSource {
|
||||||
|
BLOCK_REPOSITORY, // To build an archive from the Blocks table
|
||||||
|
BLOCK_ARCHIVE // To build a new archive from an existing archive
|
||||||
|
}
|
||||||
|
|
||||||
private static final Logger LOGGER = LogManager.getLogger(BlockArchiveWriter.class);
|
private static final Logger LOGGER = LogManager.getLogger(BlockArchiveWriter.class);
|
||||||
|
|
||||||
public static final long DEFAULT_FILE_SIZE_TARGET = 100 * 1024 * 1024; // 100MiB
|
public static final long DEFAULT_FILE_SIZE_TARGET = 100 * 1024 * 1024; // 100MiB
|
||||||
|
|
||||||
private int startHeight;
|
private int startHeight;
|
||||||
private final int endHeight;
|
private final int endHeight;
|
||||||
|
private final int serializationVersion;
|
||||||
|
private final Path archivePath;
|
||||||
private final Repository repository;
|
private final Repository repository;
|
||||||
|
|
||||||
private long fileSizeTarget = DEFAULT_FILE_SIZE_TARGET;
|
private long fileSizeTarget = DEFAULT_FILE_SIZE_TARGET;
|
||||||
private boolean shouldEnforceFileSizeTarget = true;
|
private boolean shouldEnforceFileSizeTarget = true;
|
||||||
|
|
||||||
|
// Default data source to BLOCK_REPOSITORY; can optionally be overridden
|
||||||
|
private BlockArchiveDataSource dataSource = BlockArchiveDataSource.BLOCK_REPOSITORY;
|
||||||
|
|
||||||
|
private boolean shouldLogProgress = false;
|
||||||
|
|
||||||
private int writtenCount;
|
private int writtenCount;
|
||||||
private int lastWrittenHeight;
|
private int lastWrittenHeight;
|
||||||
private Path outputPath;
|
private Path outputPath;
|
||||||
|
|
||||||
public BlockArchiveWriter(int startHeight, int endHeight, Repository repository) {
|
/**
|
||||||
|
* Instantiate a BlockArchiveWriter using a custom archive path
|
||||||
|
* @param startHeight
|
||||||
|
* @param endHeight
|
||||||
|
* @param repository
|
||||||
|
*/
|
||||||
|
public BlockArchiveWriter(int startHeight, int endHeight, int serializationVersion, Path archivePath, Repository repository) {
|
||||||
this.startHeight = startHeight;
|
this.startHeight = startHeight;
|
||||||
this.endHeight = endHeight;
|
this.endHeight = endHeight;
|
||||||
|
this.serializationVersion = serializationVersion;
|
||||||
|
this.archivePath = archivePath.toAbsolutePath();
|
||||||
this.repository = repository;
|
this.repository = repository;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Instantiate a BlockArchiveWriter using the default archive path and version
|
||||||
|
* @param startHeight
|
||||||
|
* @param endHeight
|
||||||
|
* @param repository
|
||||||
|
*/
|
||||||
|
public BlockArchiveWriter(int startHeight, int endHeight, Repository repository) {
|
||||||
|
this(startHeight, endHeight, 1, Paths.get(Settings.getInstance().getRepositoryPath(), "archive"), repository);
|
||||||
|
}
|
||||||
|
|
||||||
public static int getMaxArchiveHeight(Repository repository) throws DataException {
|
public static int getMaxArchiveHeight(Repository repository) throws DataException {
|
||||||
// We must only archive trimmed blocks, or the archive will grow far too large
|
// We must only archive trimmed blocks, or the archive will grow far too large
|
||||||
final int accountSignaturesTrimStartHeight = repository.getBlockRepository().getOnlineAccountsSignaturesTrimHeight();
|
final int accountSignaturesTrimStartHeight = repository.getBlockRepository().getOnlineAccountsSignaturesTrimHeight();
|
||||||
@ -72,8 +106,7 @@ public class BlockArchiveWriter {
|
|||||||
|
|
||||||
public BlockArchiveWriteResult write() throws DataException, IOException, TransformationException, InterruptedException {
|
public BlockArchiveWriteResult write() throws DataException, IOException, TransformationException, InterruptedException {
|
||||||
// Create the archive folder if it doesn't exist
|
// Create the archive folder if it doesn't exist
|
||||||
// This is a subfolder of the db directory, to make bootstrapping easier
|
// This is generally a subfolder of the db directory, to make bootstrapping easier
|
||||||
Path archivePath = Paths.get(Settings.getInstance().getRepositoryPath(), "archive").toAbsolutePath();
|
|
||||||
try {
|
try {
|
||||||
Files.createDirectories(archivePath);
|
Files.createDirectories(archivePath);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
@ -95,8 +128,7 @@ public class BlockArchiveWriter {
|
|||||||
|
|
||||||
LOGGER.info(String.format("Fetching blocks from height %d...", startHeight));
|
LOGGER.info(String.format("Fetching blocks from height %d...", startHeight));
|
||||||
int i = 0;
|
int i = 0;
|
||||||
while (headerBytes.size() + bytes.size() < this.fileSizeTarget
|
while (headerBytes.size() + bytes.size() < this.fileSizeTarget) {
|
||||||
|| this.shouldEnforceFileSizeTarget == false) {
|
|
||||||
|
|
||||||
if (Controller.isStopping()) {
|
if (Controller.isStopping()) {
|
||||||
return BlockArchiveWriteResult.STOPPING;
|
return BlockArchiveWriteResult.STOPPING;
|
||||||
@ -112,7 +144,28 @@ public class BlockArchiveWriter {
|
|||||||
|
|
||||||
//LOGGER.info("Fetching block {}...", currentHeight);
|
//LOGGER.info("Fetching block {}...", currentHeight);
|
||||||
|
|
||||||
BlockData blockData = repository.getBlockRepository().fromHeight(currentHeight);
|
BlockData blockData = null;
|
||||||
|
List<TransactionData> transactions = null;
|
||||||
|
List<ATStateData> atStates = null;
|
||||||
|
byte[] atStatesHash = null;
|
||||||
|
|
||||||
|
switch (this.dataSource) {
|
||||||
|
case BLOCK_ARCHIVE:
|
||||||
|
BlockTransformation archivedBlock = BlockArchiveReader.getInstance().fetchBlockAtHeight(currentHeight);
|
||||||
|
if (archivedBlock != null) {
|
||||||
|
blockData = archivedBlock.getBlockData();
|
||||||
|
transactions = archivedBlock.getTransactions();
|
||||||
|
atStates = archivedBlock.getAtStates();
|
||||||
|
atStatesHash = archivedBlock.getAtStatesHash();
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
|
case BLOCK_REPOSITORY:
|
||||||
|
default:
|
||||||
|
blockData = repository.getBlockRepository().fromHeight(currentHeight);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
if (blockData == null) {
|
if (blockData == null) {
|
||||||
return BlockArchiveWriteResult.BLOCK_NOT_FOUND;
|
return BlockArchiveWriteResult.BLOCK_NOT_FOUND;
|
||||||
}
|
}
|
||||||
@ -122,18 +175,47 @@ public class BlockArchiveWriter {
|
|||||||
repository.getBlockArchiveRepository().save(blockArchiveData);
|
repository.getBlockArchiveRepository().save(blockArchiveData);
|
||||||
repository.saveChanges();
|
repository.saveChanges();
|
||||||
|
|
||||||
|
// Build the block
|
||||||
|
Block block;
|
||||||
|
if (atStatesHash != null) {
|
||||||
|
block = new Block(repository, blockData, transactions, atStatesHash);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
block = new Block(repository, blockData, transactions, atStates);
|
||||||
|
}
|
||||||
|
|
||||||
// Write the block data to some byte buffers
|
// Write the block data to some byte buffers
|
||||||
Block block = new Block(repository, blockData);
|
|
||||||
int blockIndex = bytes.size();
|
int blockIndex = bytes.size();
|
||||||
// Write block index to header
|
// Write block index to header
|
||||||
headerBytes.write(Ints.toByteArray(blockIndex));
|
headerBytes.write(Ints.toByteArray(blockIndex));
|
||||||
// Write block height
|
// Write block height
|
||||||
bytes.write(Ints.toByteArray(block.getBlockData().getHeight()));
|
bytes.write(Ints.toByteArray(block.getBlockData().getHeight()));
|
||||||
byte[] blockBytes = BlockTransformer.toBytes(block);
|
|
||||||
|
// Get serialized block bytes
|
||||||
|
byte[] blockBytes;
|
||||||
|
switch (serializationVersion) {
|
||||||
|
case 1:
|
||||||
|
blockBytes = BlockTransformer.toBytes(block);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case 2:
|
||||||
|
blockBytes = BlockTransformer.toBytesV2(block);
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
throw new DataException("Invalid serialization version");
|
||||||
|
}
|
||||||
|
|
||||||
// Write block length
|
// Write block length
|
||||||
bytes.write(Ints.toByteArray(blockBytes.length));
|
bytes.write(Ints.toByteArray(blockBytes.length));
|
||||||
// Write block bytes
|
// Write block bytes
|
||||||
bytes.write(blockBytes);
|
bytes.write(blockBytes);
|
||||||
|
|
||||||
|
// Log every 1000 blocks
|
||||||
|
if (this.shouldLogProgress && i % 1000 == 0) {
|
||||||
|
LOGGER.info("Archived up to block height {}. Size of current file: {} bytes", currentHeight, (headerBytes.size() + bytes.size()));
|
||||||
|
}
|
||||||
|
|
||||||
i++;
|
i++;
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -147,11 +229,10 @@ public class BlockArchiveWriter {
|
|||||||
|
|
||||||
// We have enough blocks to create a new file
|
// We have enough blocks to create a new file
|
||||||
int endHeight = startHeight + i - 1;
|
int endHeight = startHeight + i - 1;
|
||||||
int version = 1;
|
|
||||||
String filePath = String.format("%s/%d-%d.dat", archivePath.toString(), startHeight, endHeight);
|
String filePath = String.format("%s/%d-%d.dat", archivePath.toString(), startHeight, endHeight);
|
||||||
FileOutputStream fileOutputStream = new FileOutputStream(filePath);
|
FileOutputStream fileOutputStream = new FileOutputStream(filePath);
|
||||||
// Write version number
|
// Write version number
|
||||||
fileOutputStream.write(Ints.toByteArray(version));
|
fileOutputStream.write(Ints.toByteArray(serializationVersion));
|
||||||
// Write start height
|
// Write start height
|
||||||
fileOutputStream.write(Ints.toByteArray(startHeight));
|
fileOutputStream.write(Ints.toByteArray(startHeight));
|
||||||
// Write end height
|
// Write end height
|
||||||
@ -199,4 +280,12 @@ public class BlockArchiveWriter {
|
|||||||
this.shouldEnforceFileSizeTarget = shouldEnforceFileSizeTarget;
|
this.shouldEnforceFileSizeTarget = shouldEnforceFileSizeTarget;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setDataSource(BlockArchiveDataSource dataSource) {
|
||||||
|
this.dataSource = dataSource;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setShouldLogProgress(boolean shouldLogProgress) {
|
||||||
|
this.shouldLogProgress = shouldLogProgress;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -312,16 +312,24 @@ public class BlockTransformer extends Transformer {
|
|||||||
ByteArrayOutputStream atHashBytes = new ByteArrayOutputStream(atBytesLength);
|
ByteArrayOutputStream atHashBytes = new ByteArrayOutputStream(atBytesLength);
|
||||||
long atFees = 0;
|
long atFees = 0;
|
||||||
|
|
||||||
for (ATStateData atStateData : block.getATStates()) {
|
if (block.getAtStatesHash() != null) {
|
||||||
// Skip initial states generated by DEPLOY_AT transactions in the same block
|
// We already have the AT states hash
|
||||||
if (atStateData.isInitial())
|
atFees = blockData.getATFees();
|
||||||
continue;
|
atHashBytes.write(block.getAtStatesHash());
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
// We need to build the AT states hash
|
||||||
|
for (ATStateData atStateData : block.getATStates()) {
|
||||||
|
// Skip initial states generated by DEPLOY_AT transactions in the same block
|
||||||
|
if (atStateData.isInitial())
|
||||||
|
continue;
|
||||||
|
|
||||||
atHashBytes.write(atStateData.getATAddress().getBytes(StandardCharsets.UTF_8));
|
atHashBytes.write(atStateData.getATAddress().getBytes(StandardCharsets.UTF_8));
|
||||||
atHashBytes.write(atStateData.getStateHash());
|
atHashBytes.write(atStateData.getStateHash());
|
||||||
atHashBytes.write(Longs.toByteArray(atStateData.getFees()));
|
atHashBytes.write(Longs.toByteArray(atStateData.getFees()));
|
||||||
|
|
||||||
atFees += atStateData.getFees();
|
atFees += atStateData.getFees();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bytes.write(Ints.toByteArray(blockData.getATCount()));
|
bytes.write(Ints.toByteArray(blockData.getATCount()));
|
||||||
|
Loading…
x
Reference in New Issue
Block a user