RxJAVA在Android開發中已經算是無人不知無人不曉了,加之它與Retrofit等流行框架的完美結合,已經成為Android項目開發的必備利器。隨手記作為一個大型項目,引入三方框架一直比較慎重,但也從今年初開始,正式引入了RxJava2.0,并配合Retrofit對項目的網絡框架和繁瑣的異步邏輯進行重構。
RxJava的作用:
就是異步RxJava的使用,可以使“邏輯復雜的代碼”保持極強的閱讀性。
RxAndorid的作用:
Android中RxAndorid與RxJava配合使用; RxAndorid 封裝了
AndroidSchedulers.mainThread(),Android開發者使用過程中,可以輕松的將任務post Andorid主線程中,執行頁面更新操作。
RxJava的優點
簡單來講RxJava是一個簡化異步調用的庫,但其實它更是一種優雅的編程方式和編程思想,當你熟悉RxJava的使用方式之后,會很容易愛上它。 我總結它的優點主要有兩個方面:
- 簡潔,免除傳統異步代碼邏輯中的callback hell
- 增加業務邏輯代碼的可讀性
Rx的操作符有哪些
剛接觸Rx的人面對一堆各式各樣的操作符會覺得不知如何去學習記憶,其實你只需要從整體上了解Rx操作符的類別和掌握一些使用頻率較高的操作符就足夠了,至于其他的操作符,你只需要知道它的使用場景和掌握如何快速理解一個操作符的方法,就可以在需要的時候快速拿來用了。 下圖是我根據官方文檔總結的Rx操作符的分類及每個類別下的代表性操作符

Rx操作符的原理
要了解操作符的原理,肯定要從源碼入手嘍。所以我們先來簡單擼一遍Rx的最基本的Create操作符的源碼。 Rx的源碼目錄結構是比較清晰的,我們先從Observable.create方法來分析
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("s");
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
// 創建的Observer中多了一個回調方法onSubscribe,傳遞參數為Disposable ,Disposable相當于RxJava1.x中的Subscription,用于解除訂閱。
}
@Override
public void onNext(@NonNull String s) {
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});復制代碼
create方法如下
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}復制代碼
代碼很簡單,第一行判空不用管,第二行調用RxJavaPlugins的方法是為了實現Rx的hook功能,我們暫時也無需關注,在一般情況下,第二行代碼會直接返回它的入參即ObservableCreate對象,ObservableCreate是Observable的子類,實現了Observable的一些抽象方法比如subscribeActual。事實上Rx的每個操作符都對應Observable的一個子類。 這里create方法接受的是一個ObservableOnSubscribe的接口實現類:
/**
* A functional interface that has a {@code subscribe()} method that receives
* an instance of an {@link ObservableEmitter} instance that allows pushing
* events in a cancellation-safe manner.
*
* @param <T> the value type pushed
*/
public interface ObservableOnSubscribe<T> {
/**
* Called for each Observer that subscribes.
* @param e the safe emitter instance, never null
* @throws Exception on error
*/
void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;
}復制代碼
通過注釋可以知道這個接口的作用是通過一個subscribe方法接受一個ObservableEmitter類型的實例,俗稱發射器。 Observable.create方法執行時,我們傳入的就是一個ObservableOnSubscribe類型的匿名內部類,并實現了它的subscribe方法,然后它又被傳入create方法的返回對象ObservableCreate,最終成為ObservableCreate的成員source
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
...復制代碼
接著我們來看Observable的subscribe方法,它的入參是一個Observer(即觀察者,也就是事件接收者)
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}復制代碼
最終它會調用它的子類ObservableCreate的subscribeActual方法:
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}復制代碼
在subscribeActual里首先創建了用于發射事件的CreateEmitter對象parent,CreateEmitter實現了接口Emitter和Disposable,并持有observer。 這段代碼的關鍵語句是source.subscribe(parent),這行代碼執行后,就會觸發事件源進行發射事件,即e.onNext("s")會被調用。細心的同學也會注意到這行代碼之前,parent先被傳入了observer的onSubscribe()方法,而在上面我們說過,observer的onSubscribe()方法接受一個Disposable類型的參數,可以用于解除訂閱,之所以能夠解除訂閱,正是因為在觸發事件發射之前調用了observer的onSubscribe(),給了我們調用CreateEmitter的解除訂閱的方法dispose()的機會。 繼續來看CreateEmitter的onNext()方法,它最終是通過調用observer的onNext()方法將事件發射出去的
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
// 在真正發射之前,會先判斷該CreateEmitter是否已經解除訂閱
if (!isDisposed()) {
observer.onNext(t);
}
}
...
}復制代碼
至此,Rx事件源的創建和訂閱的流程就走通了。
下面我們從map操作符來入手看一下Rx操作符的原理,map方法如下
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mApper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}復制代碼
map方法接受一個Function類型的參數mapper,返回了一個ObservableMap對象,它也是繼承自Observable,而mapper被傳給了ObservableMap的成員function,同時當前的源Observable被傳給ObservableMap的成員source,進入ObservableMap類
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
actual.onNext(v);
}
@Override
public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}
@Nullable
@Override
public U poll() throws Exception {
T t = qs.poll();
return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
}
}
}復制代碼
可以看到這里用到了裝飾者模式,ObservableMap持有來自它上游的事件源source,MapObserver持有來自它下游的事件接收者和我們實現的轉換方法function,在subscribeActual()方法中完成ObservableMap對source的訂閱,觸發MapObserver的onNext()方法,繼而將來自source的原始數據經過函數mapper轉換后再發射給下游的事件接收者,從而實現map這一功能。
現在我們終于能夠來總結一下包含多個操作符時的訂閱流程了,以下面這段代碼為例
Observable.
create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("holen");
}
})
.map(new Function<String, Integer>() {
@Override
public Integer apply(@NonNull String s) throws Exception {
return s.length();
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer integer) {
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});復制代碼
執行代碼時,自上而下每一步操作符都會創建一個新的Observable(均為Observable的子類,對應不同的操作符),當執行create時,創建并返回了ObservableCreate,當執行map時,創建并返回了ObservableMap,并且每一個新的Observable都持有它上游的源Observable(即source)及當前涉及到的操作函數function。當最后一步執行訂閱方法subscribe時會觸發ObservableMap的subscribeActual()方法,并將最下游的Observer包裝成MapObserver,同時該方法又會繼續調用它所持有ObservableCreate的訂閱方法(即執行source.subscribe),由此也會觸發ObservableCreate的subscribeActual()方法,此時我們的發射器CreateEmitter才會調用它的onNext()方法發射事件,再依次調用MapObserver的操作函數mapper和onNext()方法,最終將事件傳遞給了最下游的Observer的onNext()方法。
我簡單的將這段邏輯用下面這幅圖來表示

操作符舉例
1. map

map
可以看到,這幅圖表達的意思是一個源先后發射了1、2、3的三個item,而經過操作符一轉換,就變成了一個發射了10、20、30三個item的新的。描述操作符的長方框中也清楚的說明了該操作符進行了何種具體的轉換操作(圖中的10*x只是一個例子,這個具體的轉換函數是可以自定義的)。于是,我們就很快速地理解了操作符的含義和用法,簡單來講,它就是通過一個函數將一個發射的item逐個進行某種轉換。示例代碼:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
}
}).map(new Function<Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer integer) throws Exception {
return integer * 10;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer result) throws Exception {
Log.i(TAG, "accept : " + result +"n" );
}
});復制代碼
輸出結果:

2. zip

根據的寶石圖,可以知道zip操作符的作用是把多個源發射的item通過特定函數組合在一起,然后發射組合后的item。從圖中還可以看到一個重要的信息是,最終發射的item是對上面的兩個源發射的item按照發射順序逐個組合的結果,而且最終發射的等item的發射時間是由組合它的和等item中發射時間較晚的那個item決定的,也正是如此,操作符經常可以用在需要同時組合處理多個網絡請求的結果的業務場景中。示例代碼:
Observable.zip(Observable.just(1, 2, 3),
Observable.just("A", "B", "C"),
new BiFunction<Integer, String, String>() {
@Override
public String apply(@NonNull Integer integer, @NonNull String s) throws Exception {
return integer + s;
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.i(TAG, "zip : accept : " + s + "n");
}
});復制代碼
輸出結果:

3. concat

從寶石圖可以看出,操作符的作用就是將兩個源發射的item連接在一起發射出來。這里的連接指的是整體連接,被操作后產生的會先發射第一個源的所有item,然后緊接著再發射第二個源的所有的item。示例代碼:
Observable.concat(Observable.just(1, 2, 3), Observable.just(4, 5, 6))
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.i(TAG, "concat : " + integer + "n");
}
});復制代碼
輸出結果:

4.flatMap
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
boolean delayErrors, int maxConcurrency, int bufferSize) {
...
return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
看看ObservableFlatMap代碼
public ObservableFlatMap(ObservableSource<T> source,
Function<? super T, ? extends ObservableSource<? extends U>> mapper,
boolean delayErrors, int maxConcurrency, int bufferSize) {
super(source);
this.mapper = mapper;
this.delayErrors = delayErrors;
this.maxConcurrency = maxConcurrency;
this.bufferSize = bufferSize;
}
@Override
public void subscribeActual(Observer<? super U> t) {
if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
return;
}
source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}
是不是和MAP超級像,我們這幾看MergeObserver onNext做了什么
@Override
public void onNext(T t) {
...
p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
...
subscribeInner(p);
}
@SuppressWarnings("unchecked")
void subscribeInner(ObservableSource<? extends U> p) {
for (;;) {
if (p instanceof Callable) {
} else {
InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
addInner(inner);
p.subscribe(inner);
break;
}
}
}
省略了很多代碼,我們看主要邏輯,獲取到flatMap生成的observableSource,然后 p.subscribe(inner);注意這里的P不是observable 看innerObserver的onNext做了什么
//這里的onNext事件由 p.subscribe(inner)觸發
@Override
public void onNext(U t) {
if (fusionMode == QueueDisposable.NONE) {
parent.tryEmit(t, this);
} else {
parent.drain();
}
}
void tryEmit(U value, InnerObserver<T, U> inner) {
if (get() == 0 && compareAndSet(0, 1)) {
actual.onNext(value);
if (decrementAndGet() == 0) {
return;
}
} else {
SimpleQueue<U> q = inner.queue;
if (q == null) {
q = new SpscLinkedArrayQueue<U>(bufferSize);
inner.queue = q;
}
q.offer(value);
if (getAndIncrement() != 0) {
return;
}
}
drainLoop();
}
在這里我們終于看到我們定義的observer接收到了onNext事件
Rx操作符的應用場景
說了這么多,其實我們最關心的還是Rx操作符的應用場景。其實只要存在異步的地方,都可以優雅地使用Rx操作符。比如很多流行的Rx周邊開源項目
