Skip to content

Commit ecae6a0

Browse files
Updates. (checkpoint)
1 parent 1751f8f commit ecae6a0

8 files changed

Lines changed: 245 additions & 152 deletions

.editorconfig

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,8 @@ csharp_style_prefer_index_operator = true:silent
117117
csharp_style_prefer_null_check_over_type_check = true:warning
118118
csharp_style_prefer_range_operator = true:silent
119119
csharp_style_throw_expression = true:warning
120-
csharp_style_unused_value_assignment_preference = discard_variable
121-
csharp_style_unused_value_expression_statement_preference = discard_variable
120+
csharp_style_unused_value_assignment_preference = discard_variable:suggestion
121+
csharp_style_unused_value_expression_statement_preference = discard_variable:silent
122122

123123
# 'using' directive preferences
124124
csharp_using_directive_placement = outside_namespace:warning
@@ -236,10 +236,11 @@ dotnet_style_null_propagation = true:warning
236236
dotnet_style_prefer_is_null_check_over_reference_equality_method = true:warning
237237
dotnet_style_prefer_auto_properties = true:suggestion
238238
dotnet_style_object_initializer = true:suggestion
239-
dotnet_style_prefer_collection_expression = true:suggestion
239+
dotnet_style_prefer_collection_expression = when_types_exactly_match:suggestion
240240
dotnet_style_collection_initializer = true:suggestion
241241
dotnet_style_prefer_simplified_boolean_expressions = true:warning
242242
dotnet_style_prefer_conditional_expression_over_assignment = true:warning
243243
dotnet_style_prefer_conditional_expression_over_return = true:warning
244244
dotnet_style_explicit_tuple_names = true:suggestion
245-
dotnet_diagnostic.CA1510.severity = none
245+
dotnet_diagnostic.CA1510.severity = none
246+
dotnet_code_quality_unused_parameters = all:suggestion

Open.ChannelExtensions/BatchingChannelReader.cs

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -246,12 +246,9 @@ protected override async ValueTask<bool> WaitToReadAsyncCore(
246246
}
247247

248248
/// <inheritdoc />
249-
public class QueueBatchingChannelReader<T> : BatchingChannelReader<T, Queue<T>>
249+
public class QueueBatchingChannelReader<T>(ChannelReader<T> source, int batchSize, bool singleReader, bool syncCont = false)
250+
: BatchingChannelReader<T, Queue<T>>(source, batchSize, singleReader, syncCont)
250251
{
251-
/// <inheritdoc />
252-
public QueueBatchingChannelReader(ChannelReader<T> source, int batchSize, bool singleReader, bool syncCont = false)
253-
: base(source, batchSize, singleReader, syncCont) { }
254-
255252
/// <inheritdoc />
256253
[MethodImpl(MethodImplOptions.AggressiveInlining)]
257254
protected override void AddBatchItem(Queue<T> batch, T item)
@@ -282,12 +279,9 @@ protected override void TrimBatch(Queue<T> batch)
282279
}
283280

284281
/// <inheritdoc />
285-
public class BatchingChannelReader<T> : BatchingChannelReader<T, List<T>>
282+
public class BatchingChannelReader<T>(ChannelReader<T> source, int batchSize, bool singleReader, bool syncCont = false)
283+
: BatchingChannelReader<T, List<T>>(source, batchSize, singleReader, syncCont)
286284
{
287-
/// <inheritdoc />
288-
public BatchingChannelReader(ChannelReader<T> source, int batchSize, bool singleReader, bool syncCont = false)
289-
: base(source, batchSize, singleReader, syncCont) { }
290-
291285
/// <inheritdoc />
292286
[MethodImpl(MethodImplOptions.AggressiveInlining)]
293287
protected override void AddBatchItem(List<T> batch, T item)

Open.ChannelExtensions/Extensions.Join.cs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,10 @@
22

33
public static partial class Extensions
44
{
5-
sealed class JoiningChannelReader<TList, T> : BufferingChannelReader<TList, T>
5+
sealed class JoiningChannelReader<TList, T>(ChannelReader<TList> source, bool singleReader)
6+
: BufferingChannelReader<TList, T>(source, singleReader)
67
where TList : IEnumerable<T>
78
{
8-
public JoiningChannelReader(ChannelReader<TList> source, bool singleReader) : base(source, singleReader)
9-
{
10-
}
11-
129
protected override bool TryPipeItems(bool _)
1310
{
1411
ChannelReader<TList>? source = Source;
Lines changed: 39 additions & 120 deletions
Original file line numberDiff line numberDiff line change
@@ -1,128 +1,47 @@
1-
namespace Open.ChannelExtensions;
1+
using System.Collections.Immutable;
2+
3+
namespace Open.ChannelExtensions;
24

35
public static partial class Extensions
46
{
5-
sealed class MergingChannelReader<T> : ChannelReader<T>
6-
{
7-
public MergingChannelReader(IEnumerable<ChannelReader<T>> sources)
8-
{
9-
if (sources is null) throw new ArgumentNullException(nameof(sources));
10-
Contract.EndContractBlock();
11-
12-
var active = sources.Where(s =>
13-
{
14-
Debug.Assert(s is not null);
15-
return s.Completion.Status != TaskStatus.RanToCompletion;
16-
}).ToArray();
17-
18-
_count = active.Length;
19-
20-
if (_count == 0)
21-
{
22-
_sources = [];
23-
Completion = Task.CompletedTask;
24-
return;
25-
}
26-
27-
_sources = active;
28-
29-
// Capture the initial list of completions.
30-
var completions = active.Select(e => e.Completion).ToList();
31-
32-
// Create a task that completes when any of the sources are faulted:
33-
Completion = Task.Run(async () =>
34-
{
35-
// Wait for any of the tasks to complete and let it throw if it faults.
36-
while (completions.Count != 0)
37-
{
38-
var completed = await Task.WhenAny(completions).ConfigureAwait(false);
39-
// Propagate the exception.
40-
await completed.ConfigureAwait(false);
41-
completions.Remove(completed);
42-
}
43-
});
44-
}
45-
46-
private readonly ChannelReader<T>[] _sources;
47-
public override Task Completion { get; }
48-
49-
readonly int _count;
50-
int _next = -1;
51-
52-
public override bool TryRead(out T item)
53-
{
54-
int previous = -1;
55-
// Try as many times as there are sources before giving up.
56-
for (var attempt = 0; attempt < _count; attempt++)
57-
{
58-
// If the value overflows, it will be negative, which is fine, we'll adapt.
59-
var i = Interlocked.Increment(ref _next) % _count;
60-
if (i < 0) i += _count;
61-
62-
var source = _sources[i];
63-
64-
if (source.TryRead(out T? s))
65-
{
66-
item = s;
67-
return true;
68-
}
69-
70-
// Help the round-robin to try each source at least once.
71-
// If previous is not -1 and i is not the next in the sequence,
72-
// then another thread has already tried that source.
73-
if (previous != -1 && (previous + 1) % _count != i)
74-
attempt--; // Allow for an extra attempt.
75-
76-
previous = i;
77-
}
78-
79-
item = default!;
80-
return false;
81-
}
82-
83-
public override ValueTask<bool> WaitToReadAsync(CancellationToken cancellationToken = default)
84-
{
85-
var completion = Completion;
86-
if (Completion.IsCompleted)
87-
{
88-
return completion.IsFaulted
89-
? new ValueTask<bool>(Task.FromException<bool>(completion.Exception!))
90-
: new ValueTask<bool>(false);
91-
}
92-
93-
if (cancellationToken.IsCancellationRequested)
94-
return new ValueTask<bool>(Task.FromCanceled<bool>(cancellationToken));
95-
96-
// Not complete or cancelled? Wait on the sources.
97-
return WaitToReadAsyncCore(cancellationToken);
98-
}
99-
100-
private async ValueTask<bool> WaitToReadAsyncCore(CancellationToken cancellationToken)
101-
{
102-
retry:
103-
// We don't care about ones that have already completed.
104-
var active = _sources.Where(s => s.Completion.Status != TaskStatus.RanToCompletion).ToArray();
105-
if (active.Length == 0) return false;
106-
107-
var next = await Task.WhenAny(active.Select(s => s.WaitToReadAsync(cancellationToken).AsTask())).ConfigureAwait(false);
108-
109-
// Allow for possible exception to be thrown.
110-
var result = await next.ConfigureAwait(false);
111-
if (result) return true;
112-
113-
// If result was false, then there's one less and we should try again.
114-
goto retry;
115-
}
116-
}
117-
1187
/// <summary>
119-
/// Reads from multiple sources in a round-robin fashion.
8+
/// Creates a <see cref="MergingChannelReader{T}"/>
9+
/// that reads from multiple sources in a round-robin fashion.
12010
/// </summary>
12111
/// <typeparam name="T">The source type.</typeparam>
12212
/// <param name="sources">The channels to read from.</param>
123-
/// <returns>
124-
/// A <see cref="ChannelReader{T}"/> that reads from all sources in a round-robin fashion.
125-
/// </returns>
126-
public static ChannelReader<T> Merge<T>(this IEnumerable<ChannelReader<T>> sources)
127-
=> new MergingChannelReader<T>(sources);
13+
public static MergingChannelReader<T> Merge<T>(this IEnumerable<ChannelReader<T>> sources) => new(sources);
14+
15+
/// <summary>
16+
/// Merges the <paramref name="primary"/> with the <paramref name="secondary"/>
17+
/// as a <see cref="MergingChannelReader{T}"/>
18+
/// that reads from multiple sources in a round-robin fashion.
19+
/// </summary>
20+
/// <exception cref="ArgumentNullException">
21+
/// If the <paramref name="primary"/>
22+
/// or <paramref name="secondary"/> sources are null.
23+
/// </exception>
24+
/// <inheritdoc cref="MergingChannelReader{T}.Merge(ChannelReader{T}, ChannelReader{T}[])"/>/>
25+
public static MergingChannelReader<T> Merge<T>(
26+
this ChannelReader<T> primary,
27+
ChannelReader<T> secondary,
28+
params ChannelReader<T>[] others)
29+
{
30+
if (primary is null) throw new ArgumentNullException(nameof(primary));
31+
if (secondary is null) throw new ArgumentNullException(nameof(secondary));
32+
Contract.EndContractBlock();
33+
34+
// Is this already a merging reader? Then recapture the sources so it flattens the hierarchy.
35+
if (primary is MergingChannelReader<T> mcr)
36+
return mcr.Merge(secondary, others);
37+
38+
if(others is null || others.Length == 0)
39+
return new MergingChannelReader<T>(ImmutableArray.Create(primary, secondary));
40+
41+
var builder = ImmutableArray.CreateBuilder<ChannelReader<T>>(2 + others.Length);
42+
builder.Add(primary);
43+
builder.Add(secondary);
44+
builder.AddRange(others);
45+
return new MergingChannelReader<T>(builder.MoveToImmutable());
46+
}
12847
}

Open.ChannelExtensions/Extensions.Read.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1060,12 +1060,12 @@ public static ValueTask<long> ReadAllAsLines<T>(this Channel<T, string> channel,
10601060
/// <param name="reader">The channel reader to read from.</param>
10611061
/// <param name="initialCapacity">An optional capacity to initialze the list with.</param>
10621062
/// <returns>A list containing all the items from the completed channel.</returns>
1063-
public static async ValueTask<List<T>> ToListAsync<T>(this ChannelReader<T> reader, int initialCapacity = 0)
1063+
public static async ValueTask<List<T>> ToListAsync<T>(this ChannelReader<T> reader, int initialCapacity = -1)
10641064
{
10651065
if (reader is null) throw new ArgumentNullException(nameof(reader));
10661066
Contract.EndContractBlock();
10671067

1068-
List<T> list = initialCapacity == 0 ? new() : new(initialCapacity);
1068+
List<T> list = initialCapacity < 0 ? new() : new(initialCapacity);
10691069
await ReadAll(reader, list.Add).ConfigureAwait(false);
10701070
return list;
10711071
}

0 commit comments

Comments
 (0)