Angular/RxJS何时应该取消订阅“订阅”

什么时候应该存储Subscription实例并在ngOnDestroy生命周期中调用unsubscribe(),什么时候可以简单地忽略它们?

保存所有订阅会给组件代码带来很多混乱。

HTTP客户端指南忽略这样的订阅:

getHeroes() {this.heroService.getHeroes().subscribe(heroes => this.heroes = heroes,error =>  this.errorMessage = <any>error);}

在同一时间路线和导航指南说:

最终,我们将导航到其他地方。路由器将从DOM中删除此组件并销毁它。在此之前,我们需要自己清理。具体来说,我们必须在Angular销毁组件之前取消订阅。不这样做可能会造成内存泄漏。

我们取消订阅ngOnDestroy方法中的Observable

private sub: any;
ngOnInit() {this.sub = this.route.params.subscribe(params => {let id = +params['id']; // (+) converts string 'id' to a numberthis.service.getHero(id).then(hero => this.hero = hero);});}
ngOnDestroy() {this.sub.unsubscribe();}
351494 次浏览

Angular 2官方留档提供了何时取消订阅以及何时可以安全忽略的解释。看看这个链接:

https://angular.io/docs/ts/latest/cookbook/component-communication.html##双向服务

查找标题为家长和孩子通过服务进行交流的段落,然后是蓝色框:

请注意,我们在AstronautComponent被销毁时捕获订阅并取消订阅。这是一个内存泄漏保护步骤。此应用程序中没有实际风险,因为AstronautComponent的生命周期与应用程序本身的生命周期相同。在更复杂的应用程序中并非总是如此。

我们不将此警卫添加到MissionControlComponent,因为作为父级,它控制MissionService的生命周期。

我希望这对你有帮助。

如果通过调用someObservable.subscribe(),您开始占用一些资源,这些资源必须在组件的生命周期结束时手动释放,那么您应该调用theSubscription.unsubscribe()来防止内存泄漏。

让我们仔细看看你的例子:

getHero()返回http.get()的结果。如果你观察角度2源代码http.get()会创建两个事件侦听器:

_xhr.addEventListener('load', onLoad);_xhr.addEventListener('error', onError);

通过调用unsubscribe(),您可以取消请求以及侦听器:

_xhr.removeEventListener('load', onLoad);_xhr.removeEventListener('error', onError);_xhr.abort();

请注意,_xhr是特定于平台的,但我认为在您的情况下可以安全地假设它是XMLHttpRequest()

通常情况下,这足以证明需要手动unsubscribe()调用。但是根据这个WHATWG规范,一旦XMLHttpRequest()“完成”,即使有事件侦听器附加到它,它也会受到垃圾回收机制的约束。所以我想这就是角2官方指南省略unsubscribe()并让GC清理侦听器的原因。

至于你的第二个例子,它取决于params的实现。截至今天,角官方指南不再显示从params退订。我再次查看src,发现params只是一个行为主体。由于没有使用事件侦听器或定时器,也没有创建全局变量,省略unsubscribe()应该是安全的。

你的问题的底线是始终调用unsubscribe()来防止内存泄漏,除非你确定observable的执行不会创建全局变量、添加事件侦听器、设置计时器或执行任何导致内存泄漏的其他操作。

如果有疑问,请查看该observable的实现。如果observable已经将一些清理逻辑写入其unsubscribe()中,通常是构造函数返回的函数,那么你有充分的理由认真考虑调用unsubscribe()

太长别读

对于这个问题,有两种可观察对象-有限值和无限值。

http Observables产生有限(1)值,类似DOM事件侦听器Observable的东西产生无限值。

如果您手动调用subscribe(不使用异步管道),则从无限 Observables中调用unsubscribe

不要担心有限,RxJ会照顾它们。


来源:

  1. 我在Angular的Gitter这里中找到了Rob Wormald的答案。

    他说(我为清晰而重组,重点是我的):

    如果它的一个单值序列(如超文本传输协议请求)手动清理是不必要的(假设您手动订阅控制器)

    我应该说“如果它是完成的序列”(其中单值序列,la超文本传输协议,是其中之一)

    如果它是一个无限序列你应该退订异步管道为您执行

    他还在Observables的这YouTube视频中提到,"他们自己清理…"在Observables的上下文中是完成(就像Promises一样,它总是完成的,因为它们总是产生一个值并结束-我们从不担心取消订阅Promises以确保它们清理XHR事件侦听器,对吗?)

  2. 角2的范围指南中也读到

    在大多数情况下,我们不需要显式调用unsubscribe方法,除非我们想提前取消,或者我们的Observable比我们的订阅具有更长的生命周期。Observable运算符的默认行为是在.complete().error()消息发布后立即处理订阅。请记住,RxJS被设计为在大多数时候以“火了就忘”的方式使用。

    短语"我们的#0比我们的订阅拥有更长的生命周期"什么时候适用?

    它适用于在Observable完成之前(或“不久”之前)销毁的组件内创建订阅。

    我认为这意味着如果我们订阅了一个http请求或一个发出10个值的Observable,并且我们的组件在http请求返回或10个值被发出之前被销毁,我们仍然可以!

    当请求返回或最终发出第10个值时,Observable将完成并清理所有资源。

  3. 如果我们从同一个Rangle指南中查看这个例子,我们可以看到route.params的订阅确实需要unsubscribe(),因为我们不知道这些params何时会停止更改(发出新值)。

    在这种情况下,路由参数可能仍会发生变化(它们在技术上可能会发生变化,直到应用程序结束),并且由于没有完成,订阅中分配的资源仍将被分配。

  4. 在来自NgEurope的这段视频中,Rob Wormald还表示您不需要取消订阅路由器Observables。他还提到了2016年11月的http服务和这段视频中的ActivatedRoute.params

  5. Angular教程路由章节现在声明如下:

    Router管理它提供的observable并本地化订阅。当组件被销毁时,订阅会被清理,以防止内存泄漏,因此我们不需要取消订阅路由paramsObservable

    这是一个关于Angular文档的GitHub问题的讨论,关于路由器Observables,Ward Bell提到所有这些都在工作中。


我在NGConf与Ward Bell讨论了这个问题(我甚至向他展示了这个答案,他说这个答案是正确的),但他告诉我Angular的文档团队有一个未发布的解决方案(尽管他们正在努力获得批准)。他还告诉我,我可以用即将到来的官方推荐更新我的SO答案。

我们都应该使用的解决方案是在其类代码中对Observables进行.subscribe()调用的所有组件中添加private ngUnsubscribe = new Subject<void>();字段。

然后我们在ngOnDestroy()方法中调用this.ngUnsubscribe.next(); this.ngUnsubscribe.complete();

秘诀(如@张超所述)是在我们的每个.subscribe()调用之前调用takeUntil(this.ngUnsubscribe),这将保证在组件被销毁时所有订阅都将被清理。

示例:

import { Component, OnDestroy, OnInit } from '@angular/core';// RxJs 6.x+ import pathsimport { filter, startWith, takeUntil } from 'rxjs/operators';import { Subject } from 'rxjs';import { BookService } from '../books.service';
@Component({selector: 'app-books',templateUrl: './books.component.html'})export class BooksComponent implements OnDestroy, OnInit {private ngUnsubscribe = new Subject<void>();
constructor(private booksService: BookService) { }
ngOnInit() {this.booksService.getBooks().pipe(startWith([]),filter(books => books.length > 0),takeUntil(this.ngUnsubscribe)).subscribe(books => console.log(books));
this.booksService.getArchivedBooks().pipe(takeUntil(this.ngUnsubscribe)).subscribe(archivedBooks => console.log(archivedBooks));}
ngOnDestroy() {this.ngUnsubscribe.next();this.ngUnsubscribe.complete();}}

备注:添加takeUntil运算符作为最后一个运算符很重要,以防止运算符链中中间Observables的泄漏。


最近,在在Angular的冒险的一集中,Ben Lesh和Ward Bell讨论了如何/何时取消订阅组件的问题。讨论大约在1:05:30开始。

沃德提到"现在有一个可怕的任务直到舞蹈需要很多机器",沙伊·雷兹尼克提到“Angular处理一些订阅,如超文本传输协议和路由”

作为回应,Ben提到现在有一些讨论允许Observables与Angular组件生命周期事件挂钩,Ward建议一个组件可以订阅的生命周期事件的Observable,作为一种知道何时完成作为组件内部状态维护的Observables的方式。

也就是说,我们现在最需要的是解决方案,所以这里有一些其他资源。

  1. RxJs核心团队成员Nicholas Jamieson对takeUntil()模式的建议以及帮助执行它的TSLint规则:https://ncjamieson.com/avoiding-takeuntil-leaks/

  2. 轻量级npm包,公开一个Observable运算符,该运算符将组件实例(this)作为参数,并在ngOnDestroyhttps://github.com/NetanelBasal/ngx-take-until-destroy期间自动取消订阅

  3. 如果您不进行AOT构建(但我们现在都应该进行AOT),上述的另一个变体具有略好的人体工程学:https://github.com/smnbbrv/ngx-rx-collector

  4. 自定义指令*ngSubscribe类似于异步管道,但在模板中创建一个嵌入式视图,因此您可以在整个模板中引用“未包装”值:https://netbasal.com/diy-subscription-handling-directive-in-angular-c8f6e762697f

我在Nicholas博客的评论中提到,过度使用takeUntil()可能是你的组件尝试做太多的迹象,应该考虑将现有组件分成功能演讲组件。然后,你可以将Observable从Feature组件中的| async变成Presentational组件的Input,这意味着任何地方都不需要订阅。阅读有关这种方法这里的更多信息。

你不需要有一堆订阅和手动取消订阅。使用题目外卖直到组合像老板一样处理订阅:

import { Subject } from "rxjs"import { takeUntil } from "rxjs/operators"
@Component({moduleId: __moduleName,selector: "my-view",templateUrl: "../views/view-route.view.html"})export class ViewRouteComponent implements OnInit, OnDestroy {componentDestroyed$: Subject<boolean> = new Subject()
constructor(private titleService: TitleService) {}
ngOnInit() {this.titleService.emitter1$.pipe(takeUntil(this.componentDestroyed$)).subscribe((data: any) => { /* ... do something 1 */ })
this.titleService.emitter2$.pipe(takeUntil(this.componentDestroyed$)).subscribe((data: any) => { /* ... do something 2 */ })
//...
this.titleService.emitterN$.pipe(takeUntil(this.componentDestroyed$)).subscribe((data: any) => { /* ... do something N */ })}
ngOnDestroy() {this.componentDestroyed$.next(true)this.componentDestroyed$.complete()}}

备选办法由@acumartini在评论提出的,使用获取时间而不是外卖直到。你可能更喜欢它,但请注意,这样你的Observable执行不会在组件的ngDestroy上被取消(例如,当你进行耗时的计算或等待来自服务器的数据时)。基于外卖直到的方法没有这个缺点,并导致请求立即取消。感谢@AlexChe在评论中的详细解释

所以这里是代码:

@Component({moduleId: __moduleName,selector: "my-view",templateUrl: "../views/view-route.view.html"})export class ViewRouteComponent implements OnInit, OnDestroy {alive: boolean = true
constructor(private titleService: TitleService) {}
ngOnInit() {this.titleService.emitter1$.pipe(takeWhile(() => this.alive)).subscribe((data: any) => { /* ... do something 1 */ })
this.titleService.emitter2$.pipe(takeWhile(() => this.alive)).subscribe((data: any) => { /* ... do something 2 */ })
// ...
this.titleService.emitterN$.pipe(takeWhile(() => this.alive)).subscribe((data: any) => { /* ... do something N */ })}
ngOnDestroy() {this.alive = false}}

我尝试了seangwright的解决方案(编辑3)

这对于由计时器或间隔创建的Observable不起作用。

但是,我通过使用另一种方法使其工作:

import { Component, OnDestroy, OnInit } from '@angular/core';import 'rxjs/add/operator/takeUntil';import { Subject } from 'rxjs/Subject';import { Subscription } from 'rxjs/Subscription';import 'rxjs/Rx';
import { MyThingService } from '../my-thing.service';
@Component({selector: 'my-thing',templateUrl: './my-thing.component.html'})export class MyThingComponent implements OnDestroy, OnInit {private subscriptions: Array<Subscription> = [];
constructor(private myThingService: MyThingService,) { }
ngOnInit() {const newSubs = this.myThingService.getThings().subscribe(things => console.log(things));this.subscriptions.push(newSubs);}
ngOnDestroy() {for (const subs of this.subscriptions) {subs.unsubscribe();}}}

基于:使用Class继承挂钩到Angular 2组件生命周期

另一种通用方法:

export abstract class UnsubscribeOnDestroy implements OnDestroy {protected d$: Subject<any>;
constructor() {this.d$ = new Subject<void>();
const f = this.ngOnDestroy;this.ngOnDestroy = () => {f();this.d$.next();this.d$.complete();};}
public ngOnDestroy() {// no-op}
}

并使用:

@Component({selector: 'my-comp',template: ``})export class RsvpFormSaveComponent extends UnsubscribeOnDestroy implements OnInit {
constructor() {super();}
ngOnInit(): void {Observable.of('bla').takeUntil(this.d$).subscribe(val => console.log(val));}}

由于seangwright的解决方案(Edit 3)似乎非常有用,我还发现将此功能打包到基本组件中很麻烦,并提示其他项目队友记住在ngOnDestroy上调用Super()来激活此功能。

这个答案提供了一种从超级调用中释放的方法,并使“组件销毁$”成为基本组件的核心。

class BaseClass {protected componentDestroyed$: Subject<void> = new Subject<void>();constructor() {
/// wrap the ngOnDestroy to be an Observable. and set free from calling super() on ngOnDestroy.let _$ = this.ngOnDestroy;this.ngOnDestroy = () => {this.componentDestroyed$.next();this.componentDestroyed$.complete();_$();}}
/// placeholder of ngOnDestroy. no need to do super() call of extended class.ngOnDestroy() {}}

然后您可以自由使用此功能,例如:

@Component({selector: 'my-thing',templateUrl: './my-thing.component.html'})export class MyThingComponent extends BaseClass implements OnInit, OnDestroy {constructor(private myThingService: MyThingService,) { super(); }
ngOnInit() {this.myThingService.getThings().takeUntil(this.componentDestroyed$).subscribe(things => console.log(things));}
/// optional. not a requirement to implement OnDestroyngOnDestroy() {console.log('everything works as intended with or without super call');}
}

我喜欢最后两个答案,但如果子类在ngOnDestroy中引用"this",我就会遇到问题。

我把它修改成这样,看起来它解决了这个问题。

export abstract class BaseComponent implements OnDestroy {protected componentDestroyed$: Subject<boolean>;constructor() {this.componentDestroyed$ = new Subject<boolean>();let f = this.ngOnDestroy;this.ngOnDestroy = function()  {// without this I was getting an error if the subclass had// this.blah() in ngOnDestroyf.bind(this)();this.componentDestroyed$.next(true);this.componentDestroyed$.complete();};}/// placeholder of ngOnDestroy. no need to do super() call of extended class.ngOnDestroy() {}}

Subscription类有一个有趣的特性:

表示可支配资源,例如Observable的执行。订阅有一个重要的方法,即取消订阅,它不接受参数,只是释放订阅持有的资源。
此外,订阅可以通过add()方法组合在一起,add()方法会将子订阅附加到当前订阅。当订阅被取消订阅时,其所有子(及其孙子)也将被取消订阅。

您可以创建一个聚合订阅对象来对您的所有订阅进行分组。你可以通过创建一个空订阅并使用其add()方法向其添加订阅来做到这一点。当你的组件被销毁时,你只需要取消订阅聚合订阅。

@Component({ ... })export class SmartComponent implements OnInit, OnDestroy {private subscriptions = new Subscription();
constructor(private heroService: HeroService) {}
ngOnInit() {this.subscriptions.add(this.heroService.getHeroes().subscribe(heroes => this.heroes = heroes));this.subscriptions.add(/* another subscription */);this.subscriptions.add(/* and another subscription */);this.subscriptions.add(/* and so on */);}
ngOnDestroy() {this.subscriptions.unsubscribe();}}

官方的Edit#3答案(和变体)运行良好,但让我感到困惑的是围绕可观察订阅的业务逻辑的“混乱”。

这是另一种使用包装器的方法。

警告:实验代码

文件subscribeAndGuard.ts用于创建一个新的Observable扩展来包装.subscribe()并在其中包装ngOnDestroy()
用法与.subscribe()相同,除了一个额外的第一个参数引用组件。

import { Observable } from 'rxjs/Observable';import { Subscription } from 'rxjs/Subscription';
const subscribeAndGuard = function(component, fnData, fnError = null, fnComplete = null) {
// Define the subscriptionconst sub: Subscription = this.subscribe(fnData, fnError, fnComplete);
// Wrap component's onDestroyif (!component.ngOnDestroy) {throw new Error('To use subscribeAndGuard, the component must implement ngOnDestroy');}const saved_OnDestroy = component.ngOnDestroy;component.ngOnDestroy = () => {console.log('subscribeAndGuard.onDestroy');sub.unsubscribe();// Note: need to put original back in place// otherwise 'this' is undefined in component.ngOnDestroycomponent.ngOnDestroy = saved_OnDestroy;component.ngOnDestroy();
};
return sub;};
// Create an Observable extensionObservable.prototype.subscribeAndGuard = subscribeAndGuard;
// Ref: https://www.typescriptlang.org/docs/handbook/declaration-merging.htmldeclare module 'rxjs/Observable' {interface Observable<T> {subscribeAndGuard: typeof subscribeAndGuard;}}

这是一个包含两个订阅的组件,一个带有包装器,一个没有。唯一需要注意的是它必须实现OnDestroy(如果需要,带有空主体),否则Angular不知道调用包装的版本。

import { Component, OnInit, OnDestroy } from '@angular/core';import { Observable } from 'rxjs/Observable';import 'rxjs/Rx';import './subscribeAndGuard';
@Component({selector: 'app-subscribing',template: '<h3>Subscribing component is active</h3>',})export class SubscribingComponent implements OnInit, OnDestroy {
ngOnInit() {
// This subscription will be terminated after onDestroyObservable.interval(1000).subscribeAndGuard(this,(data) => { console.log('Guarded:', data); },(error) => { },(/*completed*/) => { });
// This subscription will continue after onDestroyObservable.interval(1000).subscribe((data) => { console.log('Unguarded:', data); },(error) => { },(/*completed*/) => { });}
ngOnDestroy() {console.log('SubscribingComponent.OnDestroy');}}

演示柱塞是这里

附加说明:Re Edit 3-“官方”解决方案,这可以通过在订阅前使用外卖()而不是外卖()以及一个简单的布尔值而不是ngOnDestroy中的另一个Observable来简化。

@Component({...})export class SubscribingComponent implements OnInit, OnDestroy {
iAmAlive = true;ngOnInit() {
Observable.interval(1000).takeWhile(() => { return this.iAmAlive; }).subscribe((data) => { console.log(data); });}
ngOnDestroy() {this.iAmAlive = false;}}

当组件被销毁时,你通常需要取消订阅,但是Angular会越来越多地处理它,例如在Angular4的新次要版本中,他们有这个部分用于路由取消订阅:

您需要退订吗?

如在ActivatedRoute:路线信息部分的一站式商店路由和导航页面,路由器管理着它的observable提供并本地化订阅。订阅是当组件被破坏时清理,防止记忆泄漏,因此您不需要取消订阅路由参数图可观察。

下面的例子是Angular创建一个组件并在之后销毁它的一个很好的例子,看看组件是如何实现OnDestroy的,如果你需要onInit,你也可以在你的组件中实现它,比如实现OnInit, OnDestroy

import { Component, Input, OnDestroy } from '@angular/core';import { MissionService } from './mission.service';import { Subscription }   from 'rxjs/Subscription';
@Component({selector: 'my-astronaut',template: `<p>\{\{astronaut}}: <strong>\{\{mission}}</strong><button(click)="confirm()"[disabled]="!announced || confirmed">Confirm</button></p>`})
export class AstronautComponent implements OnDestroy {@Input() astronaut: string;mission = '<no mission announced>';confirmed = false;announced = false;subscription: Subscription;
constructor(private missionService: MissionService) {this.subscription = missionService.missionAnnounced$.subscribe(mission => {this.mission = mission;this.announced = true;this.confirmed = false;});}
confirm() {this.confirmed = true;this.missionService.confirmMission(this.astronaut);}
ngOnDestroy() {// prevent memory leak when component destroyedthis.subscription.unsubscribe();}}

@陈志立的答案之后,我编写了一个抽象类,用于处理组件中“无限”observables的订阅:

import { OnDestroy } from '@angular/core';import { Subscription } from 'rxjs/Subscription';import { Subject } from 'rxjs/Subject';import { Observable } from 'rxjs/Observable';import { PartialObserver } from 'rxjs/Observer';
export abstract class InfiniteSubscriberComponent implements OnDestroy {private onDestroySource: Subject<any> = new Subject();
constructor() {}
subscribe(observable: Observable<any>): Subscription;
subscribe(observable: Observable<any>,observer: PartialObserver<any>): Subscription;
subscribe(observable: Observable<any>,next?: (value: any) => void,error?: (error: any) => void,complete?: () => void): Subscription;
subscribe(observable: Observable<any>, ...subscribeArgs): Subscription {return observable.takeUntil(this.onDestroySource).subscribe(...subscribeArgs);}
ngOnDestroy() {this.onDestroySource.next();this.onDestroySource.complete();}}

要使用它,只需在角分量中扩展它并调用subscribe()方法,如下所示:

this.subscribe(someObservable, data => doSomething());

它还接受错误并像往常一样完成回调、观察者对象或根本不回调。如果您也在子组件中实现该方法,请记住调用super.ngOnDestroy()

在这里找到Ben Lesh的附加参考:RxJS:不要取消订阅

对上述情况的另一个简短补充是:

  • 总是取消订阅,当订阅的流中的新值不再需要或不重要时,在少数情况下会导致触发数量减少并提高性能。取消订阅的一个很好的例子,例如订阅的数据/事件不再存在的组件或需要对全新流进行新订阅(刷新等)。

关于Angular组件中observables取消订阅的一些最佳实践:

引用自Routing & Navigation

订阅组件中的observable时,你几乎总是在组件被销毁时安排取消订阅。

有一些特殊的observable不需要这一点。ActivatedRoute observables是例外之一。

ActivatedRoute及其可观察对象与路由器本身隔离。当不再需要路由组件并且注入的ActivatedRoute随之死亡时,路由器会销毁它。

请随意取消订阅。这是无害的,从来都不是一个坏习惯。

并对以下链接作出回应:

我在Angular组件中收集了一些关于observables取消订阅的最佳实践与你分享:

  • http observable取消订阅是有条件的,我们应该根据具体情况考虑组件被销毁后运行“订阅回调”的影响。我们知道角取消订阅并清理http observable本身(1)(2)。虽然从资源的角度来看这是真的,但它只说明了一半。假设我们正在谈论直接从组件内调用http,而http的响应时间超过了需要的时间,因此用户关闭了组件。即使组件被关闭并销毁,subscribe()处理程序仍然会被调用。这可能会产生不必要的副作用,在更糟糕的情况下,会导致应用程序状态中断。如果回调中的代码尝试调用刚刚处理的内容,也可能导致异常。然而,与此同时,偶尔也需要它们。比如,假设您正在创建一个电子邮件客户端,并且在电子邮件发送完成时触发了声音-即使组件关闭,您仍然希望发生这种情况(8)。
  • 无需取消订阅完成或错误的observable。但是,这样做没有坏处(7)
  • 尽可能使用AsyncPipe,因为它会在组件销毁时自动取消订阅observable。
  • 取消订阅ActivatedRoute observable,例如route.params,如果它们是在嵌套(使用组件选择器在tpl中添加)或动态组件中订阅的,因为只要父/主机组件存在,它们可能会被订阅多次。在其他情况下,无需取消订阅它们,如上面Routing & Navigation文档的引用中提到的。
  • 取消订阅通过Angular服务公开的组件之间共享的全局observable,因为只要组件初始化,它们就可能被多次订阅。
  • 无需取消订阅应用程序范围服务的内部observable,因为该服务永远不会被销毁,除非你的整个应用程序被销毁,否则没有真正的理由取消订阅,也没有内存泄漏的机会。(6)

    备注:关于作用域服务,即组件提供者,它们在组件被销毁时被销毁。根据文档,在这种情况下,如果我们订阅了该提供者内部的任何observable,我们应该考虑使用OnDestroy生命周期钩子取消订阅,该钩子将在服务被销毁时调用。
  • 使用抽象技术来避免取消订阅可能导致的任何代码混乱。你可以使用takeUntil(3)管理你的订阅,也可以使用(4)在Angular中取消订阅Observables的最简单方法中提到的npm
  • 始终退订FormGroup个可观察对象,例如form.valueChangesform.statusChanges
  • 始终退订Renderer2服务的observable,例如renderer2.listen
  • 作为内存泄漏保护步骤,取消订阅每个observable,直到Angular Docs明确告诉我们哪些observable不需要取消订阅(检查问题:(5)RxJS退订文档(开放))。
  • 奖励:始终使用Angular方法绑定像HostListener这样的事件,因为Angular非常关心在需要时删除事件侦听器,并防止由于事件绑定而导致的任何潜在内存泄漏。

不错的最后提示:如果你不知道observable是否被自动取消订阅/完成,请将complete回调添加到subscribe(...)并检查它是否在组件被销毁时被调用。

如果需要取消订阅,可以使用以下可观察管道方法的运算符

import { Observable, Subject } from 'rxjs';import { takeUntil } from 'rxjs/operators';import { OnDestroy } from '@angular/core';
export const takeUntilDestroyed = (componentInstance: OnDestroy) => <T>(observable: Observable<T>) => {const subjectPropertyName = '__takeUntilDestroySubject__';const originalOnDestroy = componentInstance.ngOnDestroy;const componentSubject = componentInstance[subjectPropertyName] as Subject<any> || new Subject();
componentInstance.ngOnDestroy = (...args) => {originalOnDestroy.apply(componentInstance, args);componentSubject.next(true);componentSubject.complete();};
return observable.pipe(takeUntil<T>(componentSubject));};

它可以像这样使用:

import { Component, OnDestroy, OnInit } from '@angular/core';import { Observable } from 'rxjs';
@Component({ template: '<div></div>' })export class SomeComponent implements OnInit, OnDestroy {
ngOnInit(): void {const observable = Observable.create(observer => {observer.next('Hello');});
observable.pipe(takeUntilDestroyed(this)).subscribe(val => console.log(val));}
ngOnDestroy(): void {}}

运算符包装组件的ngOnDestroy方法。

重要提示:操作员应该是可观察管道中的最后一个。

在SPA应用程序中ngOnDestroy函数(角寿命周期)对于每个订阅,您需要退订它。优势=>以防止状态变得太重。

例如:在组件1中:

import {UserService} from './user.service';
private user = {name: 'test', id: 1}
constructor(public userService: UserService) {this.userService.onUserChange.next(this.user);}

现役:

import {BehaviorSubject} from 'rxjs/BehaviorSubject';
public onUserChange: BehaviorSubject<any> = new BehaviorSubject({});

在组件2中:

import {Subscription} from 'rxjs/Subscription';import {UserService} from './user.service';
private onUserChange: Subscription;
constructor(public userService: UserService) {this.onUserChange = this.userService.onUserChange.subscribe(user => {console.log(user);});}
public ngOnDestroy(): void {// note: Here you have to be sure to unsubscribe to the subscribe item!this.onUserChange.unsubscribe();}

为了处理订阅,我使用了一个“取消订阅者”类。

这里是Unsubber类。

export class Unsubscriber implements OnDestroy {private subscriptions: Subscription[] = [];
addSubscription(subscription: Subscription | Subscription[]) {if (Array.isArray(subscription)) {this.subscriptions.push(...subscription);} else {this.subscriptions.push(subscription);}}
unsubscribe() {this.subscriptions.filter(subscription => subscription).forEach(subscription => {subscription.unsubscribe();});}
ngOnDestroy() {this.unsubscribe();}}

您可以在任何组件/服务/效果等中使用此类。

示例:

class SampleComponent extends Unsubscriber {constructor () {super();}
this.addSubscription(subscription);}

你可以使用最新的#0类来取消订阅Observable,代码不会那么混乱。

我们可以用normal variable做到这一点,但它将在每个新订阅上都是override the last subscription,因此避免这种情况,并且当您处理更多数量的可观察对象以及#2#3等可观察对象类型时,这种方法非常有用

订阅

表示一个可支配资源,例如Observable的执行。订阅有一个重要的方法,即取消订阅,它不接受参数,仅释放订阅持有的资源。

你可以通过两种方式使用它,

  • 您可以直接将订阅推送到订阅数组

     subscriptions:Subscription[] = [];
    ngOnInit(): void {
    this.subscription.push(this.dataService.getMessageTracker().subscribe((param: any) => {//...}));
    this.subscription.push(this.dataService.getFileTracker().subscribe((param: any) => {//...}));}
    ngOnDestroy(){// prevent memory leak when component destroyedthis.subscriptions.forEach(s => s.unsubscribe());}
  • using add() of Subscription

    subscriptions = new Subscription();
    this.subscriptions.add(subscribeOne);this.subscriptions.add(subscribeTwo);
    ngOnDestroy() {this.subscriptions.unsubscribe();}

A Subscription can hold child subscriptions and safely unsubscribe them all. This method handles possible errors (e.g. if any child subscriptions are null).

Hope this helps.. :)

SubSink包,一个简单而一致的取消订阅解决方案

由于没有其他人提到过,我想推荐Ward Bell创建的Subink包:https://github.com/wardbell/subsink#readme

我一直在一个项目中使用它,我们是几个开发人员都在使用它。在每种情况下都有一致的工作方式会有很大帮助。

对于像AsyncSubject这样在发出结果后直接完成的observable,或者例如来自超文本传输协议请求的observables,你不需要取消订阅。为这些调用unsubscribe()没有坏处,但如果observable是closed,则取消订阅方法不会做任何事情

if (this.closed) {return;}

当你有长寿命的observable随着时间的推移发出多个值(例如BehaviorSubjectReplaySubject)时,你需要取消订阅以防止内存泄漏。

你可以轻松创建一个observable,在使用管道运算符从这种长寿命的observable发出结果后直接完成。在这里的一些答案中提到了take(1)管道。但我更喜欢#1管道。与take(1)的区别在于它会:

如果Observable在发送任何下一个通知之前完成,则向观察者的错误回调传递EmptyError

第一个管道的另一个优点是您可以传递一个谓词,它将帮助您返回满足某些条件的第一个值:

const predicate = (result: any) => {// check value and return true if it is the result that satisfies your needsreturn true;}observable.pipe(first(predicate)).subscribe(observer);

First将在发出第一个值(或在传递函数参数时满足谓词的第一个值)后直接完成,因此无需取消订阅。

有时你不确定自己是否有一个长期存在的observable。我并不是说这是一个好做法,但你可以随时添加first管道以确保不需要手动取消订阅。在只发出一个值的observable上添加额外的first管道并没有坏处。

在开发过程中,你可以使用#0管道,如果源observable发出多个事件,它将失败。这可以帮助你探索observable的类型以及是否有必要取消订阅它。

observable.pipe(single()).subscribe(observer);

firstsingle看起来非常相似,两个管道都可以采用可选谓词,但差异很重要,并在这个堆栈溢出的答案在这里中很好地总结了:

第一

将在第一个项目出现后立即发出。将在那之后立即完成。

单一

如果源observable发出多个事件,将失败。


说明我试图在我的回答中尽可能准确和完整地引用官方留档,但如果缺少重要的东西,请发表评论…

更新Angular 9和Rxjs 6解决方案

  1. 在Angular组件的ngDestroy生命周期中使用unsubscribe
class SampleComponent implements OnInit, OnDestroy {private subscriptions: Subscription;private sampleObservable$: Observable<any>;
constructor () {}
ngOnInit(){this.subscriptions = this.sampleObservable$.subscribe( ... );}
ngOnDestroy() {this.subscriptions.unsubscribe();}}
  1. 在Rxjs中使用takeUntil
class SampleComponent implements OnInit, OnDestroy {private unsubscribe$: new Subject<void>;private sampleObservable$: Observable<any>;
constructor () {}
ngOnInit(){this.subscriptions = this.sampleObservable$.pipe(takeUntil(this.unsubscribe$)).subscribe( ... );}
ngOnDestroy() {this.unsubscribe$.next();this.unsubscribe$.complete();}}
  1. 对于一些在ngOnInit调用的操作,当组件初始化时只发生一次。
class SampleComponent implements OnInit {
private sampleObservable$: Observable<any>;
constructor () {}
ngOnInit(){this.subscriptions = this.sampleObservable$.pipe(take(1)).subscribe( ... );}}

我们也有async管道。但是,这个在模板上使用(不在Angular组件中)。

一个Subscription本质上只是有一个取消订阅()函数来释放资源或取消Observable执行。在Angular中,我们必须在组件被销毁时取消订阅Observable。幸运的是,Angular有一个ngOnDestroy钩子,在组件被销毁之前被调用,这使开发人员能够在这里提供清理人员,以避免挂起订阅、打开门户以及未来可能会给我们带来的麻烦

@Component({...})export class AppComponent implements OnInit, OnDestroy {subscription: SubscriptionngOnInit () {var observable = Rx.Observable.interval(1000);this.subscription = observable.subscribe(x => console.log(x));}ngOnDestroy() {this.subscription.unsubscribe()}}

我们将ngOnDestroy添加到我们的AppCompoennt并在this.subscriptionObservable上调用了退订方法

如果有多个订阅:

@Component({...})export class AppComponent implements OnInit, OnDestroy {subscription1$: Subscriptionsubscription2$: SubscriptionngOnInit () {var observable1$ = Rx.Observable.interval(1000);var observable2$ = Rx.Observable.interval(400);this.subscription1$ = observable.subscribe(x => console.log("From interval 1000" x));this.subscription2$ = observable.subscribe(x => console.log("From interval 400" x));}ngOnDestroy() {this.subscription1$.unsubscribe()this.subscription2$.unsubscribe()}}

在我的情况下,我使用@seanwright提出的解决方案的变体:
https://github.com/NetanelBasal/ngx-take-until-destroy

这是ngx火箭/启动器套件项目中使用的文件。您可以在此处until-destroyed.ts访问它

组件看起来像这样

/*** RxJS operator that unsubscribe from observables on destory.* Code forked from https://github.com/NetanelBasal/ngx-take-until-destroy** IMPORTANT: Add the `untilDestroyed` operator as the last one to* prevent leaks with intermediate observables in the* operator chain.** @param instance The parent Angular component or object instance.* @param destroyMethodName The method to hook on (default: 'ngOnDestroy').*/import { untilDestroyed } from '../../core/until-destroyed';
@Component({selector: 'app-example',templateUrl: './example.component.html'})export class ExampleComponent implements OnInit, OnDestroy {
ngOnInit() {interval(1000).pipe(untilDestroyed(this)).subscribe(val => console.log(val));
// ...}

// This method must be present, even if empty.ngOnDestroy() {// To protect you, an error will be thrown if it doesn't exist.}}

这里有很多很棒的答案……

让我添加另一种选择:

import { interval    } from "rxjs";import { takeUntil   } from "rxjs/operators";import { Component   } from "@angular/core";import { Destroyable } from "@bespunky/angular-zen/core";
@Component({selector: 'app-no-leak-demo',template: '👍 Destroyable component rendered. Unload me and watch me cleanup...'})export class NoLeakComponent extends Destroyable{constructor(){super();
this.subscribeToInterval();}
private subscribeToInterval(): void{const value    = interval(1000);const observer = {next    : value => console.log(`👍 Destroyable: ${value}`),complete: ()    => console.log('👍 Observable completed.')};
// ==== Comment one and uncomment the other to see the difference ====        
// Subscribe using the inherited subscribe methodthis.subscribe(value, observer);
// ... or pipe-in the inherited destroyed subject//value.pipe(takeUntil(this.destroyed)).subscribe(observer);}}

实时示例

这里发生了什么

组件/服务扩展了Destroyable(它来自一个名为@bespunky/angular-zen的库)。

该类现在可以简单地使用this.subscribe()takeUntil(this.destroyed),而无需任何额外的样板代码。

要安装库,请使用:
> npm install @bespunky/angular-zen

这是我对这个问题的看法,保持我的生活简单,我选择了在组件被销毁时取消订阅的手动方式。

为此,我创建了一个名为Subscriptor的类,它主要包含静态成员,即:

  • 私有变量订阅-保存所有提供的订阅
  • 订阅设置器-将每个新订阅推送到订阅数组
  • 取消订阅方法-取消订阅订阅数组包含的每个订阅(如果已定义),并清空订阅数组

subscriptor.ts

import { Subscription } from "rxjs";
export class Subscriptor {private static subscriptions: Subscription[] = [];
static set subscription(subscription: Subscription) {Subscriptor.subscriptions.push(subscription);}
static unsubscribe() {Subscriptor.subscriptions.forEach(subscription => subscription ? subscription.unsubscribe() : 0);Subscriptor.subscriptions = [];}}

组件内部的用法如下:

当您想订阅任何服务时,只需将订阅提交给Subscriptor的setter。

ngOnInit(): void {Subscriptor.subscription = this.userService.getAll().subscribe(users => this.users = users);Subscriptor.subscription = this.categoryService.getAll().subscribe(categories => this.categories = categories);Subscriptor.subscription = this.postService.getAll().subscribe(posts => this.posts = posts);}

当您想取消订阅任何服务时,只需调用Subscriptor的取消订阅方法。

ngOnDestroy(): void {Subscriptor.unsubscribe();}

出于性能原因,总是建议从你的可观察订阅中取消订阅以避免内存泄漏,并且有不同的方法可以做到这一点,

顺便说一句,我读了大部分的答案,我没有找到有人在谈论async管道,推荐使用Angular应用程序的Rxjs模式,因为它在离开将被销毁的组件时提供自动订阅和订阅:

请找到一个如何实现它的示例

app.compoennt.ts

import { Component, OnInit } from '@angular/core';import { Observable } from 'rxjs';
import { BookService } from './book.service';import { Book } from './book';
@Component({selector: 'app-observable',templateUrl: './observable.component.html'})export class AppComponent implements OnInit {books$: Observable<Book[]>constructor(private bookService: BookService) { }ngOnInit(): void {this.books$ = this.bookService.getBooksWithObservable();}}

app.compoennt.html

<h3>AsyncPipe with Promise using NgFor</h3><ul><li *ngFor="let book of books$ | async" >Id: \{\{book?.id}}, Name: \{\{book?.name}}</li></ul>

垃圾袋

这个想法受到RxSwift的DisposeBag的启发,所以我决定开发一个类似但简单的结构。

DisposeBag是一种数据结构,它保存对所有打开的订阅的引用。它有助于在我们的组件中处理订阅,同时为我们提供API来跟踪打开订阅的状态。

优势

非常简单的API,让您的代码看起来简单小巧。提供用于跟踪打开订阅状态的API(允许您显示不确定的进度条)没有依赖注入/包。

用法

在组件中:

@Component({selector: 'some-component',templateUrl: './some-component.component.html',changeDetection: ChangeDetectionStrategy.OnPush})export class SomeComponent implements OnInit, OnDestroy {
public bag = new DisposeBag()  
constructor(private _change: ChangeDetectorRef) {}
ngOnInit(): void {
// an observable that takes some time to finish such as an api call.const aSimpleObservable = of(0).pipe(delay(5000))
// this identifier allows us to track the progress for this specific subscription (very useful in template)this.bag.subscribe("submission", aSimpleObservable, () => {this._change.markForCheck() // trigger UI change})}
ngOnDestroy(): void {// never forget to add this line.this.bag.destroy()}}

在模板中:

<!-- will be shown as long as the submission subscription is open --><span *ngIf="bag.inProgress('submission')">submission in progress</span>
<!-- will be shown as long as there's an open subscription in the bag  --><span *ngIf="bag.hasInProgress">some subscriptions are still in progress</span>

实施

import { Observable, Observer, Subject, Subscription, takeUntil } from "rxjs";

/*** This class facilitates the disposal of the subscription in our components.* instead of creating _unsubscribeAll and lots of boilerplates to create different variables for Subscriptions;* you can just easily use subscribe(someStringIdentifier, observable, observer). then you can use bag.inProgress() with* the same someStringIdentifier on you html or elsewhere to determine the state of the ongoing subscription.**  don't forget to add onDestroy() { this.bag.destroy() }**  Author: Hamidreza Vakilian (hvakilian1@gmail.com)* @export* @class DisposeBag*/export class DisposeBag {private _unsubscribeAll: Subject<any> = new Subject<any>();
private subscriptions = new Map<string, Subscription>()

/*** this method automatically adds takeUntil to your observable, adds it to a private map.* this method enables inProgress to work. don't forget to add onDestroy() { this.bag.destroy() }** @template T* @param {string} id* @param {Observable<T>} obs* @param {Partial<Observer<T>>} observer* @return {*}  {Subscription}* @memberof DisposeBag*/public subscribe<T>(id: string, obs: Observable<T>, observer: Partial<Observer<T>> | ((value: T) => void)): Subscription {if (id.isEmpty()) {throw new Error('disposable.subscribe is called with invalid id')}if (!obs) {throw new Error('disposable.subscribe is called with an invalid observable')}
/* handle the observer */let subs: Subscriptionif (typeof observer === 'function') {subs = obs.pipe(takeUntil(this._unsubscribeAll)).subscribe(observer)} else if (typeof observer === 'object') {subs = obs.pipe(takeUntil(this._unsubscribeAll)).subscribe(observer)} else {throw new Error('disposable.subscribe is called with an invalid observer')}
/* unsubscribe from the last possible subscription if in progress. */let possibleSubs = this.subscriptions.get(id)if (possibleSubs && !possibleSubs.closed) {console.info(`Disposebag: a subscription with id=${id} was disposed and replaced.`)possibleSubs.unsubscribe()}
/* store the reference in the map */this.subscriptions.set(id, subs)
return subs}

/*** Returns true if any of the registered subscriptions is in progress.** @readonly* @type {boolean}* @memberof DisposeBag*/public get hasInProgress(): boolean {return Array.from(this.subscriptions.values()).reduce((prev, current: Subscription) => {return prev || !current.closed }, false)}
/*** call this from your template or elsewhere to determine the state of each subscription.** @param {string} id* @return {*}* @memberof DisposeBag*/public inProgress(id: string) {let possibleSubs = this.subscriptions.get(id)if (possibleSubs) {return !possibleSubs.closed} else {return false}}

/*** Never forget to call this method in your onDestroy() method of your components.** @memberof DisposeBag*/public destroy() {this._unsubscribeAll.next(null);this._unsubscribeAll.complete();}}