日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網(wǎng)為廣大站長(zhǎng)提供免費(fèi)收錄網(wǎng)站服務(wù),提交前請(qǐng)做好本站友鏈:【 網(wǎng)站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(wù)(50元/站),

點(diǎn)擊這里在線(xiàn)咨詢(xún)客服
新站提交
  • 網(wǎng)站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會(huì)員:747

RxJAVA在Android開(kāi)發(fā)中已經(jīng)算是無(wú)人不知無(wú)人不曉了,加之它與Retrofit等流行框架的完美結(jié)合,已經(jīng)成為Android項(xiàng)目開(kāi)發(fā)的必備利器。隨手記作為一個(gè)大型項(xiàng)目,引入三方框架一直比較慎重,但也從今年初開(kāi)始,正式引入了RxJava2.0,并配合Retrofit對(duì)項(xiàng)目的網(wǎng)絡(luò)框架和繁瑣的異步邏輯進(jìn)行重構(gòu)。

RxJava的作用:

就是異步RxJava的使用,可以使“邏輯復(fù)雜的代碼”保持極強(qiáng)的閱讀性。

RxAndorid的作用:

Android中RxAndorid與RxJava配合使用; RxAndorid 封裝了
AndroidSchedulers.mainThread(),Android開(kāi)發(fā)者使用過(guò)程中,可以輕松的將任務(wù)post Andorid主線(xiàn)程中,執(zhí)行頁(yè)面更新操作。

RxJava的優(yōu)點(diǎn)

簡(jiǎn)單來(lái)講RxJava是一個(gè)簡(jiǎn)化異步調(diào)用的庫(kù),但其實(shí)它更是一種優(yōu)雅的編程方式和編程思想,當(dāng)你熟悉RxJava的使用方式之后,會(huì)很容易愛(ài)上它。 我總結(jié)它的優(yōu)點(diǎn)主要有兩個(gè)方面:

  • 簡(jiǎn)潔,免除傳統(tǒng)異步代碼邏輯中的callback hell
  • 增加業(yè)務(wù)邏輯代碼的可讀性

Rx的操作符有哪些

剛接觸Rx的人面對(duì)一堆各式各樣的操作符會(huì)覺(jué)得不知如何去學(xué)習(xí)記憶,其實(shí)你只需要從整體上了解Rx操作符的類(lèi)別和掌握一些使用頻率較高的操作符就足夠了,至于其他的操作符,你只需要知道它的使用場(chǎng)景和掌握如何快速理解一個(gè)操作符的方法,就可以在需要的時(shí)候快速拿來(lái)用了。 下圖是我根據(jù)官方文檔總結(jié)的Rx操作符的分類(lèi)及每個(gè)類(lèi)別下的代表性操作符

 

 

 

Rx操作符的原理

要了解操作符的原理,肯定要從源碼入手嘍。所以我們先來(lái)簡(jiǎn)單擼一遍Rx的最基本的Create操作符的源碼。 Rx的源碼目錄結(jié)構(gòu)是比較清晰的,我們先從Observable.create方法來(lái)分析

Observable.create(new ObservableOnSubscribe<String>() {
  @Override
  public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
      e.onNext("s");
  }
}).subscribe(new Observer<String>() {
  @Override
  public void onSubscribe(@NonNull Disposable d) {
    // 創(chuàng)建的Observer中多了一個(gè)回調(diào)方法onSubscribe,傳遞參數(shù)為Disposable ,Disposable相當(dāng)于RxJava1.x中的Subscription,用于解除訂閱。
  }

  @Override
  public void onNext(@NonNull String s) {

  }

  @Override
  public void onError(@NonNull Throwable e) {

  }

  @Override
  public void onComplete() {

  }
});復(fù)制代碼

create方法如下

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
   ObjectHelper.requireNonNull(source, "source is null");
   return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}復(fù)制代碼

代碼很簡(jiǎn)單,第一行判空不用管,第二行調(diào)用RxJavaPlugins的方法是為了實(shí)現(xiàn)Rx的hook功能,我們暫時(shí)也無(wú)需關(guān)注,在一般情況下,第二行代碼會(huì)直接返回它的入?yún)⒓碠bservableCreate對(duì)象,ObservableCreate是Observable的子類(lèi),實(shí)現(xiàn)了Observable的一些抽象方法比如subscribeActual。事實(shí)上Rx的每個(gè)操作符都對(duì)應(yīng)Observable的一個(gè)子類(lèi)。 這里create方法接受的是一個(gè)ObservableOnSubscribe的接口實(shí)現(xiàn)類(lèi):

/**
 * A functional interface that has a {@code subscribe()} method that receives
 * an instance of an {@link ObservableEmitter} instance that allows pushing
 * events in a cancellation-safe manner.
 *
 * @param <T> the value type pushed
 */
public interface ObservableOnSubscribe<T> {

    /**
     * Called for each Observer that subscribes.
     * @param e the safe emitter instance, never null
     * @throws Exception on error
     */
    void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;
}復(fù)制代碼

通過(guò)注釋可以知道這個(gè)接口的作用是通過(guò)一個(gè)subscribe方法接受一個(gè)ObservableEmitter類(lèi)型的實(shí)例,俗稱(chēng)發(fā)射器。 Observable.create方法執(zhí)行時(shí),我們傳入的就是一個(gè)ObservableOnSubscribe類(lèi)型的匿名內(nèi)部類(lèi),并實(shí)現(xiàn)了它的subscribe方法,然后它又被傳入create方法的返回對(duì)象ObservableCreate,最終成為ObservableCreate的成員source

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
    ...復(fù)制代碼

接著我們來(lái)看Observable的subscribe方法,它的入?yún)⑹且粋€(gè)Observer(即觀(guān)察者,也就是事件接收者)

@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
   ObjectHelper.requireNonNull(observer, "observer is null");
   try {
       observer = RxJavaPlugins.onSubscribe(this, observer);

       ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

       subscribeActual(observer);
   } catch (NullPointerException e) { // NOPMD
       throw e;
   } catch (Throwable e) {
       Exceptions.throwIfFatal(e);
       // can't call onError because no way to know if a Disposable has been set or not
       // can't call onSubscribe because the call might have set a Subscription already
       RxJavaPlugins.onError(e);

       NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
       npe.initCause(e);
       throw npe;
   }
}復(fù)制代碼

最終它會(huì)調(diào)用它的子類(lèi)ObservableCreate的subscribeActual方法:

@Override
protected void subscribeActual(Observer<? super T> observer) {
   CreateEmitter<T> parent = new CreateEmitter<T>(observer);
   observer.onSubscribe(parent);

   try {
       source.subscribe(parent);
   } catch (Throwable ex) {
       Exceptions.throwIfFatal(ex);
       parent.onError(ex);
   }
}復(fù)制代碼

在subscribeActual里首先創(chuàng)建了用于發(fā)射事件的CreateEmitter對(duì)象parent,CreateEmitter實(shí)現(xiàn)了接口Emitter和Disposable,并持有observer。 這段代碼的關(guān)鍵語(yǔ)句是source.subscribe(parent),這行代碼執(zhí)行后,就會(huì)觸發(fā)事件源進(jìn)行發(fā)射事件,即e.onNext("s")會(huì)被調(diào)用。細(xì)心的同學(xué)也會(huì)注意到這行代碼之前,parent先被傳入了observer的onSubscribe()方法,而在上面我們說(shuō)過(guò),observer的onSubscribe()方法接受一個(gè)Disposable類(lèi)型的參數(shù),可以用于解除訂閱,之所以能夠解除訂閱,正是因?yàn)樵谟|發(fā)事件發(fā)射之前調(diào)用了observer的onSubscribe(),給了我們調(diào)用CreateEmitter的解除訂閱的方法dispose()的機(jī)會(huì)。 繼續(xù)來(lái)看CreateEmitter的onNext()方法,它最終是通過(guò)調(diào)用observer的onNext()方法將事件發(fā)射出去的

static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {


   private static final long serialVersionUID = -3434801548987643227L;

   final Observer<? super T> observer;

   CreateEmitter(Observer<? super T> observer) {
       this.observer = observer;
   }

   @Override
   public void onNext(T t) {
       if (t == null) {
           onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
           return;
       }
       // 在真正發(fā)射之前,會(huì)先判斷該CreateEmitter是否已經(jīng)解除訂閱
       if (!isDisposed()) {
           observer.onNext(t);
       }
   }
   ...
}復(fù)制代碼

至此,Rx事件源的創(chuàng)建和訂閱的流程就走通了。

下面我們從map操作符來(lái)入手看一下Rx操作符的原理,map方法如下

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mApper) {
   ObjectHelper.requireNonNull(mapper, "mapper is null");
   return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}復(fù)制代碼

map方法接受一個(gè)Function類(lèi)型的參數(shù)mapper,返回了一個(gè)ObservableMap對(duì)象,它也是繼承自O(shè)bservable,而mapper被傳給了ObservableMap的成員function,同時(shí)當(dāng)前的源Observable被傳給ObservableMap的成員source,進(jìn)入ObservableMap類(lèi)

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }


    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != NONE) {
                actual.onNext(null);
                return;
            }

            U v;

            try {
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            actual.onNext(v);
        }

        @Override
        public int requestFusion(int mode) {
            return transitiveBoundaryFusion(mode);
        }

        @Nullable
        @Override
        public U poll() throws Exception {
            T t = qs.poll();
            return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
        }
    }
}復(fù)制代碼

可以看到這里用到了裝飾者模式,ObservableMap持有來(lái)自它上游的事件源source,MapObserver持有來(lái)自它下游的事件接收者和我們實(shí)現(xiàn)的轉(zhuǎn)換方法function,在subscribeActual()方法中完成ObservableMap對(duì)source的訂閱,觸發(fā)MapObserver的onNext()方法,繼而將來(lái)自source的原始數(shù)據(jù)經(jīng)過(guò)函數(shù)mapper轉(zhuǎn)換后再發(fā)射給下游的事件接收者,從而實(shí)現(xiàn)map這一功能。

現(xiàn)在我們終于能夠來(lái)總結(jié)一下包含多個(gè)操作符時(shí)的訂閱流程了,以下面這段代碼為例

Observable.
        create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                e.onNext("holen");
            }
        })
        .map(new Function<String, Integer>() {
            @Override
            public Integer apply(@NonNull String s) throws Exception {
                return s.length();
            }
        })
        .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {

            }

            @Override
            public void onNext(@NonNull Integer integer) {

            }

            @Override
            public void onError(@NonNull Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });復(fù)制代碼

執(zhí)行代碼時(shí),自上而下每一步操作符都會(huì)創(chuàng)建一個(gè)新的Observable(均為Observable的子類(lèi),對(duì)應(yīng)不同的操作符),當(dāng)執(zhí)行create時(shí),創(chuàng)建并返回了ObservableCreate,當(dāng)執(zhí)行map時(shí),創(chuàng)建并返回了ObservableMap,并且每一個(gè)新的Observable都持有它上游的源Observable(即source)及當(dāng)前涉及到的操作函數(shù)function。當(dāng)最后一步執(zhí)行訂閱方法subscribe時(shí)會(huì)觸發(fā)ObservableMap的subscribeActual()方法,并將最下游的Observer包裝成MapObserver,同時(shí)該方法又會(huì)繼續(xù)調(diào)用它所持有ObservableCreate的訂閱方法(即執(zhí)行source.subscribe),由此也會(huì)觸發(fā)ObservableCreate的subscribeActual()方法,此時(shí)我們的發(fā)射器CreateEmitter才會(huì)調(diào)用它的onNext()方法發(fā)射事件,再依次調(diào)用MapObserver的操作函數(shù)mapper和onNext()方法,最終將事件傳遞給了最下游的Observer的onNext()方法。

我簡(jiǎn)單的將這段邏輯用下面這幅圖來(lái)表示

 

 

 

操作符舉例

1. map

 

 

map

可以看到,這幅圖表達(dá)的意思是一個(gè)源先后發(fā)射了1、2、3的三個(gè)item,而經(jīng)過(guò)操作符一轉(zhuǎn)換,就變成了一個(gè)發(fā)射了10、20、30三個(gè)item的新的。描述操作符的長(zhǎng)方框中也清楚的說(shuō)明了該操作符進(jìn)行了何種具體的轉(zhuǎn)換操作(圖中的10*x只是一個(gè)例子,這個(gè)具體的轉(zhuǎn)換函數(shù)是可以自定義的)。于是,我們就很快速地理解了操作符的含義和用法,簡(jiǎn)單來(lái)講,它就是通過(guò)一個(gè)函數(shù)將一個(gè)發(fā)射的item逐個(gè)進(jìn)行某種轉(zhuǎn)換。示例代碼:

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
    }
}).map(new Function<Integer, Integer>() {
    @Override
    public Integer apply(@NonNull Integer integer) throws Exception {
        return integer * 10;
    }
}).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(@NonNull Integer result) throws Exception {
        Log.i(TAG, "accept : " + result +"n" );
    }
});復(fù)制代碼

輸出結(jié)果:

 

 

 

2. zip

 

 

根據(jù)的寶石圖,可以知道zip操作符的作用是把多個(gè)源發(fā)射的item通過(guò)特定函數(shù)組合在一起,然后發(fā)射組合后的item。從圖中還可以看到一個(gè)重要的信息是,最終發(fā)射的item是對(duì)上面的兩個(gè)源發(fā)射的item按照發(fā)射順序逐個(gè)組合的結(jié)果,而且最終發(fā)射的等item的發(fā)射時(shí)間是由組合它的和等item中發(fā)射時(shí)間較晚的那個(gè)item決定的,也正是如此,操作符經(jīng)常可以用在需要同時(shí)組合處理多個(gè)網(wǎng)絡(luò)請(qǐng)求的結(jié)果的業(yè)務(wù)場(chǎng)景中。示例代碼:

Observable.zip(Observable.just(1, 2, 3),
        Observable.just("A", "B", "C"),
        new BiFunction<Integer, String, String>() {
            @Override
            public String apply(@NonNull Integer integer, @NonNull String s) throws Exception {
                return integer + s;
            }
        })
        .subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                Log.i(TAG, "zip : accept : " + s + "n");
            }
        });復(fù)制代碼

輸出結(jié)果:

 

3. concat

 

 

從寶石圖可以看出,操作符的作用就是將兩個(gè)源發(fā)射的item連接在一起發(fā)射出來(lái)。這里的連接指的是整體連接,被操作后產(chǎn)生的會(huì)先發(fā)射第一個(gè)源的所有item,然后緊接著再發(fā)射第二個(gè)源的所有的item。示例代碼:

Observable.concat(Observable.just(1, 2, 3), Observable.just(4, 5, 6))
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer integer) throws Exception {
                Log.i(TAG, "concat : " + integer + "n");
            }
        });復(fù)制代碼

輸出結(jié)果:

 

4.flatMap

  @SchedulerSupport(SchedulerSupport.NONE)
    public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
            boolean delayErrors, int maxConcurrency, int bufferSize) {
     ...
        return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));

看看ObservableFlatMap代碼

 public ObservableFlatMap(ObservableSource<T> source,
            Function<? super T, ? extends ObservableSource<? extends U>> mapper,
            boolean delayErrors, int maxConcurrency, int bufferSize) {
        super(source);
        this.mapper = mapper;
        this.delayErrors = delayErrors;
        this.maxConcurrency = maxConcurrency;
        this.bufferSize = bufferSize;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {

        if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
            return;
        }

        source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
    }

是不是和MAP超級(jí)像,我們這幾看MergeObserver onNext做了什么

@Override
        public void onNext(T t) {
             ...
               p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");

            ...

            subscribeInner(p);
        }
         @SuppressWarnings("unchecked")
        void subscribeInner(ObservableSource<? extends U> p) {
            for (;;) {
                if (p instanceof Callable) {
                  
                } else {
                    InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
                    addInner(inner);
                    p.subscribe(inner);
                    break;
                }
            }
        }

省略了很多代碼,我們看主要邏輯,獲取到flatMap生成的observableSource,然后 p.subscribe(inner);注意這里的P不是observable 看innerObserver的onNext做了什么

    //這里的onNext事件由 p.subscribe(inner)觸發(fā)
  @Override
        public void onNext(U t) {
            if (fusionMode == QueueDisposable.NONE) {
                parent.tryEmit(t, this);
            } else {
                parent.drain();
            }
        }
        
        void tryEmit(U value, InnerObserver<T, U> inner) {
            if (get() == 0 && compareAndSet(0, 1)) {
                actual.onNext(value);
                if (decrementAndGet() == 0) {
                    return;
                }
            } else {
                SimpleQueue<U> q = inner.queue;
                if (q == null) {
                    q = new SpscLinkedArrayQueue<U>(bufferSize);
                    inner.queue = q;
                }
                q.offer(value);
                if (getAndIncrement() != 0) {
                    return;
                }
            }
            drainLoop();
        }

在這里我們終于看到我們定義的observer接收到了onNext事件

Rx操作符的應(yīng)用場(chǎng)景

說(shuō)了這么多,其實(shí)我們最關(guān)心的還是Rx操作符的應(yīng)用場(chǎng)景。其實(shí)只要存在異步的地方,都可以?xún)?yōu)雅地使用Rx操作符。比如很多流行的Rx周邊開(kāi)源項(xiàng)目

 

分享到:
標(biāo)簽:RxJava
用戶(hù)無(wú)頭像

網(wǎng)友整理

注冊(cè)時(shí)間:

網(wǎng)站:5 個(gè)   小程序:0 個(gè)  文章:12 篇

  • 51998

    網(wǎng)站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會(huì)員

趕快注冊(cè)賬號(hào),推廣您的網(wǎng)站吧!
最新入駐小程序

數(shù)獨(dú)大挑戰(zhàn)2018-06-03

數(shù)獨(dú)一種數(shù)學(xué)游戲,玩家需要根據(jù)9

答題星2018-06-03

您可以通過(guò)答題星輕松地創(chuàng)建試卷

全階人生考試2018-06-03

各種考試題,題庫(kù),初中,高中,大學(xué)四六

運(yùn)動(dòng)步數(shù)有氧達(dá)人2018-06-03

記錄運(yùn)動(dòng)步數(shù),積累氧氣值。還可偷

每日養(yǎng)生app2018-06-03

每日養(yǎng)生,天天健康

體育訓(xùn)練成績(jī)?cè)u(píng)定2018-06-03

通用課目體育訓(xùn)練成績(jī)?cè)u(píng)定