Skip to content

Commit f7dd3f4

Browse files
author
Oren (electricessence)
committed
Added TransformChannel. Similar to a TransformBlock in Dataflow, but the transformation only occurs on read.
1 parent 139c3ec commit f7dd3f4

3 files changed

Lines changed: 41 additions & 12 deletions

File tree

Open.ChannelExtensions/Extensions.Transform.cs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,20 @@ namespace Open.ChannelExtensions
88
{
99
public static partial class Extensions
1010
{
11-
class TransformingChannelReader<TIn, TOut> : ChannelReader<TOut>
11+
class TransformingChannelReader<T, TResult> : ChannelReader<TResult>
1212
{
13-
public TransformingChannelReader(ChannelReader<TIn> source, Func<TIn, TOut> transform)
13+
public TransformingChannelReader(ChannelReader<T> source, Func<T, TResult> transform)
1414
{
1515
_source = source ?? throw new ArgumentNullException(nameof(source));
1616
_transform = transform ?? throw new ArgumentNullException(nameof(transform));
1717
Contract.EndContractBlock();
1818
}
1919

20-
private readonly ChannelReader<TIn> _source;
21-
private readonly Func<TIn, TOut> _transform;
20+
private readonly ChannelReader<T> _source;
21+
private readonly Func<T, TResult> _transform;
2222
public override Task Completion => _source.Completion;
2323

24-
public override bool TryRead(out TOut item)
24+
public override bool TryRead(out TResult item)
2525
{
2626
if (_source.TryRead(out var e))
2727
{
@@ -33,7 +33,7 @@ public override bool TryRead(out TOut item)
3333
return false;
3434
}
3535

36-
public override async ValueTask<TOut> ReadAsync(CancellationToken cancellationToken = default)
36+
public override async ValueTask<TResult> ReadAsync(CancellationToken cancellationToken = default)
3737
=> _transform(await _source.ReadAsync(cancellationToken));
3838

3939
public override ValueTask<bool> WaitToReadAsync(CancellationToken cancellationToken = default)
@@ -43,12 +43,12 @@ public override ValueTask<bool> WaitToReadAsync(CancellationToken cancellationTo
4343
/// <summary>
4444
/// Transforms the
4545
/// </summary>
46-
/// <typeparam name="TIn">The output type of the provided source reader and input type of the transform.</typeparam>
47-
/// <typeparam name="TOut">The output type of the transform.</typeparam>
46+
/// <typeparam name="T">The output type of the provided source reader and input type of the transform.</typeparam>
47+
/// <typeparam name="TResult">The output type of the transform.</typeparam>
4848
/// <param name="source">The source channel reader.</param>
4949
/// <param name="transform">The transform function.</param>
5050
/// <returns>A channel reader representing the tranformed results.</returns>
51-
public static ChannelReader<TOut> Transform<TIn, TOut>(this ChannelReader<TIn> source, Func<TIn, TOut> transform)
52-
=> new TransformingChannelReader<TIn, TOut>(source, transform);
51+
public static ChannelReader<TResult> Transform<T, TResult>(this ChannelReader<T> source, Func<T, TResult> transform)
52+
=> new TransformingChannelReader<T, TResult>(source, transform);
5353
}
5454
}

Open.ChannelExtensions/Open.ChannelExtensions.csproj

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@ Part of the "Open" set of libraries.
1919
<RepositoryType>git</RepositoryType>
2020
<PackageTags>channels dotnet threading tasks extensions</PackageTags>
2121
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
22-
<Version>3.1.0</Version>
22+
<Version>3.1.1</Version>
2323
<PackageLicenseExpression>MIT</PackageLicenseExpression>
24-
<PackageReleaseNotes>Improved overloads and method standards. FxCop inspection pass. Added ChannelReader.ToListAsync().</PackageReleaseNotes>
24+
<PackageReleaseNotes>Added TransformChannel class which facilitates providing a channel with different .</PackageReleaseNotes>
2525
<RepositoryUrl>https://github.com/electricessence/Open.ChannelExtensions</RepositoryUrl>
2626
</PropertyGroup>
2727

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
using System;
2+
using System.Diagnostics.Contracts;
3+
using System.Threading.Channels;
4+
5+
namespace Open.ChannelExtensions
6+
{
7+
/// <summary>
8+
/// A channel wrapper that takes the provided channel and transforms them on demand when being read.
9+
/// </summary>
10+
/// <typeparam name="TWrite">Specifies the type of data that may be written to the channel.</typeparam>
11+
/// <typeparam name="TRead">Specifies the type of data that may be read from the channel.</typeparam>
12+
public class TransformChannel<TWrite, TRead> : Channel<TWrite, TRead>
13+
{
14+
/// <summary>
15+
/// Creates a channel wrapper that takes the provided channel and transforms them on demand when being read.
16+
/// </summary>
17+
/// <param name="source">The channel containing the source data.</param>
18+
/// <param name="transform">The transform function to be applied to the results when being read.</param>
19+
public TransformChannel(Channel<TWrite, TWrite> source, Func<TWrite, TRead> transform)
20+
{
21+
if (source is null) throw new ArgumentNullException(nameof(source));
22+
if (transform is null) throw new ArgumentNullException(nameof(transform));
23+
Contract.EndContractBlock();
24+
25+
Writer = source.Writer;
26+
Reader = source.Reader.Transform(transform);
27+
}
28+
}
29+
}

0 commit comments

Comments
 (0)