300字范文,内容丰富有趣,生活中的好帮手!
300字范文 > RxJava详解(基于2.X版本的功能操作符)

RxJava详解(基于2.X版本的功能操作符)

时间:2022-06-30 06:52:25

相关推荐

RxJava详解(基于2.X版本的功能操作符)

本章节讲述RxJava基于2.X版本的功能操作符

1.subscribe()

<1> 作用

订阅,即连接观察者 & 被观察者。

<2> 代码&结果

/weixin_37730482/article/details/69280013

有多个重载的方法。

2.Observable.subscribeOn() &Observable.observeOn()

<1> 作用

指定被观察者&观察者所在的线程。

<2> 代码&结果

/weixin_37730482/article/details/74460807

3.delay()

<1> 作用

使得被观察者延迟一段时间再发送事件。

<2> 代码

package com.example.rxjava20;import android.os.Bundle;import android.util.Log;import androidx.appcompat.app.AppCompatActivity;import java.util.concurrent.TimeUnit;import io.reactivex.Observable;import io.reactivex.ObservableEmitter;import io.reactivex.ObservableOnSubscribe;import io.reactivex.Observer;import io.reactivex.disposables.Disposable;public class MainActivity extends AppCompatActivity {private Disposable disposable;@Overrideprotected void onCreate(Bundle savedInstanceState) {super.onCreate(savedInstanceState);setContentView(R.layout.activity_main);method();}/*** 创建 RxJava delay功能操作符*/public void method() {Observable.create(new ObservableOnSubscribe<Object>() {@Overridepublic void subscribe(ObservableEmitter<Object> e) throws Exception {if (null == e) {return;}if (!e.isDisposed()) {e.onNext("被观察者发送数据");Log.d("TAG", "被观察者发送数据 开始发送数据...");}}}).delay(5, TimeUnit.SECONDS)//被观察者延时5秒发送数据.subscribe(new Observer<Object>() {@Overridepublic void onSubscribe(Disposable d) {if (null == d) {return;}disposable = d;Log.d("TAG", "观察者 onSubscribe 方法 是否断开连接----:" + disposable.isDisposed());}@Overridepublic void onNext(Object value) {Log.d("TAG", "观察者 onNext 方法 value.toString()----:" + value.toString());}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {}});}/*** onDestroy方法*/@Overrideprotected void onDestroy() {super.onDestroy();if (null != disposable) {Log.d("TAG", "onDestroy方法 执行时是否断开----:" + disposable.isDisposed());if (!disposable.isDisposed()) {//没有断开disposable.dispose();//断开Log.d("TAG", "onDestroy方法 断开订阅");}}}}

<3> 结果

D/TAG: 观察者 onSubscribe 方法 是否断开连接----:falseD/TAG: 被观察者发送数据 开始发送数据...//5秒后打印D/TAG: 观察者 onNext 方法 value.toString()----:被观察者发送数据

<4> 关闭页面

D/TAG: onDestroy方法 执行时是否断开----:falseD/TAG: onDestroy方法 断开订阅

<5> delay重载方法

delay(long delay,TimeUnit unit)参数1:时间参数2:时间单位delay(long delay,TimeUnit unit,mScheduler scheduler)参数1:时间参数2:时间单位参数3:线程调度器delay(long delay,TimeUnit unit,boolean delayError)指定延迟时间 & 错误延迟 错误延迟,即:若存在Error事件,则如常执行,执行后再抛出错误异常。参数1:时间参数2:时间单位参数3:错误延迟参数delay(long delay,TimeUnit unit,mScheduler scheduler,boolean delayError): 指定延迟多长时间并添加调度器,错误通知可以设置是否延迟。参数1:时间参数2:时间单位参数3:线程调度器参数4:错误延迟参数

4.doXXX()

<1> 作用

在某个事件的生命周期中调用。如发送事件前的初始化、发送事件后的回调请求等等。

具体方法如下

doOnSubscribe():观察者订阅时调用。doOnEach():被观察者调用一次执行一次。doOnNext():onNext方法调用前调用。doAfterNext():onNext方法调用后调用。doOnComplete():onComplete方法执行时调用。doOnError():onError方法执行时调用。doOnTerminate():正常发送完成&异常完成都执行。doFinally():最后执行。doAfterTerminate():真正的最后执行。

<2> 代码

package com.wjn.rxdemo.rxjava;import android.os.Bundle;import android.util.Log;import androidx.appcompat.app.AppCompatActivity;import com.wjn.rxdemo.R;import io.reactivex.Notification;import io.reactivex.Observable;import io.reactivex.ObservableEmitter;import io.reactivex.ObservableOnSubscribe;import io.reactivex.Observer;import io.reactivex.disposables.Disposable;import io.reactivex.functions.Action;import io.reactivex.functions.Consumer;public class RxJavaActivity extends AppCompatActivity {@Overrideprotected void onCreate(Bundle savedInstanceState) {super.onCreate(savedInstanceState);setContentView(R.layout.activity_rxjava);method();}/*** 创建 RxJava doXXX功能操作符*/public void method() {Observable.create(new ObservableOnSubscribe<Object>() {@Overridepublic void subscribe(ObservableEmitter<Object> e) throws Exception {if (null == e) {return;}if (!e.isDisposed()) {e.onNext(1);e.onNext(new RuntimeException());e.onComplete();}}}).doOnSubscribe(new Consumer<Disposable>() {@Overridepublic void accept(Disposable disposable) {Log.d("TAG", "生命周期 doOnSubscribe方法 执行...----:" + disposable.isDisposed());}}).doOnEach(new Consumer<Notification<Object>>() {@Overridepublic void accept(Notification<Object> objectNotification) {Log.d("TAG", "生命周期 doOnEach方法 执行...----:" + objectNotification.toString());}}).doOnNext(new Consumer<Object>() {@Overridepublic void accept(Object o) {Log.d("TAG", "生命周期 doOnNext方法 执行...----:" + o.toString());}}).doAfterNext(new Consumer<Object>() {@Overridepublic void accept(Object o) {Log.d("TAG", "生命周期 doAfterNext方法 执行...----:" + o.toString());}}).doOnComplete(new Action() {@Overridepublic void run() {Log.d("TAG", "生命周期 doOnComplete方法 执行...");}}).doOnError(new Consumer<Throwable>() {@Overridepublic void accept(Throwable throwable) {Log.d("TAG", "生命周期 doOnError方法 执行...----:" + throwable.toString());}}).doOnTerminate(new Action() {@Overridepublic void run() throws Exception {Log.d("TAG", "生命周期 doOnTerminate方法 执行...");}}).doAfterTerminate(new Action() {@Overridepublic void run() {Log.d("TAG", "生命周期 doAfterTerminate方法 执行...");}}).doFinally(new Action() {@Overridepublic void run() {Log.d("TAG", "生命周期 doFinally方法 执行...");}}).subscribe(new Observer<Object>() {@Overridepublic void onSubscribe(Disposable d) {Log.d("TAG", "观察者 onSubscribe方法 执行...");}@Overridepublic void onNext(Object value) {Log.d("TAG", "观察者 onNext 方法执行...value.toString()----:" + value.toString());}@Overridepublic void onError(Throwable e) {Log.d("TAG", "观察者 onError 方法执行...e.toString()----:" + e.toString());}@Overridepublic void onComplete() {Log.d("TAG", "观察者 onComplete方法 执行...");}});}}

<3> 结果

被观察者 执行 e.onNext(1); 即发生第一次正确的onNext方法D/TAG: 生命周期 doOnSubscribe方法 执行...----:falseD/TAG: 观察者 onSubscribe方法 执行...D/TAG: 生命周期 doOnEach方法 执行...----:OnNextNotification[1]D/TAG: 生命周期 doOnNext方法 执行...----:1D/TAG: 观察者 onNext 方法执行...value.toString()----:1D/TAG: 生命周期 doAfterNext方法 执行...----:1**************************************************************************被观察者 执行 e.onNext(new RuntimeException()); 即发生第二次错误的onNext方法D/TAG: 生命周期 doOnEach方法 执行...----:OnNextNotification[java.lang.RuntimeException]D/TAG: 生命周期 doOnNext方法 执行...----:java.lang.RuntimeExceptionD/TAG: 观察者 onNext 方法执行...value.toString()----:java.lang.RuntimeExceptionD/TAG: 生命周期 doAfterNext方法 执行...----:java.lang.RuntimeException*****************************************************************************被观察者 执行 e.onComplete(); 即发生最后接收发生数据D/TAG: 生命周期 doOnEach方法 执行...----:OnCompleteNotificationD/TAG: 生命周期 doOnComplete方法 执行...D/TAG: 生命周期 doOnTerminate方法 执行...D/TAG: 观察者 onComplete方法 执行...D/TAG: 生命周期 doFinally方法 执行...D/TAG: 生命周期 doAfterTerminate方法 执行...

<4> 上述代码修改create操作符,只发送onComplete。

if (!e.isDisposed()) {e.onComplete();}

结果

D/TAG: 生命周期 doOnSubscribe方法 执行...----:falseD/TAG: 观察者 onSubscribe方法 执行...D/TAG: 生命周期 doOnEach方法 执行...----:OnCompleteNotificationD/TAG: 生命周期 doOnComplete方法 执行...D/TAG: 生命周期 doOnTerminate方法 执行...D/TAG: 观察者 onComplete方法 执行...D/TAG: 生命周期 doFinally方法 执行...D/TAG: 生命周期 doAfterTerminate方法 执行...

<5> 上述代码修改create操作符,只发送onError。

if (!e.isDisposed()) {e.onError(new RuntimeException());}

结果

D/TAG: 生命周期 doOnSubscribe方法 执行...----:falseD/TAG: 观察者 onSubscribe方法 执行...D/TAG: 生命周期 doOnEach方法 执行...----:OnErrorNotification[java.lang.RuntimeException]D/TAG: 生命周期 doOnError方法 执行...----:java.lang.RuntimeExceptionD/TAG: 生命周期 doOnTerminate方法 执行...D/TAG: 观察者 onError 方法执行...e.toString()----:java.lang.RuntimeExceptionD/TAG: 生命周期 doFinally方法 执行...D/TAG: 生命周期 doAfterTerminate方法 执行...

<6> 上述代码修改create操作符,为empty操作符。

代码

package com.wjn.rxdemo.rxjava;import android.os.Bundle;import android.util.Log;import androidx.appcompat.app.AppCompatActivity;import com.wjn.rxdemo.R;import io.reactivex.Notification;import io.reactivex.Observable;import io.reactivex.Observer;import io.reactivex.disposables.Disposable;import io.reactivex.functions.Action;import io.reactivex.functions.Consumer;public class RxJavaActivity extends AppCompatActivity {@Overrideprotected void onCreate(Bundle savedInstanceState) {super.onCreate(savedInstanceState);setContentView(R.layout.activity_rxjava);method();}/*** 创建 RxJava doXXX功能操作符*/public void method() {Observable.empty().doOnSubscribe(new Consumer<Disposable>() {@Overridepublic void accept(Disposable disposable) {Log.d("TAG", "生命周期 doOnSubscribe方法 执行...----:" + disposable.isDisposed());}}).doOnEach(new Consumer<Notification<Object>>() {@Overridepublic void accept(Notification<Object> objectNotification) {Log.d("TAG", "生命周期 doOnEach方法 执行...----:" + objectNotification.toString());}}).doOnNext(new Consumer<Object>() {@Overridepublic void accept(Object o) {Log.d("TAG", "生命周期 doOnNext方法 执行...----:" + o.toString());}}).doAfterNext(new Consumer<Object>() {@Overridepublic void accept(Object o) {Log.d("TAG", "生命周期 doAfterNext方法 执行...----:" + o.toString());}}).doOnComplete(new Action() {@Overridepublic void run() {Log.d("TAG", "生命周期 doOnComplete方法 执行...");}}).doOnError(new Consumer<Throwable>() {@Overridepublic void accept(Throwable throwable) {Log.d("TAG", "生命周期 doOnError方法 执行...----:" + throwable.toString());}}).doOnTerminate(new Action() {@Overridepublic void run() throws Exception {Log.d("TAG", "生命周期 doOnTerminate方法 执行...");}}).doAfterTerminate(new Action() {@Overridepublic void run() {Log.d("TAG", "生命周期 doAfterTerminate方法 执行...");}}).doFinally(new Action() {@Overridepublic void run() {Log.d("TAG", "生命周期 doFinally方法 执行...");}}).subscribe(new Observer<Object>() {@Overridepublic void onSubscribe(Disposable d) {Log.d("TAG", "观察者 onSubscribe方法 执行...");}@Overridepublic void onNext(Object value) {Log.d("TAG", "观察者 onNext 方法执行...value.toString()----:" + value.toString());}@Overridepublic void onError(Throwable e) {Log.d("TAG", "观察者 onError 方法执行...e.toString()----:" + e.toString());}@Overridepublic void onComplete() {Log.d("TAG", "观察者 onComplete方法 执行...");}});}}

结果

D/TAG: 生命周期 doOnSubscribe方法 执行...----:trueD/TAG: 观察者 onSubscribe方法 执行...D/TAG: 生命周期 doOnEach方法 执行...----:OnCompleteNotificationD/TAG: 生命周期 doOnComplete方法 执行...D/TAG: 生命周期 doOnTerminate方法 执行...D/TAG: 观察者 onComplete方法 执行...D/TAG: 生命周期 doFinally方法 执行...D/TAG: 生命周期 doAfterTerminate方法 执行...

5.onErrorReturn()

<1> 作用

遇到观察者发送错误时,发送1个特殊事件 正常终止。

<2> 代码

package com.example.rxjava20;import android.os.Bundle;import android.util.Log;import androidx.appcompat.app.AppCompatActivity;import io.reactivex.Observable;import io.reactivex.ObservableEmitter;import io.reactivex.ObservableOnSubscribe;import io.reactivex.Observer;import io.reactivex.disposables.Disposable;import io.reactivex.functions.Function;public class MainActivity extends AppCompatActivity {private Disposable disposable;@Overrideprotected void onCreate(Bundle savedInstanceState) {super.onCreate(savedInstanceState);setContentView(R.layout.activity_main);method();}/*** 创建 RxJava onErrorReturn功能操作符*/public void method() {Observable.create(new ObservableOnSubscribe<Object>() {@Overridepublic void subscribe(ObservableEmitter<Object> e) throws Exception {if (null == e) {return;}if (!e.isDisposed()) {e.onNext(1);e.onNext("张三");e.onError(new Throwable());}}}).onErrorReturn(new Function<Throwable, Object>() {@Overridepublic Object apply(Throwable throwable) throws Exception {return "Code:123";}}).subscribe(new Observer<Object>() {@Overridepublic void onSubscribe(Disposable d) {if (null == d) {return;}disposable = d;Log.d("TAG", "观察者 onSubscribe 方法 是否断开连接----:" + disposable.isDisposed());}@Overridepublic void onNext(Object value) {Log.d("TAG", "观察者 onNext 方法 value.toString()----:" + value.toString());}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {}});}/*** onDestroy方法*/@Overrideprotected void onDestroy() {super.onDestroy();if (null != disposable) {Log.d("TAG", "onDestroy方法 执行时是否断开----:" + disposable.isDisposed());if (!disposable.isDisposed()) {//没有断开disposable.dispose();//断开Log.d("TAG", "onDestroy方法 断开订阅");}}}}

<3> 结果

D/TAG: 观察者 onSubscribe 方法 是否断开连接----:falseD/TAG: 观察者 onNext 方法 value.toString()----:1D/TAG: 观察者 onNext 方法 value.toString()----:张三D/TAG: 观察者 onNext 方法 value.toString()----:Code:123

<4> 关闭页面

D/TAG: onDestroy方法 执行时是否断开----:true

6.onErrorResumeNext()&onExceptionResumeNext()

<1> 作用

遇到错误时,发送1个新的Observable。

onErrorResumeNext():拦截的错误 = Throwable。

onExceptionResumeNext():拦截的错误 = Exception。

<2> 代码

package com.example.rxjava20;import android.os.Bundle;import android.util.Log;import androidx.appcompat.app.AppCompatActivity;import io.reactivex.Observable;import io.reactivex.ObservableEmitter;import io.reactivex.ObservableOnSubscribe;import io.reactivex.ObservableSource;import io.reactivex.Observer;import io.reactivex.disposables.Disposable;import io.reactivex.functions.Function;public class MainActivity extends AppCompatActivity {private Disposable disposable;@Overrideprotected void onCreate(Bundle savedInstanceState) {super.onCreate(savedInstanceState);setContentView(R.layout.activity_main);method();}/*** 创建 RxJava onErrorResumeNext功能操作符*/public void method() {Observable.create(new ObservableOnSubscribe<Object>() {@Overridepublic void subscribe(ObservableEmitter<Object> e) throws Exception {if (null == e) {return;}if (!e.isDisposed()) {e.onNext(1);e.onNext("张三");e.onError(new Throwable());}}}).onErrorResumeNext(new Function<Throwable, ObservableSource<?>>() {@Overridepublic ObservableSource<?> apply(Throwable throwable) throws Exception {return Observable.just("遇到观察者发送的错误数据 修改后的内容");}}).subscribe(new Observer<Object>() {@Overridepublic void onSubscribe(Disposable d) {if (null == d) {return;}disposable = d;Log.d("TAG", "观察者 onSubscribe 方法 是否断开连接----:" + disposable.isDisposed());}@Overridepublic void onNext(Object value) {Log.d("TAG", "观察者 onNext 方法 value.toString()----:" + value.toString());}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {}});}/*** onDestroy方法*/@Overrideprotected void onDestroy() {super.onDestroy();if (null != disposable) {Log.d("TAG", "onDestroy方法 执行时是否断开----:" + disposable.isDisposed());if (!disposable.isDisposed()) {//没有断开disposable.dispose();//断开Log.d("TAG", "onDestroy方法 断开订阅");}}}}

<3> 结果

D/TAG: 观察者 onSubscribe 方法 是否断开连接----:falseD/TAG: 观察者 onNext 方法 value.toString()----:1D/TAG: 观察者 onNext 方法 value.toString()----:张三D/TAG: 观察者 onNext 方法 value.toString()----:遇到观察者发送的错误数据 修改后的内容

7.retry()

<1> 作用

重试,即当出现错误时,让被观察者(Observable)重新发射数据。Throwable 和 Exception都可拦截。

retry():出现错误时,让被观察者重新发送数据。若一直错误,则一直重新发送。如果不出现错误,不重复执行。

retry(long time):出现错误时,让被观察者重新发送数据。可设置重试次数。如果不出现错误,不重复执行。

retry(long times, Predicate<? super Throwable> predicate) 出现错误后,判断是否需要重新发送数据 如果需要 重试times次。如果不出现错误,不重复执行。

<2> 代码

package com.example.rxjava20;import android.os.Bundle;import android.util.Log;import androidx.appcompat.app.AppCompatActivity;import io.reactivex.Observable;import io.reactivex.ObservableEmitter;import io.reactivex.ObservableOnSubscribe;import io.reactivex.Observer;import io.reactivex.disposables.Disposable;public class MainActivity extends AppCompatActivity {private Disposable disposable;@Overrideprotected void onCreate(Bundle savedInstanceState) {super.onCreate(savedInstanceState);setContentView(R.layout.activity_main);method();}/*** 创建 RxJava retry功能操作符*/public void method() {Observable.create(new ObservableOnSubscribe<Object>() {@Overridepublic void subscribe(ObservableEmitter<Object> e) throws Exception {if (null == e) {return;}if (!e.isDisposed()) {e.onNext(1);e.onNext("张三");e.onError(new Throwable());}}}).retry(2)//重试两次.subscribe(new Observer<Object>() {@Overridepublic void onSubscribe(Disposable d) {if (null == d) {return;}disposable = d;Log.d("TAG", "观察者 onSubscribe 方法 是否断开连接----:" + disposable.isDisposed());}@Overridepublic void onNext(Object value) {Log.d("TAG", "观察者 onNext 方法 value.toString()----:" + value.toString());}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {}});}/*** onDestroy方法*/@Overrideprotected void onDestroy() {super.onDestroy();if (null != disposable) {Log.d("TAG", "onDestroy方法 执行时是否断开----:" + disposable.isDisposed());if (!disposable.isDisposed()) {//没有断开disposable.dispose();//断开Log.d("TAG", "onDestroy方法 断开订阅");}}}}

<3> 结果

D/TAG: 观察者 onSubscribe 方法 是否断开连接----:falseD/TAG: 观察者 onNext 方法 value.toString()----:1D/TAG: 观察者 onNext 方法 value.toString()----:张三D/TAG: 观察者 onNext 方法 value.toString()----:1D/TAG: 观察者 onNext 方法 value.toString()----:张三D/TAG: 观察者 onNext 方法 value.toString()----:1D/TAG: 观察者 onNext 方法 value.toString()----:张三

重试2次,一共执行3次。

<4> 关闭页面

D/TAG: onDestroy方法 执行时是否断开----:falseD/TAG: onDestroy方法 断开订阅

8.retryUntil()

<1> 作用

出现错误后,判断是否需要重新发送数据。类似retry(long times, Predicate<? super Throwable> predicate) 出现错误后,判断是否需要重新发送数据 如果需要 重试times次。

<2> 代码

package com.example.rxjava20;import android.os.Bundle;import android.util.Log;import androidx.appcompat.app.AppCompatActivity;import io.reactivex.Observable;import io.reactivex.ObservableEmitter;import io.reactivex.ObservableOnSubscribe;import io.reactivex.Observer;import io.reactivex.disposables.Disposable;import io.reactivex.functions.BooleanSupplier;public class MainActivity extends AppCompatActivity {private Disposable disposable;@Overrideprotected void onCreate(Bundle savedInstanceState) {super.onCreate(savedInstanceState);setContentView(R.layout.activity_main);method();}/*** 创建 RxJava Map变换符*/public void method() {Observable.create(new ObservableOnSubscribe<Object>() {@Overridepublic void subscribe(ObservableEmitter<Object> e) throws Exception {if (null == e) {return;}if (!e.isDisposed()) {e.onNext(1);e.onNext("张三");e.onError(new Throwable());}}}).retryUntil(new BooleanSupplier() {@Overridepublic boolean getAsBoolean() throws Exception {return false;//一直重试}}).subscribe(new Observer<Object>() {@Overridepublic void onSubscribe(Disposable d) {if (null == d) {return;}disposable = d;Log.d("TAG", "观察者 onSubscribe 方法 是否断开连接----:" + disposable.isDisposed());}@Overridepublic void onNext(Object value) {Log.d("TAG", "观察者 onNext 方法 value.toString()----:" + value.toString());}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {}});}/*** onDestroy方法*/@Overrideprotected void onDestroy() {super.onDestroy();if (null != disposable) {Log.d("TAG", "onDestroy方法 执行时是否断开----:" + disposable.isDisposed());if (!disposable.isDisposed()) {//没有断开disposable.dispose();//断开Log.d("TAG", "onDestroy方法 断开订阅");}}}}

<3> 结果

return false;//一直重试return true;//不重试

9.retryWhen()

<1> 作用

遇到错误时,将发生的错误传递给一个新的被观察者。并决定是否需要重新订阅原始被观察者。

<2> 代码

<3> 结果

10.repeat()

<1> 作用

重复不断地发送被观察者事件。

repeat():重复不断地发送被观察者事件。

repeat(long times):重复 times次发送被观察者事件。

<2> 代码

package com.example.rxjava20;import android.os.Bundle;import android.util.Log;import androidx.appcompat.app.AppCompatActivity;import io.reactivex.Observable;import io.reactivex.ObservableEmitter;import io.reactivex.ObservableOnSubscribe;import io.reactivex.Observer;import io.reactivex.disposables.Disposable;public class MainActivity extends AppCompatActivity {private Disposable disposable;@Overrideprotected void onCreate(Bundle savedInstanceState) {super.onCreate(savedInstanceState);setContentView(R.layout.activity_main);method();}/*** 创建 RxJava repeat功能操作符*/public void method() {Observable.create(new ObservableOnSubscribe<Object>() {@Overridepublic void subscribe(ObservableEmitter<Object> e) throws Exception {if (null == e) {return;}if (!e.isDisposed()) {e.onNext(1);e.onNext("张三");e.onComplete();}}}).repeat(2)//重复两次.subscribe(new Observer<Object>() {@Overridepublic void onSubscribe(Disposable d) {if (null == d) {return;}disposable = d;Log.d("TAG", "观察者 onSubscribe 方法 是否断开连接----:" + disposable.isDisposed());}@Overridepublic void onNext(Object value) {Log.d("TAG", "观察者 onNext 方法 value.toString()----:" + value.toString());}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {}});}/*** onDestroy方法*/@Overrideprotected void onDestroy() {super.onDestroy();if (null != disposable) {Log.d("TAG", "onDestroy方法 执行时是否断开----:" + disposable.isDisposed());if (!disposable.isDisposed()) {//没有断开disposable.dispose();//断开Log.d("TAG", "onDestroy方法 断开订阅");}}}}

<3> 结果

D/TAG: 观察者 onSubscribe 方法 是否断开连接----:falseD/TAG: 观察者 onNext 方法 value.toString()----:1D/TAG: 观察者 onNext 方法 value.toString()----:张三D/TAG: 观察者 onNext 方法 value.toString()----:1D/TAG: 观察者 onNext 方法 value.toString()----:张三

<4> 关闭页面

D/TAG: onDestroy方法 执行时是否断开----:falseD/TAG: onDestroy方法 断开订阅

11.repeatWhen()

<1> 作用

有条件地、重复发送 被观察者事件。

<2> 代码

<3> 结果

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。