RxJava调度器用例

在RxJava中,有5种不同的调度程序可供选择:

  1. immediate():创建并返回一个Scheduler,在当前线程上立即执行工作。

  2. trampoline():创建并返回一个Scheduler,该Scheduler在当前线程上排队工作,在当前工作完成后执行。

  3. newThread():创建并返回一个调度器,为每个工作单元创建一个新线程。

  4. computation():创建并返回用于计算工作的调度器。这可以用于事件循环、处理回调和其他计算工作。不要在此调度程序上执行io绑定的工作。使用Schedulers.io()代替。

  5. io ():创建并返回用于io绑定工作的调度器。 该实现由Executor线程池支持,该线程池将根据需要增长。这可以用于异步执行阻塞IO。不要在此调度程序上执行计算工作。使用调度器。李计算()。< / p > < / >

问题:

前3个调度器是不言自明的;然而,我对计算io有点困惑。

  1. 什么是“IO-bound work”?它是用于处理流(java.io)和文件(java.nio.files)吗?它用于数据库查询吗?它是用于下载文件还是访问REST api ?
  2. 计算()newThread ()有什么不同?是不是所有的计算()调用都在单个(后台)线程上,而不是每次都在一个新的(后台)线程上?
  3. 为什么在做IO工作时调用计算()是不好的?
  4. 为什么在做计算工作时调用io ()是不好的?
28461 次浏览

问得好,我认为文档可以提供更多细节。

  1. io()是由一个无界线程池支持的,是那种你会用于非计算密集型任务的东西,那是不会给CPU带来太多负载的东西。与文件系统的交互,与不同主机上的数据库或服务的交互都是很好的例子。
  2. computation()由一个有限的线程池支持,其大小等于可用处理器的数量。如果你试图在多个可用处理器上并行调度CPU密集型工作(比如使用newThread()),那么当线程争夺处理器时,你就会面临线程创建开销和上下文切换开销,这可能会对性能造成很大的影响。
  3. 最好将computation()留给CPU密集型工作,否则你将得不到良好的CPU利用率。
  4. 由于2中讨论的原因,调用io()进行计算工作是不好的。io()是无界的,如果你在io()上并行调度一千个计算任务,那么这一千个任务中的每一个都有自己的线程,并竞争CPU,导致上下文切换成本。

最重要的一点是,与问题中提到的其他线程池不同,Schedulers.ioSchedulers.computation都是由无界线程池支持的。这个特性只在Schedulers.from(执行器)使用newCachedThreadPool创建遗嘱执行人的情况下共享(没有自动回收线程池)。

正如之前的回复和网络上的多篇文章中充分解释的那样,Schedulers.ioSchedulers.computation应使用仔细地进行了优化,因为它们是针对其名称中的工作类型进行优化的。但是,在我看来,它们最重要的作用是为响应式流提供真正的并发性

与新来者的看法相反,响应流本质上不是并发的,而是异步的和顺序的。因此,Schedulers.io应使用只有在I/O操作阻塞时(例如:使用阻塞命令如Apache IOUtils FileUtils.readFileAsString(…))才会冻结调用线程,直到操作完成。

使用异步方法(如Java AsynchronousFileChannel(…))不会在操作期间阻塞调用线程,因此没有必要使用单独的线程。事实上,Schedulers.io线程并不适合异步操作,因为它们不会运行事件循环,而且回调永远不会…被称为。

同样的逻辑也适用于数据库访问或远程API调用。如果可以使用异步或响应式API进行调用,则不要使用Schedulers.io

回到并发性。您可能无法访问异步或响应式API来异步或并发地执行I/O操作,因此惟一的选择是在单独的线程上分派多个调用。唉,反应性流的末端是顺序的,但好消息是flatMap()操作符可以在其核心引入并发性

并发性必须在流构造中构建,通常使用flatMap ()操作符。这个功能强大的操作符可以配置为在内部为你的flatMap ()嵌入式Function<T, R>提供一个多线程上下文。该上下文由多线程调度器(例如Scheduler.ioScheduler.computation)提供。

在RxJava2 < em >调度器< / em >并发性上的文章中找到更多细节,在那里你会找到关于如何顺序和并发地使用调度器的代码示例和详细解释。

希望这能有所帮助,

Softjake

这篇博文提供了一个很好的答案

文章中写道:

Schedulers.io ()由一个无界线程池支持。它用于非cpu密集型I/O类型的工作,包括与文件系统的交互、执行网络调用、数据库交互等。此线程池用于异步执行阻塞IO。

Schedulers.computation ()由一个有限的线程池支持,其大小为可用处理器的数量。它用于计算或cpu密集型工作,如调整图像大小,处理大型数据集等。注意:当分配的计算线程多于可用内核时,由于上下文切换和线程创建开销,性能会下降,因为线程会争夺处理器时间。

Schedulers.newThread ()为每个计划的工作单元创建一个新线程。这个调度程序的开销很大,因为每次都要生成新线程,而且不会发生重用。

Schedulers.from(执行人执行器)创建并返回指定执行程序支持的自定义调度器。要限制线程池中并发线程的数量,请使用Scheduler.from(Executors.newFixedThreadPool(n))。这保证了如果一个任务在所有线程都被占用时被调度,它将被排队。在显式关闭池之前,池中的线程将一直存在。

主线程或AndroidSchedulers.mainThread ()是由RxAndroid扩展库提供给RxJava的。主线程(也称为UI线程)是用户交互发生的地方。应该注意不要重载这个线程,以防止不响应的UI或更糟糕的应用程序不响应(ANR)对话框。

Schedulers.single ()是RxJava 2中的新功能。此调度器由单个线程支持,按请求的顺序依次执行任务。

Schedulers.trampoline ()由一个参与的工作线程以FIFO(先入先出)方式执行任务。它通常在实现递归时使用,以避免增加调用堆栈。