Skip to content

Commit 99efa39

Browse files
Added PropagateCompletion helper extensions.
1 parent ec8e492 commit 99efa39

3 files changed

Lines changed: 74 additions & 58 deletions

File tree

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
namespace Open.ChannelExtensions.Tests;
2+
public static class PropagationTests
3+
{
4+
[Fact]
5+
public static async Task PropagateCompleteionTest()
6+
{
7+
var inputChannel = Channel.CreateBounded<int>(100);
8+
var outputChannel = Channel.CreateBounded<int>(100);
9+
10+
inputChannel.PropagateCompletion(outputChannel);
11+
inputChannel.Writer.Complete();
12+
await outputChannel.Reader.Completion;
13+
}
14+
}

Open.ChannelExtensions/Extensions.Pipe.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,15 +34,15 @@ public static async ValueTask<long> PipeTo<T>(this ChannelReader<T> source,
3434
{
3535
if (complete)
3636
{
37-
target.Complete(ex);
37+
target.TryComplete(ex);
3838
complete = false;
3939
}
4040
throw;
4141
}
4242
finally
4343
{
4444
if (complete)
45-
target.Complete();
45+
target.TryComplete();
4646
}
4747
}
4848

Open.ChannelExtensions/Extensions._.cs

Lines changed: 58 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ namespace Open.ChannelExtensions;
66
/// <summary>
77
/// Extensions for operating with System.Threading.Channels.
88
/// </summary>
9+
[SuppressMessage("RCS1047FadeOut",
10+
"RCS1047FadeOut: Async Naming",
11+
Justification = "Some methods use the Async suffix to distinguish between the contents of their parameters.")]
912
public static partial class Extensions
1013
{
1114
internal static Channel<T> CreateChannel<T>(ChannelOptions channelOptions)
@@ -69,6 +72,52 @@ internal static IEnumerable<ValueTask<T>> WrapValueTask<T>(this IEnumerable<Func
6972
yield return new ValueTask<T>(e());
7073
}
7174

75+
/// <summary>
76+
/// Will propagate the completion of the <paramref name="source"/> channel reader to the <paramref name="target"/> channel writer.
77+
/// </summary>
78+
/// <remarks>If the <paramref name="cancellationToken"/> is cancelled, the propagation will not occur.</remarks>
79+
/// <returns>The source reader.</returns>
80+
public static ChannelReader<TSource> PropagateCompletion<TSource, TTarget>(
81+
this ChannelReader<TSource> source, ChannelWriter<TTarget> target, CancellationToken cancellationToken = default)
82+
{
83+
if (source is null) throw new ArgumentNullException(nameof(source));
84+
if (target is null) throw new ArgumentNullException(nameof(target));
85+
Contract.EndContractBlock();
86+
87+
if (cancellationToken.IsCancellationRequested)
88+
return source;
89+
90+
source.Completion.ContinueWith(t => {
91+
if (t.IsFaulted)
92+
target.TryComplete(t.Exception);
93+
else
94+
target.TryComplete();
95+
}, cancellationToken);
96+
97+
return source;
98+
}
99+
100+
/// <inheritdoc cref="PropagateCompletion{TSource, TTarget}(ChannelReader{TSource}, ChannelWriter{TTarget}, CancellationToken)"/>
101+
[ExcludeFromCodeCoverage]
102+
public static ChannelReader<TSource> PropagateCompletion<TSource, TTargetIn, TTargetOut>(
103+
this ChannelReader<TSource> source, Channel<TTargetIn, TTargetOut> target, CancellationToken cancellationToken = default)
104+
=> PropagateCompletion(source, target.Writer, cancellationToken);
105+
106+
/// <inheritdoc cref="PropagateCompletion{TSource, TTarget}(ChannelReader{TSource}, ChannelWriter{TTarget}, CancellationToken)"/>
107+
[ExcludeFromCodeCoverage]
108+
public static Channel<TSourceIn, TSourceOut> PropagateCompletion<TSourceIn, TSourceOut, TTarget>(
109+
this Channel<TSourceIn, TSourceOut> source, ChannelWriter<TTarget> target, CancellationToken cancellationToken = default)
110+
{
111+
_ = source.Reader.PropagateCompletion(target, cancellationToken);
112+
return source;
113+
}
114+
115+
/// <inheritdoc cref="PropagateCompletion{TSource, TTarget}(ChannelReader{TSource}, ChannelWriter{TTarget}, CancellationToken)"/>/>
116+
[ExcludeFromCodeCoverage]
117+
public static Channel<TSourceIn, TSourceOut> PropagateCompletion<TSourceIn, TSourceOut, TTargetIn, TTargetOut>(
118+
this Channel<TSourceIn, TSourceOut> source, Channel<TTargetIn, TTargetOut> target, CancellationToken cancellationToken = default)
119+
=> PropagateCompletion(source, target.Writer, cancellationToken);
120+
72121
/// <summary>
73122
/// Uses <see cref="ChannelWriter{T}.WaitToWriteAsync(CancellationToken)"/> to peek and see if the channel can still be written to.
74123
/// </summary>
@@ -271,13 +320,7 @@ public static ChannelReader<T> ToChannel<T>(this IEnumerable<T> source,
271320
/// <summary>
272321
/// Writes all entries from the source to a channel and calls complete when finished.
273322
/// </summary>
274-
/// <typeparam name="T">The input type of the channel.</typeparam>
275-
/// <param name="source">The source data to use.</param>
276-
/// <param name="capacity">The optional bounded capacity of the channel. Default is unbound.</param>
277-
/// <param name="singleReader">True will cause the resultant reader to optimize for the assumption that no concurrent read operations will occur.</param>
278-
/// <param name="deferredExecution">If true, calls await Task.Yield() before writing to the channel.</param>
279-
/// <param name="cancellationToken">An optional cancellation token.</param>
280-
/// <returns>The channel reader containing the results.</returns>
323+
/// <inheritdoc cref="ToChannelAsync{T}(IEnumerable{ValueTask{T}}, int, bool, int, CancellationToken)"/>
281324
public static ChannelReader<T> ToChannel<T>(this IAsyncEnumerable<T> source,
282325
int capacity = -1, bool singleReader = false,
283326
bool deferredExecution = false,
@@ -288,29 +331,20 @@ public static ChannelReader<T> ToChannel<T>(this IAsyncEnumerable<T> source,
288331
/// <summary>
289332
/// Writes all entries from the source to a channel and calls complete when finished.
290333
/// </summary>
291-
/// <typeparam name="T">The input type of the channel.</typeparam>
292-
/// <param name="source">The source data to use.</param>
293-
/// <param name="channelOptions">The options for configuring the new channel.</param>
294-
/// <param name="deferredExecution">If true, calls await Task.Yield() before writing to the channel.</param>
295-
/// <param name="cancellationToken">An optional cancellation token.</param>
296-
/// <returns>The channel reader containing the results.</returns>
334+
/// <inheritdoc cref="ToChannelAsync{T}(IEnumerable{ValueTask{T}}, ChannelOptions, int, CancellationToken)"/>
297335
public static ChannelReader<T> ToChannel<T>(this IAsyncEnumerable<T> source,
298336
ChannelOptions channelOptions,
299337
bool deferredExecution = false,
300338
CancellationToken cancellationToken = default)
301339
=> CreateChannel<T>(channelOptions)
302340
.Source(source, deferredExecution, cancellationToken);
303341

304-
/// <summary>
305-
/// Writes all entries from the source to a channel and calls complete when finished.
306-
/// </summary>
307-
/// <typeparam name="T">The input type of the channel.</typeparam>
308342
/// <param name="source">The source data to use.</param>
309-
/// <param name="capacity">The optional bounded capacity of the channel. Default is unbound.</param>
343+
/// <param name="capacity">The optional bounded capacity of the channel.</param>
310344
/// <param name="singleReader">True will cause the resultant reader to optimize for the assumption that no concurrent read operations will occur.</param>
311345
/// <param name="cancellationToken">An optional cancellation token.</param>
312346
/// <param name="deferredExecution">If true, calls await Task.Yield() before writing to the channel.</param>
313-
/// <returns>The channel reader containing the results.</returns>
347+
/// <inheritdoc cref="ToChannelAsync{T}(IEnumerable{ValueTask{T}}, int, bool, int, CancellationToken)"/>
314348
public static ChannelReader<T> ToChannel<T>(this IAsyncEnumerable<T> source,
315349
int capacity, bool singleReader,
316350
CancellationToken cancellationToken,
@@ -380,12 +414,7 @@ static async IAsyncEnumerable<TOut> AsAsyncEnumerableCore(Channel<TIn, TOut> cha
380414
/// <summary>
381415
/// Asynchronously executes all entries and writes their results to a channel.
382416
/// </summary>
383-
/// <typeparam name="T">The input type of the channel.</typeparam>
384-
/// <param name="source">The asynchronous source data to use.</param>
385-
/// <param name="channelOptions">The options for configuring the new channel.</param>
386-
/// <param name="maxConcurrency">The maximum number of concurrent operations. Greater than 1 may likely cause results to be out of order.</param>
387-
/// <param name="cancellationToken">An optional cancellation token.</param>
388-
/// <returns>The channel reader containing the results.</returns>
417+
/// <inheritdoc cref="ToChannelAsync{T}(IEnumerable{ValueTask{T}}, ChannelOptions, int, CancellationToken)"/>
389418
public static ChannelReader<T> ToChannelAsync<T>(this IEnumerable<Func<T>> source,
390419
ChannelOptions channelOptions, int maxConcurrency = 1,
391420
CancellationToken cancellationToken = default)
@@ -395,28 +424,18 @@ public static ChannelReader<T> ToChannelAsync<T>(this IEnumerable<Func<T>> sourc
395424
/// <summary>
396425
/// Asynchronously executes all entries and writes their results to a channel.
397426
/// </summary>
398-
/// <typeparam name="T">The input type of the channel.</typeparam>
399-
/// <param name="source">The asynchronous source data to use.</param>
400-
/// <param name="capacity">The optional bounded capacity of the channel. Default is unbound.</param>
401-
/// <param name="singleReader">True will cause the resultant reader to optimize for the assumption that no concurrent read operations will occur.</param>
402-
/// <param name="maxConcurrency">The maximum number of concurrent operations. Greater than 1 may likely cause results to be out of order.</param>
403-
/// <param name="cancellationToken">An optional cancellation token.</param>
404-
/// <returns>The channel reader containing the results.</returns>
427+
/// <inheritdoc cref="ToChannelAsync{T}(IEnumerable{ValueTask{T}}, int, bool, int, CancellationToken)"/>
405428
public static ChannelReader<T> ToChannelAsync<T>(this IEnumerable<Func<T>> source,
406429
int capacity = -1, bool singleReader = false, int maxConcurrency = 1,
407430
CancellationToken cancellationToken = default)
408431
=> CreateChannel<T>(capacity, singleReader)
409432
.SourceAsync(maxConcurrency, source, cancellationToken);
410433

411-
/// <summary>
412-
/// Writes all entries from the source to a channel and calls complete when finished.
413-
/// </summary>
414-
/// <typeparam name="T">The input type of the channel.</typeparam>
415434
/// <param name="source">The asynchronous source data to use.</param>
416435
/// <param name="channelOptions">The options for configuring the new channel.</param>
417436
/// <param name="maxConcurrency">The maximum number of concurrent operations. Greater than 1 may likely cause results to be out of order.</param>
418437
/// <param name="cancellationToken">An optional cancellation token.</param>
419-
/// <returns>The channel reader containing the results.</returns>
438+
/// <inheritdoc cref="ToChannelAsync{T}(IEnumerable{ValueTask{T}}, int, bool, int, CancellationToken)"/>
420439
public static ChannelReader<T> ToChannelAsync<T>(this IEnumerable<ValueTask<T>> source,
421440
ChannelOptions channelOptions, int maxConcurrency = 1,
422441
CancellationToken cancellationToken = default)
@@ -439,31 +458,14 @@ public static ChannelReader<T> ToChannelAsync<T>(this IEnumerable<ValueTask<T>>
439458
=> CreateChannel<T>(capacity, singleReader)
440459
.SourceAsync(maxConcurrency, source, cancellationToken);
441460

442-
/// <summary>
443-
/// Writes all entries from the source to a channel and calls complete when finished.
444-
/// </summary>
445-
/// <typeparam name="T">The input type of the channel.</typeparam>
446-
/// <param name="source">The asynchronous source data to use.</param>
447-
/// <param name="channelOptions">The options for configuring the new channel.</param>
448-
/// <param name="maxConcurrency">The maximum number of concurrent operations. Greater than 1 may likely cause results to be out of order.</param>
449-
/// <param name="cancellationToken">An optional cancellation token.</param>
450-
/// <returns>The channel reader containing the results.</returns>
461+
/// <inheritdoc cref="ToChannelAsync{T}(IEnumerable{ValueTask{T}}, ChannelOptions, int, CancellationToken)"/>
451462
public static ChannelReader<T> ToChannelAsync<T>(this IEnumerable<Task<T>> source,
452463
ChannelOptions channelOptions, int maxConcurrency = 1,
453464
CancellationToken cancellationToken = default)
454465
=> CreateChannel<T>(channelOptions)
455466
.SourceAsync(maxConcurrency, source, cancellationToken);
456467

457-
/// <summary>
458-
/// Writes all entries from the source to a channel and calls complete when finished.
459-
/// </summary>
460-
/// <typeparam name="T">The input type of the channel.</typeparam>
461-
/// <param name="source">The asynchronous source data to use.</param>
462-
/// <param name="capacity">The optional bounded capacity of the channel. Default is unbound.</param>
463-
/// <param name="singleReader">True will cause the resultant reader to optimize for the assumption that no concurrent read operations will occur.</param>
464-
/// <param name="maxConcurrency">The maximum number of concurrent operations. Greater than 1 may likely cause results to be out of order.</param>
465-
/// <param name="cancellationToken">An optional cancellation token.</param>
466-
/// <returns>The channel reader containing the results.</returns>
468+
/// <inheritdoc cref="ToChannelAsync{T}(IEnumerable{ValueTask{T}}, int, bool, int, CancellationToken)"/>
467469
public static ChannelReader<T> ToChannelAsync<T>(this IEnumerable<Task<T>> source,
468470
int capacity = -1, bool singleReader = false, int maxConcurrency = 1,
469471
CancellationToken cancellationToken = default)

0 commit comments

Comments
 (0)