RxJava - Learn from Zero(0x01)
(by @jpwang and original post is here)
Preface
I have been spending more than half year to learn RxJava, especially how to use it on Android platform. And one of the most important reason to push me learn this knowledge is our current project which has ton thousand of code and callbacks to handle the asynchrnous staffs, and you will be very quickly lost into looking at such kind of source code. But RxJava is one of the technologies to save me from that hell.
What’s RxJava
ReactiveX is a library for composing asynchronous and event-based programs by using observable sequences.
It extends the observer pattern to support sequences of data and/or events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety, concurrent data structures, and non-blocking I/O. More
Above is the official introduction in a high level layer. But for me, its actually just one word - Asynchronous which is a awesome library to help implementing kinds asynchrnous operations.
Why should I consider RxJava
Reactive Programming raises the level of abstraction of your code so you can focus on the interdependence of events that define the business logic, rather than having to constantly fiddle with a large amount of implementation details. Code in RP will likely be more concise.
The benefit is more evident in modern webapps and mobile apps that are highly interactive with a multitude of UI events related to data events. 10 years ago, interaction with web pages was basically about submitting a long form to the backend and performing simple rendering to the frontend. Apps have evolved to be more real-time: modifying a single form field can automatically trigger a save to the backend, “likes” to some content can be reflected in real time to other connected users, and so forth.
Apps nowadays have an abundancy of real-time events of every kind that enable a highly interactive experience to the user. We need tools for properly dealing with that, and Reactive Programming is an answer. More
But to me, its also one word - Concision
One of the most important key point of asynchrnouse operation is the concision. Why do I need to use AsyncTask, Handler, ThreadExecutor and all kinds of other tools to implement the same goal of synchronous? Why wouldn’t I just use one technology to do all of them alignment? That’s how RxJava does!
RxJava provids a consistent and concise way to make it easy to use, and could keep concise all the time even the logic of the whole app becomes more and more complexy.
For example
There is a customized view called imageCollectorView
which is used to display multiple images, and there is a method called addImage(Bitmap)
to add any image for displaying. Assuming our app needs to load all the images under some directories File[] folders
and display all of into imageCollectorView
. But there are somethings we need to take more carefully: Reading image from local file system really costs and it has to be put into background thread instead of doing in UI Thread. And displaying image into view has to be running in UI Thread. There are lots of way to handle it, here is one of them:
But if we are using RxJava, here is the implementation:
Wait! Wait! Hold on! I know you guys will complain that: the whole code lines are obviously much more than the old one, why did you say it becomes more concise? - Okay! I got your point, but the concision I’m talking means the concision of code logic, not just the lines of code. If you are reading the code above, you will find that the implementation by RxJava is a chain execution instead of calling nesting which can help us simplify the logic, especially when your app’s requirement and design becomes more complex.
So if you asked me why should I use RxJava, I would say Concision which will put all the complex logic into one chain.
How to use RxJava
Before we start talking how to use RxJava, I would spend some time to go through the API and philosophy of RxJava.
- Concept: Extend of observer pattern
-
Basic philosophy
-
Create Observer
Observer is the role to be responsible for listerning the event when it gets triggerred.
Beside Observer, there is another class which simplely implements
Observer
:Subscriber
.Subscriber
extends the API ofObserver
, but the basic functions are exactly same:Actually,
Observer
will be translated toSubscriber
inside RxJava when it gets invoked. So if you are only using the basic funtions, they are exactly the same. The only difference is:-
onStart()
: This is the new method ofSubscriber
and it will get called when subscribe just begins before sending out which could be used to do some preparations, such as data reset. It’s a optional method and empty of default impelmentation. -
unsubscribe()
: It’s used to cancel the subscription. After it’s called,Subscriber
will not accept any event. Usually, you can useusUnsubscribed()
before calling it.unsubscribe()
is very important, because theObservable
will keep the reference ofSubscriber
aftersubscribe()
gets called. If the reference doesn’t release in-time, it will cause memory leaking! So keeping in mind: callunsubscribe()
to release the hard link once you are done everything and call them in proper place.
-
-
Create observable
Observable is the guy who has event triggerred on when and how. RxJava uses
create()
method to make an Observable, and defines the rules.create()
accepts theonSubscribe
object as the paramter. Whenobservable
gets subscribed, the methodcall()
ofonSubscribe
will be called automatically, and the events get triggerred in streaming(just like above with three timesonNext()
and one timeonCompleted()
). This is what we called -Observer Pattern
create()
is the very basic method used to create event sequentially, there are some extra ways to help creating them more quckly:just(T...)
: Send out all the parameters in streaming.from(T[])
/from(Iterable<? extends T>)
: Actually its same asjust(T...)
-
Subscribe
After
Observable
andObserver
are created, usingsubscribe()
to connect them:observable.subscribe(observer)
orobservable.subscribe(subscriber)
You may get confused of
subscribe()
method: it seems to be likeobservable
is subscribingobserver
/subscriber
instead ofobserver
/subscriber
subscribe theobservable
. Yes, you are right, but if the API changes toobserver.subscribe(observable)
, it will break the whole streaming API which is not good for the whole design. You can just treat thesubscribe()
method to be likegetSubscribedBy()
for your convenient.The
subscribe()
method is working like below:Lets see what
subscribe()
does:- Call
Subscriber.onStart()
. This method is optional. - Call
OnSubscribe.call(Subscriber)
. Here is the point of whole event logic started. So in RxJava, the event inObservable
is not delivered whenObservable
is created, by instead, it’s triggerred when thesubscribe()
method get called. - Return the
Subscriber
which is extendingSubscription
. So you can unsubscribe it anytime.
The whole process can be like this: Or
And you can also create actions like below, and RxJava will automatically generate the
Subscriber
to wrapper these actions.Action0
only has only one methodcall()
which has no parameter and return. Actually it’s mapping into the methodonCompleted()
inSubscriber
.Action1
is similar which has only one methodvoid call(T param)
and can be mapped intoonNext(T obj)
andonError(Throwable error)
.ActionX
there are kinds of format like that and provided by RxJava to handle different cases. - Call
-
Usage
I just pick up a few simple scenarios.
Print array
Get drawable and display it
But, this is still useless!!!
Yes! You are right! In the usual way that we are using RxJava, the event triggering and consuming is called in same thread! But in most usages of
Observer Pattern
, it should be Handling in background, Get notified in foreground! So let’s introduce another important part in RxJava - Scheduler. -
-
Scheduler
Usually, RxJava handles things in same thread which means: the event will be triggerred in the thread where
subscribe()
get called; the event will also be consumed in the thread where it is triggered. Fortunately, RxJava provides a very cool guys -Scheduler
to help us switching between different threads.Bascially,
Scheduler
is like a thread controller. RxJava uses it to assign the codes running in what kind of threads. For example,Schedulers.immediate()
: It’s defaultScheduler
, which means keep running in current thread.Schedulers.newThread()
: Always launch a new thread to run it.Schedulers.io()
: I/O related operation should be put in thisScheduler
. It’s similar asnewThread()
, the only difference itio()
will use thread pool to maintain the threads and try to reuse them.Schedulers.computation()
: All CPU-intensive computing tasks should be in thisScheduler
. It’s using fixed number thread pool and the size is equal to CPU core. DON’T put I/O operations intocomputation()
which will waste the CPU computation.AndroidSchedulers.mainThread()
: Android specified to put tasks running in Android main thread.
With these
Scheduler
, you can now usesubscribeOn()
andobserverOn()
to control the threads.subscribeOn()
: Assign the thread forsubscribe()
, that means which thread theObservable.OnSubscribe
should run. Usually, we called it the thread of event triggerred.observeOn()
: Assign the thread ofSubscriber
should be running. Usually, we called it the thread of event consumed.
The event of
1
,2
,3
,4
will be triggerred in I/O thread, and the printing into log will be executed in main thread.Same change on the example of displaying image: