如何获得RxJS主题或可观察对象的当前值?

我有一个Angular 2服务:

import {Storage} from './storage';
import {Injectable} from 'angular2/core';
import {Subject}    from 'rxjs/Subject';


@Injectable()
export class SessionStorage extends Storage {
private _isLoggedInSource = new Subject<boolean>();
isLoggedIn = this._isLoggedInSource.asObservable();
constructor() {
super('session');
}
setIsLoggedIn(value: boolean) {
this.setItem('_isLoggedIn', value, () => {
this._isLoggedInSource.next(value);
});
}
}

一切都很好。但是我有另一个不需要订阅的组件,它只需要在某个时间点获得isLoggedIn的当前值。我该怎么做呢?

317530 次浏览

SubjectObservable没有当前值。当一个值被触发时,它被传递给订阅者,并且Observable对它完成处理。

如果你想要一个当前值,使用BehaviorSubject,它正是为此目的而设计的。BehaviorSubject保留最后发出的值,并立即将其发出给新的订阅者。

它还有一个方法getValue()来获取当前值。

我遇到过类似的情况,在Subject的价值到达后,迟来的订阅者才订阅它。

我发现ReplaySubject类似于BehaviorSubject,在这种情况下就像一个魅力。 这里有一个更好的解释链接:http://reactivex.io/rxjs/manual/overview.html#replaysubject

应该从Observable/Subject中获取值的唯一方法是订阅!

如果你正在使用getValue(),你正在做一些声明式范例中的命令。getValue()会做一些有趣的事情:如果主题已经取消订阅,它会抛出一个错误,如果主题已经死亡,它会阻止你获得一个值,因为它是错误的,等等。但是,再一次地,它是在罕见情况下的一个逃生口。

有几种方法可以以“Rx-y”方式从Subject或Observable中获取最新的值:

  1. 使用BehaviorSubject:但是实际上订阅它。当你第一次订阅BehaviorSubject时,它会同步发送它接收到的或初始化的前一个值。
  2. 使用ReplaySubject(N):这将缓存N值并将它们重放给新订阅者。
  3. A.withLatestFrom(B):当可观察对象A发出时,使用此操作符从可观察对象B中获取最新的值。将在数组[a, b]中给你两个值。
  4. A.combineLatest(B):每次AB触发时,使用此操作符从AB中获取最新的值。会在数组中给你两个值。
  5. shareReplay():通过ReplaySubject创建一个可观察对象多播,但允许你在错误时重试可观察对象。(基本上它给你承诺缓存行为)。
  6. publishReplay()publishBehavior(initialValue)multicast(subject: BehaviorSubject | ReplaySubject)等:其他利用BehaviorSubjectReplaySubject的操作符。同一事物的不同风格,它们基本上通过将所有通知汇集到一个主题来多播源可观察对象。你需要调用connect()来订阅带有主题的源。

我在子组件中遇到了同样的问题,最初它必须拥有Subject的当前值,然后订阅Subject以侦听更改。我只是在服务中维护当前值,以便组件可以访问它,例如:

import {Storage} from './storage';
import {Injectable} from 'angular2/core';
import {Subject}    from 'rxjs/Subject';


@Injectable()
export class SessionStorage extends Storage {


isLoggedIn: boolean;


private _isLoggedInSource = new Subject<boolean>();
isLoggedIn = this._isLoggedInSource.asObservable();
constructor() {
super('session');
this.currIsLoggedIn = false;
}
setIsLoggedIn(value: boolean) {
this.setItem('_isLoggedIn', value, () => {
this._isLoggedInSource.next(value);
});
this.isLoggedIn = value;
}
}

需要当前值的组件可以直接从服务中访问它,即:

sessionStorage.isLoggedIn

不确定这是否是正确的做法:)

你可以将最后一个发出的值与Observable分开存储。然后在需要的时候阅读它。

let lastValue: number;


const subscription = new Service().start();
subscription
.subscribe((data) => {
lastValue = data;
}
);

类似的答案被否决了。但我认为我可以证明我在这里提出的建议适用于有限的情况。


虽然可观察对象确实没有当前的值,但通常它会有立即可用的值。例如,对于redux / flux / akita存储,您可以根据一些可观察的数据从中央存储请求数据,这些值通常会立即可用。

如果是这种情况,那么当你subscribe时,值将立即返回。

假设你有一个对服务的调用,在调用完成时,你想从你的商店它可能不会发射中获取某物的最新值:

你可以尝试这样做(你应该尽可能地把事情“放在管道里”):

 serviceCallResponse$.pipe(withLatestFrom(store$.select(x => x.customer)))
.subscribe(([ serviceCallResponse, customer] => {


// we have serviceCallResponse and customer
});

这样做的问题是它会阻塞,直到第二个可观察对象发出一个值,而这可能永远不会发生。

我发现自己最近需要评估一个可观察对象只有当值立即可用时,更重要的是,我需要能够检测,如果它不是。我最后是这样做的:

 serviceCallResponse$.pipe()
.subscribe(serviceCallResponse => {


// immediately try to subscribe to get the 'available' value
// note: immediately unsubscribe afterward to 'cancel' if needed
let customer = undefined;


// whatever the secondary observable is
const secondary$ = store$.select(x => x.customer);


// subscribe to it, and assign to closure scope
sub = secondary$.pipe(take(1)).subscribe(_customer => customer = _customer);
sub.unsubscribe();


// if there's a delay or customer isn't available the value won't have been set before we get here
if (customer === undefined)
{
// handle, or ignore as needed
return throwError('Customer was not immediately available');
}
});

请注意,对于上述所有函数,我都使用subscribe来获取值(正如@Ben讨论的那样)。不使用.value属性,即使我有BehaviorSubject

const observable = of('response')


function hasValue(value: any) {
return value !== null && value !== undefined;
}


function getValue<T>(observable: Observable<T>): Promise<T> {
return observable
.pipe(
filter(hasValue),
first()
)
.toPromise();
}


const result = await getValue(observable)
// Do the logic with the result
// .................
// .................
// .................

你可以在这里查看如何实现它的完整文章。 https://www.imkrish.com/blog/development/simple-way-get-value-from-observable < / p >

虽然这听起来可能有点夸张,这只是另一个“可能”的解决方案,以保持Observable类型和减少样板…

你总是可以创建扩展getter来获取Observable的当前值。

要做到这一点,你需要在global.d.ts types声明文件中扩展Observable<T>接口。然后在observable.extension.ts文件中实现扩展getter,最后将类型和扩展文件都包含到应用程序中。

你可以参考这个StackOverflow回答来了解如何将扩展包含到你的Angular应用中。

// global.d.ts
declare module 'rxjs' {
interface Observable<T> {
/**
* _Extension Method_ - Returns current value of an Observable.
* Value is retrieved using _first()_ operator to avoid the need to unsubscribe.
*/
value: Observable<T>;
}
}


// observable.extension.ts
Object.defineProperty(Observable.prototype, 'value', {
get <T>(this: Observable<T>): Observable<T> {
return this.pipe(
filter(value => value !== null && value !== undefined),
first());
},
});


// using the extension getter example
this.myObservable$.value
.subscribe(value => {
// whatever code you need...
});

最好的方法是使用Behaviur Subject,下面是一个例子:

var sub = new rxjs.BehaviorSubject([0, 1])
sub.next([2, 3])
setTimeout(() => {sub.next([4, 5])}, 1500)
sub.subscribe(a => console.log(a)) //2, 3 (current value) -> wait 2 sec -> 4, 5

可以创建订阅,然后在获取第一个发出的项之后销毁订阅。在下面的例子中,pipe()是一个函数,它使用一个可观察对象作为输入,并返回另一个可观察对象作为输出,同时不修改第一个可观察对象。

使用Angular 8.1.0"rxjs": "6.5.3""rxjs-observable": "0.0.7"创建的示例

  ngOnInit() {


...


// If loading with previously saved value
if (this.controlValue) {


// Take says once you have 1, then close the subscription
this.selectList.pipe(take(1)).subscribe(x => {
let opt = x.find(y => y.value === this.controlValue);
this.updateValue(opt);
});


}
}

有两种方法可以实现这一点。

  1. BehaviorSubject有一个方法getValue(),你可以得到一个特定时间点的值。

  2. 你可以直接订阅BehaviorSubject,你可以将订阅的值传递给类成员、字段或属性。

我不会同时推荐这两种方法。

在第一种方法中,它是一种方便的方法,您可以随时获取值,您可以将其称为该时间点的当前快照。问题是你可以在你的代码中引入竞争条件,你可以在许多不同的地方和不同的时间调用这个方法,这很难调试。

第二种方法是大多数开发人员在他们想要订阅时使用的原始值,您可以跟踪订阅,以及何时取消订阅以避免进一步的内存泄漏,如果您真的非常想将其绑定到一个变量,并且没有其他方法来连接它,则可以使用这种方法。

我建议,再看看你的用例,你在哪里使用它?例如,当你调用任何API时,你想要确定用户是否登录,你可以结合其他观察数据:

const data$ = apiRequestCall$().pipe(
// Latest snapshot from BehaviorSubject.
withLatestFrom(isLoggedIn),
// Allow call only if logged in.
filter(([request, loggedIn]) => loggedIn)
// Do something else..
);

在angular的情况下,你可以通过管道data$ | async直接将它使用到UI。