Skip to content

Commit eaf242f

Browse files
Added PipeFilter extensions.
1 parent 00828ef commit eaf242f

6 files changed

Lines changed: 196 additions & 2 deletions

File tree

Open.ChannelExtensions.Tests/BasicTests.cs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -461,4 +461,56 @@ public static async Task Filter(int testSize)
461461
Assert.Equal(count, result.Count);
462462
Assert.True(result.SequenceEqual(range.Where(i => i % 2 == 1)));
463463
}
464+
465+
[Fact]
466+
public static async Task PipeFilterTest()
467+
{
468+
const int each = 5_000;
469+
const int total = 2 * each;
470+
471+
var source = Enumerable.Range(0, total).ToChannel(300);
472+
473+
var evenFilter = source
474+
.PipeFilter(out var unmatched, 100, 10,
475+
static e => e % 2 == 0)
476+
.ToListAsync(each);
477+
478+
var oddFilter = unmatched
479+
.ToListAsync(each);
480+
481+
var even = await evenFilter;
482+
var odd = await oddFilter;
483+
484+
even.Count.Should().Be(each);
485+
odd.Count.Should().Be(each);
486+
even.Should().OnlyContain(e => e % 2 == 0);
487+
odd.Should().OnlyContain(e => e % 2 != 0);
488+
}
489+
490+
[Fact]
491+
public static async Task PipeFilterAsyncTest()
492+
{
493+
const int each = 5_000;
494+
const int total = 2 * each;
495+
496+
var source = Enumerable.Range(0, total).ToChannel(300);
497+
498+
var evenFilter = source
499+
.PipeFilterAsync(out var unmatched, 10, 100, static async e => {
500+
await Task.Yield();
501+
return e % 2 == 0;
502+
})
503+
.ToListAsync(each);
504+
505+
var oddFilter = unmatched
506+
.ToListAsync(each);
507+
508+
var even = await evenFilter;
509+
var odd = await oddFilter;
510+
511+
even.Count.Should().Be(each);
512+
odd.Count.Should().Be(each);
513+
even.Should().OnlyContain(e => e % 2 == 0);
514+
odd.Should().OnlyContain(e => e % 2 != 0);
515+
}
464516
}

Open.ChannelExtensions.Tests/Open.ChannelExtensions.Tests.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
</ItemGroup>
1313

1414
<ItemGroup>
15+
<PackageReference Include="FluentAssertions" Version="6.12.0" />
1516
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.8.0" />
1617
<PackageReference Include="xunit" Version="2.6.6" />
1718
<PackageReference Include="xunit.runner.visualstudio" Version="2.5.6">

Open.ChannelExtensions.Tests/_Global.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
global using System;
1+
global using FluentAssertions;
2+
global using System;
23
global using System.Collections.Generic;
34
global using System.Diagnostics;
45
global using System.Diagnostics.CodeAnalysis;

Open.ChannelExtensions/Extensions.Pipe.cs

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,4 +325,128 @@ public static ChannelReader<TOut> Pipe<TWrite, TRead, TOut>(this Channel<TWrite,
325325

326326
return Pipe(source.Reader, transform, capacity, singleReader, cancellationToken);
327327
}
328+
329+
/// <summary>
330+
/// <para>
331+
/// Reads all entries and filters the values using the <paramref name="predicate"/>
332+
/// function before buffering the results into another channel for consumption.
333+
/// </para>
334+
/// <para>
335+
/// If you do no need the unmatched items,
336+
/// use the <see cref="Filter{T}(ChannelReader{T}, Func{T, bool})"/> extension.
337+
/// </para>
338+
/// </summary>
339+
/// <typeparam name="T">The input type of the channel.</typeparam>
340+
/// <param name="source">The asynchronous source data to use.</param>
341+
/// <param name="unmatched">Channel containing the unmatched items</param>
342+
/// <param name="options">The settings to use for the created channels.</param>
343+
/// <param name="maxConcurrency">The maximum number of concurrent operations. Greater than 1 may likely cause results to be out of order.</param>
344+
/// <param name="predicate">Predicate to test against</param>
345+
/// <param name="cancellationToken">An optional cancellation token.</param>
346+
/// <returns>The <see cref="ChannelReader{T}"/> containing only the items that match the <paramref name="predicate"/>.</returns>
347+
/// <remarks>
348+
/// All items not matching the predicate are written to the <paramref name="unmatched"/> channel.
349+
/// </remarks>
350+
public static ChannelReader<T> PipeFilter<T>(this ChannelReader<T> source,
351+
out ChannelReader<T> unmatched,
352+
ChannelOptions options,
353+
int maxConcurrency,
354+
Func<T, bool> predicate,
355+
CancellationToken cancellationToken = default)
356+
{
357+
var singleWriter = maxConcurrency == 1;
358+
359+
var matchedChannel = CreateChannel<T>(options);
360+
var matchedWriter = matchedChannel.Writer;
361+
362+
var unmatchedChannel = CreateChannel<T>(options);
363+
var unmatchedWriter = unmatchedChannel.Writer;
364+
365+
source
366+
.ReadAllConcurrentlyAsync(maxConcurrency, e =>
367+
{
368+
var writer = predicate(e) ? matchedWriter : unmatchedWriter;
369+
return writer.WriteAsync(e, cancellationToken);
370+
}, cancellationToken)
371+
.ContinueWith(t =>
372+
{
373+
unmatchedWriter.Complete(t.Exception);
374+
matchedWriter.Complete(t.Exception);
375+
},
376+
CancellationToken.None,
377+
TaskContinuationOptions.ExecuteSynchronously,
378+
TaskScheduler.Current);
379+
380+
unmatched = unmatchedChannel.Reader;
381+
return matchedChannel.Reader;
382+
}
383+
384+
/// <inheritdoc cref="PipeFilter{T}(ChannelReader{T}, out ChannelReader{T}, ChannelOptions, int, Func{T, bool}, CancellationToken)"/>
385+
public static ChannelReader<T> PipeFilterAsync<T>(this ChannelReader<T> source,
386+
out ChannelReader<T> unmatched,
387+
ChannelOptions options,
388+
int maxConcurrency,
389+
Func<T, ValueTask<bool>> predicate,
390+
CancellationToken cancellationToken = default)
391+
{
392+
var singleWriter = maxConcurrency == 1;
393+
394+
var matchedChannel = CreateChannel<T>(options);
395+
var matchedWriter = matchedChannel.Writer;
396+
397+
var unmatchedChannel = CreateChannel<T>(options);
398+
var unmatchedWriter = unmatchedChannel.Writer;
399+
400+
source
401+
.ReadAllConcurrentlyAsync(maxConcurrency, async e =>
402+
{
403+
var writer = await predicate(e).ConfigureAwait(false) ? matchedWriter : unmatchedWriter;
404+
await writer.WriteAsync(e, cancellationToken).ConfigureAwait(false);
405+
}, cancellationToken)
406+
.ContinueWith(t =>
407+
{
408+
unmatchedWriter.Complete(t.Exception);
409+
matchedWriter.Complete(t.Exception);
410+
},
411+
CancellationToken.None,
412+
TaskContinuationOptions.ExecuteSynchronously,
413+
TaskScheduler.Current);
414+
415+
unmatched = unmatchedChannel.Reader;
416+
return matchedChannel.Reader;
417+
}
418+
419+
/// <param name="source">The asynchronous source data to use.</param>
420+
/// <param name="unmatched">Channel containing the unmatched items</param>
421+
/// <param name="capacity">
422+
/// <para>The width of the pipe: how many entries to buffer while waiting to be read from.</para>
423+
/// <para>Applies to both the matched (return) and <paramref name="unmatched"/> (out) channels.</para>
424+
/// <para>A value less that 1 will produce unbound channels.</para>
425+
/// </param>
426+
/// <param name="maxConcurrency">The maximum number of concurrent operations. Greater than 1 may likely cause results to be out of order.</param>
427+
/// <param name="predicate">Predicate to test against</param>
428+
/// <param name="cancellationToken">An optional cancellation token.</param>
429+
/// <inheritdoc cref="PipeFilter{T}(ChannelReader{T}, out ChannelReader{T}, ChannelOptions, int, Func{T, bool}, CancellationToken)"/>
430+
public static ChannelReader<T> PipeFilter<T>(this ChannelReader<T> source,
431+
out ChannelReader<T> unmatched,
432+
int capacity,
433+
int maxConcurrency,
434+
Func<T, bool> predicate,
435+
CancellationToken cancellationToken = default)
436+
{
437+
var options = CreateOptions(capacity, false, false, maxConcurrency == 1);
438+
return PipeFilter(source, out unmatched, options, maxConcurrency, predicate, cancellationToken);
439+
}
440+
441+
/// <inheritdoc cref="PipeFilter{T}(ChannelReader{T}, out ChannelReader{T}, int, int, Func{T, bool}, CancellationToken)"/>
442+
public static ChannelReader<T> PipeFilterAsync<T>(this ChannelReader<T> source,
443+
out ChannelReader<T> unmatched,
444+
int capacity,
445+
int maxConcurrency,
446+
Func<T, ValueTask<bool>> predicate,
447+
CancellationToken cancellationToken = default)
448+
{
449+
var options = CreateOptions(capacity, false, false, maxConcurrency == 1);
450+
return PipeFilterAsync(source, out unmatched, options, maxConcurrency, predicate, cancellationToken);
451+
}
328452
}

Open.ChannelExtensions/Extensions._.cs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,22 @@ internal static Channel<T> CreateChannel<T>(ChannelOptions channelOptions)
1818
? Channel.CreateUnbounded<T>(ubco)
1919
: throw new ArgumentException("Unsupported channel option type.", nameof(channelOptions));
2020

21+
internal static ChannelOptions CreateOptions(int capacity = -1, bool singleReader = false, bool syncCont = false, bool singleWriter = true)
22+
=> capacity > 0
23+
? new BoundedChannelOptions(capacity)
24+
{
25+
SingleWriter = singleWriter,
26+
SingleReader = singleReader,
27+
AllowSynchronousContinuations = syncCont,
28+
FullMode = BoundedChannelFullMode.Wait
29+
}
30+
: new UnboundedChannelOptions
31+
{
32+
SingleWriter = singleWriter,
33+
SingleReader = singleReader,
34+
AllowSynchronousContinuations = syncCont
35+
};
36+
2137
internal static Channel<T> CreateChannel<T>(int capacity = -1, bool singleReader = false, bool syncCont = false, bool singleWriter = true)
2238
=> capacity > 0
2339
? Channel.CreateBounded<T>(new BoundedChannelOptions(capacity)

Open.ChannelExtensions/Open.ChannelExtensions.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
<RepositoryType>git</RepositoryType>
2323
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
2424
<GenerateDocumentationFile>true</GenerateDocumentationFile>
25-
<Version>8.2.0</Version>
25+
<Version>8.3.0</Version>
2626
<PackageReleaseNotes>Added .Merge and .PropagateCompletion extensions.</PackageReleaseNotes>
2727
<PackageLicenseExpression>MIT</PackageLicenseExpression>
2828
<PublishRepositoryUrl>true</PublishRepositoryUrl>

0 commit comments

Comments
 (0)