Skip to content

Commit cd7c454

Browse files
kovanclaude
andcommitted
Add asyncio memory object channels with split send/receive endpoints
Introduces open_channel() returning (SendChannel, ReceiveChannel) — a bounded, split-ownership channel primitive for asyncio. Features include ref-counted clone() for fan-in/fan-out patterns, backpressure via max_buffer_size, deterministic close signaling (EndOfChannel when all senders close, BrokenResourceError when all receivers close), and async iteration support. New exceptions: EndOfChannel, ClosedResourceError, BrokenResourceError, WouldBlock. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 72eca2a commit cd7c454

4 files changed

Lines changed: 919 additions & 3 deletions

File tree

Lib/asyncio/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
# This relies on each of the submodules having an __all__ variable.
88
from .base_events import *
9+
from .channels import *
910
from .coroutines import *
1011
from .events import *
1112
from .exceptions import *
@@ -24,6 +25,7 @@
2425
from .transports import *
2526

2627
__all__ = (base_events.__all__ +
28+
channels.__all__ +
2729
coroutines.__all__ +
2830
events.__all__ +
2931
exceptions.__all__ +

Lib/asyncio/channels.py

Lines changed: 331 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,331 @@
1+
"""Memory object channels for asyncio."""
2+
3+
__all__ = ('open_channel', 'SendChannel', 'ReceiveChannel',
4+
'ChannelStatistics')
5+
6+
import collections
7+
import dataclasses
8+
import math
9+
from types import GenericAlias
10+
11+
from . import exceptions
12+
from . import mixins
13+
14+
15+
@dataclasses.dataclass(frozen=True)
16+
class ChannelStatistics:
17+
"""Statistics for a memory channel pair."""
18+
current_buffer_used: int
19+
max_buffer_size: int | float
20+
open_send_channels: int
21+
open_receive_channels: int
22+
tasks_waiting_send: int
23+
tasks_waiting_receive: int
24+
25+
26+
class _ChannelState:
27+
"""Shared internal state between SendChannel and ReceiveChannel."""
28+
29+
__slots__ = ('max_buffer_size', 'buffer', 'open_send_channels',
30+
'open_receive_channels', 'waiting_senders',
31+
'waiting_receivers')
32+
33+
def __init__(self, max_buffer_size):
34+
self.max_buffer_size = max_buffer_size
35+
self.buffer = collections.deque()
36+
self.open_send_channels = 0
37+
self.open_receive_channels = 0
38+
# OrderedDict preserves insertion order for FIFO wakeup.
39+
# waiting_senders: Future -> item
40+
self.waiting_senders = collections.OrderedDict()
41+
# waiting_receivers: Future -> None
42+
self.waiting_receivers = collections.OrderedDict()
43+
44+
def statistics(self):
45+
return ChannelStatistics(
46+
current_buffer_used=len(self.buffer),
47+
max_buffer_size=self.max_buffer_size,
48+
open_send_channels=self.open_send_channels,
49+
open_receive_channels=self.open_receive_channels,
50+
tasks_waiting_send=len(self.waiting_senders),
51+
tasks_waiting_receive=len(self.waiting_receivers),
52+
)
53+
54+
55+
class SendChannel(mixins._LoopBoundMixin):
56+
"""The sending end of a memory object channel.
57+
58+
Items sent through this channel will be received by the corresponding
59+
ReceiveChannel. Multiple clones can exist for fan-in patterns.
60+
"""
61+
62+
__slots__ = ('_state', '_closed')
63+
64+
def __init__(self, state):
65+
self._state = state
66+
self._closed = False
67+
state.open_send_channels += 1
68+
69+
def send_nowait(self, item):
70+
"""Send an item without blocking.
71+
72+
Raises ClosedResourceError if the channel is closed.
73+
Raises BrokenResourceError if all receivers are closed.
74+
Raises WouldBlock if the buffer is full and no receiver is waiting.
75+
"""
76+
if self._closed:
77+
raise exceptions.ClosedResourceError(
78+
"this send channel is closed")
79+
state = self._state
80+
if state.open_receive_channels == 0:
81+
raise exceptions.BrokenResourceError(
82+
"all receive channels are closed")
83+
# Try to deliver directly to a waiting receiver.
84+
while state.waiting_receivers:
85+
fut, _ = state.waiting_receivers.popitem(last=False)
86+
if not fut.done():
87+
fut.set_result(item)
88+
return
89+
# Try to buffer the item.
90+
if len(state.buffer) < state.max_buffer_size:
91+
state.buffer.append(item)
92+
return
93+
raise exceptions.WouldBlock
94+
95+
async def send(self, item):
96+
"""Send an item, blocking if the buffer is full.
97+
98+
Raises ClosedResourceError if the channel is closed.
99+
Raises BrokenResourceError if all receivers are closed.
100+
"""
101+
try:
102+
self.send_nowait(item)
103+
return
104+
except exceptions.WouldBlock:
105+
pass
106+
loop = self._get_loop()
107+
fut = loop.create_future()
108+
state = self._state
109+
state.waiting_senders[fut] = item
110+
try:
111+
await fut
112+
except BaseException:
113+
state.waiting_senders.pop(fut, None)
114+
raise
115+
116+
def clone(self):
117+
"""Create a clone of this send channel sharing the same state.
118+
119+
Raises ClosedResourceError if this channel is closed.
120+
"""
121+
if self._closed:
122+
raise exceptions.ClosedResourceError(
123+
"this send channel is closed")
124+
return SendChannel(self._state)
125+
126+
def close(self):
127+
"""Close this send channel.
128+
129+
When the last send channel clone is closed, all waiting receivers
130+
will receive EndOfChannel.
131+
"""
132+
if self._closed:
133+
return
134+
self._closed = True
135+
state = self._state
136+
state.open_send_channels -= 1
137+
if state.open_send_channels == 0:
138+
# Last sender closed — wake all waiting receivers.
139+
while state.waiting_receivers:
140+
fut, _ = state.waiting_receivers.popitem(last=False)
141+
if not fut.done():
142+
fut.set_exception(exceptions.EndOfChannel())
143+
# Don't clear buffer — receivers may still drain it.
144+
145+
async def aclose(self):
146+
"""Async close (for async with support)."""
147+
self.close()
148+
149+
def statistics(self):
150+
"""Return channel statistics."""
151+
return self._state.statistics()
152+
153+
def __enter__(self):
154+
return self
155+
156+
def __exit__(self, *exc_info):
157+
self.close()
158+
159+
async def __aenter__(self):
160+
return self
161+
162+
async def __aexit__(self, *exc_info):
163+
self.close()
164+
165+
def __repr__(self):
166+
state = self._state
167+
info = []
168+
if self._closed:
169+
info.append('closed')
170+
info.append(f'max_buffer_size={state.max_buffer_size!r}')
171+
info.append(f'current_buffer_used={len(state.buffer)}')
172+
return f'<{type(self).__name__} {" ".join(info)}>'
173+
174+
__class_getitem__ = classmethod(GenericAlias)
175+
176+
177+
class ReceiveChannel(mixins._LoopBoundMixin):
178+
"""The receiving end of a memory object channel.
179+
180+
Items can be received one at a time or via async iteration.
181+
Multiple clones can exist for fan-out patterns.
182+
"""
183+
184+
__slots__ = ('_state', '_closed')
185+
186+
def __init__(self, state):
187+
self._state = state
188+
self._closed = False
189+
state.open_receive_channels += 1
190+
191+
def receive_nowait(self):
192+
"""Receive an item without blocking.
193+
194+
Raises ClosedResourceError if the channel is closed.
195+
Raises EndOfChannel if all senders are closed and the buffer is empty.
196+
Raises WouldBlock if no item is available.
197+
"""
198+
if self._closed:
199+
raise exceptions.ClosedResourceError(
200+
"this receive channel is closed")
201+
state = self._state
202+
# Try to accept an item from a waiting sender to refill the buffer.
203+
while state.waiting_senders:
204+
fut, item = state.waiting_senders.popitem(last=False)
205+
if not fut.done():
206+
state.buffer.append(item)
207+
fut.set_result(None)
208+
break
209+
# Try to return from the buffer.
210+
if state.buffer:
211+
return state.buffer.popleft()
212+
if state.open_send_channels == 0:
213+
raise exceptions.EndOfChannel
214+
raise exceptions.WouldBlock
215+
216+
async def receive(self):
217+
"""Receive an item, blocking if the buffer is empty.
218+
219+
Raises ClosedResourceError if the channel is closed.
220+
Raises EndOfChannel if all senders are closed and the buffer is empty.
221+
"""
222+
try:
223+
return self.receive_nowait()
224+
except exceptions.WouldBlock:
225+
pass
226+
loop = self._get_loop()
227+
fut = loop.create_future()
228+
state = self._state
229+
state.waiting_receivers[fut] = None
230+
try:
231+
return await fut
232+
except BaseException:
233+
state.waiting_receivers.pop(fut, None)
234+
raise
235+
236+
def clone(self):
237+
"""Create a clone of this receive channel sharing the same state.
238+
239+
Raises ClosedResourceError if this channel is closed.
240+
"""
241+
if self._closed:
242+
raise exceptions.ClosedResourceError(
243+
"this receive channel is closed")
244+
return ReceiveChannel(self._state)
245+
246+
def close(self):
247+
"""Close this receive channel.
248+
249+
When the last receive channel clone is closed, all waiting senders
250+
will receive BrokenResourceError and the buffer will be cleared.
251+
"""
252+
if self._closed:
253+
return
254+
self._closed = True
255+
state = self._state
256+
state.open_receive_channels -= 1
257+
if state.open_receive_channels == 0:
258+
# Last receiver closed — wake all waiting senders.
259+
while state.waiting_senders:
260+
fut, _ = state.waiting_senders.popitem(last=False)
261+
if not fut.done():
262+
fut.set_exception(exceptions.BrokenResourceError(
263+
"all receive channels are closed"))
264+
state.buffer.clear()
265+
266+
async def aclose(self):
267+
"""Async close (for async with support)."""
268+
self.close()
269+
270+
def statistics(self):
271+
"""Return channel statistics."""
272+
return self._state.statistics()
273+
274+
def __enter__(self):
275+
return self
276+
277+
def __exit__(self, *exc_info):
278+
self.close()
279+
280+
async def __aenter__(self):
281+
return self
282+
283+
async def __aexit__(self, *exc_info):
284+
self.close()
285+
286+
def __aiter__(self):
287+
return self
288+
289+
async def __anext__(self):
290+
try:
291+
return await self.receive()
292+
except exceptions.EndOfChannel:
293+
raise StopAsyncIteration from None
294+
295+
def __repr__(self):
296+
state = self._state
297+
info = []
298+
if self._closed:
299+
info.append('closed')
300+
info.append(f'max_buffer_size={state.max_buffer_size!r}')
301+
info.append(f'current_buffer_used={len(state.buffer)}')
302+
return f'<{type(self).__name__} {" ".join(info)}>'
303+
304+
__class_getitem__ = classmethod(GenericAlias)
305+
306+
307+
def open_channel(max_buffer_size):
308+
"""Create a new memory object channel pair.
309+
310+
Returns a (SendChannel, ReceiveChannel) tuple.
311+
312+
max_buffer_size is the maximum number of items that can be buffered.
313+
Use 0 for an unbuffered (rendezvous) channel. Use math.inf for an
314+
unbounded buffer.
315+
316+
Raises ValueError if max_buffer_size is negative or not a valid number.
317+
"""
318+
if not isinstance(max_buffer_size, (int, float)):
319+
raise TypeError(
320+
f"max_buffer_size must be int or float, "
321+
f"got {type(max_buffer_size).__name__}")
322+
if max_buffer_size < 0:
323+
raise ValueError("max_buffer_size must be >= 0")
324+
if isinstance(max_buffer_size, float) and max_buffer_size != math.inf:
325+
raise ValueError(
326+
"float max_buffer_size only accepts math.inf")
327+
328+
state = _ChannelState(max_buffer_size)
329+
send_channel = SendChannel(state)
330+
receive_channel = ReceiveChannel(state)
331+
return send_channel, receive_channel

Lib/asyncio/exceptions.py

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
"""asyncio exceptions."""
22

33

4-
__all__ = ('BrokenBarrierError',
5-
'CancelledError', 'InvalidStateError', 'TimeoutError',
4+
__all__ = ('BrokenBarrierError', 'BrokenResourceError',
5+
'CancelledError', 'ClosedResourceError',
6+
'EndOfChannel',
7+
'InvalidStateError', 'TimeoutError',
68
'IncompleteReadError', 'LimitOverrunError',
7-
'SendfileNotAvailableError')
9+
'SendfileNotAvailableError',
10+
'WouldBlock')
811

912

1013
class CancelledError(BaseException):
@@ -60,3 +63,19 @@ def __reduce__(self):
6063

6164
class BrokenBarrierError(RuntimeError):
6265
"""Barrier is broken by barrier.abort() call."""
66+
67+
68+
class WouldBlock(Exception):
69+
"""Raised by nowait functions when the operation would block."""
70+
71+
72+
class EndOfChannel(Exception):
73+
"""Raised when all send channels have been closed."""
74+
75+
76+
class ClosedResourceError(Exception):
77+
"""Raised when trying to use a channel that has been closed."""
78+
79+
80+
class BrokenResourceError(Exception):
81+
"""Raised when trying to send on a channel with no open receivers."""

0 commit comments

Comments
 (0)