Skip to content

Commit 8287a38

Browse files
Merge pull request #270 from runpod/concurrency-fix
Concurrency fix
2 parents eac280d + a6a2a23 commit 8287a38

6 files changed

Lines changed: 28 additions & 18 deletions

File tree

runpod/serverless/modules/rp_http.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ async def _handle_result(session, job_data, job, url_template, log_message, is_s
5757
log.error(f"Error while returning job result. | {err}", job['id'])
5858

5959
finally:
60+
#job_data status is used for local development with FastAPI
6061
if url_template == JOB_DONE_URL and job_data.get('status', None) != 'IN_PROGRESS':
6162
job_list.remove_job(job["id"])
6263
log.info("Finished.", job['id'])

runpod/serverless/modules/rp_job.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,4 +198,4 @@ async def run_job_generator(
198198
log.error(err, job["id"])
199199
yield {"error": f"handler: {str(err)} \ntraceback: {traceback.format_exc()}"}
200200
finally:
201-
log.info('Finished', job["id"])
201+
log.info('Finished running generator.', job["id"])

runpod/serverless/modules/rp_scale.py

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -69,18 +69,21 @@ async def get_jobs(self, session):
6969
break
7070

7171
self.current_concurrency = self.concurrency_modifier(self.current_concurrency)
72+
log.debug(f"Concurrency set to: {self.current_concurrency}")
7273

73-
tasks = [
74-
asyncio.create_task(get_job(session, retry=False))
75-
for _ in range(self.current_concurrency if job_list.get_job_list() else 1)
76-
]
74+
log.debug(f"Jobs in progress: {job_list.get_job_count()}")
75+
if job_list.get_job_count() < self.current_concurrency:
76+
log.debug("Job list is less than concurrency, getting more jobs.")
7777

78-
for job_future in asyncio.as_completed(tasks):
79-
job = await job_future
80-
self.job_history.append(1 if job else 0)
81-
if job:
82-
yield job
78+
tasks = [
79+
asyncio.create_task(get_job(session, retry=False))
80+
for _ in range(self.current_concurrency if job_list.get_job_list() else 1)
81+
]
8382

84-
await asyncio.sleep(0)
83+
for job_future in asyncio.as_completed(tasks):
84+
job = await job_future
85+
self.job_history.append(1 if job else 0)
86+
if job:
87+
yield job
8588

86-
log.debug(f"Concurrency set to: {self.current_concurrency}")
89+
await asyncio.sleep(0)

runpod/serverless/modules/worker_state.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,3 +83,9 @@ def get_job_list(self):
8383
Returns the list of jobs as a string.
8484
'''
8585
return ','.join(str(job) for job in self.jobs) if self.jobs else None
86+
87+
def get_job_count(self):
88+
'''
89+
Returns the number of jobs.
90+
'''
91+
return len(self.jobs)

runpod/serverless/worker.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ async def _process_job(job, session, job_scaler, config):
4242
if rp_handler.is_generator(config["handler"]):
4343
is_stream = True
4444
generator_output = run_job_generator(config["handler"], job)
45-
log.debug("Handler is a generator, streaming results.")
45+
log.debug("Handler is a generator, streaming results.", job['id'])
4646

4747
job_result = {'output': []}
4848
async for stream_output in generator_output:
@@ -66,13 +66,13 @@ async def _process_job(job, session, job_scaler, config):
6666
# If rp_debugger is set, debugger output will be returned.
6767
if config["rp_args"].get("rp_debugger", False) and isinstance(job_result, dict):
6868
job_result["output"]["rp_debugger"] = rp_debugger.get_debugger_output()
69-
log.debug("rp_debugger | Flag set, returning debugger output.")
69+
log.debug("rp_debugger | Flag set, returning debugger output.", job['id'])
7070

7171
# Calculate ready delay for the debugger output.
7272
ready_delay = (config["reference_counter_start"] - REF_COUNT_ZERO) * 1000
7373
job_result["output"]["rp_debugger"]["ready_delay_ms"] = ready_delay
7474
else:
75-
log.debug("rp_debugger | Flag not set, skipping debugger output.")
75+
log.debug("rp_debugger | Flag not set, skipping debugger output.", job['id'])
7676
rp_debugger.clear_debugger_output()
7777

7878
# Send the job result to SLS

tests/test_serverless/test_modules/test_job.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,7 @@ async def test_run_job_generator_success(self):
296296
assert result == [{"output": "partial_output_1"}, {"output": "partial_output_2"}]
297297
assert mock_log.error.call_count == 0
298298
assert mock_log.info.call_count == 1
299-
mock_log.info.assert_called_with('Finished', '123')
299+
mock_log.info.assert_called_with('Finished running generator.', '123')
300300

301301
async def test_run_job_generator_success_async(self):
302302
'''
@@ -311,7 +311,7 @@ async def test_run_job_generator_success_async(self):
311311
assert result == [{"output": "partial_output_1"}, {"output": "partial_output_2"}]
312312
assert mock_log.error.call_count == 0
313313
assert mock_log.info.call_count == 1
314-
mock_log.info.assert_called_with('Finished', '123')
314+
mock_log.info.assert_called_with('Finished running generator.', '123')
315315

316316
async def test_run_job_generator_exception(self):
317317
'''
@@ -327,4 +327,4 @@ async def test_run_job_generator_exception(self):
327327
assert "error" in result[0]
328328
assert mock_log.error.call_count == 1
329329
assert mock_log.info.call_count == 1
330-
mock_log.info.assert_called_with('Finished', '123')
330+
mock_log.info.assert_called_with('Finished running generator.', '123')

0 commit comments

Comments
 (0)