RxJava - Learn from Zero(0x01)

(by @jpwang and original post is here)

Preface

I have been spending more than half year to learn RxJava, especially how to use it on Android platform. And one of the most important reason to push me learn this knowledge is our current project which has ton thousand of code and callbacks to handle the asynchrnous staffs, and you will be very quickly lost into looking at such kind of source code. But RxJava is one of the technologies to save me from that hell.

What’s RxJava

ReactiveX is a library for composing asynchronous and event-based programs by using observable sequences.

It extends the observer pattern to support sequences of data and/or events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety, concurrent data structures, and non-blocking I/O. More

Above is the official introduction in a high level layer. But for me, its actually just one word - Asynchronous which is a awesome library to help implementing kinds asynchrnous operations.

Why should I consider RxJava

Reactive Programming raises the level of abstraction of your code so you can focus on the interdependence of events that define the business logic, rather than having to constantly fiddle with a large amount of implementation details. Code in RP will likely be more concise.

The benefit is more evident in modern webapps and mobile apps that are highly interactive with a multitude of UI events related to data events. 10 years ago, interaction with web pages was basically about submitting a long form to the backend and performing simple rendering to the frontend. Apps have evolved to be more real-time: modifying a single form field can automatically trigger a save to the backend, “likes” to some content can be reflected in real time to other connected users, and so forth.

Apps nowadays have an abundancy of real-time events of every kind that enable a highly interactive experience to the user. We need tools for properly dealing with that, and Reactive Programming is an answer. More

But to me, its also one word - Concision

One of the most important key point of asynchrnouse operation is the concision. Why do I need to use AsyncTask, Handler, ThreadExecutor and all kinds of other tools to implement the same goal of synchronous? Why wouldn’t I just use one technology to do all of them alignment? That’s how RxJava does!

RxJava provids a consistent and concise way to make it easy to use, and could keep concise all the time even the logic of the whole app becomes more and more complexy.

For example

There is a customized view called imageCollectorView which is used to display multiple images, and there is a method called addImage(Bitmap) to add any image for displaying. Assuming our app needs to load all the images under some directories File[] folders and display all of into imageCollectorView. But there are somethings we need to take more carefully: Reading image from local file system really costs and it has to be put into background thread instead of doing in UI Thread. And displaying image into view has to be running in UI Thread. There are lots of way to handle it, here is one of them:

But if we are using RxJava, here is the implementation:

Wait! Wait! Hold on! I know you guys will complain that: the whole code lines are obviously much more than the old one, why did you say it becomes more concise? - Okay! I got your point, but the concision I’m talking means the concision of code logic, not just the lines of code. If you are reading the code above, you will find that the implementation by RxJava is a chain execution instead of calling nesting which can help us simplify the logic, especially when your app’s requirement and design becomes more complex.

So if you asked me why should I use RxJava, I would say Concision which will put all the complex logic into one chain.

How to use RxJava

Before we start talking how to use RxJava, I would spend some time to go through the API and philosophy of RxJava.

  1. Concept: Extend of observer pattern
  2. Basic philosophy

    • Create Observer

      Observer is the role to be responsible for listerning the event when it gets triggerred.

      Beside Observer, there is another class which simplely implements Observer: Subscriber. Subscriber extends the API of Observer, but the basic functions are exactly same:

      Actually, Observer will be translated to Subscriber inside RxJava when it gets invoked. So if you are only using the basic funtions, they are exactly the same. The only difference is:

      • onStart(): This is the new method of Subscriber and it will get called when subscribe just begins before sending out which could be used to do some preparations, such as data reset. It’s a optional method and empty of default impelmentation.

      • unsubscribe(): It’s used to cancel the subscription. After it’s called, Subscriber will not accept any event. Usually, you can use usUnsubscribed() before calling it. unsubscribe() is very important, because the Observable will keep the reference of Subscriber after subscribe() gets called. If the reference doesn’t release in-time, it will cause memory leaking! So keeping in mind: call unsubscribe() to release the hard link once you are done everything and call them in proper place.

    • Create observable

      Observable is the guy who has event triggerred on when and how. RxJava uses create() method to make an Observable, and defines the rules.

      create() accepts the onSubscribe object as the paramter. When observable gets subscribed, the method call() of onSubscribe will be called automatically, and the events get triggerred in streaming(just like above with three times onNext() and one time onCompleted()). This is what we called - Observer Pattern

      create() is the very basic method used to create event sequentially, there are some extra ways to help creating them more quckly:

      just(T...): Send out all the parameters in streaming.

      from(T[])/from(Iterable<? extends T>): Actually its same as just(T...)

    • Subscribe

      After Observable and Observer are created, using subscribe() to connect them:

      observable.subscribe(observer) or observable.subscribe(subscriber)

      You may get confused of subscribe() method: it seems to be like observable is subscribing observer/subscriber instead of observer/subscriber subscribe the observable. Yes, you are right, but if the API changes to observer.subscribe(observable), it will break the whole streaming API which is not good for the whole design. You can just treat the subscribe() method to be like getSubscribedBy() for your convenient.

      The subscribe() method is working like below:

      Lets see what subscribe() does:

      • Call Subscriber.onStart(). This method is optional.
      • Call OnSubscribe.call(Subscriber). Here is the point of whole event logic started. So in RxJava, the event in Observable is not delivered when Observable is created, by instead, it’s triggerred when the subscribe() method get called.
      • Return the Subscriber which is extending Subscription. So you can unsubscribe it anytime.

      The whole process can be like this: Or

      And you can also create actions like below, and RxJava will automatically generate the Subscriber to wrapper these actions.

      Action0 only has only one method call() which has no parameter and return. Actually it’s mapping into the method onCompleted() in Subscriber.

      Action1 is similar which has only one method void call(T param) and can be mapped into onNext(T obj) and onError(Throwable error).

      ActionX there are kinds of format like that and provided by RxJava to handle different cases.

    • Usage

      I just pick up a few simple scenarios.

      Print array

      Get drawable and display it

    But, this is still useless!!!

    Yes! You are right! In the usual way that we are using RxJava, the event triggering and consuming is called in same thread! But in most usages of Observer Pattern, it should be Handling in background, Get notified in foreground! So let’s introduce another important part in RxJava - Scheduler.

  3. Scheduler

    Usually, RxJava handles things in same thread which means: the event will be triggerred in the thread where subscribe() get called; the event will also be consumed in the thread where it is triggered. Fortunately, RxJava provides a very cool guys - Scheduler to help us switching between different threads.

    Bascially, Scheduler is like a thread controller. RxJava uses it to assign the codes running in what kind of threads. For example,

    • Schedulers.immediate(): It’s default Scheduler, which means keep running in current thread.
    • Schedulers.newThread(): Always launch a new thread to run it.
    • Schedulers.io(): I/O related operation should be put in this Scheduler. It’s similar as newThread(), the only difference it io() will use thread pool to maintain the threads and try to reuse them.
    • Schedulers.computation(): All CPU-intensive computing tasks should be in this Scheduler. It’s using fixed number thread pool and the size is equal to CPU core. DON’T put I/O operations into computation() which will waste the CPU computation.
    • AndroidSchedulers.mainThread(): Android specified to put tasks running in Android main thread.

    With these Scheduler, you can now use subscribeOn() and observerOn() to control the threads.

    • subscribeOn(): Assign the thread for subscribe(), that means which thread the Observable.OnSubscribe should run. Usually, we called it the thread of event triggerred.
    • observeOn(): Assign the thread of Subscriber should be running. Usually, we called it the thread of event consumed.

    The event of 1,2,3,4 will be triggerred in I/O thread, and the printing into log will be executed in main thread.

    Same change on the example of displaying image:

Written on July 27, 2016