Skip to content

Commit bef90e9

Browse files
committed
improved schedule
1 parent 33a90df commit bef90e9

File tree

1 file changed

+53
-8
lines changed

1 file changed

+53
-8
lines changed

experiments/schedule.py

Lines changed: 53 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,12 @@
1212
import time
1313
import platform
1414
import pathlib
15+
import psutil
1516

1617
# Note: here we for flush on ALL prints, otherwise we would end up with messed up logs
1718

18-
if len(sys.argv) != 3:
19-
print("Usage:\nschedule.py <N> <FOLDER>", flush=True)
19+
if len(sys.argv) != 4:
20+
print("Usage:\nschedule.py <N> <FOLDER> <TIMEOUT_MINUTES>", flush=True)
2021
exit(1)
2122

2223
# The number of jobs to run in parallel
@@ -29,10 +30,13 @@
2930
# Location of experiment folder
3031
FOLDER = sys.argv[2]
3132

33+
TIMEOUT_MINUTES = int(sys.argv[3])
34+
35+
if TIMEOUT_MINUTES < 1:
36+
print("Invalid value for TIMEOUT_MINUTES: " + str(TIMEOUT_MINUTES), flush=True)
37+
3238
SHELL = platform.system() == 'Windows'
3339

34-
# Changed, as now we might have few subfolders with scripts, eg for generated tests
35-
# SCRIPTS_FOLDER = os.path.join(FOLDER, "scripts")
3640
SCRIPTS_FOLDER = pathlib.PurePath(FOLDER).as_posix()
3741

3842
def checkDocker():
@@ -80,8 +84,33 @@ def runScript(s):
8084
handler = subprocess.Popen(command, shell=SHELL, cwd=SCRIPTS_FOLDER, start_new_session=True)
8185
buffer.append(handler)
8286

87+
def killProcess(h):
88+
print("Terminating process.", flush=True)
89+
parent = psutil.Process(h.pid)
90+
children = parent.children(recursive=True)
91+
92+
# Graceful terminate
93+
for p in children:
94+
p.terminate()
95+
parent.terminate()
96+
97+
gone, alive = psutil.wait_procs(children + [parent], timeout=10)
98+
99+
# Force kill remaining
100+
for p in alive:
101+
print(f"Force killing PID {p.pid}")
102+
p.kill()
103+
104+
h.wait()
105+
106+
107+
########################################################################################################################
108+
109+
last_start = time.time()
110+
83111
for s in scripts:
84112
if len(buffer) < N:
113+
last_start = time.time()
85114
runScript(s)
86115
else:
87116
while len(buffer) == N:
@@ -93,20 +122,36 @@ def runScript(s):
93122
# keep the ones running... those have return code not set yet
94123
buffer = [h for h in buffer if h.returncode is None]
95124
if len(buffer) == N :
125+
# all running in buffer... but has any timeout?
126+
# TODO for simplicity we just check latest added... so timeout is not enforced for ALL jobs.
127+
# however, note that internally the jobs have their own timeouts... these here are just extra checks
128+
elapsed_time = time.time() - last_start
129+
if elapsed_time > TIMEOUT_MINUTES * 60:
130+
killProcess(buffer[0])
131+
# wait before checking again
96132
time.sleep(5)
97133
else:
134+
last_start = time.time()
98135
runScript(s)
99136
break
100137

101138
print("Waiting for last scripts to end", flush=True)
102139

140+
budget = TIMEOUT_MINUTES * 60
141+
103142
for h in buffer:
104-
h.wait()
105-
if h.returncode != 0:
106-
print("Process terminated with code: " + str(h.returncode), flush=True)
143+
start = time.time()
144+
try:
145+
h.wait(budget)
146+
if h.returncode != 0:
147+
print("Process terminated with code: " + str(h.returncode), flush=True)
148+
except subprocess.TimeoutExpired:
149+
print("Timeout reached.", flush=True)
150+
killProcess(h)
151+
elapsed = time.time() - start
152+
budget = max(0, budget - elapsed)
107153

108154
print("All jobs are completed", flush=True)
109155

110-
#TODO how to make sure no subprocess is left hanging?
111156

112157

0 commit comments

Comments
 (0)