Skip to content

Commit 4656613

Browse files
committed
_internal(feat[async_run]): Add async subprocess execution with callbacks
why: Phase 2 of asyncio support - async_run() enables non-blocking command execution with real-time progress callbacks for VCS operations. what: - Add AsyncProgressCallbackProtocol for type-safe async callbacks - Add wrap_sync_callback() helper for users with existing sync callbacks - Add async_run() function matching sync run() API with: - StreamReader.readline() for line-by-line stderr streaming - Timeout support via asyncio.wait_for() - CommandError/CommandTimeoutError on failures - Add comprehensive tests (19 tests) with NamedTuple parametrization
1 parent 95fe8c3 commit 4656613

2 files changed

Lines changed: 560 additions & 0 deletions

File tree

src/libvcs/_internal/async_run.py

Lines changed: 298 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,298 @@
1+
"""Async subprocess execution with progress callbacks.
2+
3+
Async equivalent of :mod:`libvcs._internal.run`.
4+
5+
Note
6+
----
7+
This is an internal API not covered by versioning policy.
8+
9+
Examples
10+
--------
11+
- :func:`~async_run`: Async command execution with progress callback.
12+
13+
Before (sync):
14+
15+
>>> from libvcs._internal.run import run
16+
>>> output = run(['echo', 'hello'], check_returncode=True)
17+
18+
With this (async):
19+
20+
>>> import asyncio
21+
>>> from libvcs._internal.async_run import async_run
22+
>>> async def example():
23+
... output = await async_run(['echo', 'hello'])
24+
... return output
25+
>>> # asyncio.run(example()) # Returns 'hello'
26+
"""
27+
28+
from __future__ import annotations
29+
30+
import asyncio
31+
import asyncio.subprocess
32+
import datetime
33+
import logging
34+
import sys
35+
import typing as t
36+
from collections.abc import Mapping, Sequence
37+
38+
from libvcs import exc
39+
from libvcs._internal.types import StrOrBytesPath
40+
41+
from .run import console_to_str
42+
43+
logger = logging.getLogger(__name__)
44+
45+
46+
class AsyncProgressCallbackProtocol(t.Protocol):
47+
"""Async callback to report subprocess communication.
48+
49+
Async equivalent of :class:`~libvcs._internal.run.ProgressCallbackProtocol`.
50+
51+
Examples
52+
--------
53+
>>> async def my_progress(output: str, timestamp: datetime.datetime) -> None:
54+
... print(f"[{timestamp}] {output}", end="")
55+
56+
See Also
57+
--------
58+
libvcs._internal.run.ProgressCallbackProtocol : Sync equivalent
59+
wrap_sync_callback : Helper to wrap sync callbacks for async use
60+
"""
61+
62+
async def __call__(self, output: str, timestamp: datetime.datetime) -> None:
63+
"""Process progress for subprocess communication."""
64+
...
65+
66+
67+
def wrap_sync_callback(
68+
sync_cb: t.Callable[[str, datetime.datetime], None],
69+
) -> AsyncProgressCallbackProtocol:
70+
"""Wrap a sync callback for use with async APIs.
71+
72+
This helper allows users with existing sync callbacks to use them
73+
with async APIs without modification.
74+
75+
Parameters
76+
----------
77+
sync_cb : Callable[[str, datetime.datetime], None]
78+
Synchronous callback function
79+
80+
Returns
81+
-------
82+
AsyncProgressCallbackProtocol
83+
Async wrapper that calls the sync callback
84+
85+
Examples
86+
--------
87+
>>> def my_sync_progress(output: str, timestamp: datetime.datetime) -> None:
88+
... print(output, end="")
89+
>>> async_cb = wrap_sync_callback(my_sync_progress)
90+
>>> # Now use async_cb with async_run()
91+
"""
92+
93+
async def wrapper(output: str, timestamp: datetime.datetime) -> None:
94+
sync_cb(output, timestamp)
95+
96+
return wrapper
97+
98+
99+
if sys.platform == "win32":
100+
_ENV: t.TypeAlias = Mapping[str, str]
101+
else:
102+
_ENV: t.TypeAlias = Mapping[bytes, StrOrBytesPath] | Mapping[str, StrOrBytesPath]
103+
104+
_CMD: t.TypeAlias = StrOrBytesPath | Sequence[StrOrBytesPath]
105+
106+
107+
def _args_to_list(args: _CMD) -> list[str]:
108+
"""Convert command args to list of strings.
109+
110+
Parameters
111+
----------
112+
args : str | bytes | Path | Sequence[str | bytes | Path]
113+
Command arguments in various forms
114+
115+
Returns
116+
-------
117+
list[str]
118+
Normalized list of string arguments
119+
"""
120+
from os import PathLike
121+
122+
if isinstance(args, (str, bytes, PathLike)):
123+
if isinstance(args, bytes):
124+
return [args.decode()]
125+
return [str(args)]
126+
return [arg.decode() if isinstance(arg, bytes) else str(arg) for arg in args]
127+
128+
129+
async def async_run(
130+
args: _CMD,
131+
*,
132+
cwd: StrOrBytesPath | None = None,
133+
env: _ENV | None = None,
134+
check_returncode: bool = True,
135+
callback: AsyncProgressCallbackProtocol | None = None,
136+
timeout: float | None = None,
137+
) -> str:
138+
"""Run a command asynchronously.
139+
140+
Run 'args' and return stdout content (non-blocking). Optionally stream
141+
stderr to a callback for progress reporting. Raises an exception if
142+
the command exits non-zero (when check_returncode=True).
143+
144+
This is the async equivalent of :func:`~libvcs._internal.run.run`.
145+
146+
Parameters
147+
----------
148+
args : list[str] | str
149+
The command to run
150+
cwd : str | Path, optional
151+
Working directory for the command
152+
env : Mapping[str, str], optional
153+
Environment variables for the command
154+
check_returncode : bool, default True
155+
If True, raise :class:`~libvcs.exc.CommandError` on non-zero exit
156+
callback : AsyncProgressCallbackProtocol, optional
157+
Async callback to receive stderr output in real-time.
158+
Signature: ``async def callback(output: str, timestamp: datetime) -> None``
159+
timeout : float, optional
160+
Timeout in seconds. Raises :class:`~libvcs.exc.CommandTimeoutError`
161+
if exceeded.
162+
163+
Returns
164+
-------
165+
str
166+
Combined stdout output
167+
168+
Raises
169+
------
170+
libvcs.exc.CommandError
171+
If check_returncode=True and process exits with non-zero code
172+
libvcs.exc.CommandTimeoutError
173+
If timeout is exceeded
174+
175+
Examples
176+
--------
177+
Basic usage:
178+
179+
>>> import asyncio
180+
>>> async def example():
181+
... output = await async_run(['echo', 'hello'])
182+
... return output.strip()
183+
>>> # asyncio.run(example()) # Returns 'hello'
184+
185+
With progress callback:
186+
187+
>>> async def progress(output: str, timestamp: datetime.datetime) -> None:
188+
... print(f"Progress: {output}", end="")
189+
>>> async def example():
190+
... output = await async_run(['git', 'clone', url], callback=progress)
191+
... return output
192+
>>> # asyncio.run(example())
193+
194+
See Also
195+
--------
196+
libvcs._internal.run.run : Synchronous equivalent
197+
AsyncSubprocessCommand : Lower-level async subprocess wrapper
198+
"""
199+
args_list = _args_to_list(args)
200+
201+
# Create subprocess with pipes (using non-shell exec for security)
202+
proc = await asyncio.subprocess.create_subprocess_exec(
203+
*args_list,
204+
stdout=asyncio.subprocess.PIPE,
205+
stderr=asyncio.subprocess.PIPE,
206+
cwd=cwd,
207+
env=env,
208+
)
209+
210+
async def _run_with_callback() -> tuple[bytes, bytes, int]:
211+
"""Run subprocess, streaming stderr to callback."""
212+
stdout_data = b""
213+
stderr_data = b""
214+
215+
assert proc.stdout is not None
216+
assert proc.stderr is not None
217+
218+
# Read stderr line-by-line for progress callback
219+
if callback is not None:
220+
# Stream stderr to callback while collecting stdout
221+
async def read_stderr() -> bytes:
222+
collected = b""
223+
assert proc.stderr is not None
224+
while True:
225+
line = await proc.stderr.readline()
226+
if not line:
227+
break
228+
collected += line
229+
# Call progress callback with decoded line
230+
await callback(
231+
output=console_to_str(line),
232+
timestamp=datetime.datetime.now(),
233+
)
234+
return collected
235+
236+
# Run stdout collection and stderr streaming concurrently
237+
stdout_task = asyncio.create_task(proc.stdout.read())
238+
stderr_task = asyncio.create_task(read_stderr())
239+
240+
stdout_data, stderr_data = await asyncio.gather(stdout_task, stderr_task)
241+
242+
# Send final carriage return (matching sync behavior)
243+
await callback(output="\r", timestamp=datetime.datetime.now())
244+
else:
245+
# No callback - just collect both streams
246+
stdout_data, stderr_data = await proc.communicate()
247+
248+
# Wait for process to complete
249+
await proc.wait()
250+
returncode = proc.returncode
251+
assert returncode is not None
252+
253+
return stdout_data, stderr_data, returncode
254+
255+
try:
256+
if timeout is not None:
257+
stdout_bytes, stderr_bytes, returncode = await asyncio.wait_for(
258+
_run_with_callback(),
259+
timeout=timeout,
260+
)
261+
else:
262+
stdout_bytes, stderr_bytes, returncode = await _run_with_callback()
263+
except asyncio.TimeoutError:
264+
# Kill process on timeout
265+
proc.kill()
266+
await proc.wait()
267+
raise exc.CommandTimeoutError(
268+
output="Command timed out",
269+
returncode=-1,
270+
cmd=args_list,
271+
) from None
272+
273+
# Process stdout: strip and join lines (matching sync behavior)
274+
if stdout_bytes:
275+
lines = filter(
276+
None,
277+
(line.strip() for line in stdout_bytes.splitlines()),
278+
)
279+
output = console_to_str(b"\n".join(lines))
280+
else:
281+
output = ""
282+
283+
# On error, use stderr content
284+
if returncode != 0 and stderr_bytes:
285+
stderr_lines = filter(
286+
None,
287+
(line.strip() for line in stderr_bytes.splitlines()),
288+
)
289+
output = console_to_str(b"".join(stderr_lines))
290+
291+
if returncode != 0 and check_returncode:
292+
raise exc.CommandError(
293+
output=output,
294+
returncode=returncode,
295+
cmd=args_list,
296+
)
297+
298+
return output

0 commit comments

Comments
 (0)