Skip to content

Commit 835508f

Browse files
Finalized tests and move public readers to their own folder.
1 parent ecae6a0 commit 835508f

5 files changed

Lines changed: 74 additions & 36 deletions

File tree

Open.ChannelExtensions.Tests/MergeTests.cs

Lines changed: 72 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -2,25 +2,18 @@
22

33
public static class MergeTests
44
{
5-
[Fact()]
6-
public static async Task BasicMergeTest()
7-
{
8-
// Arrange
9-
const int total = 3000000;
10-
const int bound = 100;
11-
12-
// 3 channels
13-
var c1 = Channel.CreateBounded<int>(bound);
14-
var c2 = Channel.CreateBounded<int>(bound);
15-
var c3 = Channel.CreateBounded<int>(bound);
16-
var writers = new[] { c1.Writer, c2.Writer, c3.Writer };
5+
const int Total = 3000000;
6+
const int Bounds = 100;
7+
const int Count = 5;
178

18-
// 3 readers
19-
var merging = new[] { c1.Reader, c2.Reader, c3.Reader }.Merge().ToListAsync(total);
9+
private static Channel<int>[] GetChannels()
10+
=> Enumerable.Range(0, Count).Select(_ => Channel.CreateBounded<int>(Bounds)).ToArray();
2011

12+
private static async Task BasicMergeTestCore(ChannelWriter<int>[] writers, ValueTask<List<int>> merging)
13+
{
2114
// Act
22-
await Parallel.ForAsync(0, total,
23-
(i, token) => writers[i % 3].WriteAsync(i, token));
15+
await Parallel.ForAsync(0, Total,
16+
(i, token) => writers[i % Count].WriteAsync(i, token));
2417

2518
foreach (var writer in writers)
2619
writer.Complete();
@@ -29,33 +22,82 @@ await Parallel.ForAsync(0, total,
2922
merged.Sort();
3023

3124
// Assert
32-
Assert.Equal(total, merged.Count);
33-
Assert.True(Enumerable.Range(0, total).SequenceEqual(merged));
25+
Assert.Equal(Total, merged.Count);
26+
Assert.True(Enumerable.Range(0, Total).SequenceEqual(merged));
3427
}
3528

3629
[Fact()]
37-
public static async Task ExceptionPropagationTest()
30+
public static async Task BasicMergeTest()
3831
{
39-
// Arrange
40-
const int total = 3000000;
41-
const int bound = 100;
32+
// 3 channels
33+
var c = GetChannels();
4234

35+
// 3 writers
36+
var writers = c.Select(e => e.Writer).ToArray();
37+
38+
// 3 readers
39+
var merging = c.Select(e => e.Reader).Merge().ToListAsync(Total);
40+
41+
await BasicMergeTestCore(writers, merging);
42+
}
43+
44+
[Fact()]
45+
public static async Task MergeChainTest()
46+
{
4347
// 3 channels
44-
var c1 = Channel.CreateBounded<int>(bound);
45-
var c2 = Channel.CreateBounded<int>(bound);
46-
var c3 = Channel.CreateBounded<int>(bound);
47-
var writers = new[] { c1.Writer, c2.Writer, c3.Writer };
48+
var c = GetChannels();
49+
50+
// 3 writers
51+
var writers = c.Select(e => e.Writer).ToArray();
52+
53+
var reader = c[0].Reader;
54+
for (int i = 1; i < c.Length; i++)
55+
reader = reader.Merge(c[i].Reader);
56+
57+
// 3 readers
58+
var merging = reader.ToListAsync(Total);
59+
60+
await BasicMergeTestCore(writers, merging);
61+
}
62+
63+
[Fact()]
64+
public static async Task MergeChainTest2()
65+
{
66+
// 3 channels
67+
var c = GetChannels();
68+
69+
// 3 writers
70+
var writers = c.Select(e => e.Writer).ToArray();
71+
72+
var reader = c[0].Reader.Merge(c[1].Reader, c.Skip(2).Select(e => e.Reader).ToArray());
73+
for (int i = 1; i < c.Length; i++)
74+
reader = reader.Merge(c[i].Reader);
75+
76+
// 3 readers
77+
var merging = reader.ToListAsync(Total);
78+
79+
await BasicMergeTestCore(writers, merging);
80+
}
81+
82+
[Fact()]
83+
public static async Task ExceptionPropagationTest()
84+
{
85+
// 3 channels
86+
var c = GetChannels();
87+
88+
// 3 writers
89+
var writers = c.Select(e => e.Writer).ToArray();
4890

4991
// 3 readers
50-
var merging = new[] { c1.Reader, c2.Reader, c3.Reader }.Merge();
51-
var list = merging.ToListAsync(total);
92+
var merging = c.Select(e => e.Reader).Merge();
93+
var list = merging.ToListAsync(Total);
5294

5395
// Act
54-
await Assert.ThrowsAsync<ChannelClosedException>(() => Parallel.ForAsync(0, total,
96+
await Assert.ThrowsAsync<ChannelClosedException>(() => Parallel.ForAsync(0, Total,
5597
async (i, token) =>
5698
{
5799
var w = writers[i % 3];
58-
if (i == total / 2)
100+
if (i == Total / 2)
59101
w.Complete(new Exception("Test"));
60102
else
61103
await w.WriteAsync(i, token).ConfigureAwait(false);

Open.ChannelExtensions/Open.ChannelExtensions.csproj

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
<RepositoryType>git</RepositoryType>
2323
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
2424
<GenerateDocumentationFile>true</GenerateDocumentationFile>
25-
<Version>8.0.2</Version>
25+
<Version>8.1.0</Version>
2626
<PackageReleaseNotes></PackageReleaseNotes>
2727
<PackageLicenseExpression>MIT</PackageLicenseExpression>
2828
<PublishRepositoryUrl>true</PublishRepositoryUrl>
@@ -57,8 +57,4 @@
5757
<PackageReference Include="System.Collections.Immutable" Version="8.*" />
5858
</ItemGroup>
5959

60-
<ItemGroup>
61-
<Folder Include="Readers\" />
62-
</ItemGroup>
63-
6460
</Project>
File renamed without changes.
File renamed without changes.

Open.ChannelExtensions/MergingChannelReader.cs renamed to Open.ChannelExtensions/Readers/MergingChannelReader.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,6 @@ public MergingChannelReader<T> Merge(
156156

157157
int count = _sources.Length + (others?.Length ?? 0);
158158

159-
count += _sources.Length;
160159
ImmutableArray<ChannelReader<T>>.Builder builder;
161160
if (other is MergingChannelReader<T> mcr)
162161
{
@@ -174,6 +173,7 @@ public MergingChannelReader<T> Merge(
174173
}
175174

176175
if (others is not null) builder.AddRange(others);
176+
Debug.Assert(builder.Count == builder.Capacity);
177177

178178
return new (builder.MoveToImmutable());
179179
}

0 commit comments

Comments
 (0)