9191from mypy .defaults import (
9292 WORKER_CONNECTION_TIMEOUT ,
9393 WORKER_DONE_TIMEOUT ,
94+ WORKER_SHUTDOWN_TIMEOUT ,
9495 WORKER_START_INTERVAL ,
9596 WORKER_START_TIMEOUT ,
9697)
@@ -283,6 +284,7 @@ def __init__(self, status_file: str, options_data: str, env: Mapping[str, str])
283284 ]
284285 # Return early without waiting, caller must call connect() before using the client.
285286 self .proc = subprocess .Popen (command , env = env )
287+ self .connected = False
286288
287289 def connect (self ) -> None :
288290 end_time = time .time () + WORKER_START_TIMEOUT
@@ -303,18 +305,19 @@ def connect(self) -> None:
303305 # verify PIDs reliably.
304306 assert pid == self .proc .pid , f"PID mismatch: { pid } vs { self .proc .pid } "
305307 self .conn = IPCClient (connection_name , WORKER_CONNECTION_TIMEOUT )
308+ self .connected = True
306309 return
307310 except Exception as exc :
308311 last_exception = exc
309312 break
310- print ("Failed to establish connection with worker:" , last_exception )
311- sys .exit (2 )
313+ print (f"Failed to establish connection with worker: { last_exception } " )
312314
313315 def close (self ) -> None :
314- self .conn .close ()
316+ if self .connected :
317+ self .conn .close ()
315318 # Technically we don't need to wait, but otherwise we will get ResourceWarnings.
316319 try :
317- self .proc .wait (timeout = 1 )
320+ self .proc .wait (timeout = WORKER_SHUTDOWN_TIMEOUT )
318321 except subprocess .TimeoutExpired :
319322 pass
320323 if os .path .isfile (self .status_file ):
@@ -346,7 +349,7 @@ def build(
346349
347350 If a flush_errors callback is provided, all error messages will be
348351 passed to it and the errors and messages fields of BuildResult and
349- CompileError (respectively) will be empty. Otherwise those fields will
352+ CompileError (respectively) will be empty. Otherwise, those fields will
350353 report any error messages.
351354
352355 Args:
@@ -356,6 +359,9 @@ def build(
356359 (takes precedence over other directories)
357360 flush_errors: optional function to flush errors after a file is processed
358361 fscache: optionally a file-system cacher
362+ stdout: Output stream to use instead of `sys.stdout`
363+ stderr: Error stream to use instead of `sys.stderr`
364+ extra_plugins: Plugins to use in addition to those loaded from config
359365 worker_env: An environment to start parallel build workers (used for tests)
360366 """
361367 # If we were not given a flush_errors, we use one that will populate those
@@ -376,14 +382,20 @@ def default_flush_errors(
376382 stderr = stderr or sys .stderr
377383 extra_plugins = extra_plugins or []
378384
385+ # Create metastore before workers to avoid race conditions.
386+ metastore = create_metastore (options , parallel_worker = False )
379387 workers = []
380388 connect_threads = []
389+ # A quasi-unique ID for this specific mypy invocation.
390+ build_id = os .urandom (4 ).hex ()
381391 if options .num_workers > 0 :
382392 # TODO: switch to something more efficient than pickle (also in the daemon).
383393 pickled_options = pickle .dumps (options .snapshot ())
384394 options_data = b64encode (pickled_options ).decode ()
385395 workers = [
386- WorkerClient (f".mypy_worker.{ idx } .json" , options_data , worker_env or os .environ )
396+ WorkerClient (
397+ f".mypy_worker.{ build_id } .{ idx } .json" , options_data , worker_env or os .environ
398+ )
387399 for idx in range (options .num_workers )
388400 ]
389401 sources_message = SourcesDataMessage (sources = sources )
@@ -394,6 +406,9 @@ def default_flush_errors(
394406 def connect (wc : WorkerClient , data : bytes ) -> None :
395407 # Start loading sources in each worker as soon as it is up.
396408 wc .connect ()
409+ if not wc .connected :
410+ # Caller should detect this and fail gracefully.
411+ return
397412 wc .conn .write_bytes (data )
398413
399414 # We don't wait for workers to be ready until they are actually needed.
@@ -414,6 +429,7 @@ def connect(wc: WorkerClient, data: bytes) -> None:
414429 extra_plugins ,
415430 workers ,
416431 connect_threads ,
432+ metastore ,
417433 )
418434 result .errors = messages
419435 return result
@@ -432,6 +448,8 @@ def connect(wc: WorkerClient, data: bytes) -> None:
432448 for thread in connect_threads :
433449 thread .join ()
434450 for worker in workers :
451+ if not worker .connected :
452+ continue
435453 try :
436454 send (worker .conn , SccRequestMessage (scc_id = None , import_errors = {}, mod_data = {}))
437455 except (OSError , IPCException ):
@@ -451,6 +469,7 @@ def build_inner(
451469 extra_plugins : Sequence [Plugin ],
452470 workers : list [WorkerClient ],
453471 connect_threads : list [Thread ],
472+ metastore : MetadataStore ,
454473) -> BuildResult :
455474 if platform .python_implementation () == "CPython" :
456475 # Run gc less frequently, as otherwise we can spend a large fraction of
@@ -499,6 +518,7 @@ def build_inner(
499518 fscache = fscache ,
500519 stdout = stdout ,
501520 stderr = stderr ,
521+ metastore = metastore ,
502522 )
503523 manager .workers = workers
504524 if manager .verbosity () >= 2 :
@@ -816,6 +836,7 @@ def __init__(
816836 stderr : TextIO ,
817837 error_formatter : ErrorFormatter | None = None ,
818838 parallel_worker : bool = False ,
839+ metastore : MetadataStore | None = None ,
819840 ) -> None :
820841 self .stats : dict [str , Any ] = {} # Values are ints or floats
821842 # Use in cases where we need to prevent race conditions in stats reporting.
@@ -903,7 +924,9 @@ def __init__(
903924 ]
904925 )
905926
906- self .metastore = create_metastore (options , parallel_worker = parallel_worker )
927+ if metastore is None :
928+ metastore = create_metastore (options , parallel_worker = parallel_worker )
929+ self .metastore = metastore
907930
908931 # a mapping from source files to their corresponding shadow files
909932 # for efficient lookup
@@ -3983,6 +4006,9 @@ def dispatch(
39834006 # Wait for workers since they may be needed at this point.
39844007 for thread in connect_threads :
39854008 thread .join ()
4009+ not_connected = [str (idx ) for idx , wc in enumerate (manager .workers ) if not wc .connected ]
4010+ if not_connected :
4011+ raise OSError (f"Cannot connect to build worker(s): { ', ' .join (not_connected )} " )
39864012 process_graph (graph , manager )
39874013 # Update plugins snapshot.
39884014 write_plugins_snapshot (manager )
0 commit comments