RxJava offers a very large operator set. Counting all the overloads, the number of operators on Rx is over 300. A smaller number of those is essential, meaning that you cannot achieve an Rx implementation without them. Many are there just for convenience and a self-descriptive name. For example, if source.First(user -> user.isOnline()) didn't exist, we would still be able to do source.filter(user -> user.isOnline()).First().
Despite many convenience operators, the operator set of RxJava is still very basic. Rx offers the building blocks that you can combine into anything, but eventually you will want to define reuseable code for repeated cases. In standard Java, this would be done with custom classes and methods. In Rx, you would like the ability to design custom operators. For example, calculating a running average from a sequence of numbers may be very common in your financial application. That doesn't already exist in Observable, but you can make it yourself:
class AverageAcc {
public final int sum;
public final int count;
public AverageAcc(int sum, int count) {
this.sum = sum;
this.count = count;
}
}source
.scan(
new AverageAcc(0,0),
(acc, v) -> new AverageAcc(acc.sum + v, acc.count + 1))
.filter(acc -> acc.count > 0)
.map(acc -> acc.sum/(double)acc.count);That does it, but it's not reusable. In a real financial application, you will probably want to do the same kind of processing over many different sequences. Even if you don't, it would still be nice to hide all this code behind a single phrase: "running average". Understandably, a Java developer's first instinct would be to make a function out of it:
public static Observable<Double> runningAverage(Observable<Integer> source) {
return source
.scan(
new AverageAcc(0,0),
(acc, v) -> new AverageAcc(acc.sum + v, acc.count + 1))
.filter(acc -> acc.count > 0)
.map(acc -> acc.sum/(double)acc.count);
}And you can easily use it:
runningAverage(Observable.just(3, 5, 6, 4, 4))
.subscribe(System.out::println);Output
3.0
4.0
4.666666666666667
4.5
4.4
The above example looks fine because it's simple. Let's do something a little more complicated with our custom operator. Let's take a phrase, turn it into a sequence of word lengths and calculate the running average for that.
runningAverage(
Observable.just("The brown fox jumped and I forget the rest")
.flatMap(phrase -> Observable.from(phrase.split(" ")))
.map(word -> word.length()))
.subscribe(System.out::println);Once again, this works, but it doesn't look 100% Rx. Imagine if everything in Rx was done like the method which we designed (including all the existing operators).
subscribe(
lastOperator(
middleOperator(
firstOperator(source))))We're reading our pipeline in reverse! Yikes!
Rx has a particular style for applying operators, by chaining them, rather than nesting them. This style is not uncommon for objects whose methods return instances of the same type. This makes even more sense for immutable objects and can be found even in standard Java features, such as strings:
String s = new String("Hi").toLowerCase().replace('a', 'c');
This style allows you to see modifications in the order that they are applied, and it also looks neater when a lot of operators are used.
Ideally, you would want your Rx operators to fit into the chain just like any other operator:
Observable.range(0,10)
.map(i -> i*2)
.myOperator()
.subscribe();Many languages have ways of supporting this. Inconveniently, Java doesn't. You'd have to edit Observable itself to add your own methods. There's no point asking the RxJava team to add your idea to the operator set, since there are so many already and the RxJava team are conservative about adding yet another operator. You could extend Observable and add your own operators there. In that case, you'd no longer be able to share and combine libraries of operators.
There is a way of fitting a custom operator into the chain, with the compose method.
public <R> Observable<R> compose(Observable.Transformer<? super T,? extends R> transformer)Aha! A Transformer interface! Transformer<T,R> actually just an alias for the Func1<Observable<T>,Observable<R>> interface. It is a method that takes an Observable<T> and returns an Observable<R>, just like the one we made for calculating a running average.
Observable.just(3, 5, 6, 4, 4)
.compose(Main::runningAverage)
.subscribe(System.out::println);Java doesn't let you reference a function by its name alone, so here we assumed the custom operator is in our Main class. We can see that now our operator fits perfecty into the chain, albeit with the boilerplate of calling compose first. For even better encapsulation, you should implement Observable.Transformer in a new class and move the whole implementation out of sight, along with its helper class(es).
public class RunningAverage implements Observable.Transformer<Integer, Double> {
private static class AverageAcc {
public final int sum;
public final int count;
public AverageAcc(int sum, int count) {
this.sum = sum;
this.count = count;
}
}
@Override
public Observable<Double> call(Observable<Integer> source) {
return source
.scan(
new AverageAcc(0,0),
(acc, v) -> new AverageAcc(acc.sum + v, acc.count + 1))
.filter(acc -> acc.count > 0)
.map(acc -> acc.sum/(double)acc.count);
}
}And we use it like this
source.compose(new RunningAverage())Most Rx operators are parameterisable. We can do this too. Let's extend the functionality of our operator with the possiblity to ignore values above a certain threshold.
public class RunningAverage implements Observable.Transformer<Integer, Double> {
private static class AverageAcc {
public final int sum;
public final int count;
public AverageAcc(int sum, int count) {
this.sum = sum;
this.count = count;
}
}
final int threshold;
public RunningAverage() {
this.threshold = Integer.MAX_VALUE;
}
public RunningAverage(int threshold) {
this.threshold = threshold;
}
@Override
public Observable<Double> call(Observable<Integer> source) {
return source
.filter(i -> i< this.threshold)
.scan(
new AverageAcc(0,0),
(acc, v) -> new AverageAcc(acc.sum + v, acc.count + 1))
.filter(acc -> acc.count > 0)
.map(acc -> acc.sum/(double)acc.count);
}
}We just added the parameter as a field in the operator, added constructors for the uses that we cover and used the parameter in our Rx operations. Now we can do source.compose(new RunningAverage(5)), where, ideally, we would be calling source.runningAverage(5). Java only lets us go this far. Rx is a functional paradigm, but Java is still primarily an object oriented language and quite conservative at that.
You can a complete example of for this example operator here.
Internally, every Rx operator does 3 things
- It subscribes to the source and observes the values.
- It transforms the observed sequence according to the operator's purpose.
- It pushes the modified sequence to its own subscribers, by calling
onNext,onErrorandonCompleted.
The compose operator works with a method that makes an observable out of another. In doing so, it spares you the trouble of doing the 3 steps above manually: the intermediate subscribing and pushing is implicit within an Rx chain. That presumes that you can do the transformation by using existing operators. If the operators don't already exist, you need to do the processing in the traditional Java OOP way. This means extracting the values from the pipeline and re-pushing when processed. An Observable.Transformer that does this would include an explicit subscription to the source Observable and/or the explicit creation of a new Observable to be returned.
You'll find that this is often just boilerplate, and that you can avoid some of it by going to a lower level. The lift operator is similar to compose, with the difference of transforming a Subscriber, instead of an Observable.
public final <R> Observable<R> lift(Observable.Operator<? extends R,? super T> lift)And Observable.Operator<R,T> is an alias for Func1<Subscriber<? super R>,Subscriber<? super T>>: a function that will transform a Subscriber<R> into Subscriber<T>. By dealing directly with Subscriber we avoid involving Observable. The boilerplate of subscribing to and creating Observable types will be handled by lift.
If you studied the signature, there's something which seems backwards at first: to turn an Observable<T> into Observable<R>, we need a function that turns Subscriber<R> into Subscriber<T>. To understand why that is the case, remember that a subscription begins at the end of the chain and is propagated to the source. In other words, a subscription goes backwards through the chain of operators. Each operator receives a subscription (i.e. is subscribed to) and uses that subscription to create a subscription to the preceeding operator.
In the next example, we will reimplement map, without using the existing implementation or any other existing operator.
class MyMap<T,R> implements Observable.Operator<R, T> {
private Func1<T,R> transformer;
public MyMap(Func1<T,R> transformer) {
this.transformer = transformer;
}
@Override
public Subscriber<? super T> call(Subscriber<? super R> subscriber) {
return new Subscriber<T>() {
@Override
public void onCompleted() {
if (!subscriber.isUnsubscribed())
subscriber.onCompleted();
}
@Override
public void onError(Throwable e) {
if (!subscriber.isUnsubscribed())
subscriber.onError(e);
}
@Override
public void onNext(T t) {
if (!subscriber.isUnsubscribed())
subscriber.onNext(transformer.call(t));
}
};
}
}A map operator requires a function that transforms items from T to R. In our implementation, that's the transformer field. The key part is the call method. We receive a Subscriber<R> that wants to receive items of type R. For that subscriber we create a new Subscriber<T> that takes items of type T, transforms them to type R and pushes them to the Subscriber<R>. lift handles the boilerplate of receiving the Subscriber<R>, as well as using the created Subscriber<T> to subscribe on the source observable.
Using an Observable.Operator is as simple as using Observable.Transformer:
Observable.range(0, 5)
.lift(new MyMap<Integer, String>(i -> i + "!"))
.subscribe(System.out::println);0!
1!
2!
3!
4!
A class constructor in Java can't have its type parameters infered. A logical last step would be to make a method that can infer the types for us
public static <T,R> MyMap<T,R> create(Func1<T,R> transformer) {
return new MyMap<T,R>(transformer);
}And use like this
Observable.range(0, 5)
.lift(MyMap.create(i -> i + "!"))
.subscribe(System.out::println);When doing manual pushes to subscribers, like we do when implementing Observable.Operator, there are a few things to consider
- A subscriber is free to unsubscribe. Don't push without checking first:
!subscriber.isUnsubscribed(). - You are responsible for complying with the Rx contract: any number of
onNextnotifications, optionally followed by a singleonCompletedoronError. - If you need to perform asynchronous operations and scheduling, use the Schedulers of Rx. That will allow your operator to become testable.
If you can't guarantee that your operator will obey the Rx contract, for example because you push asynchronously from multiple sources, you can use the serialize operator. The serialize operator will turn an unreliable observable into a lawful, sequential observable.
Let's first create an observable that breaks the contract and subscribe to it.
Observable<Integer> source = Observable.create(o -> {
o.onNext(1);
o.onNext(2);
o.onCompleted();
o.onNext(3);
o.onCompleted();
});
source.doOnUnsubscribe(() -> System.out.println("Unsubscribed"))
.subscribe(
System.out::println,
System.out::println,
() -> System.out.println("Completed"));1
2
Completed
Unsubscribed
Despite what our observable tried to emit, the end result obeyed the Rx contract. That happened because subscribe terminated the subscription when it (very reasonably) thought that the sequence ended. This doesn't mean that the problem will always be taken care for us. There is also a method called unsafeSubscribe, which won't unsubscribe automatically.
Observable<Integer> source = Observable.create(o -> {
o.onNext(1);
o.onNext(2);
o.onCompleted();
o.onNext(3);
o.onCompleted();
});
source.doOnUnsubscribe(() -> System.out.println("Unsubscribed"))
.unsafeSubscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("Completed");
}
@Override
public void onError(Throwable e) {
System.out.println(e);
}
@Override
public void onNext(Integer t) {
System.out.println(t);
}
});1
2
Completed
3
Completed
Our subscriber's intended behaviour was identical to the previous example (we created an instance of Subscriber because unsafeSubscribe doesn't have overloads that take lambdas). However, we can see here we weren't unsubscribed and we kept receiving notifications.
unsafeSubscribe is unsafe in other regards as well, such as error handling. It's usefulness is limited. The documentation says that it should only be used for custom operators that use nested subscriptions. To protect such operators from receiving and illegal sequence, we can apply the serialize operator
Observable<Integer> source = Observable.create(o -> {
o.onNext(1);
o.onNext(2);
o.onCompleted();
o.onNext(3);
o.onCompleted();
})
.cast(Integer.class)
.serialize();;
source.doOnUnsubscribe(() -> System.out.println("Unsubscribed"))
.unsafeSubscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("Completed");
}
@Override
public void onError(Throwable e) {
System.out.println(e);
}
@Override
public void onNext(Integer t) {
System.out.println(t);
}
});1
2
Completed
We see that, despite the fact that we did not unsubscribe, the illegal notifications were filtered out.
If the ability to use your custom operator in the chain like a standard operator is not convincing enough, using lift has one more unexpected advantage. Standard operators are also implemented using lift, which makes lift a hot method at runtime. JVM optimises for lift and operators that use lift receive a performance boost. That can include your operator, if you use lift.
Both lift and compose are meta-operators, used for injecting a custom operator into the chain. In both cases, the custom operator can be implemented as a function or a class.
compose:Observable.TransformerorFunc<Observable<TSource>, Observable<TReturn>>lift:Observable.OperatororFunc<Subscriber<TReturn>, Subscriber<TSource>>
Theoretically, any operator can be implemented as both Observable.Operator and Observable.Transformer. The choice between the two is a question of convenience, and what kind of boilerplate you want to avoid.
- If the custom operator is a composite of existing operators,
composeis a natural fit. - If the custom operator needs to extract values from the pipeline to process them and then push them back,
liftis a better fit.
| Previous | Next |
|---|---|
| Hot and cold observables | Chapter 4 - Concurrency |
