@ -291,7 +291,20 @@ public class ArbitraryDataFileListManager {
return true ;
}
public void deleteFileListRequestsForSignature ( byte [ ] signature ) {
String signature58 = Base58 . encode ( signature ) ;
for ( Iterator < Map . Entry < Integer , Triple < String , Peer , Long > > > it = arbitraryDataFileListRequests . entrySet ( ) . iterator ( ) ; it . hasNext ( ) ; ) {
Map . Entry < Integer , Triple < String , Peer , Long > > entry = it . next ( ) ;
if ( entry = = null | | entry . getKey ( ) = = null | | entry . getValue ( ) ! = null ) {
continue ;
}
if ( Objects . equals ( entry . getValue ( ) . getA ( ) , signature58 ) ) {
// Update requests map to reflect that we've received all chunks
Triple < String , Peer , Long > newEntry = new Triple < > ( null , null , entry . getValue ( ) . getC ( ) ) ;
arbitraryDataFileListRequests . put ( entry . getKey ( ) , newEntry ) ;
}
}
}
// Network handlers
@ -304,7 +317,7 @@ public class ArbitraryDataFileListManager {
ArbitraryDataFileListMessage arbitraryDataFileListMessage = ( ArbitraryDataFileListMessage ) message ;
LOGGER . debug ( "Received hash list from peer {} with {} hashes" , peer , arbitraryDataFileListMessage . getHashes ( ) . size ( ) ) ;
// Do we have a pending request for this data? // TODO: might we want to relay all of them anyway?
// Do we have a pending request for this data?
Triple < String , Peer , Long > request = arbitraryDataFileListRequests . get ( message . getId ( ) ) ;
if ( request = = null | | request . getA ( ) = = null ) {
return ;
@ -350,10 +363,6 @@ public class ArbitraryDataFileListManager {
// }
// }
// Update requests map to reflect that we've received it
Triple < String , Peer , Long > newEntry = new Triple < > ( null , null , request . getC ( ) ) ;
arbitraryDataFileListRequests . put ( message . getId ( ) , newEntry ) ;
if ( ! isRelayRequest | | ! Settings . getInstance ( ) . isRelayModeEnabled ( ) ) {
// Go and fetch the actual data, since this isn't a relay request
arbitraryDataFileManager . fetchArbitraryDataFiles ( repository , peer , signature , arbitraryTransactionData , hashes ) ;
@ -412,6 +421,7 @@ public class ArbitraryDataFileListManager {
List < byte [ ] > hashes = new ArrayList < > ( ) ;
ArbitraryTransactionData transactionData = null ;
boolean allChunksExist = false ;
try ( final Repository repository = RepositoryManager . getRepository ( ) ) {
@ -435,6 +445,9 @@ public class ArbitraryDataFileListManager {
hashes . add ( arbitraryDataFile . getMetadataHash ( ) ) ;
}
// Check if we have all the chunks (and the metadata file)
allChunksExist = arbitraryDataFile . allChunksExist ( ) ;
for ( ArbitraryDataFileChunk chunk : arbitraryDataFile . getChunks ( ) ) {
if ( chunk . exists ( ) ) {
hashes . add ( chunk . getHash ( ) ) ;
@ -459,49 +472,59 @@ public class ArbitraryDataFileListManager {
// We should only respond if we have at least one hash
if ( hashes . size ( ) > 0 ) {
// Update requests map to reflect that we've sent it
newEntry = new Triple < > ( signature58 , null , now ) ;
arbitraryDataFileListRequests . put ( message . getId ( ) , newEntry ) ;
// We have all the chunks, so update requests map to reflect that we've sent it
// There is no need to keep track of the request, as we can serve all the chunks
if ( allChunksExist ) {
newEntry = new Triple < > ( null , null , now ) ;
arbitraryDataFileListRequests . put ( message . getId ( ) , newEntry ) ;
}
ArbitraryDataFileListMessage arbitraryDataFileListMessage = new ArbitraryDataFileListMessage ( signature , hashes ) ;
arbitraryDataFileListMessage . setId ( message . getId ( ) ) ;
if ( ! peer . sendMessage ( arbitraryDataFileListMessage ) ) {
LOGGER . debug ( "Couldn't send list of hashes" ) ;
peer . disconnect ( "failed to send list of hashes" ) ;
return ;
}
LOGGER . debug ( "Sent list of hashes (count: {})" , hashes . size ( ) ) ;
if ( allChunksExist ) {
// Nothing left to do, so return to prevent any unnecessary forwarding from occurring
LOGGER . debug ( "No need for any forwarding because file list request is fully served" ) ;
return ;
}
}
else {
boolean isBlocked = ( transactionData = = null | | ArbitraryDataStorageManager . getInstance ( ) . isNameBlocked ( transactionData . getName ( ) ) ) ;
if ( Settings . getInstance ( ) . isRelayModeEnabled ( ) & & ! isBlocked ) {
// In relay mode - so ask our other peers if they have it
long requestTime = getArbitraryDataFileListMessage . getRequestTime ( ) ;
int requestHops = getArbitraryDataFileListMessage . getRequestHops ( ) ;
getArbitraryDataFileListMessage . setRequestHops ( + + requestHops ) ;
long totalRequestTime = now - requestTime ;
if ( totalRequestTime < RELAY_REQUEST_MAX_DURATION ) {
// Relay request hasn't timed out yet, so can potentially be rebroadcast
if ( requestHops < RELAY_REQUEST_MAX_HOPS ) {
// Relay request hasn't reached the maximum number of hops yet, so can be rebroadcast
LOGGER . info ( "Rebroadcasting hash list request from peer {} for signature {} to our other peers... totalRequestTime: {}, requestHops: {}" , peer , Base58 . encode ( signature ) , totalRequestTime , requestHops ) ;
Network . getInstance ( ) . broadcast (
broadcastPeer - > broadcastPeer = = peer | |
Objects . equals ( broadcastPeer . getPeerData ( ) . getAddress ( ) . getHost ( ) , peer . getPeerData ( ) . getAddress ( ) . getHost ( ) )
? null : getArbitraryDataFileListMessage ) ;
}
else {
// This relay request has reached the maximum number of allowed hops
}
// We may need to forward this request on
boolean isBlocked = ( transactionData = = null | | ArbitraryDataStorageManager . getInstance ( ) . isNameBlocked ( transactionData . getName ( ) ) ) ;
if ( Settings . getInstance ( ) . isRelayModeEnabled ( ) & & ! isBlocked ) {
// In relay mode - so ask our other peers if they have it
long requestTime = getArbitraryDataFileListMessage . getRequestTime ( ) ;
int requestHops = getArbitraryDataFileListMessage . getRequestHops ( ) ;
getArbitraryDataFileListMessage . setRequestHops ( + + requestHops ) ;
long totalRequestTime = now - requestTime ;
if ( totalRequestTime < RELAY_REQUEST_MAX_DURATION ) {
// Relay request hasn't timed out yet, so can potentially be rebroadcast
if ( requestHops < RELAY_REQUEST_MAX_HOPS ) {
// Relay request hasn't reached the maximum number of hops yet, so can be rebroadcast
LOGGER . info ( "Rebroadcasting hash list request from peer {} for signature {} to our other peers... totalRequestTime: {}, requestHops: {}" , peer , Base58 . encode ( signature ) , totalRequestTime , requestHops ) ;
Network . getInstance ( ) . broadcast (
broadcastPeer - > broadcastPeer = = peer | |
Objects . equals ( broadcastPeer . getPeerData ( ) . getAddress ( ) . getHost ( ) , peer . getPeerData ( ) . getAddress ( ) . getHost ( ) )
? null : getArbitraryDataFileListMessage ) ;
}
else {
// This relay request has timed out
// This relay request has reached the maximum number of allowed hops
}
}
else {
// This relay request has timed out
}
}
}