The examples we've seen so far were all very small. Nothing should stop you from using Rx on a huge stream of realtime data, but what good would Rx be if it dumped the whole bulk of the data onto you, and force you handle it like you would otherwise? Here we will explore operators that can filter out irrelevant data, or reduce the data to the single value that you want.
Most of the operators here will be familiar to anyone who has worked with Java's Streams or functional programming in general. All the operators here return a new observable and do not affect the original observable. This principle is present throughout Rx. Transformations of observables create a new observable every time and leave the original unaffected. Subscribers to the original observable should notice no change, but we will see in later chapters that guaranteeing this may require caution from the developer as well.
This is an appropriate time to introduce to concept of marble diagrams. It is a popular way of explaining the operators in Rx, because of their intuitive and graphical nature. They are present a lot in the documentation of RxJava and it only makes sense that we take advantage of their explanatory nature. The format is mostly self-explanatory: time flows left to right, shapes represent values, a slash is an onCompletion, an X is an error. The operator is applied to the top sequence and the result is the sequence below.
filter takes a predicate function that makes a boolean decision for each value emitted. If the decision is false, the item is omitted from the filtered sequence.
public final Observable<T> filter(Func1<? super T,java.lang.Boolean> predicate)We will use filter to create a sequence of numbers and filter out all the even ones, keeping only odd values.
Observable<Integer> values = Observable.range(0,10);
Subscription oddNumbers = values
.filter(v -> v % 2 == 0)
.subscribe(
v -> System.out.println(v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);0
2
4
6
8
Completed
distinct filters out any element that has already appeared in the sequence.
Observable<Integer> values = Observable.create(o -> {
o.onNext(1);
o.onNext(1);
o.onNext(2);
o.onNext(3);
o.onNext(2);
o.onCompleted();
});
Subscription subscription = values
.distinct()
.subscribe(
v -> System.out.println(v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);1
2
3
Completed
An overload of distinct takes a key selector. For each item, the function generates a key and the key is then used to determine distinctiveness.
public final <U> Observable<T> distinct(Func1<? super T,? extends U> keySelector)In this example, we use the first character as a key.
Observable<String> values = Observable.create(o -> {
o.onNext("First");
o.onNext("Second");
o.onNext("Third");
o.onNext("Fourth");
o.onNext("Fifth");
o.onCompleted();
});
Subscription subscription = values
.distinct(v -> v.charAt(0))
.subscribe(
v -> System.out.println(v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);First
Second
Third
Completed
"Fourth" and "Fifth" were filtered out because their first character is 'F' and that has already appeared in "First".
An experienced programmer already knows that this operator maintains a set internally with every unique value that passes through the observable and checks every new value against it. While Rx operators neatly hide these things, you should still be aware that an Rx operator can have a significant cost and consider what you are using it on.
A variant of distinct is distinctUntilChanged. The difference is that only consecutive non-distinct values are filtered out.
public final Observable<T> distinctUntilChanged()
public final <U> Observable<T> distinctUntilChanged(Func1<? super T,? extends U> keySelector)Observable<Integer> values = Observable.create(o -> {
o.onNext(1);
o.onNext(1);
o.onNext(2);
o.onNext(3);
o.onNext(2);
o.onCompleted();
});
Subscription subscription = values
.distinctUntilChanged()
.subscribe(
v -> System.out.println(v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);1
2
3
2
Completed
You can you use a key selector with distinctUntilChanged, as well.
Observable<String> values = Observable.create(o -> {
o.onNext("First");
o.onNext("Second");
o.onNext("Third");
o.onNext("Fourth");
o.onNext("Fifth");
o.onCompleted();
});
Subscription subscription = values
.distinctUntilChanged(v -> v.charAt(0))
.subscribe(
v -> System.out.println(v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);First
Second
Third
Fourth
Completed
ignoreElements will ignore every value, but lets pass through onCompleted and onError.
Observable<Integer> values = Observable.range(0, 10);
Subscription subscription = values
.ignoreElements()
.subscribe(
v -> System.out.println(v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);Completed
ignoreElements() produces the same result as filter(v -> false)
The next group of methods serve to cut the sequence at a specific point based on the item's index, and either take the first part or the second part. take takes the first n elements, while skip skips them. Note that neither function considers it an error if there are fewer items in the sequence than the specified index.
Observable<T> take(int num)Observable<Integer> values = Observable.range(0, 5);
Subscription first2 = values
.take(2)
.subscribe(
v -> System.out.println(v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);0
1
Completed
Users of Java 8 streams should know the take operator as limit. The limit operator exists in Rx too, for symmetry purposes. It is an alias of take, but it lacks the richer overloads that we will soon see.
take completes as soon as the n-th item is available. If an error occurs, the error will be forwarded, but not if it occurs after the cutting point. take doesn't care what happens in the observable after the n-th item.
Observable<Integer> values = Observable.create(o -> {
o.onNext(1);
o.onError(new Exception("Oops"));
});
Subscription subscription = values
.take(1)
.subscribe(
v -> System.out.println(v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);1
Completed
skip returns the other half of a take.
Observable<T> skip(int num)Observable<Integer> values = Observable.range(0, 5);
Subscription subscription = values
.skip(2)
.subscribe(
v -> System.out.println(v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);2
3
4
Completed
There are overloads where the cutoff is a moment in time rather than place in the sequence.
Observable<T> take(long time, java.util.concurrent.TimeUnit unit)
Observable<T> skip(long time, java.util.concurrent.TimeUnit unit)Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);
Subscription subscription = values
.take(250, TimeUnit.MILLISECONDS)
.subscribe(
v -> System.out.println(v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);0
1
Completed
take and skip work with predefined indices. If you want to "discover" the cutoff point as the values come, takeWhile and skipWhile will use a predicate instead. takeWhile takes items while a predicate function returns true
Observable<T> takeWhile(Func1<? super T,java.lang.Boolean> predicate)Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);
Subscription subscription = values
.takeWhile(v -> v < 2)
.subscribe(
v -> System.out.println(v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);0
1
Completed
As you would expect, skipWhile returns the other half of the sequence
Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);
Subscription subscription = values
.skipWhile(v -> v < 2)
.subscribe(
v -> System.out.println(v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);2
3
4
...
skipLast and takeLast work just like take and skip, with the difference that the point of reference is from the end.
Observable<Integer> values = Observable.range(0,5);
Subscription subscription = values
.skipLast(2)
.subscribe(
v -> System.out.println(v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);0
1
2
Completed
By now you should be able to guess how takeLast is related to skipLast. There are overloads for both indices and time.
There are also two methods named takeUntil and skipUntil. takeUntil works exactly like takeWhile except that it takes items while the predictate is false. The same is true of skipUntil.
Along with that, takeUntil and skipUntil each have a very interesting overload. The cutoff point is defined as the moment when another observable emits an item.
public final <E> Observable<T> takeUntil(Observable<? extends E> other)Observable<Long> values = Observable.interval(100,TimeUnit.MILLISECONDS);
Observable<Long> cutoff = Observable.timer(250, TimeUnit.MILLISECONDS);
Subscription subscription = values
.takeUntil(cutoff)
.subscribe(
v -> System.out.println(v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);0
1
Completed
As you may remember, timer here will wait 250ms and emit one event. This signals takeUntil to stop the sequence. Note that the signal can be of any type, since the actual value is not used.
Once again skipUntil works by the same rules and returns the other half of the observable. Values are ignored until the signal comes to start letting values pass through.
Observable<Long> values = Observable.interval(100,TimeUnit.MILLISECONDS);
Observable<Long> cutoff = Observable.timer(250, TimeUnit.MILLISECONDS);
Subscription subscription = values
.skipUntil(cutoff)
.subscribe(
v -> System.out.println(v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);2
3
4
...
| Previous | Next |
|---|---|
| Creating a sequence | Inspection |







