Skip to content

Commit e54bc53

Browse files
Merge pull request #39 from Open-NET-Libraries/PipeFilter
Pipe filter
2 parents cb511ee + 1b7c546 commit e54bc53

6 files changed

Lines changed: 228 additions & 2 deletions

File tree

Open.ChannelExtensions.Tests/BasicTests.cs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -461,4 +461,56 @@ public static async Task Filter(int testSize)
461461
Assert.Equal(count, result.Count);
462462
Assert.True(result.SequenceEqual(range.Where(i => i % 2 == 1)));
463463
}
464+
465+
[Fact]
466+
public static async Task PipeFilterTest()
467+
{
468+
const int each = 5_000;
469+
const int total = 2 * each;
470+
471+
var source = Enumerable.Range(0, total).ToChannel(300);
472+
473+
var evenFilter = source
474+
.PipeFilter(out var unmatched, 100, 10,
475+
static e => e % 2 == 0)
476+
.ToListAsync(each);
477+
478+
var oddFilter = unmatched
479+
.ToListAsync(each);
480+
481+
var even = await evenFilter;
482+
var odd = await oddFilter;
483+
484+
even.Count.Should().Be(each);
485+
odd.Count.Should().Be(each);
486+
even.Should().OnlyContain(e => e % 2 == 0);
487+
odd.Should().OnlyContain(e => e % 2 != 0);
488+
}
489+
490+
[Fact]
491+
public static async Task PipeFilterAsyncTest()
492+
{
493+
const int each = 5_000;
494+
const int total = 2 * each;
495+
496+
var source = Enumerable.Range(0, total).ToChannel(300);
497+
498+
var evenFilter = source
499+
.PipeFilterAsync(out var unmatched, 10, 100, static async e => {
500+
await Task.Yield();
501+
return e % 2 == 0;
502+
})
503+
.ToListAsync(each);
504+
505+
var oddFilter = unmatched
506+
.ToListAsync(each);
507+
508+
var even = await evenFilter;
509+
var odd = await oddFilter;
510+
511+
even.Count.Should().Be(each);
512+
odd.Count.Should().Be(each);
513+
even.Should().OnlyContain(e => e % 2 == 0);
514+
odd.Should().OnlyContain(e => e % 2 != 0);
515+
}
464516
}

Open.ChannelExtensions.Tests/Open.ChannelExtensions.Tests.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
</ItemGroup>
1313

1414
<ItemGroup>
15+
<PackageReference Include="FluentAssertions" Version="6.12.0" />
1516
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.8.0" />
1617
<PackageReference Include="xunit" Version="2.6.6" />
1718
<PackageReference Include="xunit.runner.visualstudio" Version="2.5.6">

Open.ChannelExtensions.Tests/_Global.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
global using System;
1+
global using FluentAssertions;
2+
global using System;
23
global using System.Collections.Generic;
34
global using System.Diagnostics;
45
global using System.Diagnostics.CodeAnalysis;

Open.ChannelExtensions/Extensions.Pipe.cs

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,4 +325,160 @@ public static ChannelReader<TOut> Pipe<TWrite, TRead, TOut>(this Channel<TWrite,
325325

326326
return Pipe(source.Reader, transform, capacity, singleReader, cancellationToken);
327327
}
328+
329+
/// <summary>
330+
/// <para>
331+
/// Reads all entries and filters the values using the <paramref name="predicate"/>
332+
/// function before buffering the results into another channel for consumption.
333+
/// </para>
334+
/// <para>
335+
/// If you do not need the unmatched items,
336+
/// use the <see cref="Filter{T}(ChannelReader{T}, Func{T, bool})"/> extension.
337+
/// </para>
338+
/// </summary>
339+
/// <typeparam name="T">The input type of the channel.</typeparam>
340+
/// <param name="source">The asynchronous source data to use.</param>
341+
/// <param name="unmatched">Channel containing the unmatched items</param>
342+
/// <param name="options">The settings to use for the created channels.</param>
343+
/// <param name="maxConcurrency">The maximum number of concurrent operations. Greater than 1 may likely cause results to be out of order.</param>
344+
/// <param name="predicate">Predicate to test against</param>
345+
/// <param name="cancellationToken">An optional cancellation token.</param>
346+
/// <returns>The <see cref="ChannelReader{T}"/> containing only the items that match the <paramref name="predicate"/>.</returns>
347+
/// <remarks>
348+
/// All items not matching the <paramref name="predicate"/> are written to the <paramref name="unmatched"/> channel.
349+
/// </remarks>
350+
public static ChannelReader<T> PipeFilter<T>(this ChannelReader<T> source,
351+
out ChannelReader<T> unmatched,
352+
ChannelOptions options,
353+
int maxConcurrency,
354+
Func<T, bool> predicate,
355+
CancellationToken cancellationToken = default)
356+
{
357+
var singleWriter = maxConcurrency == 1;
358+
359+
var matchedChannel = CreateChannel<T>(options);
360+
var matchedWriter = matchedChannel.Writer;
361+
362+
var unmatchedChannel = CreateChannel<T>(options);
363+
var unmatchedWriter = unmatchedChannel.Writer;
364+
365+
source
366+
.ReadAllConcurrentlyAsync(maxConcurrency, e =>
367+
{
368+
var writer = predicate(e) ? matchedWriter : unmatchedWriter;
369+
return writer.WriteAsync(e, cancellationToken);
370+
}, cancellationToken)
371+
.ContinueWith(t =>
372+
{
373+
unmatchedWriter.Complete(t.Exception);
374+
matchedWriter.Complete(t.Exception);
375+
},
376+
CancellationToken.None,
377+
TaskContinuationOptions.ExecuteSynchronously,
378+
TaskScheduler.Current);
379+
380+
unmatched = unmatchedChannel.Reader;
381+
return matchedChannel.Reader;
382+
}
383+
384+
/// <inheritdoc cref="PipeFilter{T}(ChannelReader{T}, out ChannelReader{T}, ChannelOptions, int, Func{T, bool}, CancellationToken)"/>
385+
public static ChannelReader<T> PipeFilter<T>(this ChannelReader<T> source,
386+
out ChannelReader<T> unmatched,
387+
ChannelOptions options,
388+
Func<T, bool> predicate,
389+
CancellationToken cancellationToken = default)
390+
=> PipeFilter(source, out unmatched, options, 1, predicate, cancellationToken);
391+
392+
/// <inheritdoc cref="PipeFilter{T}(ChannelReader{T}, out ChannelReader{T}, ChannelOptions, int, Func{T, bool}, CancellationToken)"/>
393+
public static ChannelReader<T> PipeFilterAsync<T>(this ChannelReader<T> source,
394+
out ChannelReader<T> unmatched,
395+
ChannelOptions options,
396+
int maxConcurrency,
397+
Func<T, ValueTask<bool>> predicate,
398+
CancellationToken cancellationToken = default)
399+
{
400+
var singleWriter = maxConcurrency == 1;
401+
402+
var matchedChannel = CreateChannel<T>(options);
403+
var matchedWriter = matchedChannel.Writer;
404+
405+
var unmatchedChannel = CreateChannel<T>(options);
406+
var unmatchedWriter = unmatchedChannel.Writer;
407+
408+
source
409+
.ReadAllConcurrentlyAsync(maxConcurrency, async e =>
410+
{
411+
var writer = await predicate(e).ConfigureAwait(false) ? matchedWriter : unmatchedWriter;
412+
await writer.WriteAsync(e, cancellationToken).ConfigureAwait(false);
413+
}, cancellationToken)
414+
.ContinueWith(t =>
415+
{
416+
unmatchedWriter.Complete(t.Exception);
417+
matchedWriter.Complete(t.Exception);
418+
},
419+
CancellationToken.None,
420+
TaskContinuationOptions.ExecuteSynchronously,
421+
TaskScheduler.Current);
422+
423+
unmatched = unmatchedChannel.Reader;
424+
return matchedChannel.Reader;
425+
}
426+
427+
/// <inheritdoc cref="PipeFilterAsync{T}(ChannelReader{T}, out ChannelReader{T}, ChannelOptions, int, Func{T, ValueTask{bool}}, CancellationToken)"/>
428+
public static ChannelReader<T> PipeFilterAsync<T>(this ChannelReader<T> source,
429+
out ChannelReader<T> unmatched,
430+
ChannelOptions options,
431+
Func<T, ValueTask<bool>> predicate,
432+
CancellationToken cancellationToken = default)
433+
=> PipeFilterAsync(source, out unmatched, options, 1, predicate, cancellationToken);
434+
435+
/// <param name="source">The asynchronous source data to use.</param>
436+
/// <param name="unmatched">Channel containing the unmatched items</param>
437+
/// <param name="capacity">
438+
/// <para>The width of the pipe: how many entries to buffer while waiting to be read from.</para>
439+
/// <para>Applies to both the matched (return) and <paramref name="unmatched"/> (out) channels.</para>
440+
/// <para>A value less that 1 will produce unbound channels.</para>
441+
/// </param>
442+
/// <param name="maxConcurrency">The maximum number of concurrent operations. Greater than 1 may likely cause results to be out of order.</param>
443+
/// <param name="predicate">Predicate to test against</param>
444+
/// <param name="cancellationToken">An optional cancellation token.</param>
445+
/// <inheritdoc cref="PipeFilter{T}(ChannelReader{T}, out ChannelReader{T}, ChannelOptions, int, Func{T, bool}, CancellationToken)"/>
446+
public static ChannelReader<T> PipeFilter<T>(this ChannelReader<T> source,
447+
out ChannelReader<T> unmatched,
448+
int capacity,
449+
int maxConcurrency,
450+
Func<T, bool> predicate,
451+
CancellationToken cancellationToken = default)
452+
{
453+
var options = CreateOptions(capacity, false, false, maxConcurrency == 1);
454+
return PipeFilter(source, out unmatched, options, maxConcurrency, predicate, cancellationToken);
455+
}
456+
457+
/// <inheritdoc cref="PipeFilter{T}(ChannelReader{T}, out ChannelReader{T}, int, int, Func{T, bool}, CancellationToken)"/>
458+
public static ChannelReader<T> PipeFilter<T>(this ChannelReader<T> source,
459+
out ChannelReader<T> unmatched,
460+
int capacity,
461+
Func<T, bool> predicate,
462+
CancellationToken cancellationToken = default)
463+
=> PipeFilter(source, out unmatched, capacity, 1, predicate, cancellationToken);
464+
465+
/// <inheritdoc cref="PipeFilter{T}(ChannelReader{T}, out ChannelReader{T}, int, int, Func{T, bool}, CancellationToken)"/>
466+
public static ChannelReader<T> PipeFilterAsync<T>(this ChannelReader<T> source,
467+
out ChannelReader<T> unmatched,
468+
int capacity,
469+
int maxConcurrency,
470+
Func<T, ValueTask<bool>> predicate,
471+
CancellationToken cancellationToken = default)
472+
{
473+
var options = CreateOptions(capacity, false, false, maxConcurrency == 1);
474+
return PipeFilterAsync(source, out unmatched, options, maxConcurrency, predicate, cancellationToken);
475+
}
476+
477+
/// <inheritdoc cref="PipeFilter{T}(ChannelReader{T}, out ChannelReader{T}, int, int, Func{T, bool}, CancellationToken)"/>
478+
public static ChannelReader<T> PipeFilterAsync<T>(this ChannelReader<T> source,
479+
out ChannelReader<T> unmatched,
480+
int capacity,
481+
Func<T, ValueTask<bool>> predicate,
482+
CancellationToken cancellationToken = default)
483+
=> PipeFilterAsync(source, out unmatched, capacity, 1, predicate, cancellationToken);
328484
}

Open.ChannelExtensions/Extensions._.cs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,22 @@ internal static Channel<T> CreateChannel<T>(ChannelOptions channelOptions)
1818
? Channel.CreateUnbounded<T>(ubco)
1919
: throw new ArgumentException("Unsupported channel option type.", nameof(channelOptions));
2020

21+
internal static ChannelOptions CreateOptions(int capacity = -1, bool singleReader = false, bool syncCont = false, bool singleWriter = true)
22+
=> capacity > 0
23+
? new BoundedChannelOptions(capacity)
24+
{
25+
SingleWriter = singleWriter,
26+
SingleReader = singleReader,
27+
AllowSynchronousContinuations = syncCont,
28+
FullMode = BoundedChannelFullMode.Wait
29+
}
30+
: new UnboundedChannelOptions
31+
{
32+
SingleWriter = singleWriter,
33+
SingleReader = singleReader,
34+
AllowSynchronousContinuations = syncCont
35+
};
36+
2137
internal static Channel<T> CreateChannel<T>(int capacity = -1, bool singleReader = false, bool syncCont = false, bool singleWriter = true)
2238
=> capacity > 0
2339
? Channel.CreateBounded<T>(new BoundedChannelOptions(capacity)

Open.ChannelExtensions/Open.ChannelExtensions.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
<RepositoryType>git</RepositoryType>
2323
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
2424
<GenerateDocumentationFile>true</GenerateDocumentationFile>
25-
<Version>8.2.0</Version>
25+
<Version>8.3.0</Version>
2626
<PackageReleaseNotes>Added .Merge and .PropagateCompletion extensions.</PackageReleaseNotes>
2727
<PackageLicenseExpression>MIT</PackageLicenseExpression>
2828
<PublishRepositoryUrl>true</PublishRepositoryUrl>

0 commit comments

Comments
 (0)