Skip to content

Commit 64668e5

Browse files
author
Oren (electricessence)
committed
Fixed batching.
1 parent 8e83e5f commit 64668e5

3 files changed

Lines changed: 50 additions & 58 deletions

File tree

Open.ChannelExtensions/BufferingChannelReader.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,17 @@ public BufferingChannelReader(ChannelReader<TIn> source, bool singleReader)
1818
Buffer = Extensions.CreateChannel<TOut>(-1, singleReader);
1919

2020
if (Source.Completion.IsCompleted)
21+
{
2122
Buffer.Writer.Complete(Source.Completion.Exception);
23+
}
2224
else
2325
{
2426
Source.Completion.ContinueWith(t =>
2527
{
2628
// Need to be sure writing is done before we continue...
2729
lock (Buffer)
2830
{
29-
TryPipeItems();
31+
while (TryPipeItems()) { }
3032
Buffer.Writer.Complete(t.Exception);
3133
}
3234
});

Open.ChannelExtensions/Extensions.Batch.cs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ public BatchingChannelReader(ChannelReader<T> source, int batchSize, bool single
1818
Contract.EndContractBlock();
1919

2020
_batchSize = batchSize;
21-
_current = new List<T>(batchSize);
21+
_current = source.Completion.IsCompleted ? null : new List<T>(batchSize);
2222
}
2323

2424

@@ -36,6 +36,9 @@ protected override bool TryPipeItems()
3636
if (c == null)
3737
return false;
3838

39+
if (Buffer.Reader.Completion.IsCompleted)
40+
return false;
41+
3942
if (Source.Completion.IsCompleted)
4043
{
4144
c.TrimExcess();
@@ -47,22 +50,19 @@ protected override bool TryPipeItems()
4750
return true;
4851
}
4952

50-
bool piped = false;
5153
while (Source.TryRead(out T item))
5254
{
53-
piped = true;
54-
5555
if (c.Count == _batchSize)
5656
{
57-
_current = new List<T>(_batchSize);
57+
_current = new List<T>(_batchSize) { item };
5858
Buffer.Writer.TryWrite(c);
59-
break;
59+
return true;
6060
}
6161

6262
c.Add(item);
6363
}
6464

65-
return piped;
65+
return false;
6666
}
6767
}
6868
}

Open.ChannelExtensions/docs/Documentation.xml

Lines changed: 40 additions & 50 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)