Skip to content

Commit ec8e492

Browse files
Merge pull request #36 from Open-NET-Libraries/Merge-Extension
ChannelReader.Merge extension with tests.
2 parents 7cb47fd + 70ca097 commit ec8e492

10 files changed

Lines changed: 362 additions & 33 deletions

File tree

.editorconfig

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,8 @@ csharp_style_prefer_index_operator = true:silent
117117
csharp_style_prefer_null_check_over_type_check = true:warning
118118
csharp_style_prefer_range_operator = true:silent
119119
csharp_style_throw_expression = true:warning
120-
csharp_style_unused_value_assignment_preference = discard_variable
121-
csharp_style_unused_value_expression_statement_preference = discard_variable
120+
csharp_style_unused_value_assignment_preference = discard_variable:suggestion
121+
csharp_style_unused_value_expression_statement_preference = discard_variable:silent
122122

123123
# 'using' directive preferences
124124
csharp_using_directive_placement = outside_namespace:warning
@@ -236,10 +236,11 @@ dotnet_style_null_propagation = true:warning
236236
dotnet_style_prefer_is_null_check_over_reference_equality_method = true:warning
237237
dotnet_style_prefer_auto_properties = true:suggestion
238238
dotnet_style_object_initializer = true:suggestion
239-
dotnet_style_prefer_collection_expression = true:suggestion
239+
dotnet_style_prefer_collection_expression = when_types_exactly_match:suggestion
240240
dotnet_style_collection_initializer = true:suggestion
241241
dotnet_style_prefer_simplified_boolean_expressions = true:warning
242242
dotnet_style_prefer_conditional_expression_over_assignment = true:warning
243243
dotnet_style_prefer_conditional_expression_over_return = true:warning
244244
dotnet_style_explicit_tuple_names = true:suggestion
245-
dotnet_diagnostic.CA1510.severity = none
245+
dotnet_diagnostic.CA1510.severity = none
246+
dotnet_code_quality_unused_parameters = all:suggestion
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
namespace Open.ChannelExtensions.Tests;
2+
3+
public static class MergeTests
4+
{
5+
const int Total = 3000000;
6+
const int Bounds = 100;
7+
const int Count = 5;
8+
9+
private static Channel<int>[] GetChannels()
10+
=> Enumerable.Range(0, Count).Select(_ => Channel.CreateBounded<int>(Bounds)).ToArray();
11+
12+
private static async Task BasicMergeTestCore(ChannelWriter<int>[] writers, ValueTask<List<int>> merging)
13+
{
14+
// Act
15+
await Parallel.ForAsync(0, Total,
16+
(i, token) => writers[i % Count].WriteAsync(i, token));
17+
18+
foreach (var writer in writers)
19+
writer.Complete();
20+
21+
var merged = await merging;
22+
merged.Sort();
23+
24+
// Assert
25+
Assert.Equal(Total, merged.Count);
26+
Assert.True(Enumerable.Range(0, Total).SequenceEqual(merged));
27+
}
28+
29+
[Fact()]
30+
public static async Task BasicMergeTest()
31+
{
32+
// 3 channels
33+
var c = GetChannels();
34+
35+
// 3 writers
36+
var writers = c.Select(e => e.Writer).ToArray();
37+
38+
// 3 readers
39+
var merging = c.Select(e => e.Reader).Merge().ToListAsync(Total);
40+
41+
await BasicMergeTestCore(writers, merging);
42+
}
43+
44+
[Fact()]
45+
public static async Task MergeChainTest()
46+
{
47+
// 3 channels
48+
var c = GetChannels();
49+
50+
// 3 writers
51+
var writers = c.Select(e => e.Writer).ToArray();
52+
53+
var reader = c[0].Reader;
54+
for (int i = 1; i < c.Length; i++)
55+
reader = reader.Merge(c[i].Reader);
56+
57+
// 3 readers
58+
var merging = reader.ToListAsync(Total);
59+
60+
await BasicMergeTestCore(writers, merging);
61+
}
62+
63+
[Fact()]
64+
public static async Task MergeChainTest2()
65+
{
66+
// 3 channels
67+
var c = GetChannels();
68+
69+
// 3 writers
70+
var writers = c.Select(e => e.Writer).ToArray();
71+
72+
var reader = c[0].Reader.Merge(c[1].Reader, c.Skip(2).Select(e => e.Reader).ToArray());
73+
for (int i = 1; i < c.Length; i++)
74+
reader = reader.Merge(c[i].Reader);
75+
76+
// 3 readers
77+
var merging = reader.ToListAsync(Total);
78+
79+
await BasicMergeTestCore(writers, merging);
80+
}
81+
82+
[Fact()]
83+
public static async Task ExceptionPropagationTest()
84+
{
85+
// 3 channels
86+
var c = GetChannels();
87+
88+
// 3 writers
89+
var writers = c.Select(e => e.Writer).ToArray();
90+
91+
// 3 readers
92+
var merging = c.Select(e => e.Reader).Merge();
93+
var list = merging.ToListAsync(Total);
94+
95+
// Act
96+
await Assert.ThrowsAsync<ChannelClosedException>(() => Parallel.ForAsync(0, Total,
97+
async (i, token) =>
98+
{
99+
var w = writers[i % 3];
100+
if (i == Total / 2)
101+
w.Complete(new Exception("Test"));
102+
else
103+
await w.WriteAsync(i, token).ConfigureAwait(false);
104+
}));
105+
106+
// Assert
107+
await Assert.ThrowsAsync<Exception>(list.AsTask);
108+
await Assert.ThrowsAsync<Exception>(() => merging.Completion);
109+
}
110+
}

Open.ChannelExtensions/Extensions.Join.cs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,10 @@
22

33
public static partial class Extensions
44
{
5-
sealed class JoiningChannelReader<TList, T> : BufferingChannelReader<TList, T>
5+
sealed class JoiningChannelReader<TList, T>(ChannelReader<TList> source, bool singleReader)
6+
: BufferingChannelReader<TList, T>(source, singleReader)
67
where TList : IEnumerable<T>
78
{
8-
public JoiningChannelReader(ChannelReader<TList> source, bool singleReader) : base(source, singleReader)
9-
{
10-
}
11-
129
protected override bool TryPipeItems(bool _)
1310
{
1411
ChannelReader<TList>? source = Source;
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
using System.Collections.Immutable;
2+
3+
namespace Open.ChannelExtensions;
4+
5+
public static partial class Extensions
6+
{
7+
/// <summary>
8+
/// Creates a <see cref="MergingChannelReader{T}"/>
9+
/// that reads from multiple sources in a round-robin fashion.
10+
/// </summary>
11+
/// <typeparam name="T">The source type.</typeparam>
12+
/// <param name="sources">The channels to read from.</param>
13+
public static MergingChannelReader<T> Merge<T>(this IEnumerable<ChannelReader<T>> sources) => new(sources);
14+
15+
/// <summary>
16+
/// Merges the <paramref name="primary"/> with the <paramref name="secondary"/>
17+
/// as a <see cref="MergingChannelReader{T}"/>
18+
/// that reads from multiple sources in a round-robin fashion.
19+
/// </summary>
20+
/// <exception cref="ArgumentNullException">
21+
/// If the <paramref name="primary"/>
22+
/// or <paramref name="secondary"/> sources are null.
23+
/// </exception>
24+
/// <inheritdoc cref="MergingChannelReader{T}.Merge(ChannelReader{T}, ChannelReader{T}[])"/>/>
25+
public static MergingChannelReader<T> Merge<T>(
26+
this ChannelReader<T> primary,
27+
ChannelReader<T> secondary,
28+
params ChannelReader<T>[] others)
29+
{
30+
if (primary is null) throw new ArgumentNullException(nameof(primary));
31+
if (secondary is null) throw new ArgumentNullException(nameof(secondary));
32+
Contract.EndContractBlock();
33+
34+
// Is this already a merging reader? Then recapture the sources so it flattens the hierarchy.
35+
if (primary is MergingChannelReader<T> mcr)
36+
return mcr.Merge(secondary, others);
37+
38+
if(others is null || others.Length == 0)
39+
return new MergingChannelReader<T>(ImmutableArray.Create(primary, secondary));
40+
41+
var builder = ImmutableArray.CreateBuilder<ChannelReader<T>>(2 + others.Length);
42+
builder.Add(primary);
43+
builder.Add(secondary);
44+
builder.AddRange(others);
45+
return new MergingChannelReader<T>(builder.MoveToImmutable());
46+
}
47+
}

Open.ChannelExtensions/Extensions.Read.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1058,13 +1058,14 @@ public static ValueTask<long> ReadAllAsLines<T>(this Channel<T, string> channel,
10581058
/// </summary>
10591059
/// <typeparam name="T">The item type.</typeparam>
10601060
/// <param name="reader">The channel reader to read from.</param>
1061+
/// <param name="initialCapacity">An optional capacity to initialze the list with.</param>
10611062
/// <returns>A list containing all the items from the completed channel.</returns>
1062-
public static async ValueTask<List<T>> ToListAsync<T>(this ChannelReader<T> reader)
1063+
public static async ValueTask<List<T>> ToListAsync<T>(this ChannelReader<T> reader, int initialCapacity = -1)
10631064
{
10641065
if (reader is null) throw new ArgumentNullException(nameof(reader));
10651066
Contract.EndContractBlock();
10661067

1067-
var list = new List<T>();
1068+
List<T> list = initialCapacity < 0 ? new() : new(initialCapacity);
10681069
await ReadAll(reader, list.Add).ConfigureAwait(false);
10691070
return list;
10701071
}

Open.ChannelExtensions/Open.ChannelExtensions.csproj

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,18 +22,24 @@
2222
<RepositoryType>git</RepositoryType>
2323
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
2424
<GenerateDocumentationFile>true</GenerateDocumentationFile>
25-
<Version>8.0.2</Version>
25+
<Version>8.1.0</Version>
2626
<PackageReleaseNotes></PackageReleaseNotes>
2727
<PackageLicenseExpression>MIT</PackageLicenseExpression>
2828
<PublishRepositoryUrl>true</PublishRepositoryUrl>
2929
<IncludeSymbols>true</IncludeSymbols>
3030
<SymbolPackageFormat>snupkg</SymbolPackageFormat>
3131
<PackageIcon>logo.png</PackageIcon>
3232
<PackageReadmeFile>README.md</PackageReadmeFile>
33+
<!-- Don't enforce colleciton expressions as they don't work in .NET 6. -->
34+
<NoWarn>IDE0301;IDE0303;IDE0304</NoWarn>
35+
</PropertyGroup>
36+
37+
<PropertyGroup Condition="'$(TargetFramework)' == 'netstandard2.0'">
38+
<NoWarn>$(NoWarn);nullable</NoWarn>
3339
</PropertyGroup>
3440

3541
<ItemGroup>
36-
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="8.0.0" PrivateAssets="All" />
42+
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="8.*" PrivateAssets="All" />
3743
</ItemGroup>
3844

3945
<ItemGroup>
@@ -49,6 +55,7 @@
4955

5056
<ItemGroup Condition="'$(TargetFramework)' == 'netstandard2.0' or '$(TargetFramework)' == 'netstandard2.1'">
5157
<PackageReference Include="System.Threading.Channels" Version="8.*" />
58+
<PackageReference Include="System.Collections.Immutable" Version="8.*" />
5259
</ItemGroup>
5360

5461
</Project>

Open.ChannelExtensions/BatchingChannelReader.cs renamed to Open.ChannelExtensions/Readers/BatchingChannelReader.cs

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -246,12 +246,9 @@ protected override async ValueTask<bool> WaitToReadAsyncCore(
246246
}
247247

248248
/// <inheritdoc />
249-
public class QueueBatchingChannelReader<T> : BatchingChannelReader<T, Queue<T>>
249+
public class QueueBatchingChannelReader<T>(ChannelReader<T> source, int batchSize, bool singleReader, bool syncCont = false)
250+
: BatchingChannelReader<T, Queue<T>>(source, batchSize, singleReader, syncCont)
250251
{
251-
/// <inheritdoc />
252-
public QueueBatchingChannelReader(ChannelReader<T> source, int batchSize, bool singleReader, bool syncCont = false)
253-
: base(source, batchSize, singleReader, syncCont) { }
254-
255252
/// <inheritdoc />
256253
[MethodImpl(MethodImplOptions.AggressiveInlining)]
257254
protected override void AddBatchItem(Queue<T> batch, T item)
@@ -282,12 +279,9 @@ protected override void TrimBatch(Queue<T> batch)
282279
}
283280

284281
/// <inheritdoc />
285-
public class BatchingChannelReader<T> : BatchingChannelReader<T, List<T>>
282+
public class BatchingChannelReader<T>(ChannelReader<T> source, int batchSize, bool singleReader, bool syncCont = false)
283+
: BatchingChannelReader<T, List<T>>(source, batchSize, singleReader, syncCont)
286284
{
287-
/// <inheritdoc />
288-
public BatchingChannelReader(ChannelReader<T> source, int batchSize, bool singleReader, bool syncCont = false)
289-
: base(source, batchSize, singleReader, syncCont) { }
290-
291285
/// <inheritdoc />
292286
[MethodImpl(MethodImplOptions.AggressiveInlining)]
293287
protected override void AddBatchItem(List<T> batch, T item)
File renamed without changes.

0 commit comments

Comments
 (0)