300字范文,内容丰富有趣,生活中的好帮手!
300字范文 > 1.x到2.x的迁移:可观察与可观察:RxJava FAQ

1.x到2.x的迁移:可观察与可观察:RxJava FAQ

时间:2021-09-23 15:50:34

相关推荐

1.x到2.x的迁移:可观察与可观察:RxJava FAQ

标题不是错误。rx.Observable1.x的io.reactivex.Observable与2.x的io.reactivex.Observable完全不同。 盲目升级rx依赖关系并重命名项目中的所有导入将进行编译(稍作更改),但不能保证相同的行为。 在项目的早期,Observablein 1.x中没有背压的概念,但后来包含了背压。 这到底是什么意思? 假设我们有一个流,每1毫秒产生一个事件,但是处理一个这样的项目需要1。 您会发现从长远来看,它不可能以这种方式工作:

import rx.Observable; //RxJava 1.ximport rx.schedulers.Schedulers;Observable.interval(1, MILLISECONDS).observeOn(putation()).subscribe(x -> sleep(Duration.ofSeconds(1)));

MissingBackpressureException在几百毫秒内MissingBackpressureException。 但是这个异常是什么意思呢? 好吧,基本上,这是一个安全网(或如果有的话,请进行健全性检查),以防止损害应用程序。 RxJava自动发现生产者溢出了消费者,并主动终止了流以避免进一步的损害。 那么,如果我们只是在这里和那里搜索并替换少量进口商品,该怎么办?

import io.reactivex.Observable;//RxJava 2.ximport io.reactivex.schedulers.Schedulers;Observable.interval(1, MILLISECONDS).observeOn(putation()).subscribe(x -> sleep(Duration.ofSeconds(1)));

例外不见了! 我们的吞吐量也是如此……一段时间后,应用程序停止运行,一直处于无休止的GC循环中。 您会看到,RxJava 1.x中的Observable在各处都有断言(绑定队列,检查等),确保您没有在任何地方溢出。 例如,默认情况下,1.x中的observeOn()运算符的队列限制为128个元素。 当在整个堆栈上正确实现背压时,observeOn()运算符会要求上游传递不超过128个元素来填充其内部缓冲区。 然后,与此调度程序分开的线程(工作人员)从该队列中拾取事件。 当队列几乎变空时,observeOn()运算符会要求更多(request()方法)。 当生产者不遵守背压请求并发送比允许的更多的数据时,此机制就会破裂,从而使消耗者有效溢出。observeOn()运算符内的内部队列已满,而interval()运算符仍在发出新事件-因为这就是interval()要做的事情。

在1.x中Observable到的发现了这种溢出,并通过MissingBackpressureException快速失败。 字面上的意思是:我尽了最大努力使系统保持健康状态,但是我的上游不尊重背压-缺少背压实现。 但是,在2.x中Observable到的没有这种安全机制。 希望你会成为一个好公民,生产缓慢或消费快速的人。 当系统运行Observable,两个Observable的行为相同。 但是,在1.x负载下,快速失败,而2.x则缓慢而痛苦地失败。

这是否意味着RxJava 2.x向后退? 恰恰相反! 在2.x中,有一个重要的区别:

Observable不在乎背压,这极大地简化了其设计和实现。 根据定义,它应用于建模不支持背压的流,例如用户界面事件Flowable确实支持背压,并已采取所有安全措施。 换句话说,计算管道中的所有步骤都确保您不会溢出使用者。

2.x在可以支持背压的流(简单地说“如果需要可以减慢”)和不支持背压的流之间进行了重要区分。 从类型系统的角度来看,很清楚,我们正在处理哪种源及其保证。 那么我们应该如何将interval()示例迁移到RxJava 2.x? 比您想像的容易:

Flowable.interval(1, MILLISECONDS).observeOn(putation()).subscribe(x -> sleep(Duration.ofSeconds(1)));

这么简单。 您可能会问自己一个问题,为什么Flowable可以有一个interval()运算符,按照定义,它不能支持背压? 假设所有interval()均以恒定速率传递事件后,它就不会放慢速度! 好吧,如果您看一下interval()的声明,您会注意到:

@BackpressureSupport(BackpressureKind.ERROR)

简而言之,这告诉我们,每当无法再保证背压时,RxJava都会处理它并抛出MissingBackpressureException。 这正是我们运行Flowable.interval()程序时发生的事情–它快速失败,而不是破坏整个应用程序的稳定性。

因此,总结起来,每当您从1.x看到Observable时,您可能想要的是从2.x开始的Flowable。 至少,除非您的流按定义不支持背压。 尽管名称相同,但这两个主要版本中的Observable很大不同。 但是,一旦您进行搜索并从Observable替换为Flowable您将注意到迁移并不是那么简单。 这与API更改无关,区别更加深刻。

在2.x中没有直接等效于Observable.create()简单Flowable.create()。 过去我过度使用Observable.create()工厂方法是一个错误。create()允许您以任意速率发出事件,而完全忽略了背压。 2.x拥有一些友好的功能来处理背压请求,但它们需要仔细设计流。 这将在下一个常见问题解答中介绍。

翻译自: //08/1-x-2-x-migration-observable-vs-observable-rxjava-faq.html

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。
相关阅读
RxJava 2.x 入门

RxJava 2.x 入门

2020-03-04

RxJava 2.x 教程

RxJava 2.x 教程

2024-02-15

RxJava 3.x 使用总结

RxJava 3.x 使用总结

2024-06-14

JAVA RxJava 2.x

JAVA RxJava 2.x

2023-07-08