diff --git a/src/main/java/org/qortal/controller/Controller.java b/src/main/java/org/qortal/controller/Controller.java index 0026be41..974567f4 100644 --- a/src/main/java/org/qortal/controller/Controller.java +++ b/src/main/java/org/qortal/controller/Controller.java @@ -105,6 +105,8 @@ public class Controller extends Thread { private static final long LAST_SEEN_EXPIRY_PERIOD = (ONLINE_TIMESTAMP_MODULUS * 2) + (1 * 60 * 1000L); /** How many (latest) blocks' worth of online accounts we cache */ private static final int MAX_BLOCKS_CACHED_ONLINE_ACCOUNTS = 2; + private static final long ONLINE_ACCOUNTS_V2_PEER_VERSION = 0x0300010002L; + private static volatile boolean isStopping = false; private static BlockMinter blockMinter = null; @@ -1282,6 +1284,14 @@ public class Controller extends Thread { onNetworkOnlineAccountsMessage(peer, message); break; + case GET_ONLINE_ACCOUNTS_V2: + onNetworkGetOnlineAccountsV2Message(peer, message); + break; + + case ONLINE_ACCOUNTS_V2: + onNetworkOnlineAccountsV2Message(peer, message); + break; + case GET_ARBITRARY_DATA: // Not currently supported break; @@ -1687,6 +1697,53 @@ public class Controller extends Thread { } } + private void onNetworkGetOnlineAccountsV2Message(Peer peer, Message message) { + GetOnlineAccountsV2Message getOnlineAccountsMessage = (GetOnlineAccountsV2Message) message; + + List excludeAccounts = getOnlineAccountsMessage.getOnlineAccounts(); + + // Send online accounts info, excluding entries with matching timestamp & public key from excludeAccounts + List accountsToSend; + synchronized (this.onlineAccounts) { + accountsToSend = new ArrayList<>(this.onlineAccounts); + } + + Iterator iterator = accountsToSend.iterator(); + + SEND_ITERATOR: + while (iterator.hasNext()) { + OnlineAccountData onlineAccountData = iterator.next(); + + for (int i = 0; i < excludeAccounts.size(); ++i) { + OnlineAccountData excludeAccountData = excludeAccounts.get(i); + + if (onlineAccountData.getTimestamp() == excludeAccountData.getTimestamp() && Arrays.equals(onlineAccountData.getPublicKey(), excludeAccountData.getPublicKey())) { + iterator.remove(); + continue SEND_ITERATOR; + } + } + } + + Message onlineAccountsMessage = new OnlineAccountsV2Message(accountsToSend); + peer.sendMessage(onlineAccountsMessage); + + LOGGER.trace(() -> String.format("Sent %d of our %d online accounts to %s", accountsToSend.size(), this.onlineAccounts.size(), peer)); + } + + private void onNetworkOnlineAccountsV2Message(Peer peer, Message message) { + OnlineAccountsV2Message onlineAccountsMessage = (OnlineAccountsV2Message) message; + + List peersOnlineAccounts = onlineAccountsMessage.getOnlineAccounts(); + LOGGER.trace(() -> String.format("Received %d online accounts from %s", peersOnlineAccounts.size(), peer)); + + try (final Repository repository = RepositoryManager.getRepository()) { + for (OnlineAccountData onlineAccountData : peersOnlineAccounts) + this.verifyAndAddAccount(repository, onlineAccountData); + } catch (DataException e) { + LOGGER.error(String.format("Repository issue while verifying online accounts from peer %s", peer), e); + } + } + // Utilities private void verifyAndAddAccount(Repository repository, OnlineAccountData onlineAccountData) throws DataException { @@ -1798,11 +1855,17 @@ public class Controller extends Thread { // Request data from other peers? if ((this.onlineAccountsTasksTimestamp % ONLINE_ACCOUNTS_BROADCAST_INTERVAL) < ONLINE_ACCOUNTS_TASKS_INTERVAL) { - Message message; + List safeOnlineAccounts; synchronized (this.onlineAccounts) { - message = new GetOnlineAccountsMessage(this.onlineAccounts); + safeOnlineAccounts = new ArrayList<>(this.onlineAccounts); } - Network.getInstance().broadcast(peer -> message); + + Message messageV1 = new GetOnlineAccountsMessage(safeOnlineAccounts); + Message messageV2 = new GetOnlineAccountsV2Message(safeOnlineAccounts); + + Network.getInstance().broadcast(peer -> + peer.getPeersVersion() >= ONLINE_ACCOUNTS_V2_PEER_VERSION ? messageV2 : messageV1 + ); } // Refresh our online accounts signatures? @@ -1894,8 +1957,12 @@ public class Controller extends Thread { if (!hasInfoChanged) return; - Message message = new OnlineAccountsMessage(ourOnlineAccounts); - Network.getInstance().broadcast(peer -> message); + Message messageV1 = new OnlineAccountsMessage(ourOnlineAccounts); + Message messageV2 = new OnlineAccountsV2Message(ourOnlineAccounts); + + Network.getInstance().broadcast(peer -> + peer.getPeersVersion() >= ONLINE_ACCOUNTS_V2_PEER_VERSION ? messageV2 : messageV1 + ); LOGGER.trace(() -> String.format("Broadcasted %d online account%s with timestamp %d", ourOnlineAccounts.size(), (ourOnlineAccounts.size() != 1 ? "s" : ""), onlineAccountsTimestamp)); } diff --git a/src/main/java/org/qortal/network/message/GetOnlineAccountsV2Message.java b/src/main/java/org/qortal/network/message/GetOnlineAccountsV2Message.java new file mode 100644 index 00000000..709f9782 --- /dev/null +++ b/src/main/java/org/qortal/network/message/GetOnlineAccountsV2Message.java @@ -0,0 +1,117 @@ +package org.qortal.network.message; + +import com.google.common.primitives.Ints; +import com.google.common.primitives.Longs; +import org.qortal.data.network.OnlineAccountData; +import org.qortal.transform.Transformer; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * For requesting online accounts info from remote peer, given our list of online accounts. + * + * Different format to V1: + * V1 is: number of entries, then timestamp + pubkey for each entry + * V2 is: groups of: number of entries, timestamp, then pubkey for each entry + * + * Also V2 only builds online accounts message once! + */ +public class GetOnlineAccountsV2Message extends Message { + private List onlineAccounts; + private byte[] cachedData; + + public GetOnlineAccountsV2Message(List onlineAccounts) { + this(-1, onlineAccounts); + } + + private GetOnlineAccountsV2Message(int id, List onlineAccounts) { + super(id, MessageType.GET_ONLINE_ACCOUNTS_V2); + + this.onlineAccounts = onlineAccounts; + } + + public List getOnlineAccounts() { + return this.onlineAccounts; + } + + public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException { + int accountCount = bytes.getInt(); + + List onlineAccounts = new ArrayList<>(accountCount); + + while (accountCount > 0) { + long timestamp = bytes.getLong(); + + for (int i = 0; i < accountCount; ++i) { + byte[] publicKey = new byte[Transformer.PUBLIC_KEY_LENGTH]; + bytes.get(publicKey); + + onlineAccounts.add(new OnlineAccountData(timestamp, null, publicKey)); + } + + if (bytes.hasRemaining()) { + accountCount = bytes.getInt(); + } else { + // we've finished + accountCount = 0; + } + } + + return new GetOnlineAccountsV2Message(id, onlineAccounts); + } + + @Override + protected synchronized byte[] toData() { + if (this.cachedData != null) + return this.cachedData; + + // Shortcut in case we have no online accounts + if (this.onlineAccounts.isEmpty()) { + this.cachedData = Ints.toByteArray(0); + return this.cachedData; + } + + // How many of each timestamp + Map countByTimestamp = new HashMap<>(); + + for (int i = 0; i < this.onlineAccounts.size(); ++i) { + OnlineAccountData onlineAccountData = this.onlineAccounts.get(i); + Long timestamp = onlineAccountData.getTimestamp(); + countByTimestamp.compute(timestamp, (k, v) -> v == null ? 1 : ++v); + } + + // We should know exactly how many bytes to allocate now + int byteSize = countByTimestamp.size() * (Transformer.INT_LENGTH + Transformer.TIMESTAMP_LENGTH) + + this.onlineAccounts.size() * Transformer.PUBLIC_KEY_LENGTH; + + try { + ByteArrayOutputStream bytes = new ByteArrayOutputStream(byteSize); + + for (long timestamp : countByTimestamp.keySet()) { + bytes.write(Ints.toByteArray(countByTimestamp.get(timestamp))); + + bytes.write(Longs.toByteArray(timestamp)); + + for (int i = 0; i < this.onlineAccounts.size(); ++i) { + OnlineAccountData onlineAccountData = this.onlineAccounts.get(i); + + if (onlineAccountData.getTimestamp() == timestamp) + bytes.write(onlineAccountData.getPublicKey()); + } + } + + this.cachedData = bytes.toByteArray(); + return this.cachedData; + } catch (IOException e) { + return null; + } + } + +} diff --git a/src/main/java/org/qortal/network/message/Message.java b/src/main/java/org/qortal/network/message/Message.java index c7657493..6c89a0dd 100644 --- a/src/main/java/org/qortal/network/message/Message.java +++ b/src/main/java/org/qortal/network/message/Message.java @@ -78,6 +78,8 @@ public abstract class Message { ONLINE_ACCOUNTS(80), GET_ONLINE_ACCOUNTS(81), + ONLINE_ACCOUNTS_V2(82), + GET_ONLINE_ACCOUNTS_V2(83), ARBITRARY_DATA(90), GET_ARBITRARY_DATA(91), diff --git a/src/main/java/org/qortal/network/message/OnlineAccountsV2Message.java b/src/main/java/org/qortal/network/message/OnlineAccountsV2Message.java new file mode 100644 index 00000000..f0fce81e --- /dev/null +++ b/src/main/java/org/qortal/network/message/OnlineAccountsV2Message.java @@ -0,0 +1,124 @@ +package org.qortal.network.message; + +import com.google.common.primitives.Ints; +import com.google.common.primitives.Longs; +import org.qortal.data.network.OnlineAccountData; +import org.qortal.transform.Transformer; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * For sending online accounts info to remote peer. + * + * Different format to V1: + * V1 is: number of entries, then timestamp + sig + pubkey for each entry + * V2 is: groups of: number of entries, timestamp, then sig + pubkey for each entry + * + * Also V2 only builds online accounts message once! + */ +public class OnlineAccountsV2Message extends Message { + private List onlineAccounts; + private byte[] cachedData; + + public OnlineAccountsV2Message(List onlineAccounts) { + this(-1, onlineAccounts); + } + + private OnlineAccountsV2Message(int id, List onlineAccounts) { + super(id, MessageType.ONLINE_ACCOUNTS_V2); + + this.onlineAccounts = onlineAccounts; + } + + public List getOnlineAccounts() { + return this.onlineAccounts; + } + + public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException { + int accountCount = bytes.getInt(); + + List onlineAccounts = new ArrayList<>(accountCount); + + while (accountCount > 0) { + long timestamp = bytes.getLong(); + + for (int i = 0; i < accountCount; ++i) { + byte[] signature = new byte[Transformer.SIGNATURE_LENGTH]; + bytes.get(signature); + + byte[] publicKey = new byte[Transformer.PUBLIC_KEY_LENGTH]; + bytes.get(publicKey); + + onlineAccounts.add(new OnlineAccountData(timestamp, signature, publicKey)); + } + + if (bytes.hasRemaining()) { + accountCount = bytes.getInt(); + } else { + // we've finished + accountCount = 0; + } + } + + return new OnlineAccountsV2Message(id, onlineAccounts); + } + + @Override + protected synchronized byte[] toData() { + if (this.cachedData != null) + return this.cachedData; + + // Shortcut in case we have no online accounts + if (this.onlineAccounts.isEmpty()) { + this.cachedData = Ints.toByteArray(0); + return this.cachedData; + } + + // How many of each timestamp + Map countByTimestamp = new HashMap<>(); + + for (int i = 0; i < this.onlineAccounts.size(); ++i) { + OnlineAccountData onlineAccountData = this.onlineAccounts.get(i); + Long timestamp = onlineAccountData.getTimestamp(); + countByTimestamp.compute(timestamp, (k, v) -> v == null ? 1 : ++v); + } + + // We should know exactly how many bytes to allocate now + int byteSize = countByTimestamp.size() * (Transformer.INT_LENGTH + Transformer.TIMESTAMP_LENGTH) + + this.onlineAccounts.size() * (Transformer.SIGNATURE_LENGTH + Transformer.PUBLIC_KEY_LENGTH); + + try { + ByteArrayOutputStream bytes = new ByteArrayOutputStream(byteSize); + + for (long timestamp : countByTimestamp.keySet()) { + bytes.write(Ints.toByteArray(countByTimestamp.get(timestamp))); + + bytes.write(Longs.toByteArray(timestamp)); + + for (int i = 0; i < this.onlineAccounts.size(); ++i) { + OnlineAccountData onlineAccountData = this.onlineAccounts.get(i); + + if (onlineAccountData.getTimestamp() == timestamp) { + bytes.write(onlineAccountData.getSignature()); + + bytes.write(onlineAccountData.getPublicKey()); + } + } + } + + this.cachedData = bytes.toByteArray(); + return this.cachedData; + } catch (IOException e) { + return null; + } + } + +} diff --git a/src/test/java/org/qortal/test/network/OnlineAccountsTests.java b/src/test/java/org/qortal/test/network/OnlineAccountsTests.java new file mode 100644 index 00000000..b1c5ec4f --- /dev/null +++ b/src/test/java/org/qortal/test/network/OnlineAccountsTests.java @@ -0,0 +1,114 @@ +package org.qortal.test.network; + +import org.bouncycastle.jce.provider.BouncyCastleProvider; +import org.bouncycastle.jsse.provider.BouncyCastleJsseProvider; +import org.junit.Test; +import org.qortal.data.network.OnlineAccountData; +import org.qortal.network.message.*; +import org.qortal.transform.Transformer; + +import java.nio.ByteBuffer; +import java.security.Security; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class OnlineAccountsTests { + + private static final Random RANDOM = new Random(); + static { + // This must go before any calls to LogManager/Logger + System.setProperty("java.util.logging.manager", "org.apache.logging.log4j.jul.LogManager"); + + Security.insertProviderAt(new BouncyCastleProvider(), 0); + Security.insertProviderAt(new BouncyCastleJsseProvider(), 1); + } + + + @Test + public void testGetOnlineAccountsV2() throws Message.MessageException { + List onlineAccountsOut = generateOnlineAccounts(false); + + Message messageOut = new GetOnlineAccountsV2Message(onlineAccountsOut); + + byte[] messageBytes = messageOut.toBytes(); + ByteBuffer byteBuffer = ByteBuffer.wrap(messageBytes); + + GetOnlineAccountsV2Message messageIn = (GetOnlineAccountsV2Message) Message.fromByteBuffer(byteBuffer); + + List onlineAccountsIn = messageIn.getOnlineAccounts(); + + assertEquals("size mismatch", onlineAccountsOut.size(), onlineAccountsIn.size()); + assertTrue("accounts mismatch", onlineAccountsIn.containsAll(onlineAccountsOut)); + + Message oldMessageOut = new GetOnlineAccountsMessage(onlineAccountsOut); + byte[] oldMessageBytes = oldMessageOut.toBytes(); + + long numTimestamps = onlineAccountsOut.stream().mapToLong(OnlineAccountData::getTimestamp).sorted().distinct().count(); + + System.out.println(String.format("For %d accounts split across %d timestamp%s: old size %d vs new size %d", + onlineAccountsOut.size(), + numTimestamps, + numTimestamps != 1 ? "s" : "", + oldMessageBytes.length, + messageBytes.length)); + } + + @Test + public void testOnlineAccountsV2() throws Message.MessageException { + List onlineAccountsOut = generateOnlineAccounts(true); + + Message messageOut = new OnlineAccountsV2Message(onlineAccountsOut); + + byte[] messageBytes = messageOut.toBytes(); + ByteBuffer byteBuffer = ByteBuffer.wrap(messageBytes); + + OnlineAccountsV2Message messageIn = (OnlineAccountsV2Message) Message.fromByteBuffer(byteBuffer); + + List onlineAccountsIn = messageIn.getOnlineAccounts(); + + assertEquals("size mismatch", onlineAccountsOut.size(), onlineAccountsIn.size()); + assertTrue("accounts mismatch", onlineAccountsIn.containsAll(onlineAccountsOut)); + + Message oldMessageOut = new OnlineAccountsMessage(onlineAccountsOut); + byte[] oldMessageBytes = oldMessageOut.toBytes(); + + long numTimestamps = onlineAccountsOut.stream().mapToLong(OnlineAccountData::getTimestamp).sorted().distinct().count(); + + System.out.println(String.format("For %d accounts split across %d timestamp%s: old size %d vs new size %d", + onlineAccountsOut.size(), + numTimestamps, + numTimestamps != 1 ? "s" : "", + oldMessageBytes.length, + messageBytes.length)); + } + + private List generateOnlineAccounts(boolean withSignatures) { + List onlineAccounts = new ArrayList<>(); + + int numTimestamps = RANDOM.nextInt(2) + 1; // 1 or 2 + + for (int t = 0; t < numTimestamps; ++t) { + int numAccounts = RANDOM.nextInt(3000); + + for (int a = 0; a < numAccounts; ++a) { + byte[] sig = null; + if (withSignatures) { + sig = new byte[Transformer.SIGNATURE_LENGTH]; + RANDOM.nextBytes(sig); + } + + byte[] pubkey = new byte[Transformer.PUBLIC_KEY_LENGTH]; + RANDOM.nextBytes(pubkey); + + onlineAccounts.add(new OnlineAccountData(t << 32, sig, pubkey)); + } + } + + return onlineAccounts; + } + +}