Skip to content

Commit 474bd76

Browse files
author
Oren (electricessence)
committed
Added PipeTo extensions.
1 parent c66483e commit 474bd76

8 files changed

Lines changed: 250 additions & 61 deletions

File tree

Open.ChannelExtensions.ComparisonTests/Program.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ static async Task Main()
1515
const int concurrency = 4;
1616
const int testSize = 30000001;
1717

18-
1918
{
2019
Console.WriteLine("Standard DataFlow operation test...");
2120
var block = new ActionBlock<int>(async i => await Delay(i));

Open.ChannelExtensions.Tests/BasicTests.cs

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using System.Collections.Generic;
33
using System.Diagnostics;
44
using System.Linq;
5+
using System.Threading.Channels;
56
using System.Threading.Tasks;
67
using Xunit;
78

@@ -85,6 +86,74 @@ public static async Task ReadAllAsync(int testSize)
8586
result.Clear();
8687
}
8788

89+
[Theory]
90+
[InlineData(testSize1)]
91+
[InlineData(testSize2)]
92+
public static async Task PipeToBounded(int testSize)
93+
{
94+
var range = Enumerable.Range(0, testSize);
95+
var result = new List<int>(testSize);
96+
97+
var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(100)
98+
{
99+
SingleReader = true,
100+
SingleWriter = true,
101+
AllowSynchronousContinuations = true
102+
});
103+
104+
var sw = Stopwatch.StartNew();
105+
var total = await range
106+
.ToChannel(singleReader: true)
107+
.PipeTo(channel, true)
108+
.ReadAllAsync(i =>
109+
{
110+
result.Add(i);
111+
return new ValueTask();
112+
});
113+
sw.Stop();
114+
115+
Console.WriteLine("Channel.ReadAllAsync(): {0}", sw.Elapsed);
116+
Console.WriteLine();
117+
118+
Assert.Equal(testSize, result.Count);
119+
Assert.True(result.SequenceEqual(range));
120+
result.Clear();
121+
}
122+
123+
[Theory]
124+
[InlineData(testSize1)]
125+
[InlineData(testSize2)]
126+
public static async Task PipeToUnbound(int testSize)
127+
{
128+
var range = Enumerable.Range(0, testSize);
129+
var result = new List<int>(testSize);
130+
131+
var channel = Channel.CreateUnbounded<int>(new UnboundedChannelOptions()
132+
{
133+
SingleReader = true,
134+
SingleWriter = true,
135+
AllowSynchronousContinuations = true
136+
});
137+
138+
var sw = Stopwatch.StartNew();
139+
var total = await range
140+
.ToChannel(singleReader: true)
141+
.PipeTo(channel, true)
142+
.ReadAllAsync(i =>
143+
{
144+
result.Add(i);
145+
return new ValueTask();
146+
});
147+
sw.Stop();
148+
149+
Console.WriteLine("Channel.ReadAllAsync(): {0}", sw.Elapsed);
150+
Console.WriteLine();
151+
152+
Assert.Equal(testSize, result.Count);
153+
Assert.True(result.SequenceEqual(range));
154+
result.Clear();
155+
}
156+
88157
[Theory]
89158
[InlineData(1000, 51)]
90159
[InlineData(50, 1000)]

Open.ChannelExtensions/.editorconfig

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,6 @@
22

33
# CA1303: Do not pass literals as localized parameters
44
dotnet_diagnostic.CA1303.severity = silent
5+
6+
# CA1031: Do not catch general exception types
7+
dotnet_diagnostic.CA1031.severity = silent

Open.ChannelExtensions/Documentation.xml

Lines changed: 100 additions & 45 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Open.ChannelExtensions/Extensions.Join.cs

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
using System.Collections.Generic;
1+
using System;
2+
using System.Collections.Generic;
23
using System.Threading;
34
using System.Threading.Channels;
45
using System.Threading.Tasks;
@@ -97,22 +98,31 @@ public static ChannelReader<T> Join<T>(this ChannelReader<T[]> source, bool sing
9798
public static ChannelReader<T> Join<T>(this ChannelReader<IAsyncEnumerable<T>> source, bool singleReader = false)
9899
{
99100
var buffer = CreateChannel<T>(1, singleReader);
101+
var writer = buffer.Writer;
100102

101-
source
102-
.ReadAllAsync(
103-
async (batch, i) =>
104-
{
105-
await foreach (var e in batch)
106-
await buffer.Writer.WriteAsync(e).ConfigureAwait(false);
107-
})
108-
.AsTask()
109-
.ContinueWith(
110-
t => buffer.CompleteAsync(t.Exception),
111-
CancellationToken.None,
112-
TaskContinuationOptions.ExecuteSynchronously,
113-
TaskScheduler.Current);
103+
_ = JoinCore();
114104

115105
return buffer.Reader;
106+
107+
async ValueTask JoinCore()
108+
{
109+
try
110+
{
111+
await source
112+
.ReadAllAsync(
113+
async (batch, i) =>
114+
{
115+
await foreach (var e in batch)
116+
await writer.WriteAsync(e).ConfigureAwait(false);
117+
});
118+
writer.Complete();
119+
}
120+
catch (Exception ex)
121+
{
122+
writer.Complete(ex);
123+
}
124+
125+
}
116126
}
117127
#endif
118128
}

Open.ChannelExtensions/Extensions.Pipe.cs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,58 @@ namespace Open.ChannelExtensions
88
{
99
public static partial class Extensions
1010
{
11+
/// <summary>
12+
/// Reads all entries from the source channel and writes them to the target.
13+
/// This is useful for managing different buffers sizes, especially if the source reader comes from a .Transform function.
14+
/// </summary>
15+
/// <typeparam name="T">The type contained by the source channel and written to the target..</typeparam>
16+
/// <param name="source">The source channel.</param>
17+
/// <param name="target">The target channel.</param>
18+
/// <param name="cancellationToken">An optional cancellation token.</param>
19+
public static async ValueTask PipeTo<T>(this ChannelReader<T> source,
20+
ChannelWriter<T> target,
21+
CancellationToken cancellationToken = default)
22+
=> await source.ReadAllAsync(e => target.WriteAsync(e, cancellationToken), cancellationToken);
23+
24+
/// <summary>
25+
/// Reads all entries from the source channel and writes them to the target.
26+
/// This is useful for managing different buffers sizes, especially if the source reader comes from a .Transform function.
27+
/// </summary>
28+
/// <typeparam name="T">The type contained by the source channel and written to the target..</typeparam>
29+
/// <param name="source">The source channel.</param>
30+
/// <param name="target">The target channel.</param>
31+
/// <param name="complete">Indicates to call complete on the target when the source is complete.</param>
32+
/// <param name="cancellationToken">An optional cancellation token.</param>
33+
/// <returns>The channel reader of the target.</returns>
34+
public static ChannelReader<T> PipeTo<T>(this ChannelReader<T> source,
35+
Channel<T> target,
36+
bool complete,
37+
CancellationToken cancellationToken = default)
38+
{
39+
if (source is null) throw new ArgumentNullException(nameof(source));
40+
if (target is null) throw new ArgumentNullException(nameof(target));
41+
Contract.EndContractBlock();
42+
43+
_ = PipeToCore();
44+
45+
return target.Reader;
46+
47+
async ValueTask PipeToCore()
48+
{
49+
try
50+
{
51+
await PipeTo(source, target.Writer, cancellationToken);
52+
if (complete)
53+
target.Writer.Complete();
54+
}
55+
catch (Exception ex)
56+
{
57+
if (complete)
58+
target.Writer.Complete(ex);
59+
}
60+
}
61+
}
62+
1163
/// <summary>
1264
/// Reads all entries concurrently and applies the values to the provided transform function before buffering the results into another channel for consumption.
1365
/// </summary>

Open.ChannelExtensions/Extensions.ReadConcurrently.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ public static Task<long> ReadAllConcurrentlyAsync<T>(this ChannelReader<T> reade
2323
Func<T, ValueTask> receiver,
2424
CancellationToken cancellationToken = default)
2525
{
26+
if (reader is null) throw new ArgumentNullException(nameof(reader));
2627
if (maxConcurrency < 1) throw new ArgumentOutOfRangeException(nameof(maxConcurrency), maxConcurrency, "Must be at least 1.");
2728
Contract.EndContractBlock();
2829

Open.ChannelExtensions/Open.ChannelExtensions.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ Part of the "Open" set of libraries.
1919
<RepositoryType>git</RepositoryType>
2020
<PackageTags>channels dotnet threading tasks extensions</PackageTags>
2121
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
22-
<Version>3.1.1</Version>
22+
<Version>3.2.0</Version>
2323
<PackageLicenseExpression>MIT</PackageLicenseExpression>
2424
<PackageReleaseNotes>Added TransformChannel class which facilitates providing a channel with different .</PackageReleaseNotes>
2525
<RepositoryUrl>https://github.com/electricessence/Open.ChannelExtensions</RepositoryUrl>

0 commit comments

Comments
 (0)