From c22a64b589153e71f195fd2f0c9ad59430ee39a2 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Wed, 10 Jun 2026 10:55:21 +0500 Subject: [PATCH 1/2] Fix jpd.hostname AssertionError on container stop --- .../background/pipeline_tasks/jobs_terminating.py | 4 +++- .../_internal/server/services/jobs/__init__.py | 12 ++++++------ src/dstack/_internal/server/services/runner/ssh.py | 5 +++++ 3 files changed, 14 insertions(+), 7 deletions(-) diff --git a/src/dstack/_internal/server/background/pipeline_tasks/jobs_terminating.py b/src/dstack/_internal/server/background/pipeline_tasks/jobs_terminating.py index fe2e64ca4..3ae30c3ef 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/jobs_terminating.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/jobs_terminating.py @@ -660,10 +660,12 @@ async def _process_terminating_job( jrd = get_job_runtime_data(job_model) jpd = get_job_provisioning_data(job_model) - if jpd is not None: + if jpd is not None and jpd.hostname is not None and jpd.ssh_port is not None: logger.debug("%s: stopping container", fmt(job_model)) ssh_private_keys = get_instance_ssh_private_keys(instance_model) if not await _stop_container(job_model, jpd, ssh_private_keys): + # Dangling containers (tasks) are cleared periodically on instance checks by + # `remove_dangling_tasks_from_instance()` logger.warning( ( "%s: could not stop container, possibly due to a communication error." diff --git a/src/dstack/_internal/server/services/jobs/__init__.py b/src/dstack/_internal/server/services/jobs/__init__.py index 2d149ab77..e0c99221b 100644 --- a/src/dstack/_internal/server/services/jobs/__init__.py +++ b/src/dstack/_internal/server/services/jobs/__init__.py @@ -338,13 +338,13 @@ async def stop_runner(job_model: JobModel, instance_model: InstanceModel): `instance_model.project` must be loaded because SSH key resolution uses the project keys. """ ssh_private_keys = get_instance_ssh_private_keys(instance_model) - try: - jpd = get_job_provisioning_data(job_model) - if jpd is not None: - jrd = get_job_runtime_data(job_model) + jpd = get_job_provisioning_data(job_model) + if jpd is not None: + jrd = get_job_runtime_data(job_model) + try: await run_async(_stop_runner, ssh_private_keys, jpd, jrd, job_model) - except SSHError: - logger.debug("%s: failed to stop runner", fmt(job_model)) + except SSHError: + logger.debug("%s: failed to stop runner", fmt(job_model)) @runner_ssh_tunnel diff --git a/src/dstack/_internal/server/services/runner/ssh.py b/src/dstack/_internal/server/services/runner/ssh.py index 19dfb05a7..a7009df46 100644 --- a/src/dstack/_internal/server/services/runner/ssh.py +++ b/src/dstack/_internal/server/services/runner/ssh.py @@ -52,6 +52,11 @@ def wrapper( Returns: is successful """ + if job_provisioning_data.hostname is None or job_provisioning_data.ssh_port is None: + # The callers may try to establish tunnels even if hostname/ssh_port is missing + # and rely on `False` being returned in this case. + return False + if not settings.SERVER_SSH_POOL_ENABLED or not job_provisioning_data.dockerized: # Connections from dstack-server to runner's sshd are expected to be short # as the `inactivity_duration` feature distinguishes user and server connections based on duration. From 8220a52030d70d521259d7e2dc21165755899c29 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Wed, 10 Jun 2026 11:07:25 +0500 Subject: [PATCH 2/2] Add regression tests --- .../pipeline_tasks/test_terminating_jobs.py | 83 +++++++++++++++++++ 1 file changed, 83 insertions(+) diff --git a/src/tests/_internal/server/background/pipeline_tasks/test_terminating_jobs.py b/src/tests/_internal/server/background/pipeline_tasks/test_terminating_jobs.py index 5ec769519..af34b1913 100644 --- a/src/tests/_internal/server/background/pipeline_tasks/test_terminating_jobs.py +++ b/src/tests/_internal/server/background/pipeline_tasks/test_terminating_jobs.py @@ -902,3 +902,86 @@ async def test_keeps_related_instance_locked_on_processing_exception( assert job.lock_owner == JobTerminatingPipeline.__name__ assert instance.lock_token == job_lock_token assert instance.lock_owner == _get_related_instance_lock_owner(job.id) + + async def test_stops_job_gracefully_without_provisioning_data_hostname( + self, test_db, session: AsyncSession, worker: JobTerminatingWorker + ): + # Regression test for https://github.com/dstackai/dstack/issues/3950. + # Stopping a job that is still provisioning (no hostname/ssh_port yet) must not raise + # when the graceful stop tries to open an SSH tunnel to the runner. + project = await create_project(session=session) + user = await create_user(session=session) + instance = await create_instance( + session=session, + project=project, + status=InstanceStatus.BUSY, + ) + repo = await create_repo(session=session, project_id=project.id) + run = await create_run(session=session, project=project, repo=repo, user=user) + jpd = get_job_provisioning_data(dockerized=True) + jpd.hostname = None + jpd.ssh_port = None + job = await create_job( + session=session, + run=run, + status=JobStatus.TERMINATING, + termination_reason=JobTerminationReason.TERMINATED_BY_USER, + job_provisioning_data=jpd, + instance=instance, + ) + job.graceful_termination_attempts = 0 + _lock_job(job) + await session.commit() + + await worker.process(_job_to_pipeline_item(job)) + + await session.refresh(job) + assert job.status == JobStatus.TERMINATING + assert job.graceful_termination_attempts == 1 + assert job.remove_at is not None + assert job.instance_id == instance.id + + async def test_terminates_job_without_provisioning_data_hostname( + self, test_db, session: AsyncSession, worker: JobTerminatingWorker + ): + # Regression test for https://github.com/dstackai/dstack/issues/3950. + # The container stop is skipped (and must not raise) when the job has no hostname/ssh_port. + # Dangling containers are cleared later on instance checks by `remove_dangling_tasks_from_instance()`. + project = await create_project(session=session) + user = await create_user(session=session) + instance = await create_instance( + session=session, + project=project, + status=InstanceStatus.BUSY, + ) + repo = await create_repo(session=session, project_id=project.id) + run = await create_run(session=session, project=project, repo=repo, user=user) + jpd = get_job_provisioning_data(dockerized=True) + jpd.hostname = None + jpd.ssh_port = None + job = await create_job( + session=session, + run=run, + status=JobStatus.TERMINATING, + termination_reason=JobTerminationReason.TERMINATED_BY_USER, + job_provisioning_data=jpd, + instance=instance, + ) + job.graceful_termination_attempts = 1 + job.remove_at = get_current_datetime() - timedelta(minutes=1) + _lock_job(job) + await session.commit() + + with patch( + "dstack._internal.server.background.pipeline_tasks.jobs_terminating._stop_container", + new=AsyncMock(return_value=True), + ) as stop_container: + await worker.process(_job_to_pipeline_item(job)) + + stop_container.assert_not_awaited() + + await session.refresh(job) + await session.refresh(instance) + assert job.status == JobStatus.TERMINATED + assert job.instance_id is None + assert instance.status == InstanceStatus.IDLE