@@ -585,27 +585,30 @@ def _handle_tasks(taskqueue, put, outqueue, pool, cache):
585585
586586 @staticmethod
587587 def _handle_results (outqueue , get , cache ):
588+ def _handle_results_failure (cache , e ):
589+ exc = RuntimeError ("Result handler failed to get result from worker and " +
590+ "unable to recover. " +
591+ "This is likely due to a worker process return or raise " +
592+ "an unpicklable object." )
593+ exc .__cause__ = e
594+ cache ._disable_cache (exc )
595+ _cache = cache .copy ()
596+ for value in _cache .values ():
597+ if isinstance (value , ApplyResult ):
598+ chunk_number_left = getattr (value , '_number_left' , 1 )
599+ for _ in range (chunk_number_left ):
600+ value ._set (None , (False , exc ))
601+ elif isinstance (value , IMapIterator ):
602+ value ._set_length (value ._index + 1 )
603+ value ._set (value ._index , (False , exc ))
604+
588605 thread = threading .current_thread ()
589606
590607 while 1 :
591608 try :
592609 task = get ()
593610 except Exception as e :
594- exc = RuntimeError ("Result handler failed to get result from worker and " +
595- "unable to recover. " +
596- "This is likely due to a worker process return or raise " +
597- "an unpicklable object." )
598- exc .__cause__ = e
599- cache ._disable_cache (exc )
600- _cache = cache .copy ()
601- for value in _cache .values ():
602- if isinstance (value , ApplyResult ):
603- chunk_number_left = getattr (value , '_number_left' , 1 )
604- for _ in range (chunk_number_left ):
605- value ._set (None , (False , exc ))
606- elif isinstance (value , IMapIterator ):
607- value ._set_length (value ._index + 1 )
608- value ._set (value ._index , (False , exc ))
611+ _handle_results_failure (cache , e )
609612 return
610613
611614 if thread ._state != RUN :
@@ -627,8 +630,8 @@ def _handle_results(outqueue, get, cache):
627630 while cache and thread ._state != TERMINATE :
628631 try :
629632 task = get ()
630- except ( OSError , EOFError ) :
631- util . debug ( 'result handler got EOFError/OSError -- exiting' )
633+ except Exception as e :
634+ _handle_results_failure ( cache , e )
632635 return
633636
634637 if task is None :
0 commit comments