Now that you understand what Rx is in general, it is time to start creating and manipulating sequences. The original implementation of manipulating sequences was based on C#'s LINQ, which in turn was inspired from functional programming. Knowledge about either isn't necessary, but it would make the learning process a lot easier for the reader. Following the original www.introtorx.com, we too will divide operations into themes that generally go from the simpler to the more advanced. Most Rx operators manipulate existing sequences. But first, we will see how to create an Observable to begin with.
In previous examples we used Subjects and manually pushed values into them to create a sequence. We used that sequence to demonstrate some key concepts and the first and most important Rx method, subscribe. In most cases, subjects are not the best way to create a new Observable. We will now see tidier ways to create observable sequences.
The just method creates an Observable that will emit a predifined sequence of values, supplied on creation, and then terminate.
Observable<String> values = Observable.just("one", "two", "three");
Subscription subscription = values.subscribe(
v -> System.out.println("Received: " + v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
); Received: one
Received: two
Received: three
Completed
This observable will emit a single onCompleted and nothing else.
Observable<String> values = Observable.empty();
Subscription subscription = values.subscribe(
v -> System.out.println("Received: " + v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);Completed
This observable will never emit anything
Observable<String> values = Observable.never();
Subscription subscription = values.subscribe(
v -> System.out.println("Received: " + v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);The code above will print nothing. Note that this doesn't mean that the program is blocking. In fact, it will terminate immediately.
This observable will emit a single error event and terminate.
Observable<String> values = Observable.error(new Exception("Oops"));
Subscription subscription = values.subscribe(
v -> System.out.println("Received: " + v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);Error: java.lang.Exception: Oops
defer doesn't define a new kind of observable, but allows you to declare how an observable should be created every time a subscriber arrives. Consider how you would create an observable that returns the current time and terminates. You are emitting a single value, so it sounds like a case for just.
Observable<Long> now = Observable.just(System.currentTimeMillis());
now.subscribe(System.out::println);
Thread.sleep(1000);
now.subscribe(System.out::println);1431443908375
1431443908375
Notice how the two subscribers, 1 second apart, see the same time. That is because the value for the time is aquired once: when execution reaches just. What you want is for the time to be aquired when a subscriber asks for it by subscribing. defer takes a function that will executed to create and return the Observable. The Observable returned by the function is also the Observable returned by defer. The important thing here is that this function will be executed again for every new subscription.
Observable<Long> now = Observable.defer(() ->
Observable.just(System.currentTimeMillis()));
now.subscribe(System.out::println);
Thread.sleep(1000);
now.subscribe(System.out::println);1431444107854
1431444108858
create is a very powerful function for creating observables. Let have a look at the signature.
static <T> Observable<T> create(Observable.OnSubscribe<T> f)The Observable.OnSubscribe<T> is simpler than it looks. It is basically a function that takes a Subscriber<T> for type T. Inside it we can manually determine the events that are pushed to the subscriber.
Observable<String> values = Observable.create(o -> {
o.onNext("Hello");
o.onCompleted();
});
Subscription subscription = values.subscribe(
v -> System.out.println("Received: " + v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);Received: Hello
Completed
When someone subscribes to the observable (here: values), the corresponding Subscriber instance is passed to your function. As the code is executed, values are being pushed to the subscriber. Note that you have to call onCompleted in the end by yourself, if you want the sequence to signal its completion.
This method should be your preferred way of creating a custom observable, when none of the existing shorthands serve your purpose. The code is similar to how we created a Subject and pushed values to it, but there are a few important differences. First of all, the source of the events is neatly encapsulated and separated from unrelated code. Secondly, Subjects carry dangers that are not obvious: with a Subject you are managing state, and anyone with access to the instance can push values into it and alter the sequence. We will see more about this issue later on.
Another key difference to using subjects is that the code is executed lazily, when and if an observer subscribes. In the example above, the code is run not when the observable is created (because there is no Subscriber yet), but each time subscribe is called. This means that every value is generated again for each subscriber, similar to ReplaySubject. The end result is similar to a ReplaySubject, except that no caching takes place. However, if we had used a ReplaySubject, and the creation method was time-consuming, that would block the thread that executes the creation. You'd have to manually create a new thread to push values into the Subject. We're not presenting Rx's methods for concurrency yet, but there are convenient ways to make the execution of the onSubscribe function concurrently.
You may have already noticed that you can trivially implement any of the previous observables using Observable.create. In fact, our example for create is equivalent to Observable.just("hello").
In functional programming it is common to create sequences of unrestricted or infinite length. RxJava has factory methods that create such sequences.
A straight forward and familiar method to any functional programmer. It emits the specified range of integers.
Observable<Integer> values = Observable.range(10, 15);The example emits the values from 10 to 24 in sequence.
This function will create an infinite sequence of ticks, separated by the specified time duration.
Observable<Long> values = Observable.interval(1000, TimeUnit.MILLISECONDS);
Subscription subscription = values.subscribe(
v -> System.out.println("Received: " + v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);
System.in.read();Received: 0
Received: 1
Received: 2
Received: 3
...
This sequence will not terminate until we unsubscribe.
We should note why the blocking read at the end is necessary. Without it, the program terminates without printing something. That's because our operations are non-blocking: we create an observable that will emit values over time, then we register the actions to execute if and when values arrive. None of that is blocking and the main thread proceeds to terminate. The timer that produces the ticks runs on its own thread, which does not prevent the JVM from terminating, killing the timer with it.
There are two overloads to Observable.timer. The first example creates an observable that waits a given amount of time, then emits 0L and terminates.
Observable<Long> values = Observable.timer(1, TimeUnit.SECONDS);
Subscription subscription = values.subscribe(
v -> System.out.println("Received: " + v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);Received: 0
Completed
The second one will wait a specified amount of time, then begin emitting like interval with the given frequency.
Observable<Long> values = Observable.timer(2, 1, TimeUnit.SECONDS);
Subscription subscription = values.subscribe(
v -> System.out.println("Received: " + v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);Received: 0
Received: 1
Received: 2
...
The example above waits 2 seconds, then starts counting every 1 second.
There are well established tools for dealing with sequences, collections and asychronous events, which may not be directly compatible with Rx. Here we will discuss ways to turn their output into input for your Rx code.
If you are using an asynchronous tool that uses event handlers, like JavaFX, you can use Observable.create to turn the streams into an observable
Observable<ActionEvent> events = Observable.create(o -> {
button2.setOnAction(new EventHandler<ActionEvent>() {
@Override public void handle(ActionEvent e) {
o.onNext(e)
}
});
})Depending on what the event is, the event type (here ActionEvent) may be meaningful enough to be the type of your sequence. Very often you will want something else, like the contents of a field. The place to get the value is in the handler, while the GUI thread is blocked by the handler and the field value is relevant. There is no guarantee what the value will be by the time the value reaches the final Subscriber. On the other hand, a value moving though an observable should remain unchanged, if the pipeline is properly implemented.
Much like most of the functions we've seen so far, you can turn any kind of input into an Rx observable with create. There are several shorthands for converting common types of input.
Futures are part of the Java framework and you may come across them while using frameworks that use concurrency. They are a less powerful concept for concurrency than Rx, since they only return one value. Naturally, you may like to turn them into observables.
FutureTask<Integer> f = new FutureTask<Integer>(() -> {
Thread.sleep(2000);
return 21;
});
new Thread(f).start();
Observable<Integer> values = Observable.from(f);
Subscription subscription = values.subscribe(
v -> System.out.println("Received: " + v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);Received: 21
Completed
The observable emits the result of the FutureTask when it is available and then terminates. If the task is canceled, the observable will emit a java.util.concurrent.CancellationException error.
If you're interested in the results of the Future for a limited amount of time, you can provide a timeout period like this
Observable<Integer> values = Observable.from(f, 1000, TimeUnit.MILLISECONDS);If the Future has not completed in the specified amount of time, the observable will ignore it and fail with a TimeoutException.
You can also turn any collection into an observable using the overloads of Observable.from that take arrays and iterables. This will result in every item in the collection being emitted and then a final onCompleted event.
Integer[] is = {1,2,3};
Observable<Integer> values = Observable.from(is);
Subscription subscription = values.subscribe(
v -> System.out.println("Received: " + v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);Received: 1
Received: 2
Received: 3
Completed
Observable is not interchangeable with Iterable or Stream. Observables are push-based, i.e., the call to onNext causes the stack of handlers to execute all the way to the final subscriber method (unless specified otherwise). The other models are pull-based, which means that values are requested as soon as possible and execution blocks until the result is returned.
| Previous | Next |
|---|---|
| Lifetime management | Reducing a sequence |