Skip to content

Latest commit

 

History

History
342 lines (268 loc) · 9.85 KB

File metadata and controls

342 lines (268 loc) · 9.85 KB

Inspection

In the previous chapter we just saw ways to filter out data that we don't care about. Sometimes what we want is information about the sequence rather than the values themselves. We will now introduce some methods that allow us to reason about a sequence.

all

The all method establishes that every value emitted by an observable meets a criterion. Here's the signature and an example:

public final Observable<java.lang.Boolean> all(Func1<? super T,java.lang.Boolean> predicate)
Observable<Integer> values = Observable.create(o -> {
	o.onNext(0);
	o.onNext(10);
	o.onNext(10);
	o.onNext(2);
	o.onCompleted();
});


Subscription evenNumbers = values
	.all(i -> i % 2 == 0)
	.subscribe(
	    v -> System.out.println(v),
	    e -> System.out.println("Error: " + e),
	    () -> System.out.println("Completed")
	);

Output

true
Completed

An interesting fact about this method is that it returns an observable with a single value, rather than the boolean value directly. This is because it is unknown how long it will take to establish whether the result should be true or false. Even though it completes as soon as it can know, that may take as long the source sequence itself. As soon as an item fails the predicate, false will be emitted. A value of true on the other hand cannot be emitted until the source sequence has completed and all of the items are checked. Returning the decision inside an observable is a convenient way of making the operation non-blocking. We can see all failing as soon as possible in the next example:

Observable<Long> values = Observable.interval(150, TimeUnit.MILLISECONDS).take(5);
		
Subscription subscription = values
	.all(i -> i<3) // Will fail eventually
	.subscribe(
	    v -> System.out.println("All: " + v),
	    e -> System.out.println("All: Error: " + e),
	    () -> System.out.println("All: Completed")
	);
Subscription subscription2 = values
	.subscribe(
	    v -> System.out.println(v),
	    e -> System.out.println("Error: " + e),
	    () -> System.out.println("Completed")
	);

Output

0
1
2
All: false
All: Completed
3
4
Completed

If the source observable emits an error, then all becomes irrelevant and the error passes through, terminating the sequence.

Observable<Integer> values = Observable.create(o -> {
	o.onNext(0);
	o.onNext(2);
	o.onError(new Exception());
});

Subscription subscription = values
	.all(i -> i % 2 == 0)
	.subscribe(
	    v -> System.out.println(v),
	    e -> System.out.println("Error: " + e),
	    () -> System.out.println("Completed")
	);

Output

Error: java.lang.Exception

If, however, the predicate fails, then false is emitted and the sequence terminates. Even if the source observable fails after that, the event is ignored, as required by the Rx contract (no events after a termination event).

Observable<Integer> values = Observable.create(o -> {
	o.onNext(1);
	o.onNext(2);
	o.onError(new Exception());
});

Subscription subscription = values
	.all(i -> i % 2 == 0)
	.subscribe(
	    v -> System.out.println(v),
	    e -> System.out.println("Error: " + e),
	    () -> System.out.println("Completed")
	);

Output

false
Completed

exists

The exists method returns an observable that will emit true if any of the values emitted by the observable make the predicate true.

Observable<Integer> values = Observable.range(0, 2);
		
Subscription subscription = values
	.exists(i -> i > 2)
	.subscribe(
	    v -> System.out.println(v),
	    e -> System.out.println("Error: " + e),
	    () -> System.out.println("Completed")
	);

Output

false
Completed

Here our range didn't go high enough for the i > 2 condition to succeed. If we extend our range in the same example with

Observable<Integer> values = Observable.range(0, 4);

We will get a successful result

true
Completed

isEmpty

This operator's result is a boolean value, indicating if an observable emitted values before completing or not.

Observable<Long> values = Observable.timer(1000, TimeUnit.MILLISECONDS);
		
Subscription subscription = values
	.isEmpty()
	.subscribe(
	    v -> System.out.println(v),
	    e -> System.out.println("Error: " + e),
	    () -> System.out.println("Completed")
	);

Output

false
Completed

Falsehood is established as soon as the first value is emitted. true will be returned once the source observable has terminated.

contains

The method contains establishes if a particular element is emitted by an observable. contains will use the Object.equals method to establish the equality. Just like previous operators, it emits its decision as soon as it can be established and immediately completes.

Observable<Long> values = Observable.interval(100, TimeUnit.MILLISECONDS);
		
Subscription subscription = values
	.contains(4L)
	.subscribe(
	    v -> System.out.println(v),
	    e -> System.out.println("Error: " + e),
	    () -> System.out.println("Completed")
	);

Output

true
Completed

If we had used contains(4) where we used contains(4L), nothing would be printed. That's because 4 and 4L are not equal in Java. Our code would wait for the observable to complete before returning false, but the observable we used is infinite.

defaultIfEmpty

If an empty sequence would cause you problems, rather than checking with isEmpty and handling the case, you can force an observable to emit a value on completion if it didn't emit anything before completing.

Observable<Integer> values = Observable.empty();
		
Subscription subscription = values
	.defaultIfEmpty(2)
	.subscribe(
	    v -> System.out.println(v),
	    e -> System.out.println("Error: " + e),
	    () -> System.out.println("Completed")
	);

Output

2
Completed

The default value is emitted only if no other values appeared and only on successful completion. If the source is not empty, the result is just the source observable. In the case of the error, the default value will not be emitted before the error.

Observable<Integer> values = Observable.error(new Exception());
		
Subscription subscription = values
	.defaultIfEmpty(2)
	.subscribe(
	    v -> System.out.println(v),
	    e -> System.out.println("Error: " + e),
	    () -> System.out.println("Completed")
	);

Output

Error: java.lang.Exception

elementAt

You can select exactly one element out of an observable using the elementAt method

Observable<Integer> values = Observable.range(100, 10);
		
Subscription subscription = values
	.elementAt(2)
	.subscribe(
	    v -> System.out.println(v),
	    e -> System.out.println("Error: " + e),
	    () -> System.out.println("Completed")
	);

Output

102
Completed

If the sequence doesn't have enough items, an java.lang.IndexOutOfBoundsException will be emitted. To avoid that specific case, we can provide a default value that will be emitted instead of an IndexOutOfBoundsException.

Observable<Integer> values = Observable.range(100, 10);
		
Subscription subscription = values
	.elementAtOrDefault(22, 0)
	.subscribe(
	    v -> System.out.println(v),
	    e -> System.out.println("Error: " + e),
	    () -> System.out.println("Completed")
	);

Output

0
Completed

sequenceEqual

The last operator for this chapter establishes that two sequences are equal by comparing the values at the same index. Both the size of the sequences and the values must be equal. The function will either use Object.equals or the function that you supply to compare values.

Observable<String> strings = Observable.just("1", "2", "3");
Observable<Integer> ints = Observable.just(1, 2, 3);

Observable.sequenceEqual(strings, ints, (s,i) -> s.equals(i.toString()))
//Observable.sequenceEqual(strings, ints)
	.subscribe(
	    v -> System.out.println(v),
	    e -> System.out.println("Error: " + e),
	    () -> System.out.println("Completed")
	);

Output

true
Completed

If we swap the operator for the one that is commented out, i.e, the one using the standard Object.equals, the result would be false.

Failing is not part of the comparison. As soon as either sequence fails, the resulting observable forwards the error.

Observable<Integer> values = Observable.create(o -> {
	o.onNext(1);
	o.onNext(2);
	o.onError(new Exception());
});

Observable.sequenceEqual(values, values)
	.subscribe(
	    v -> System.out.println(v),
	    e -> System.out.println("Error: " + e),
	    () -> System.out.println("Completed")
	);

Output

Error: java.lang.Exception

Continue reading

Previous Next
Reducing a sequence Aggregation