Skip to content

Commit a2dd674

Browse files
authored
Find operator
1 parent f3c50c1 commit a2dd674

5 files changed

Lines changed: 87 additions & 0 deletions

File tree

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -450,6 +450,7 @@ How to use the [assert API](doc/assert.md) to write unit tests while using RxGo.
450450
* [Distinct](doc/distinct.md)/[DistinctUntilChanged](doc/distinctuntilchanged.md) — suppress duplicate items emitted by an Observable
451451
* [ElementAt](doc/elementat.md) — emit only item n emitted by an Observable
452452
* [Filter](doc/filter.md) — emit only those items from an Observable that pass a predicate test
453+
* [Find](doc/find.md) — emit the first item passing a predicate then complete
453454
* [First](doc/first.md)/[FirstOrDefault](doc/firstordefault.md) — emit only the first item or the first item that meets a condition, from an Observable
454455
* [IgnoreElements](doc/ignoreelements.md) — do not emit any items from an Observable but mirror its termination notification
455456
* [Last](doc/last.md)/[LastOrDefault](doc/lastordefault.md) — emit only the last item emitted by an Observable

doc/find.md

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# Find Operator
2+
3+
## Overview
4+
5+
Emit the first item passing a predicate then complete.
6+
7+
## Example
8+
9+
```go
10+
observable := rxgo.Just(1, 2, 3)().Find(func(i interface{}) bool {
11+
return i == 2
12+
})
13+
```
14+
15+
Output:
16+
17+
```
18+
2
19+
```
20+
21+
## Options
22+
23+
* [WithBufferedChannel](options.md#withbufferedchannel)
24+
25+
* [WithContext](options.md#withcontext)
26+
27+
* [WithPool](options.md#withpool)
28+
29+
* [WithCPUPool](options.md#withcpupool)
30+
31+
* [WithObservationStrategy](options.md#withobservationstrategy)
32+
33+
* [WithErrorStrategy](options.md#witherrorstrategy)
34+
35+
* [WithPublishStrategy](options.md#withpublishstrategy)

observable.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ type Observable interface {
4040
Error(opts ...Option) error
4141
Errors(opts ...Option) []error
4242
Filter(apply Predicate, opts ...Option) Observable
43+
Find(find Predicate, opts ...Option) OptionalSingle
4344
First(opts ...Option) OptionalSingle
4445
FirstOrDefault(defaultValue interface{}, opts ...Option) Single
4546
FlatMap(apply ItemToObservable, opts ...Option) Observable

observable_operator.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1020,6 +1020,36 @@ func (op *filterOperator) end(_ context.Context, _ chan<- Item) {
10201020
func (op *filterOperator) gatherNext(_ context.Context, _ Item, _ chan<- Item, _ operatorOptions) {
10211021
}
10221022

1023+
// Find emits the first item passing a predicate then complete.
1024+
func (o *ObservableImpl) Find(find Predicate, opts ...Option) OptionalSingle {
1025+
return optionalSingle(o, func() operator {
1026+
return &findOperator{
1027+
find: find,
1028+
}
1029+
}, true, true, opts...)
1030+
}
1031+
1032+
type findOperator struct {
1033+
find Predicate
1034+
}
1035+
1036+
func (op *findOperator) next(ctx context.Context, item Item, dst chan<- Item, operatorOptions operatorOptions) {
1037+
if op.find(item.V) {
1038+
item.SendContext(ctx, dst)
1039+
operatorOptions.stop()
1040+
}
1041+
}
1042+
1043+
func (op *findOperator) err(ctx context.Context, item Item, dst chan<- Item, operatorOptions operatorOptions) {
1044+
defaultErrorFuncOperator(ctx, item, dst, operatorOptions)
1045+
}
1046+
1047+
func (op *findOperator) end(_ context.Context, _ chan<- Item) {
1048+
}
1049+
1050+
func (op *findOperator) gatherNext(_ context.Context, _ Item, _ chan<- Item, _ operatorOptions) {
1051+
}
1052+
10231053
// First returns new Observable which emit only first item.
10241054
// Cannot be run in parallel.
10251055
func (o *ObservableImpl) First(opts ...Option) OptionalSingle {

observable_operator_test.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -676,6 +676,26 @@ func Test_Observable_Filter_Parallel(t *testing.T) {
676676
Assert(ctx, t, obs, HasItemsNoOrder(2, 4), HasNoError())
677677
}
678678

679+
func Test_Observable_Find_NotEmpty(t *testing.T) {
680+
defer goleak.VerifyNone(t)
681+
ctx, cancel := context.WithCancel(context.Background())
682+
defer cancel()
683+
obs := testObservable(ctx, 1, 2, 3).Find(func(i interface{}) bool {
684+
return i == 2
685+
})
686+
Assert(ctx, t, obs, HasItem(2))
687+
}
688+
689+
func Test_Observable_Find_Empty(t *testing.T) {
690+
defer goleak.VerifyNone(t)
691+
ctx, cancel := context.WithCancel(context.Background())
692+
defer cancel()
693+
obs := Empty().Find(func(_ interface{}) bool {
694+
return true
695+
})
696+
Assert(ctx, t, obs, IsEmpty())
697+
}
698+
679699
func Test_Observable_First_NotEmpty(t *testing.T) {
680700
defer goleak.VerifyNone(t)
681701
ctx, cancel := context.WithCancel(context.Background())

0 commit comments

Comments
 (0)