Skip to content

Commit f5924f7

Browse files
committed
_internal(feat[async_subprocess]): Add async subprocess wrapper
why: Foundation for asyncio support - async equivalent of SubprocessCommand what: - Add AsyncSubprocessCommand dataclass with run(), check_output(), wait() methods - Add AsyncCompletedProcess result type mirroring subprocess.CompletedProcess - Use asyncio.create_subprocess_exec for secure non-shell execution - Use asyncio.wait_for for timeout handling (Python 3.10+ compatible) - Configure pytest-asyncio strict mode in pyproject.toml - Add comprehensive tests with 100% coverage (22 tests, all passing)
1 parent 7807878 commit f5924f7

3 files changed

Lines changed: 626 additions & 0 deletions

File tree

pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,8 @@ testpaths = [
236236
filterwarnings = [
237237
"ignore:The frontend.Option(Parser)? class.*:DeprecationWarning::",
238238
]
239+
asyncio_mode = "strict"
240+
asyncio_default_fixture_loop_scope = "function"
239241

240242
[tool.pytest-watcher]
241243
now = true
Lines changed: 387 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,387 @@
1+
# ruff: noqa: A002
2+
r"""Async invocable :mod:`subprocess` wrapper.
3+
4+
Async equivalent of :mod:`libvcs._internal.subprocess`.
5+
6+
Note
7+
----
8+
This is an internal API not covered by versioning policy.
9+
10+
Examples
11+
--------
12+
- :class:`~AsyncSubprocessCommand`: Async wrapper for subprocess execution.
13+
14+
Before (sync):
15+
16+
>>> import subprocess
17+
>>> subprocess.run(
18+
... ['echo', 'hi'],
19+
... capture_output=True, text=True
20+
... ).stdout
21+
'hi\n'
22+
23+
With this (async):
24+
25+
>>> import asyncio
26+
>>> async def example():
27+
... cmd = AsyncSubprocessCommand(['echo', 'hi'])
28+
... result = await cmd.run()
29+
... return result.stdout
30+
>>> # asyncio.run(example()) # Returns 'hi\n'
31+
"""
32+
33+
from __future__ import annotations
34+
35+
import asyncio
36+
import asyncio.subprocess
37+
import dataclasses
38+
import subprocess
39+
import typing as t
40+
from collections.abc import Mapping, Sequence
41+
42+
from libvcs._internal.types import StrOrBytesPath
43+
44+
from .dataclasses import SkipDefaultFieldsReprMixin
45+
46+
#: Command type alias
47+
_CMD: t.TypeAlias = StrOrBytesPath | Sequence[StrOrBytesPath]
48+
49+
#: Environment type alias
50+
_ENV: t.TypeAlias = Mapping[str, str]
51+
52+
53+
@dataclasses.dataclass
54+
class AsyncCompletedProcess(t.Generic[t.AnyStr]):
55+
"""Result of an async subprocess execution.
56+
57+
Mirrors :class:`subprocess.CompletedProcess` for async context.
58+
59+
Parameters
60+
----------
61+
args : list[str]
62+
The command arguments
63+
returncode : int
64+
Exit code of the process
65+
stdout : str | bytes | None
66+
Captured stdout, if any
67+
stderr : str | bytes | None
68+
Captured stderr, if any
69+
"""
70+
71+
args: list[str]
72+
returncode: int
73+
stdout: t.AnyStr | None = None
74+
stderr: t.AnyStr | None = None
75+
76+
def check_returncode(self) -> None:
77+
"""Raise CalledProcessError if returncode is non-zero.
78+
79+
Raises
80+
------
81+
subprocess.CalledProcessError
82+
If the process exited with a non-zero code
83+
"""
84+
if self.returncode != 0:
85+
raise subprocess.CalledProcessError(
86+
self.returncode,
87+
self.args,
88+
self.stdout,
89+
self.stderr,
90+
)
91+
92+
93+
@dataclasses.dataclass(repr=False)
94+
class AsyncSubprocessCommand(SkipDefaultFieldsReprMixin):
95+
r"""Async subprocess command wrapper.
96+
97+
Wraps asyncio subprocess execution in a dataclass for inspection
98+
and mutation before invocation.
99+
100+
Parameters
101+
----------
102+
args : list[str]
103+
Command and arguments to run
104+
cwd : str | Path, optional
105+
Working directory for the command
106+
env : dict[str, str], optional
107+
Environment variables for the command
108+
109+
Examples
110+
--------
111+
>>> import asyncio
112+
>>> async def example():
113+
... cmd = AsyncSubprocessCommand(['echo', 'hello'])
114+
... result = await cmd.run()
115+
... return result.stdout
116+
>>> # asyncio.run(example()) # Returns 'hello\n'
117+
118+
Modify before running:
119+
120+
>>> cmd = AsyncSubprocessCommand(['echo', 'hi'])
121+
>>> cmd.args
122+
['echo', 'hi']
123+
>>> cmd.args[1] = 'hello'
124+
>>> cmd.args
125+
['echo', 'hello']
126+
"""
127+
128+
args: _CMD
129+
cwd: StrOrBytesPath | None = None
130+
env: _ENV | None = None
131+
132+
# Limits for stdout/stderr
133+
limit: int = 2**16 # 64 KiB default buffer
134+
135+
def _args_as_list(self) -> list[str]:
136+
"""Convert args to list of strings for asyncio."""
137+
from os import PathLike
138+
139+
args = self.args
140+
if isinstance(args, (str, bytes, PathLike)):
141+
# Single command (str, bytes, or PathLike)
142+
return [str(args) if not isinstance(args, bytes) else args.decode()]
143+
# At this point, args is Sequence[StrOrBytesPath]
144+
return [str(arg) if not isinstance(arg, bytes) else arg.decode() for arg in args]
145+
146+
async def _create_process(
147+
self,
148+
*,
149+
stdin: int | None = None,
150+
stdout: int | None = None,
151+
stderr: int | None = None,
152+
) -> asyncio.subprocess.Process:
153+
"""Create an async subprocess.
154+
155+
Uses asyncio.create_subprocess_exec for secure, non-shell execution.
156+
"""
157+
args_list = self._args_as_list()
158+
# Use asyncio's subprocess creation (non-shell variant for security)
159+
return await asyncio.subprocess.create_subprocess_exec(
160+
*args_list,
161+
stdin=stdin,
162+
stdout=stdout,
163+
stderr=stderr,
164+
cwd=self.cwd,
165+
env=self.env,
166+
limit=self.limit,
167+
)
168+
169+
@t.overload
170+
async def run(
171+
self,
172+
*,
173+
check: bool = ...,
174+
timeout: float | None = ...,
175+
input: bytes | None = ...,
176+
text: t.Literal[False] = ...,
177+
) -> AsyncCompletedProcess[bytes]: ...
178+
179+
@t.overload
180+
async def run(
181+
self,
182+
*,
183+
check: bool = ...,
184+
timeout: float | None = ...,
185+
input: str | None = ...,
186+
text: t.Literal[True],
187+
) -> AsyncCompletedProcess[str]: ...
188+
189+
@t.overload
190+
async def run(
191+
self,
192+
*,
193+
check: bool = ...,
194+
timeout: float | None = ...,
195+
input: str | bytes | None = ...,
196+
text: bool = ...,
197+
) -> AsyncCompletedProcess[t.Any]: ...
198+
199+
async def run(
200+
self,
201+
*,
202+
check: bool = False,
203+
timeout: float | None = None,
204+
input: str | bytes | None = None,
205+
text: bool = False,
206+
) -> AsyncCompletedProcess[t.Any]:
207+
r"""Run command asynchronously and return completed process.
208+
209+
Uses asyncio subprocess APIs for non-blocking operation.
210+
211+
Parameters
212+
----------
213+
check : bool, default False
214+
If True, raise CalledProcessError on non-zero exit
215+
timeout : float, optional
216+
Timeout in seconds. Raises asyncio.TimeoutError if exceeded.
217+
input : str | bytes, optional
218+
Data to send to stdin
219+
text : bool, default False
220+
If True, decode stdout/stderr as text
221+
222+
Returns
223+
-------
224+
AsyncCompletedProcess
225+
Result with args, returncode, stdout, stderr
226+
227+
Raises
228+
------
229+
subprocess.CalledProcessError
230+
If check=True and process exits with non-zero code
231+
asyncio.TimeoutError
232+
If timeout is exceeded
233+
234+
Examples
235+
--------
236+
>>> import asyncio
237+
>>> async def example():
238+
... cmd = AsyncSubprocessCommand(['echo', 'hello'])
239+
... result = await cmd.run(text=True)
240+
... return result.stdout.strip()
241+
>>> # asyncio.run(example()) # Returns 'hello'
242+
"""
243+
args_list = self._args_as_list()
244+
245+
# Prepare input as bytes
246+
input_bytes: bytes | None = None
247+
if input is not None:
248+
input_bytes = input.encode() if isinstance(input, str) else input
249+
250+
# Create subprocess
251+
proc = await self._create_process(
252+
stdin=asyncio.subprocess.PIPE if input_bytes else None,
253+
stdout=asyncio.subprocess.PIPE,
254+
stderr=asyncio.subprocess.PIPE,
255+
)
256+
257+
try:
258+
# Use communicate() with optional timeout via wait_for
259+
if timeout is not None:
260+
stdout_bytes, stderr_bytes = await asyncio.wait_for(
261+
proc.communicate(input_bytes),
262+
timeout=timeout,
263+
)
264+
else:
265+
stdout_bytes, stderr_bytes = await proc.communicate(input_bytes)
266+
except asyncio.TimeoutError:
267+
# Kill process on timeout
268+
proc.kill()
269+
await proc.wait()
270+
raise
271+
272+
# Get return code (should be set after communicate)
273+
returncode = proc.returncode
274+
assert returncode is not None, "returncode should be set after communicate()"
275+
276+
# Decode if text mode
277+
stdout: str | bytes | None = stdout_bytes
278+
stderr: str | bytes | None = stderr_bytes
279+
if text:
280+
stdout = stdout_bytes.decode() if stdout_bytes else ""
281+
stderr = stderr_bytes.decode() if stderr_bytes else ""
282+
283+
result: AsyncCompletedProcess[t.Any] = AsyncCompletedProcess(
284+
args=args_list,
285+
returncode=returncode,
286+
stdout=stdout,
287+
stderr=stderr,
288+
)
289+
290+
if check:
291+
result.check_returncode()
292+
293+
return result
294+
295+
async def check_output(
296+
self,
297+
*,
298+
timeout: float | None = None,
299+
input: str | bytes | None = None,
300+
text: bool = False,
301+
) -> str | bytes:
302+
r"""Run command and return stdout, raising on non-zero exit.
303+
304+
Parameters
305+
----------
306+
timeout : float, optional
307+
Timeout in seconds
308+
input : str | bytes, optional
309+
Data to send to stdin
310+
text : bool, default False
311+
If True, return stdout as text
312+
313+
Returns
314+
-------
315+
str | bytes
316+
Command stdout
317+
318+
Raises
319+
------
320+
subprocess.CalledProcessError
321+
If process exits with non-zero code
322+
asyncio.TimeoutError
323+
If timeout is exceeded
324+
325+
Examples
326+
--------
327+
>>> import asyncio
328+
>>> async def example():
329+
... cmd = AsyncSubprocessCommand(['echo', 'hello'])
330+
... return await cmd.check_output(text=True)
331+
>>> # asyncio.run(example()) # Returns 'hello\n'
332+
"""
333+
result = await self.run(check=True, timeout=timeout, input=input, text=text)
334+
return result.stdout if result.stdout is not None else (b"" if not text else "")
335+
336+
async def wait(
337+
self,
338+
*,
339+
timeout: float | None = None,
340+
) -> int:
341+
"""Run command and return exit code.
342+
343+
Discards stdout/stderr.
344+
345+
Parameters
346+
----------
347+
timeout : float, optional
348+
Timeout in seconds
349+
350+
Returns
351+
-------
352+
int
353+
Process exit code
354+
355+
Raises
356+
------
357+
asyncio.TimeoutError
358+
If timeout is exceeded
359+
360+
Examples
361+
--------
362+
>>> import asyncio
363+
>>> async def example():
364+
... cmd = AsyncSubprocessCommand(['true'])
365+
... return await cmd.wait()
366+
>>> # asyncio.run(example()) # Returns 0
367+
"""
368+
proc = await self._create_process(
369+
stdin=asyncio.subprocess.DEVNULL,
370+
stdout=asyncio.subprocess.DEVNULL,
371+
stderr=asyncio.subprocess.DEVNULL,
372+
)
373+
374+
try:
375+
if timeout is not None:
376+
returncode = await asyncio.wait_for(
377+
proc.wait(),
378+
timeout=timeout,
379+
)
380+
else:
381+
returncode = await proc.wait()
382+
except asyncio.TimeoutError:
383+
proc.kill()
384+
await proc.wait()
385+
raise
386+
387+
return returncode

0 commit comments

Comments
 (0)