@@ -3113,42 +3113,30 @@ void processTimedOutExcessBlocks() {
31133113 if (excessRedundancyMap .size () == 0 ) {
31143114 return ;
31153115 }
3116+
3117+ long now = Time .monotonicNow ();
3118+ Map <String , List <ExcessBlockInfo >> pendingScanBlocks = getPendingScanBlocks (now );
3119+ if (pendingScanBlocks .isEmpty ()) {
3120+ return ;
3121+ }
3122+
31163123 // TODO: Change to readLock(FSNamesysteLockMode.BM) since invalidateBlocks is thread safe.
31173124 namesystem .writeLock (RwLockMode .BM );
3118- long now = Time . monotonicNow () ;
3119- int processed = 0 ;
3125+ String datanodeUuid = null ;
3126+ BlockInfo blockInfo = null ;
31203127 try {
3121- Iterator <Map .Entry <String , LightWeightHashSet <Block >>> iter =
3122- excessRedundancyMap .getExcessRedundancyMap ().entrySet ().iterator ();
3123- while (iter .hasNext () && processed < excessRedundancyTimeoutCheckLimit ) {
3124- Map .Entry <String , LightWeightHashSet <Block >> entry = iter .next ();
3125- String datanodeUuid = entry .getKey ();
3126- LightWeightHashSet <Block > blocks = entry .getValue ();
3127- // Sort blocks by timestamp in descending order.
3128- List <ExcessBlockInfo > sortedBlocks = blocks .stream ()
3129- .filter (block -> block instanceof ExcessBlockInfo )
3130- .map (block -> (ExcessBlockInfo ) block )
3131- .sorted (Comparator .comparingLong (ExcessBlockInfo ::getTimeStamp ))
3132- .collect (Collectors .toList ());
3133-
3134- for (ExcessBlockInfo excessBlockInfo : sortedBlocks ) {
3135- if (processed >= excessRedundancyTimeoutCheckLimit ) {
3136- break ;
3137- }
3138-
3139- processed ++;
3140- // If the datanode doesn't have any excess block that has exceeded the timeout,
3141- // can exit this loop.
3142- if (now <= excessBlockInfo .getTimeStamp () + excessRedundancyTimeout ) {
3143- break ;
3144- }
3145-
3146- BlockInfo blockInfo = excessBlockInfo .getBlockInfo ();
3147- BlockInfo bi = blocksMap .getStoredBlock (blockInfo );
3148- if (bi == null || bi .isDeleted ()) {
3128+ for (Map .Entry <String , List <ExcessBlockInfo >> entry : pendingScanBlocks .entrySet ()) {
3129+ for (ExcessBlockInfo excessBlockInfo : entry .getValue ()) {
3130+ datanodeUuid = entry .getKey ();
3131+ blockInfo = excessBlockInfo .getBlockInfo ();
3132+ BlockInfo storedBlock = blocksMap .getStoredBlock (blockInfo );
3133+ Block storedExcessBlock =
3134+ excessRedundancyMap .getExcessBlockInfo (datanodeUuid , blockInfo );
3135+ if (storedBlock == null || storedBlock .isDeleted ()
3136+ || !(storedExcessBlock instanceof ExcessBlockInfo )) {
31493137 continue ;
31503138 }
3151-
3139+ blockInfo = (( ExcessBlockInfo ) storedExcessBlock ). getBlockInfo ();
31523140 Iterator <DatanodeStorageInfo > iterator = blockInfo .getStorageInfos ();
31533141 while (iterator .hasNext ()) {
31543142 DatanodeStorageInfo datanodeStorageInfo = iterator .next ();
@@ -3167,11 +3155,44 @@ void processTimedOutExcessBlocks() {
31673155 }
31683156 }
31693157 }
3158+ } catch (Throwable e ) {
3159+ LOG .error ("Fail to process excess block {} for {}." , blockInfo , datanodeUuid , e );
31703160 } finally {
31713161 namesystem .writeUnlock (RwLockMode .BM , "processTimedOutExcessBlocks" );
31723162 LOG .info ("processTimedOutExcessBlocks {} msecs." , (Time .monotonicNow () - now ));
31733163 }
31743164 }
3165+
3166+ private Map <String , List <ExcessBlockInfo >> getPendingScanBlocks (long now ) {
3167+ int processed = 0 ;
3168+ Map <String , List <ExcessBlockInfo >> pendingScanBlocks = new HashMap <>();
3169+ for (String dnUUID : excessRedundancyMap .getExcessDNs ()) {
3170+ ArrayList <Block > excessBlocks = excessRedundancyMap .getExcessBlocks (dnUUID );
3171+ if (excessBlocks == null || excessBlocks .isEmpty ()) {
3172+ continue ;
3173+ }
3174+ List <ExcessBlockInfo > sortedBlocks = excessBlocks .stream ()
3175+ .filter (block -> block instanceof ExcessBlockInfo )
3176+ .map (block -> (ExcessBlockInfo ) block )
3177+ .sorted (Comparator .comparingLong (ExcessBlockInfo ::getTimeStamp ))
3178+ .collect (Collectors .toList ());
3179+ for (ExcessBlockInfo block : sortedBlocks ) {
3180+ if (processed >= excessRedundancyTimeoutCheckLimit ) {
3181+ return pendingScanBlocks ; // or break outer loop if you need to continue elsewhere
3182+ }
3183+
3184+ processed ++;
3185+ if (now <= block .getTimeStamp () + excessRedundancyTimeout ) {
3186+ // Since the list is sorted, no further block will be expired
3187+ break ;
3188+ }
3189+ pendingScanBlocks
3190+ .computeIfAbsent (dnUUID , k -> new ArrayList <>())
3191+ .add (block );
3192+ }
3193+ }
3194+ return pendingScanBlocks ;
3195+ }
31753196
31763197 Collection <Block > processReport (
31773198 final DatanodeStorageInfo storageInfo ,
0 commit comments