Skip to content

Commit f994e18

Browse files
Merge pull request #273 from runpod/refresh-worker-datarace
Refresh worker datarace
2 parents adf9f79 + a096340 commit f994e18

3 files changed

Lines changed: 3 additions & 59 deletions

File tree

runpod/serverless/modules/rp_scale.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,15 +64,12 @@ async def get_jobs(self, session):
6464
Returns:
6565
List[Any]: A list of job data retrieved from the server.
6666
"""
67-
while True:
68-
if not self.is_alive():
69-
break
70-
67+
while self.is_alive():
7168
self.current_concurrency = self.concurrency_modifier(self.current_concurrency)
7269
log.debug(f"Concurrency set to: {self.current_concurrency}")
7370

7471
log.debug(f"Jobs in progress: {job_list.get_job_count()}")
75-
if job_list.get_job_count() < self.current_concurrency:
72+
if job_list.get_job_count() < self.current_concurrency and self.is_alive():
7673
log.debug("Job list is less than concurrency, getting more jobs.")
7774

7875
tasks = [

runpod/serverless/worker.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ async def _process_job(job, session, job_scaler, config):
5454

5555
await stream_result(session, stream_output, job)
5656
else:
57-
is_stream = 0
57+
is_stream = False
5858
job_result = await run_job(config["handler"], job)
5959

6060
# If refresh_worker is set, pod will be reset after job is complete.

tests/test_serverless/test_worker.py

Lines changed: 0 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -459,59 +459,6 @@ def mock_is_alive():
459459
with patch("runpod.serverless.modules.rp_scale.JobScaler.is_alive", wraps=mock_is_alive):
460460
runpod.serverless.start(config)
461461

462-
@patch("runpod.serverless.modules.rp_scale.get_job")
463-
@patch("runpod.serverless.worker.run_job")
464-
@patch("runpod.serverless.worker.send_result")
465-
async def test_run_worker_multi_processing_availability_ratio(
466-
self, mock_send_result, mock_run_job, mock_get_job):
467-
'''
468-
Test run_worker with multi processing enabled, the scale-up and
469-
scale-down behavior with availability ratio.
470-
471-
Args:
472-
mock_send_result (_type_): _description_
473-
mock_stream_result (_type_): _description_
474-
mock_run_job (_type_): _description_
475-
mock_get_job (_type_): _description_
476-
mock_session (_type_): _description_
477-
'''
478-
479-
# Let the test be a long running one so we can capture the scale-up and scale-down.
480-
config = {
481-
"handler": MagicMock(),
482-
"refresh_worker": False,
483-
"rp_args": {
484-
"rp_debugger": True,
485-
"rp_log_level": "DEBUG"
486-
}
487-
}
488-
489-
# Let's stop after the 20th call.
490-
scale_behavior = {
491-
'counter': 0
492-
}
493-
494-
def mock_is_alive():
495-
res = scale_behavior['counter'] < 10
496-
scale_behavior['counter'] += 1
497-
498-
# Let's oscillate between upscaling, downscaling, upscaling, downscaling, ...
499-
if scale_behavior['counter'] % 2 == 0:
500-
mock_get_job.return_value = {
501-
"id": "123", "input": {"number": 1}}
502-
else:
503-
mock_get_job.return_value = None
504-
return res
505-
506-
# Define the mock behaviors
507-
mock_run_job.return_value = {"output": {"result": "odd"}}
508-
with patch("runpod.serverless.modules.rp_scale.JobScaler.is_alive", wraps=mock_is_alive):
509-
runpod.serverless.start(config)
510-
511-
# 5 calls with actual jobs
512-
assert mock_run_job.call_count == 5
513-
assert mock_send_result.call_count == 5
514-
515462
# Test with sls-core
516463
async def test_run_worker_with_sls_core(self):
517464
'''

0 commit comments

Comments
 (0)