@ -32,7 +32,11 @@ public class ArbitraryDataManager extends Thread {
private static final Logger LOGGER = LogManager . getLogger ( ArbitraryDataManager . class ) ;
private static final List < TransactionType > ARBITRARY_TX_TYPE = Arrays . asList ( TransactionType . ARBITRARY ) ;
private static final long ARBITRARY_REQUEST_TIMEOUT = 5 * 1000L ; // ms
/** Request timeout when transferring arbitrary data */
private static final long ARBITRARY_REQUEST_TIMEOUT = 6 * 1000L ; // ms
/** Maximum time to hold information about an in-progress relay */
private static final long ARBITRARY_RELAY_TIMEOUT = 30 * 1000L ; // ms
private static ArbitraryDataManager instance ;
private final Object peerDataLock = new Object ( ) ;
@ -40,7 +44,7 @@ public class ArbitraryDataManager extends Thread {
private volatile boolean isStopping = false ;
/ * *
* Map of recent requests for ARBITRARY transaction data file lists .
* Map of recent incoming requests for ARBITRARY transaction data file lists .
* < p >
* Key is original request ' s message ID < br >
* Value is Triple & lt ; transaction signature in base58 , first requesting peer , first request ' s timestamp & gt ;
@ -59,10 +63,16 @@ public class ArbitraryDataManager extends Thread {
public Map < Integer , Triple < String , Peer , Long > > arbitraryDataFileListRequests = Collections . synchronizedMap ( new HashMap < > ( ) ) ;
/ * *
* Map to keep track of in progress arbitrary data file requests
* Map to keep track of our in progress ( outgoing ) arbitrary data file requests
* /
private Map < String , Long > arbitraryDataFileRequests = Collections . synchronizedMap ( new HashMap < > ( ) ) ;
/ * *
* Map to keep track of hashes that we might need to relay , keyed by the hash of the file ( base58 encoded ) .
* Value is comprised of the base58 - encoded signature , the peer that is hosting it , and the timestamp that it was added
* /
private Map < String , Triple < String , Peer , Long > > arbitraryRelayMap = Collections . synchronizedMap ( new HashMap < > ( ) ) ;
/ * *
* Map to keep track of in progress arbitrary data signature requests
* Key : string - the signature encoded in base58
@ -527,27 +537,46 @@ public class ArbitraryDataManager extends Thread {
// Fetch data files by hash
private ArbitraryDataFile fetchArbitraryDataFile ( Peer peer , byte [ ] signature , byte [ ] hash ) {
String hash58 = Base58 . encode ( hash ) ;
LOGGER . info ( String . format ( "Fetching data file %.8s from peer %s" , hash58 , peer ) ) ;
arbitraryDataFileRequests . put ( hash58 , NTP . getTime ( ) ) ;
Message getArbitraryDataFileMessage = new GetArbitraryDataFileMessage ( signature , hash ) ;
private ArbitraryDataFileMessage fetchArbitraryDataFile ( Peer peer , Peer requestingPeer , byte [ ] signature , byte [ ] hash , Message originalMessage ) throws DataException {
ArbitraryDataFile existingFile = ArbitraryDataFile . fromHash ( hash , signature ) ;
boolean fileAlreadyExists = existingFile . exists ( ) ;
Message message = null ;
try {
message = peer . getResponse ( getArbitraryDataFileMessage ) ;
} catch ( InterruptedException e ) {
// Will return below due to null message
// Fetch the file if it doesn't exist locally
if ( ! fileAlreadyExists ) {
String hash58 = Base58 . encode ( hash ) ;
LOGGER . info ( String . format ( "Fetching data file %.8s from peer %s" , hash58 , peer ) ) ;
arbitraryDataFileRequests . put ( hash58 , NTP . getTime ( ) ) ;
Message getArbitraryDataFileMessage = new GetArbitraryDataFileMessage ( signature , hash ) ;
try {
message = peer . getResponseWithTimeout ( getArbitraryDataFileMessage , ( int ) ARBITRARY_REQUEST_TIMEOUT ) ;
} catch ( InterruptedException e ) {
// Will return below due to null message
}
arbitraryDataFileRequests . remove ( hash58 ) ;
LOGGER . trace ( String . format ( "Removed hash %.8s from arbitraryDataFileRequests" , hash58 ) ) ;
if ( message = = null | | message . getType ( ) ! = Message . MessageType . ARBITRARY_DATA_FILE ) {
return null ;
}
}
arbitraryDataFileRequests . remove ( hash58 ) ;
LOGGER . info ( String . format ( "Removed hash %.8s from arbitraryDataFileRequests" , hash58 ) ) ;
ArbitraryDataFileMessage arbitraryDataFileMessage = ( ArbitraryDataFileMessage ) message ;
if ( message = = null | | message . getType ( ) ! = Message . MessageType . ARBITRARY_DATA_FILE ) {
return null ;
// We might want to forward the request to the peer that originally requested it
this . handleArbitraryDataFileForwarding ( requestingPeer , message , originalMessage ) ;
boolean isRelayRequest = ( requestingPeer ! = null ) ;
if ( isRelayRequest ) {
if ( ! fileAlreadyExists ) {
// File didn't exist locally before the request, and it's a forwarding request, so delete it
LOGGER . info ( "Deleting file {} because it was needed for forwarding only" , Base58 . encode ( hash ) ) ;
ArbitraryDataFile dataFile = arbitraryDataFileMessage . getArbitraryDataFile ( ) ;
dataFile . delete ( ) ;
}
}
ArbitraryDataFileMessage arbitraryDataFileMessage = ( ArbitraryDataFileMessage ) message ;
return arbitraryDataFileMessage . getArbitraryDataFile ( ) ;
return arbitraryDataFileMessage ;
}
@ -560,6 +589,9 @@ public class ArbitraryDataManager extends Thread {
final long requestMinimumTimestamp = now - ARBITRARY_REQUEST_TIMEOUT ;
arbitraryDataFileListRequests . entrySet ( ) . removeIf ( entry - > entry . getValue ( ) . getC ( ) = = null | | entry . getValue ( ) . getC ( ) < requestMinimumTimestamp ) ;
arbitraryDataFileRequests . entrySet ( ) . removeIf ( entry - > entry . getValue ( ) = = null | | entry . getValue ( ) < requestMinimumTimestamp ) ;
final long relayMinimumTimestamp = now - ARBITRARY_RELAY_TIMEOUT ;
arbitraryRelayMap . entrySet ( ) . removeIf ( entry - > entry . getValue ( ) . getC ( ) = = null | | entry . getValue ( ) . getC ( ) < relayMinimumTimestamp ) ;
}
public boolean isResourceCached ( String resourceId ) {
@ -685,9 +717,9 @@ public class ArbitraryDataManager extends Thread {
if ( ! arbitraryDataFile . chunkExists ( hash ) ) {
// Only request the file if we aren't already requesting it from someone else
if ( ! arbitraryDataFileRequests . containsKey ( Base58 . encode ( hash ) ) ) {
ArbitraryDataFile receivedArbitraryDataFile = fetchArbitraryDataFile ( peer , signature , hash ) ;
if ( receivedArbitraryDataFile ! = null ) {
LOGGER . info ( "Received data file {} from peer {}" , receivedArbitraryDataFile , peer ) ;
ArbitraryDataFileMessage receivedArbitraryDataFileMessag e = fetchArbitraryDataFile ( peer , null , signature , hash , null ) ;
if ( receivedArbitraryDataFileMessage ! = null ) {
LOGGER . info ( "Received data file {} from peer {}" , receivedArbitraryDataFileMessage . getArbitraryDataFile ( ) . getHash58 ( ) , peer ) ;
receivedAtLeastOneFile = true ;
}
else {
@ -732,47 +764,39 @@ public class ArbitraryDataManager extends Thread {
return receivedAtLeastOneFile ;
}
public void handleArbitraryDataFileForwarding ( Peer requestingPeer , Message message , Message originalMessage ) {
// Return if there is no originally requesting peer to forward to
if ( requestingPeer = = null ) {
return ;
}
// Network handlers
public void onNetworkGetArbitraryDataMessage ( Peer peer , Message message ) {
GetArbitraryDataMessage getArbitraryDataMessage = ( GetArbitraryDataMessage ) message ;
byte [ ] signature = getArbitraryDataMessage . getSignature ( ) ;
// Do we even have this transaction?
try ( final Repository repository = RepositoryManager . getRepository ( ) ) {
TransactionData transactionData = repository . getTransactionRepository ( ) . fromSignature ( signature ) ;
if ( transactionData = = null | | transactionData . getType ( ) ! = TransactionType . ARBITRARY )
return ;
ArbitraryTransaction transaction = new ArbitraryTransaction ( repository , transactionData ) ;
// If we have the data then send it
if ( transaction . isDataLocal ( ) ) {
byte [ ] data = transaction . fetchData ( ) ;
if ( data = = null )
return ;
// Return if we're not in relay mode or if this request doesn't need forwarding
if ( ! Settings . getInstance ( ) . isRelayModeEnabled ( ) ) {
return ;
}
Message arbitraryDataMessage = new ArbitraryDataMessage ( signature , data ) ;
arbitraryDataMessage . setId ( message . getId ( ) ) ;
if ( ! peer . sendMessage ( arbitraryDataMessage ) )
peer . disconnect ( "failed to send arbitrary data" ) ;
LOGGER . info ( "Received arbitrary data file - forwarding is needed" ) ;
return ;
}
// The ID needs to match that of the original request
message . setId ( originalMessage . getId ( ) ) ;
// Ask our other peers if they have it
Network . getInstance ( ) . broadcast ( broadcastPeer - > broadcastPeer = = peer ? null : message ) ;
} catch ( DataException e ) {
LOGGER . error ( String . format ( "Repository issue while finding arbitrary transaction data for peer %s" , peer ) , e ) ;
if ( ! requestingPeer . sendMessage ( message ) ) {
LOGGER . info ( "Failed to forward arbitrary data file to peer {}" , requestingPeer ) ;
requestingPeer . disconnect ( "failed to forward arbitrary data file" ) ;
}
else {
LOGGER . info ( "Forwarded arbitrary data file to peer {}" , requestingPeer ) ;
}
}
// Network handlers
public void onNetworkArbitraryDataFileListMessage ( Peer peer , Message message ) {
ArbitraryDataFileListMessage arbitraryDataFileListMessage = ( ArbitraryDataFileListMessage ) message ;
LOGGER . info ( "Received hash list from peer {} with {} hashes" , peer , arbitraryDataFileListMessage . getHashes ( ) . size ( ) ) ;
// Do we have a pending request for this data?
// Do we have a pending request for this data? // TODO: might we want to relay all of them anyway?
Triple < String , Peer , Long > request = arbitraryDataFileListRequests . get ( message . getId ( ) ) ;
if ( request = = null | | request . getA ( ) = = null ) {
return ;
@ -832,7 +856,17 @@ public class ArbitraryDataManager extends Thread {
if ( isRelayRequest & & Settings . getInstance ( ) . isRelayModeEnabled ( ) ) {
Peer requestingPeer = request . getB ( ) ;
if ( requestingPeer ! = null ) {
// Forward to requesting peer;
// Add each hash to our local mapping so we know who to ask later
Long now = NTP . getTime ( ) ;
for ( byte [ ] hash : hashes ) {
String hash58 = Base58 . encode ( hash ) ;
Triple < String , Peer , Long > value = new Triple < > ( signature58 , peer , now ) ;
this . arbitraryRelayMap . put ( hash58 , value ) ;
LOGGER . debug ( "Added {} to relay map: {}, {}, {}" , hash58 , signature58 , peer , now ) ;
}
// Forward to requesting peer
LOGGER . info ( "Forwarding file list with {} hashes to requesting peer: {}" , hashes . size ( ) , requestingPeer ) ;
if ( ! requestingPeer . sendMessage ( arbitraryDataFileListMessage ) ) {
requestingPeer . disconnect ( "failed to forward arbitrary data file list" ) ;
}
@ -843,13 +877,20 @@ public class ArbitraryDataManager extends Thread {
public void onNetworkGetArbitraryDataFileMessage ( Peer peer , Message message ) {
GetArbitraryDataFileMessage getArbitraryDataFileMessage = ( GetArbitraryDataFileMessage ) message ;
byte [ ] hash = getArbitraryDataFileMessage . getHash ( ) ;
String hash58 = Base58 . encode ( hash ) ;
byte [ ] signature = getArbitraryDataFileMessage . getSignature ( ) ;
Controller . getInstance ( ) . stats . getArbitraryDataFileMessageStats . requests . incrementAndGet ( ) ;
LOGGER . info ( "Received GetArbitraryDataFileMessage from peer {} for hash {}" , peer , Base58 . encode ( hash ) ) ;
try {
ArbitraryDataFile arbitraryDataFile = ArbitraryDataFile . fromHash ( hash , signature ) ;
Triple < String , Peer , Long > relayInfo = this . arbitraryRelayMap . get ( hash58 ) ;
if ( arbitraryDataFile . exists ( ) ) {
LOGGER . info ( "Hash {} exists" , hash58 ) ;
// We can serve the file directly as we already have it
ArbitraryDataFileMessage arbitraryDataFileMessage = new ArbitraryDataFileMessage ( signature , arbitraryDataFile ) ;
arbitraryDataFileMessage . setId ( message . getId ( ) ) ;
if ( ! peer . sendMessage ( arbitraryDataFileMessage ) ) {
@ -858,7 +899,25 @@ public class ArbitraryDataManager extends Thread {
}
LOGGER . info ( "Sent file {}" , arbitraryDataFile ) ;
}
else if ( relayInfo ! = null ) {
LOGGER . info ( "We have relay info for hash {}" , Base58 . encode ( hash ) ) ;
// We need to ask this peer for the file
Peer peerToAsk = relayInfo . getB ( ) ;
//Peer peerToAsk = Network.getInstance().getConnectedPeerWithAddress(peerAddress);
if ( peerToAsk ! = null ) {
// Forward the message to this peer
LOGGER . info ( "Asking peer {} for hash {}" , peerToAsk , hash58 ) ;
ArbitraryDataFileMessage arbitraryDataFileMessage = this . fetchArbitraryDataFile ( peerToAsk , peer , signature , hash , message ) ;
// Remove from the map regardless of outcome, as the relay attempt is now considered complete
arbitraryRelayMap . remove ( hash58 ) ;
}
else {
LOGGER . info ( "Peer {} not found in relay info" , peer ) ;
}
}
else {
LOGGER . info ( "Hash {} doesn't exist and we don't have relay info" , hash58 ) ;
// We don't have this file
Controller . getInstance ( ) . stats . getArbitraryDataFileMessageStats . unknownFiles . getAndIncrement ( ) ;
@ -874,11 +933,13 @@ public class ArbitraryDataManager extends Thread {
LOGGER . info ( "Couldn't sent file-unknown response" ) ;
peer . disconnect ( "failed to send file-unknown response" ) ;
}
LOGGER . info ( "Sent file-unknown response for file {}" , arbitraryDataFile ) ;
else {
LOGGER . info ( "Sent file-unknown response for file {}" , arbitraryDataFile ) ;
}
}
}
catch ( DataException e ) {
LOGGER . info ( "Unable to handle request for arbitrary data file: {}" , Base58 . encode ( hash ) ) ;
LOGGER . info ( "Unable to handle request for arbitrary data file: {}" , hash58 ) ;
}
}
@ -962,7 +1023,10 @@ public class ArbitraryDataManager extends Thread {
else {
// Ask our other peers if they have it
LOGGER . info ( "Rebroadcasted hash list request from peer {} for signature {} to our other peers" , peer , Base58 . encode ( signature ) ) ;
Network . getInstance ( ) . broadcast ( broadcastPeer - > broadcastPeer = = peer ? null : message ) ;
Network . getInstance ( ) . broadcast (
broadcastPeer - > broadcastPeer = = peer | |
Objects . equals ( broadcastPeer . getPeerData ( ) . getAddress ( ) . getHost ( ) , peer . getPeerData ( ) . getAddress ( ) . getHost ( ) )
? null : message ) ;
}
}