@ -84,36 +84,58 @@ public class ArbitraryDataFileManager extends Thread {
private void processFileHashes ( Long now ) {
private void processFileHashes ( Long now ) {
try ( final Repository repository = RepositoryManager . getRepository ( ) ) {
try ( final Repository repository = RepositoryManager . getRepository ( ) ) {
for ( String hash58 : arbitraryDataFileHashResponses . keySet ( ) ) {
ArbitraryTransactionData arbitraryTransactionData = null ;
if ( isStopping ) {
byte [ ] signature = null ;
return ;
byte [ ] hash = null ;
}
Peer peer = null ;
boolean shouldProcess = false ;
synchronized ( arbitraryDataFileHashResponses ) {
for ( String hash58 : arbitraryDataFileHashResponses . keySet ( ) ) {
if ( isStopping ) {
return ;
}
Triple < Peer , String , Long > value = arbitraryDataFileHashResponses . get ( hash58 ) ;
Triple < Peer , String , Long > value = arbitraryDataFileHashResponses . get ( hash58 ) ;
if ( value ! = null ) {
if ( value ! = null ) {
Peer peer = value . getA ( ) ;
peer = value . getA ( ) ;
String signature58 = value . getB ( ) ;
String signature58 = value . getB ( ) ;
Long timestamp = value . getC ( ) ;
Long timestamp = value . getC ( ) ;
if ( now - timestamp > = ArbitraryDataManager . ARBITRARY_RELAY_TIMEOUT | | signature58 = = null | | peer = = null ) {
if ( now - timestamp > = ArbitraryDataManager . ARBITRARY_RELAY_TIMEOUT | | signature58 = = null | | peer = = null ) {
// Ignore - to be deleted
// Ignore - to be deleted
continue ;
continue ;
}
}
byte [ ] hash = Base58 . decode ( hash58 ) ;
hash = Base58 . decode ( hash58 ) ;
byte [ ] signature = Base58 . decode ( signature58 ) ;
signature = Base58 . decode ( signature58 ) ;
// Fetch the transaction data
// Fetch the transaction data
ArbitraryTransactionData arbitraryTransactionData = ArbitraryTransactionUtils . fetchTransactionData ( repository , signature ) ;
arbitraryTransactionData = ArbitraryTransactionUtils . fetchTransactionData ( repository , signature ) ;
if ( arbitraryTransactionData = = null ) {
if ( arbitraryTransactionData = = null ) {
continue ;
continue ;
}
}
LOGGER . debug ( "Fetching file {} from peer {} via response queue..." , hash58 , peer ) ;
// We want to process this file
this . fetchArbitraryDataFiles ( repository , peer , signature , arbitraryTransactionData , Arrays . asList ( hash ) ) ;
shouldProcess = true ;
break ;
}
}
}
}
}
if ( ! shouldProcess ) {
// Nothing to do
return ;
}
if ( signature = = null | | hash = = null | | peer = = null | | arbitraryTransactionData = = null ) {
return ;
}
String hash58 = Base58 . encode ( hash ) ;
LOGGER . debug ( "Fetching file {} from peer {} via response queue..." , hash58 , peer ) ;
this . fetchArbitraryDataFiles ( repository , peer , signature , arbitraryTransactionData , Arrays . asList ( hash ) ) ;
} catch ( DataException e ) {
} catch ( DataException e ) {
LOGGER . info ( "Unable to process file hashes: {}" , e . getMessage ( ) ) ;
LOGGER . info ( "Unable to process file hashes: {}" , e . getMessage ( ) ) ;
}
}