11using System ;
22using System . Collections . Generic ;
33using System . Diagnostics . Contracts ;
4+ using System . Threading ;
45using System . Threading . Channels ;
6+ using System . Threading . Tasks ;
57
68namespace Open . ChannelExtensions
79{
@@ -12,7 +14,7 @@ namespace Open.ChannelExtensions
1214 public class BatchingChannelReader < T > : BufferingChannelReader < T , List < T > >
1315 {
1416 private readonly int _batchSize ;
15- private List < T > ? _current ;
17+ private List < T > ? _batch ;
1618
1719 /// <summary>
1820 /// Constructs a BatchingChannelReader.
@@ -24,7 +26,6 @@ public BatchingChannelReader(ChannelReader<T> source, int batchSize, bool single
2426 Contract . EndContractBlock ( ) ;
2527
2628 _batchSize = batchSize ;
27- _current = source . Completion . IsCompleted ? null : new List < T > ( batchSize ) ;
2829 }
2930
3031 /// <summary>
@@ -35,54 +36,53 @@ public bool ForceBatch()
3536 {
3637 if ( Buffer == null || Buffer . Reader . Completion . IsCompleted ) return false ;
3738 if ( TryPipeItems ( ) ) return true ;
39+ if ( _batch == null ) return false ;
3840
3941 lock ( Buffer )
4042 {
4143 if ( Buffer . Reader . Completion . IsCompleted ) return false ;
4244 if ( TryPipeItems ( ) ) return true ;
43- var c = _current ;
44- if ( c == null || c . Count == 0 || Buffer . Reader . Completion . IsCompleted )
45+ var c = _batch ;
46+ if ( c == null || Buffer . Reader . Completion . IsCompleted )
4547 return false ;
4648 c . TrimExcess ( ) ;
47- _current = new List < T > ( _batchSize ) ;
48- Buffer . Writer . TryWrite ( c ) ;
49+ _batch = null ;
50+ return Buffer . Writer . TryWrite ( c ) ; // Should always be true at this point.
4951 }
50-
51- return true ;
5252 }
5353
5454 /// <inheritdoc />
5555 protected override bool TryPipeItems ( )
5656 {
57- if ( _current == null || Buffer == null || Buffer . Reader . Completion . IsCompleted )
57+ if ( Buffer == null || Buffer . Reader . Completion . IsCompleted )
5858 return false ;
5959
6060 lock ( Buffer )
6161 {
62- var c = _current ;
63- if ( c == null || Buffer . Reader . Completion . IsCompleted )
64- return false ;
62+ if ( Buffer . Reader . Completion . IsCompleted ) return false ;
6563
64+ var c = _batch ;
6665 var source = Source ;
6766 if ( source == null || source . Completion . IsCompleted )
6867 {
6968 // All finished, release the last batch to the buffer.
69+ if ( c == null ) return false ;
70+
7071 c . TrimExcess ( ) ;
71- _current = null ;
72- if ( c . Count == 0 )
73- return false ;
72+ _batch = null ;
7473
7574 Buffer . Writer . TryWrite ( c ) ;
7675 return true ;
7776 }
7877
7978 while ( source . TryRead ( out T item ) )
8079 {
81- c . Add ( item ) ;
80+ if ( c == null ) _batch = c = new List < T > ( _batchSize ) { item } ;
81+ else c . Add ( item ) ;
8282
8383 if ( c . Count == _batchSize )
8484 {
85- _current = new List < T > ( _batchSize ) ;
85+ _batch = null ;
8686 Buffer . Writer . TryWrite ( c ) ;
8787 return true ;
8888 }
@@ -91,5 +91,39 @@ protected override bool TryPipeItems()
9191 return false ;
9292 }
9393 }
94+
95+ /// <inheritdoc />
96+ protected override async ValueTask < bool > WaitToReadAsyncCore ( ValueTask < bool > bufferWait , CancellationToken cancellationToken )
97+ {
98+
99+ var source = Source ;
100+ if ( source == null ) return await bufferWait ;
101+
102+ var b = bufferWait . AsTask ( ) ;
103+ using var tokenSource = CancellationTokenSource . CreateLinkedTokenSource ( cancellationToken ) ;
104+ var token = tokenSource . Token ;
105+
106+ start :
107+
108+ if ( b . IsCompleted ) return await b . ConfigureAwait ( false ) ;
109+
110+ var s = source . WaitToReadAsync ( token ) ;
111+ if ( s . IsCompleted && ! b . IsCompleted ) TryPipeItems ( ) ;
112+
113+ if ( b . IsCompleted )
114+ {
115+ tokenSource . Cancel ( ) ;
116+ return await b . ConfigureAwait ( false ) ;
117+ }
118+ await Task . WhenAny ( s . AsTask ( ) , b ) . ConfigureAwait ( false ) ;
119+ if ( b . IsCompleted )
120+ {
121+ tokenSource . Cancel ( ) ;
122+ return await b . ConfigureAwait ( false ) ;
123+ }
124+
125+ TryPipeItems ( ) ;
126+ goto start ;
127+ }
94128 }
95129}
0 commit comments