Because Rx is targeted at asynchronous systems and because Rx can naturally support multithreading, new Rx developers sometimes assume that Rx is multithreaded by default. It is important clarify before anything else that Rx is single-threaded by default.
Unless you specify otherwise, every call to onNext/onError/onCompleted executes the entire chain of operators synchronously, including the actions of the final subscriber. We can see than in the following example:
final BehaviorSubject<Integer> subject = BehaviorSubject.create();
subject.subscribe(i -> {
System.out.println("Received " + i + " on " + Thread.currentThread().getId());
});
int[] i = {1}; // naughty side-effects for examples only ;)
Runnable r = () -> {
synchronized(i) {
System.out.println("onNext(" + i[0] + ") on " + Thread.currentThread().getId());
subject.onNext(i[0]++);
}
};
r.run(); // Execute on main thread
new Thread(r).start();
new Thread(r).start();onNext(1) on 1
Received 1 on 1
onNext(2) on 11
Received 2 on 11
onNext(3) on 12
Received 3 on 12
We see here that we called onNext on our subject from 3 different threads. Every time, the subscriber's action was executed on the same thread where the first onNext call came from. The same would be true no matter how many operators we had chained together. The value goes through the chain synchronously, unless we request otherwise.
subscribeOn and observeOn allow you to control the invocation of the subscription and the reception of notifications (what thread will call onNext/onError/onCompleted on your observer).
public final Observable<T> observeOn(Scheduler scheduler)
public final Observable<T> subscribeOn(Scheduler scheduler)In Rx you don't juggle threads directly. Instead you wrap them in policies called Scheduler. We will see more on that later.
With subscribeOn you decide on what Scheduler the Observable.create is executed. Even if you're not calling create yourself, there is an internal equivalent to it. Consider the following example:
System.out.println("Main: " + Thread.currentThread().getId());
Observable.create(o -> {
System.out.println("Created on " + Thread.currentThread().getId());
o.onNext(1);
o.onNext(2);
o.onCompleted();
})
//.subscribeOn(Schedulers.newThread())
.subscribe(i -> {
System.out.println("Received " + i + " on " + Thread.currentThread().getId());
});
System.out.println("Finished main: " + Thread.currentThread().getId());Main: 1
Created on 1
Received 1 on 1
Received 2 on 1
Finished main: 1
We see here that, not only is everything executed on the same thread, it is actually sequential: subscribe does not unblock until it has completed subscribing to (and thus creating) the observable, which includes executing the body of create's lambda parameter. The calls to onNext within that lambda execute the entire chain of operators, all the way to the println. Effectively, subscribing on a created observable is blocking.
If you uncomment .subscribeOn(Schedulers.newThread()), the output now is
Main: 1
Finished main: 1
Created on 11
Received 1 on 11
Received 2 on 11
Schedulers.newThread() provided a new thread for our lambda function to run on. subscribe no longer blocks until create's lambda is executed and the main thread is free to proceed.
Some observables create their own threads regardless of you what you requested. For example, Observable.interval is asynchronous regardless. In such cases, subscribeOn will dictate on what thread to run the function which creates the resources, which typically won't be helpful. It gives you no control over what resources will be leased.
System.out.println("Main: " + Thread.currentThread().getId());
Observable.interval(100, TimeUnit.MILLISECONDS)
.subscribe(i -> {
System.out.println("Received " + i + " on " + Thread.currentThread().getId());
});
System.out.println("Finished main: " + Thread.currentThread().getId());Main: 1
Finished main: 1
Received 0 on 11
Received 1 on 11
Received 2 on 11
observeOn controls the other side of the pipeline. The creation and emission of values will work like normal, but the actions of your observer will be invoked on a different thread, as specified by the Scheduler policy.
Observable.create(o -> {
System.out.println("Created on " + Thread.currentThread().getId());
o.onNext(1);
o.onNext(2);
o.onCompleted();
})
.observeOn(Schedulers.newThread())
.subscribe(i ->
System.out.println("Received " + i + " on " + Thread.currentThread().getId()));Created on 1
Received 1 on 13
Received 2 on 13
Unlike subscribeOn, observeOn's effect doesn't jump to the start of the pipeline. It just changes the thread for the operators that come after it. You can think of it as intercepting events and changing the thread for the rest of the chain. Here's an example for this:
Observable.create(o -> {
System.out.println("Created on " + Thread.currentThread().getId());
o.onNext(1);
o.onNext(2);
o.onCompleted();
})
.doOnNext(i ->
System.out.println("Before " + i + " on " + Thread.currentThread().getId()))
.observeOn(Schedulers.newThread())
.doOnNext(i ->
System.out.println("After " + i + " on " + Thread.currentThread().getId()))
.subscribe();Created on 1
Before 1 on 1
Before 2 on 1
After 1 on 13
After 2 on 13
We can see here that events start on the thread that calls onNext and stay on that thread until they encounter the observeOn operator. After that, they continue on the new thread. This way you can assign different threading policies to different parts of an Rx pipeline.
This is very useful if you, as the consumer of an observable, know that processing is time-consuming and you don't want to block the producing thread. A typical case for this is applications with a GUI. GUI libraries have a special thread that has the exclusive right to access GUI components (buttons etc). While the GUI thread is busy, everything becomes non-responsive. Handlers to GUI events are invoked on the GUI thread and everything the handler does will block the GUI thread. In order not to have your application freeze for the duration of the processing, you must move heavy processing to another thread. You can use observeOn to move away and again to return to the GUI thread, when the data is ready to be displayed.
As we have seen, some observables depend on resources which are leased on subscription and released when subscriptions end. Typically, releasing resources is cheap. In exceptional cases, where you need the unsubscription actions to not block or to specifically take place on a special thread, you can specify the scheduler that will execute those actions with unsubscribeOn
Observable<Object> source = Observable.using(
() -> {
System.out.println("Subscribed on " + Thread.currentThread().getId());
return Arrays.asList(1,2);
},
(ints) -> {
System.out.println("Producing on " + Thread.currentThread().getId());
return Observable.from(ints);
},
(ints) -> {
System.out.println("Unubscribed on " + Thread.currentThread().getId());
}
);
source
.unsubscribeOn(Schedulers.newThread())
.subscribe(System.out::println);Subscribed on 1
Producing on 1
1
2
Unubscribed on 11
The using method executes 3 functions, one that leases a resource, one that uses it and one the releases it. With unsubscribeOn we only affected the function that releases the resource.
The observeOn and subscribeOn methods take as an argument a Scheduler. A Scheduler, as the name suggests, is a tool that can schedule individual actions to be performed. The specifics of how the action will be invoked depends on the implementation of the scheduler used. You can create your own implementation of scheduler, but most of time you'll find that RxJava already has you covered with a set of Schedulers for the common cases. You can get the existing implementations from the factory methods on Schedulers.
The existing schedulers are as follows
immediateexecutes the scheduled action synchronously. No actual scheduling takes place.trampolinequeues work on the current thread to be executed after the current work completes.newThreadcreates a new thread for each scheduled unit of work.computationis intended for CPU workiois intended for IO worktestis useful for testing and debugging.
In the current implementation, computation and io schedulers aren't actually unique implementations. The point of this separation is to have unique instances, while also documenting your intent.
Many of the Rx operators use schedulers internally. If you revisit the Observable operators that you've seen so far, you'll see that all the asynchronous operators have overloads that take a Scheduler. You can dictate exactly what scheduler each operator uses. You can also find in the documentation which scheduler they use when you don't provide one.
The approaches and implementations for scheduling used in Rx schedulers aren't specific to Rx. In fact, they are quite standard and could be used without any Rx code. You typically won't have to use schedulers directly, except for when you are designing a custom asynchronous operator. Using schedulers in custom operators isn't only convenient, but it also allow asynchronous operators to become testable, as we will see in the next chapter.
An implementation of Scheduler has two parts. One is the notion of time, which you can get through the now() method. Implementing time through the scheduler is going to prove useful when virtualising time for testing, but for now this feature isn't interesting.
The interesting part is createWorker(), which returns a Scheduler.Worker. A worker accepts actions and executes them sequentially on a single thread. In a way, a worker is a scheduler itself, but we will not refer to it as a scheduler to avoid confusion.
The way to schedule a job on any Scheduler is to create a new worker for that scheduler and schedule actions on that worker.
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(
() -> System.out.println("Action"));The action is then queued to be executed on the thread that the worker is assigned to.
As you would expect from a scheduler, you can also schedule actions to be executed in the future once or repeatedly with the following methods:
Subscription schedule(
Action0 action,
long delayTime,
java.util.concurrent.TimeUnit unit)
Subscription schedulePeriodically(
Action0 action,
long initialDelay,
long period,
java.util.concurrent.TimeUnit unit)Scheduler scheduler = Schedulers.newThread();
long start = System.currentTimeMillis();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(
() -> System.out.println(System.currentTimeMillis()-start),
5, TimeUnit.SECONDS);
worker.schedule(
() -> System.out.println(System.currentTimeMillis()-start),
5, TimeUnit.SECONDS);5033
5035
We can see here that delay for the execution is measured from the moment of scheduling. The specified time is not a mandatory sleep period in between tasks. The worker can do work in the meantime, if there is work ready for execution.
Scheduler.Worker extends Subscription. Calling the unsubscribe method on a worker will result in the queue being emptied and all pending tasks being cancelled. We can see that by modifying out previous example
Scheduler scheduler = Schedulers.newThread();
long start = System.currentTimeMillis();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(
() -> {
System.out.println(System.currentTimeMillis()-start);
worker.unsubscribe();
},
5, TimeUnit.SECONDS);
worker.schedule(
() -> System.out.println(System.currentTimeMillis()-start),
5, TimeUnit.SECONDS);5032
The second task is never executed because the one before it cancelled everything. Actions that were in the process of being executed will be interrupted. In the next example, we will create a task that sleeps for 2000ms. 500ms after it has begun executing we cancel all work on the worker. This results in an InterruptedException.
Scheduler scheduler = Schedulers.newThread();
long start = System.currentTimeMillis();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> {
try {
Thread.sleep(2000);
System.out.println("Action completed");
} catch (InterruptedException e) {
System.out.println("Action interrupted");
}
});
Thread.sleep(500);
worker.unsubscribe();Action interrupted
As we saw earlier in the signature of schedule, scheduling returns a Subscription. Rather that canceling all work, you can cancel individual tasks via that Subscription that was created while scheduling.
ImmediateScheduler doesn't do any scheduling at all. Upon request for scheduling, the scheduler instead just executes the action synchronously and returns when the action is completed. Nested requests for scheduling will result in the actions being executed recursively.
Scheduler scheduler = Schedulers.immediate();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> {
System.out.println("Start");
worker.schedule(() -> System.out.println("Inner"));
System.out.println("End");
});Start
Inner
End
The TrampolineScheduler's worker is also synchronous but does not nest tasks. Instead, it begins with the initial task and any tasks scheduled while executing will be queued for after the current task has completed.
Scheduler scheduler = Schedulers.trampoline();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> {
System.out.println("Start");
worker.schedule(() -> System.out.println("Inner"));
System.out.println("End");
});
Start
End
Inner
The TrampolineScheduler's worker executes every task on the thread that scheduled the first task. In this implementation, the first call to schedule is blocking until the queue is emptied. Any calls to schedule while executing will be non-blocking and the task will be executed by the thread is blocked.
The NewThreadScheduler creates workers that each have their own thread. Every scheduled task will be executed on the thread corresponding to that particular worker.
Let us first define a convenient method will make the following examples more readable.
public static void printThread(String message) {
System.out.println(message + " on " + Thread.currentThread().getId());
}Now we schedule work on a NewThreadScheduler worker and demonstrate that the worker is bound to a specific thread.
printThread("Main");
Scheduler scheduler = Schedulers.newThread();
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(() -> {
printThread("Start");
worker.schedule(() -> printThread("Inner"));
printThread("End");
});
Thread.sleep(500);
worker.schedule(() -> printThread("Again"));Main on 1
Start on 11
End on 11
Inner on 11
Again on 11
| Previous | Next |
|---|---|
| Custom operators | Testing Rx |