观察者和订阅者之间的区别是什么?

我正试图破译以下函数:

Subscription getCar(id, Observer<Car> observer) {
return getCarDetails(id, new Observer<CarDetails> {
@Override
onNext(CarDetails details) {
observer.onNext(details.getCar());
} });
}

我从 http://blog.danlew.net/2014/09/15/grokking-rxjava-part-1/中得到了 rxjava 的一个很好的介绍,但是它只是顺便提到了 Observer,说您将在大多数时间使用 Subscriber 来处理从 Observer 发出的消费项目。

谁能给我解释一下

  1. 什么是观察者?
  2. 观察者和订阅者有什么不同?
  3. 上面的代码片段是做什么的?

Javadoc 让它看起来就像一个订阅者。用于订阅者的 javadoc 说明它实现了观察者和订阅。我很困惑。

37982 次浏览

EDITED: with @Alrid's comment

tl;dr

public abstract class Subscriber<T> implements Observer<T>, Subscription

So a Subscriber is an implementation of the Observer, with additional semantics on subscription (it's more about un-subscription). The code in your question just shows that it passes the Observer interface, instead of the implementation (usual programming practice).

Also this code returns a Subscription, that may be because the author of this code thought that the client should only have access to Subscription methods, without access to elements produced by the observable. That may be a programmer error.

long story

Really you should read the content of this website (or book) : http://www.introtorx.com It is about Rx.Net, but the concepts are the very same, they were created by Erik Meijer and RxJava implementors followed them (if applicable to the Java language).

This page will interest you (it is the second chapter) : KeyTypes

Here you'll read in the first paragraphs :

There are two key types to understand when working with Rx, and a subset of auxiliary types that will help you to learn Rx more effectively. The IObserver and IObservable form the fundamental building blocks for Rx, while implementations of ISubject reduce the learning curve for developers new to Rx.

...

Essentially Rx is built upon the foundations of the Observer pattern. .NET already exposes some other ways to implement the Observer pattern such as multicast delegates or events (which are usually multicast delegates).

Even if types / API are a bit different, you will learn a lot with this book, probably way more than with some blogs.

What this book do not say (...because it is in the RxJava implementation)

RxJava main developer at this time introduced a slight variation (see PR #792) that allowed to distinguish two types of contracts :

  • notification -> Observer
  • (un)subscription -> Subscription

This change allowed to better express/split these concerns of the implementing classes of the RxJava library.

However as a library user, using actual implementations of the RxJava library should be good enough.

Implementing a subscriber require much more knowledge, work and care, indeed the subscription semantics are very important depending on the type of the source observable (Hot or cold? Expensive to create ?)


Exposing Subscriber rather than Observer in cases such as above will not interfere with the code in in most cases, but it is not the intended use for it unless those un-subscription semantics are needed. But in the end implementing a Subscriber, and may involve to fall in some pitfalls such as :

  1. spend resources for functionality you will not use
  2. cannot inherit from another class
  3. write incorrect un-subscription code
  4. copy/paste code an incorrect code or correct code written for a different context

(Edit: This is apparently only true of RxJava 1.)

  1. An Observer is an object that can get data from a data source (an Observable). The data source pushes data to it by calling the observer's onNext().

  2. A Subscriber is an Observer that can also unsubscribe from that data source (through the Subscription interface).

  3. The getCar() function is trying to return cars, but there's no direct method to do that. But there is a function to get car details (getCarDetails()) which will call an observer with all the car details. So it calls that function and passes it an observer that, when it gets data, will fetch the car data from the details and pass it on to its own observer.

In RxJava 2 org.reactivestreams.Subscriber is an interface complying to Reactive Streams specification.

The main difference from Observable is that new Subscriber supports backpressure.

Observer is subscribed to Observable, and Subscriber is subscribed to Flowable (implements org.reactivestreams.Publisher).

See detailed description here.

Also in RxJava2, if you want to be able to unsubscribe, you should use ResourceObserver for Observable and ResourceSubscriber for Flowable.

Check this question