在RxJs 5中共享Angular Http网络调用结果的正确方法是什么?

通过使用Http,我们调用一个方法来进行网络调用,并返回一个Http可观察对象:

getCustomer() {
return this.http.get('/someUrl').map(res => res.json());
}

如果我们获取这个可观察对象并向其添加多个订阅者:

let network$ = getCustomer();


let subscriber1 = network$.subscribe(...);
let subscriber2 = network$.subscribe(...);

我们要做的是确保这不会导致多个网络请求。

这似乎是一个不寻常的场景,但实际上很常见:例如,如果调用者订阅了可观察对象以显示错误消息,并使用异步管道将其传递给模板,那么我们已经有两个订阅者了。

在RxJs 5中正确的方法是什么?

也就是说,这似乎工作得很好:

getCustomer() {
return this.http.get('/someUrl').map(res => res.json()).share();
}

但是这是RxJs 5中惯用的方法吗,或者我们应该用别的方法来代替?

注意:根据Angular 5的新HttpClient,所有示例中的.map(res => res.json())部分现在都是无用的,因为现在默认假设JSON结果。

111569 次浏览

您是否试过运行已经拥有的代码?

因为你正在从getJSON()产生的承诺构造Observable,所以网络请求是在任何人订阅之前发出的。最终的承诺由所有订阅者共享。

var promise = jQuery.getJSON(requestUrl); // network call is executed now
var o = Rx.Observable.fromPromise(promise); // just wraps it in an observable
o.subscribe(...); // does not trigger network call
o.subscribe(...); // does not trigger network call
// ...

编辑:从2021年起,正确的方法是使用rxj原生提出的shareReplay操作符。详见下面的回答。


缓存数据,如果可用,则返回此数据,否则发出HTTP请求。

import {Injectable} from '@angular/core';
import {Http, Headers} from '@angular/http';
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/of'; //proper way to import the 'of' operator
import 'rxjs/add/operator/share';
import 'rxjs/add/operator/map';
import {Data} from './data';


@Injectable()
export class DataService {
private url: string = 'https://cors-test.appspot.com/test';
  

private data: Data;
private observable: Observable<any>;


constructor(private http: Http) {}


getData() {
if(this.data) {
// if `data` is available just return it as `Observable`
return Observable.of(this.data);
} else if(this.observable) {
// if `this.observable` is set then the request is in progress
// return the `Observable` for the ongoing request
return this.observable;
} else {
// example header (not necessary)
let headers = new Headers();
headers.append('Content-Type', 'application/json');
// create the request, store the `Observable` for subsequent subscribers
this.observable = this.http.get(this.url, {
headers: headers
})
.map(response =>  {
// when the cached data is available we don't need the `Observable` reference anymore
this.observable = null;


if(response.status == 400) {
return "FAILURE";
} else if(response.status == 200) {
this.data = new Data(response.json());
return this.data;
}
// make it shared so more than one subscriber can get the result
})
.share();
return this.observable;
}
}
}

< a href = " https://plnkr.co/edit/WpDtCkS4gslYwousZlmi?p=preview" rel="noreferrer">活塞示例 . p=preview" rel="noreferrer">

本文https://blog.thoughtram.io/angular/2018/03/05/advanced-caching-with-rxjs.html很好地解释了如何使用shareReplay进行缓存。

我把问题打了星号,但我会试着试一下。

//this will be the shared observable that
//anyone can subscribe to, get the value,
//but not cause an api request
let customer$ = new Rx.ReplaySubject(1);


getCustomer().subscribe(customer$);


//here's the first subscriber
customer$.subscribe(val => console.log('subscriber 1: ' + val));


//here's the second subscriber
setTimeout(() => {
customer$.subscribe(val => console.log('subscriber 2: ' + val));
}, 1000);


function getCustomer() {
return new Rx.Observable(observer => {
console.log('api request');
setTimeout(() => {
console.log('api response');
observer.next('customer object');
observer.complete();
}, 500);
});
}

下面是证明:)

只有一个结论:getCustomer().subscribe(customer$)

我们不是在订阅getCustomer()的api响应,我们是在订阅一个ReplaySubject,它是可观察的,它也可以订阅一个不同的可观察对象,并且(这很重要)持有它最后发出的值,并将它重新发布给它的任何(ReplaySubject的)订阅者。

根据@Cristian的建议,这是一种适用于HTTP可观察对象的方法,它只发射一次,然后就完成了:

getCustomer() {
return this.http.get('/someUrl')
.map(res => res.json()).publishLast().refCount();
}

只要在地图之后和任何订阅之前调用分享()

在我的例子中,我有一个通用服务(RestClientService.ts),它进行其余调用,提取数据,检查错误,并将可观察对象返回到具体的实现服务(f.ex)。: ContractClientService.ts),最后这个具体的实现将可观察对象返回给de ContractComponent。Ts,这个订阅来更新视图。

RestClientService.ts:

export abstract class RestClientService<T extends BaseModel> {


public GetAll = (path: string, property: string): Observable<T[]> => {
let fullPath = this.actionUrl + path;
let observable = this._http.get(fullPath).map(res => this.extractData(res, property));
observable = observable.share();  //allows multiple subscribers without making again the http request
observable.subscribe(
(res) => {},
error => this.handleError2(error, "GetAll", fullPath),
() => {}
);
return observable;
}


private extractData(res: Response, property: string) {
...
}
private handleError2(error: any, method: string, path: string) {
...
}


}

ContractService.ts:

export class ContractService extends RestClientService<Contract> {
private GET_ALL_ITEMS_REST_URI_PATH = "search";
private GET_ALL_ITEMS_PROPERTY_PATH = "contract";
public getAllItems(): Observable<Contract[]> {
return this.GetAll(this.GET_ALL_ITEMS_REST_URI_PATH, this.GET_ALL_ITEMS_PROPERTY_PATH);
}


}

ContractComponent.ts:

export class ContractComponent implements OnInit {


getAllItems() {
this.rcService.getAllItems().subscribe((data) => {
this.items = data;
});
}


}

根据这个文章

事实证明,通过添加publishReplay(1)和refCount,我们可以轻松地将缓存添加到可观察对象中。

所以if语句内部只是追加

.publishReplay(1)
.refCount();

.map(...)

我找到了一种将http get结果存储到sessionStorage并将其用于会话的方法,这样它就永远不会再次调用服务器。

我用它来调用github API,以避免使用限制。

@Injectable()
export class HttpCache {
constructor(private http: Http) {}


get(url: string): Observable<any> {
let cached: any;
if (cached === sessionStorage.getItem(url)) {
return Observable.of(JSON.parse(cached));
} else {
return this.http.get(url)
.map(resp => {
sessionStorage.setItem(url, resp.text());
return resp.json();
});
}
}
}

供您参考,sessionStorage限制是5M(或4.75M)。因此,它不应该用于大型数据集。

------ edit -------------
如果你想用F5刷新数据,它使用内存数据而不是sessionStorage;< / p >

@Injectable()
export class HttpCache {
cached: any = {};  // this will store data
constructor(private http: Http) {}


get(url: string): Observable<any> {
if (this.cached[url]) {
return Observable.of(this.cached[url]));
} else {
return this.http.get(url)
.map(resp => {
this.cached[url] = resp.text();
return resp.json();
});
}
}
}

我写了一个缓存类,

/**
* Caches results returned from given fetcher callback for given key,
* up to maxItems results, deletes the oldest results when full (FIFO).
*/
export class StaticCache
{
static cachedData: Map<string, any> = new Map<string, any>();
static maxItems: number = 400;


static get(key: string){
return this.cachedData.get(key);
}


static getOrFetch(key: string, fetcher: (string) => any): any {
let value = this.cachedData.get(key);


if (value != null){
console.log("Cache HIT! (fetcher)");
return value;
}


console.log("Cache MISS... (fetcher)");
value = fetcher(key);
this.add(key, value);
return value;
}


static add(key, value){
this.cachedData.set(key, value);
this.deleteOverflowing();
}


static deleteOverflowing(): void {
if (this.cachedData.size > this.maxItems) {
this.deleteOldest(this.cachedData.size - this.maxItems);
}
}


/// A Map object iterates its elements in insertion order — a for...of loop returns an array of [key, value] for each iteration.
/// However that seems not to work. Trying with forEach.
static deleteOldest(howMany: number): void {
//console.debug("Deleting oldest " + howMany + " of " + this.cachedData.size);
let iterKeys = this.cachedData.keys();
let item: IteratorResult<string>;
while (howMany-- > 0 && (item = iterKeys.next(), !item.done)){
//console.debug("    Deleting: " + item.value);
this.cachedData.delete(item.value); // Deleting while iterating should be ok in JS.
}
}


static clear(): void {
this.cachedData = new Map<string, any>();
}


}

由于我们使用它的方式不同,所以它都是静态的,但是可以随意地将它变成一个正常的类和服务。我不确定angular是否会一直保持一个实例(对于Angular2来说是新的)。

我是这样使用它的:

            let httpService: Http = this.http;
function fetcher(url: string): Observable<any> {
console.log("    Fetching URL: " + url);
return httpService.get(url).map((response: Response) => {
if (!response) return null;
if (typeof response.json() !== "array")
throw new Error("Graph REST should return an array of vertices.");
let items: any[] = graphService.fromJSONarray(response.json(), httpService);
return array ? items : items[0];
});
}


// If data is a link, return a result of a service call.
if (this.data[verticesLabel][name]["link"] || this.data[verticesLabel][name]["_type"] == "link")
{
// Make an HTTP call.
let url = this.data[verticesLabel][name]["link"];
let cachedObservable: Observable<any> = StaticCache.getOrFetch(url, fetcher);
if (!cachedObservable)
throw new Error("Failed loading link: " + url);
return cachedObservable;
}

我认为可能有更聪明的方法,它将使用一些Observable技巧,但这对我的目的来说已经很好了。

只需使用这个缓存层,它就可以完成您需要的一切,甚至还可以管理ajax请求的缓存。

http://www.ravinderpayal.com/blogs/12Jan2017-Ajax-Cache-Mangement-Angular2-Service.html

用起来就这么简单

@Component({
selector: 'home',
templateUrl: './html/home.component.html',
styleUrls: ['./css/home.component.css'],
})
export class HomeComponent {
constructor(AjaxService:AjaxService){
AjaxService.postCache("/api/home/articles").subscribe(values=>{console.log(values);this.articles=values;});
}


articles={1:[{data:[{title:"first",sort_text:"description"},{title:"second",sort_text:"description"}],type:"Open Source Works"}]};
}

层(作为一个可注入的angular服务)是

import { Injectable }     from '@angular/core';
import { Http, Response} from '@angular/http';
import { Observable }     from 'rxjs/Observable';
import './../rxjs/operator'
@Injectable()
export class AjaxService {
public data:Object={};
/*
private dataObservable:Observable<boolean>;
*/
private dataObserver:Array<any>=[];
private loading:Object={};
private links:Object={};
counter:number=-1;
constructor (private http: Http) {
}
private loadPostCache(link:string){
if(!this.loading[link]){
this.loading[link]=true;
this.links[link].forEach(a=>this.dataObserver[a].next(false));
this.http.get(link)
.map(this.setValue)
.catch(this.handleError).subscribe(
values => {
this.data[link] = values;
delete this.loading[link];
this.links[link].forEach(a=>this.dataObserver[a].next(false));
},
error => {
delete this.loading[link];
}
);
}
}


private setValue(res: Response) {
return res.json() || { };
}


private handleError (error: Response | any) {
// In a real world app, we might use a remote logging infrastructure
let errMsg: string;
if (error instanceof Response) {
const body = error.json() || '';
const err = body.error || JSON.stringify(body);
errMsg = `${error.status} - ${error.statusText || ''} ${err}`;
} else {
errMsg = error.message ? error.message : error.toString();
}
console.error(errMsg);
return Observable.throw(errMsg);
}


postCache(link:string): Observable<Object>{


return Observable.create(observer=> {
if(this.data.hasOwnProperty(link)){
observer.next(this.data[link]);
}
else{
let _observable=Observable.create(_observer=>{
this.counter=this.counter+1;
this.dataObserver[this.counter]=_observer;
this.links.hasOwnProperty(link)?this.links[link].push(this.counter):(this.links[link]=[this.counter]);
_observer.next(false);
});
this.loadPostCache(link);
_observable.subscribe(status=>{
if(status){
observer.next(this.data[link]);
}
}
);
}
});
}
}

更新:Ben Lesh说在5.2.0之后的下一个小版本中,你将能够调用shareplay()来真正地缓存。

以前……

首先,不要使用share()或publishReplay(1). refcount(),它们是相同的,它的问题是,它只在可观察对象处于活动状态时建立连接时共享,如果你在它完成连接后,它会再次创建一个新的可观察对象,翻译,而不是真正的缓存。

Birowski给出了正确的解决方案,即使用ReplaySubject。ReplaySubject将缓存你给它的值(bufferSize),在我们的例子中是1。它不会像share()一样在refCount为零时创建一个新的可观察对象,并且你建立了一个新的连接,这是缓存的正确行为。

这是一个可重用函数

export function cacheable<T>(o: Observable<T>): Observable<T> {
let replay = new ReplaySubject<T>(1);
o.subscribe(
x => replay.next(x),
x => replay.error(x),
() => replay.complete()
);
return replay.asObservable();
}

下面是如何使用它

import { Injectable } from '@angular/core';
import { Http } from '@angular/http';
import { Observable } from 'rxjs/Observable';
import { cacheable } from '../utils/rxjs-functions';


@Injectable()
export class SettingsService {
_cache: Observable<any>;
constructor(private _http: Http, ) { }


refresh = () => {
if (this._cache) {
return this._cache;
}
return this._cache = cacheable<any>(this._http.get('YOUR URL'));
}
}

下面是一个更高级版本的可缓存函数。这个函数允许有自己的查找表+提供自定义查找表的能力。这样的话,你就不用检查了。_cache就像上面的例子。还要注意的是,你传递的不是可观察对象作为第一个参数,而是一个返回可观察对象的函数,这是因为Angular的Http会立即执行,所以通过返回一个延迟执行的函数,如果它已经在缓存中,我们可以决定不调用它。

let cacheableCache: { [key: string]: Observable<any> } = {};
export function cacheable<T>(returnObservable: () => Observable<T>, key?: string, customCache?: { [key: string]: Observable<T> }): Observable<T> {
if (!!key && (customCache || cacheableCache)[key]) {
return (customCache || cacheableCache)[key] as Observable<T>;
}
let replay = new ReplaySubject<T>(1);
returnObservable().subscribe(
x => replay.next(x),
x => replay.error(x),
() => replay.complete()
);
let observable = replay.asObservable();
if (!!key) {
if (!!customCache) {
customCache[key] = observable;
} else {
cacheableCache[key] = observable;
}
}
return observable;
}

用法:

getData() => cacheable(this._http.get("YOUR URL"), "this is key for my cache")

rxjs 5.3.0

我一直不满意.map(myFunction).publishReplay(1).refCount()

对于多个订阅者,.map()在某些情况下执行myFunction两次(我希望它只执行一次)。一个修复似乎是publishReplay(1).refCount().take(1)

你可以做的另一件事是不使用refCount(),并立即使可观察对象热:

let obs = this.http.get('my/data.json').publishReplay(1);
obs.connect();
return obs;

这将启动HTTP请求,而不考虑订阅者。我不确定在HTTP GET完成之前取消订阅是否会取消它。

我假设@ngx-cache /核心对于维护http调用的缓存功能是有用的,特别是当http调用同时在浏览器服务器平台上进行时。

假设我们有以下方法:

getCustomer() {
return this.http.get('/someUrl').map(res => res.json());
}

您可以使用@ngx-cache /核心Cached装饰器来存储在cache storage (storage可以配置,请检查实现ng-seed/universal)上进行HTTP调用的方法的返回值-就在第一次执行时。下一次调用该方法时(无论是在浏览器还是服务器平台上),该值将从cache storage. c中检索。

import { Cached } from '@ngx-cache/core';


...


@Cached('get-customer') // the cache key/identifier
getCustomer() {
return this.http.get('/someUrl').map(res => res.json());
}

还有一种可能性是使用缓存API来使用缓存方法(hasgetset)。

anyclass.ts

...
import { CacheService } from '@ngx-cache/core';


@Injectable()
export class AnyClass {
constructor(private readonly cache: CacheService) {
// note that CacheService is injected into a private property of AnyClass
}


// will retrieve 'some string value'
getSomeStringValue(): string {
if (this.cache.has('some-string'))
return this.cache.get('some-string');


this.cache.set('some-string', 'some string value');
return 'some string value';
}
}

下面是客户端和服务器端缓存的包列表:

您选择的实现将取决于是否希望unsubscribe()取消您的HTTP请求。

在任何情况下,打印稿修饰符都是标准化行为的好方法。这是我写的:

  @CacheObservableArgsKey
getMyThing(id: string): Observable<any> {
return this.http.get('things/'+id);
}

装饰器定义:

/**
* Decorator that replays and connects to the Observable returned from the function.
* Caches the result using all arguments to form a key.
* @param target
* @param name
* @param descriptor
* @returns {PropertyDescriptor}
*/
export function CacheObservableArgsKey(target: Object, name: string, descriptor: PropertyDescriptor) {
const originalFunc = descriptor.value;
const cacheMap = new Map<string, any>();
descriptor.value = function(this: any, ...args: any[]): any {
const key = args.join('::');


let returnValue = cacheMap.get(key);
if (returnValue !== undefined) {
console.log(`${name} cache-hit ${key}`, returnValue);
return returnValue;
}


returnValue = originalFunc.apply(this, args);
console.log(`${name} cache-miss ${key} new`, returnValue);
if (returnValue instanceof Observable) {
returnValue = returnValue.publishReplay(1);
returnValue.connect();
}
else {
console.warn('CacheHttpArgsKey: value not an Observable cannot publishReplay and connect', returnValue);
}
cacheMap.set(key, returnValue);
return returnValue;
};


return descriptor;
}

rxjs 5.4.0有一个新的shareReplay方法。

作者显式地说“非常适合处理缓存AJAX结果之类的事情”

rxjs PR #2443 feat(shareereplay):添加shareReplaypublishReplay的变体

shareplay返回一个多播源的可观察对象 ReplaySubject。重放主题被错误地从 源,而不是对源的完成。这使得共享重玩 非常适合处理诸如缓存AJAX结果之类的事情 的事情。但是,重复行为不同于分享行为 它不会重复源可观察对象,而是会重复 来源可观察对象的值

使用Rxjs观察者/可观察对象+缓存+订阅的可缓存HTTP响应数据

参见下面的代码

*免责声明:我是rxjs的新手,所以请记住,我可能误用了可观察对象/观察者方法。我的解决方案纯粹是我找到的其他解决方案的集合,是没有找到一个简单的、有充分证据的解决方案的结果。因此,我提供了我完整的代码解决方案(因为我希望已经找到),希望它能帮助其他人。

*注意,此方法松散地基于GoogleFirebaseObservables。不幸的是,我缺乏适当的经验/时间来复制他们在幕后所做的事情。但是下面是提供对一些可缓存数据的异步访问的简单方法。

情况: 'product-list'组件的任务是显示产品列表。该网站是一个单页网页应用程序,带有一些菜单按钮,可以“过滤”页面上显示的产品。

< em > < / em >解决方案:组件“订阅”一个服务方法。service方法返回一个产品对象数组,组件通过订阅回调访问该数组。服务方法将其活动包装在新创建的Observer中,并返回该Observer。在这个观察器中,它搜索缓存的数据并将其传递给订阅者(组件)并返回。否则,它会发出一个http调用来检索数据,订阅响应,在那里您可以处理该数据(例如,将数据映射到您自己的模型),然后将数据传递回订阅者。

的代码

product-list.component.ts

import { Component, OnInit, Input } from '@angular/core';
import { ProductService } from '../../../services/product.service';
import { Product, ProductResponse } from '../../../models/Product';


@Component({
selector: 'app-product-list',
templateUrl: './product-list.component.html',
styleUrls: ['./product-list.component.scss']
})
export class ProductListComponent implements OnInit {
products: Product[];


constructor(
private productService: ProductService
) { }


ngOnInit() {
console.log('product-list init...');
this.productService.getProducts().subscribe(products => {
console.log('product-list received updated products');
this.products = products;
});
}
}

product.service.ts

import { Injectable } from '@angular/core';
import { Http, Headers } from '@angular/http';
import { Observable, Observer } from 'rxjs';
import 'rxjs/add/operator/map';
import { Product, ProductResponse } from '../models/Product';


@Injectable()
export class ProductService {
products: Product[];


constructor(
private http:Http
) {
console.log('product service init.  calling http to get products...');


}


getProducts():Observable<Product[]>{
//wrap getProducts around an Observable to make it async.
let productsObservable$ = Observable.create((observer: Observer<Product[]>) => {
//return products if it was previously fetched
if(this.products){
console.log('## returning existing products');
observer.next(this.products);
return observer.complete();


}
//Fetch products from REST API
console.log('** products do not yet exist; fetching from rest api...');
let headers = new Headers();
this.http.get('http://localhost:3000/products/',  {headers: headers})
.map(res => res.json()).subscribe((response:ProductResponse) => {
console.log('productResponse: ', response);
let productlist = Product.fromJsonList(response.products); //convert service observable to product[]
this.products = productlist;
observer.next(productlist);
});
});
return productsObservable$;
}
}

产品。Ts(模型)

export interface ProductResponse {
success: boolean;
msg: string;
products: Product[];
}


export class Product {
product_id: number;
sku: string;
product_title: string;
..etc...


constructor(product_id: number,
sku: string,
product_title: string,
...etc...
){
//typescript will not autoassign the formal parameters to related properties for exported classes.
this.product_id = product_id;
this.sku = sku;
this.product_title = product_title;
...etc...
}






//Class method to convert products within http response to pure array of Product objects.
//Caller: product.service:getProducts()
static fromJsonList(products:any): Product[] {
let mappedArray = products.map(Product.fromJson);
return mappedArray;
}


//add more parameters depending on your database entries and constructor
static fromJson({
product_id,
sku,
product_title,
...etc...
}): Product {
return new Product(
product_id,
sku,
product_title,
...etc...
);
}
}

下面是我在Chrome中加载页面时看到的输出示例。注意,在初始加载时,产品是从http(调用我的节点休息服务,该服务在端口3000上本地运行)获取的。然后,当我单击导航到产品的“筛选”视图时,产品就会在缓存中找到。

我的Chrome日志(控制台):

core.es5.js:2925 Angular is running in the development mode. Call enableProdMode() to enable the production mode.
app.component.ts:19 app.component url: /products
product.service.ts:15 product service init.  calling http to get products...
product-list.component.ts:18 product-list init...
product.service.ts:29 ** products do not yet exist; fetching from rest api...
product.service.ts:33 productResponse:  {success: true, msg: "Products found", products: Array(23)}
product-list.component.ts:20 product-list received updated products

...[点击菜单按钮过滤产品]…

app.component.ts:19 app.component url: /products/chocolatechip
product-list.component.ts:18 product-list init...
product.service.ts:24 ## returning existing products
product-list.component.ts:20 product-list received updated products

结论:这是我发现的(到目前为止)实现可缓存http响应数据的最简单的方法。在我的angular应用中,每次我导航到产品的不同视图时,产品列表组件都会重新加载。ProductService似乎是一个共享实例,因此在导航期间,ProductService中'products: Product[]'的本地缓存将被保留,后续调用"GetProducts()"将返回缓存的值。最后要注意的是,我读过一些关于当你完成任务时如何关闭可观察对象/订阅以防止“内存泄漏”的评论。我在这里没有包括这一点,但这是需要记住的。

它是.publishReplay(1).refCount();.publishLast().refCount();,因为Angular Http的可观察对象在请求后完成。

这个简单的类缓存结果,因此您可以多次订阅.value,并且只发出一个请求。你也可以使用.reload()来发出新的请求并发布数据。

你可以这样使用它:

let res = new RestResource(() => this.http.get('inline.bundleo.js'));


res.status.subscribe((loading)=>{
console.log('STATUS=',loading);
});


res.value.subscribe((value) => {
console.log('VALUE=', value);
});

来源是:

export class RestResource {


static readonly LOADING: string = 'RestResource_Loading';
static readonly ERROR: string = 'RestResource_Error';
static readonly IDLE: string = 'RestResource_Idle';


public value: Observable<any>;
public status: Observable<string>;
private loadStatus: Observer<any>;


private reloader: Observable<any>;
private reloadTrigger: Observer<any>;


constructor(requestObservableFn: () => Observable<any>) {
this.status = Observable.create((o) => {
this.loadStatus = o;
});


this.reloader = Observable.create((o: Observer<any>) => {
this.reloadTrigger = o;
});


this.value = this.reloader.startWith(null).switchMap(() => {
if (this.loadStatus) {
this.loadStatus.next(RestResource.LOADING);
}
return requestObservableFn()
.map((res) => {
if (this.loadStatus) {
this.loadStatus.next(RestResource.IDLE);
}
return res;
}).catch((err)=>{
if (this.loadStatus) {
this.loadStatus.next(RestResource.ERROR);
}
return Observable.of(null);
});
}).publishReplay(1).refCount();
}


reload() {
this.reloadTrigger.next(null);
}


}

你可以创建一个简单的类Cacheable<>来帮助管理从多个订阅者的http服务器中检索到的数据:

declare type GetDataHandler<T> = () => Observable<T>;


export class Cacheable<T> {


protected data: T;
protected subjectData: Subject<T>;
protected observableData: Observable<T>;
public getHandler: GetDataHandler<T>;


constructor() {
this.subjectData = new ReplaySubject(1);
this.observableData = this.subjectData.asObservable();
}


public getData(): Observable<T> {
if (!this.getHandler) {
throw new Error("getHandler is not defined");
}
if (!this.data) {
this.getHandler().map((r: T) => {
this.data = r;
return r;
}).subscribe(
result => this.subjectData.next(result),
err => this.subjectData.error(err)
);
}
return this.observableData;
}


public resetCache(): void {
this.data = null;
}


public refresh(): void {
this.resetCache();
this.getData();
}


}

使用

声明Cacheable<>对象(假设是服务的一部分):

list: Cacheable<string> = new Cacheable<string>();

和处理程序:

this.list.getHandler = () => {
// get data from server
return this.http.get(url)
.map((r: Response) => r.json() as string[]);
}

从组件调用:

//gets data from server
List.getData().subscribe(…)

您可以有多个组件订阅到它。

更多细节和代码示例在这里:http://devinstance.net/articles/20171021/rxjs-cacheable

我们要做的是确保这不会导致多个网络请求。

我个人最喜欢使用async方法来调用网络请求。方法本身不返回值,而是更新同一服务中的BehaviorSubject,组件将订阅该服务。

为什么使用BehaviorSubject而不是Observable?因为,

  • 在订阅时,BehaviorSubject返回最后一个值,而常规可观察对象仅在接收到onnext时触发。
  • 如果你想在非可观察代码(没有订阅)中检索BehaviorSubject的最后一个值,你可以使用getValue()方法。

例子:

customer.service.ts

public customers$: BehaviorSubject<Customer[]> = new BehaviorSubject([]);


public async getCustomers(): Promise<void> {
let customers = await this.httpClient.post<LogEntry[]>(this.endPoint, criteria).toPromise();
if (customers)
this.customers$.next(customers);
}

然后,无论在哪里需要,我们都可以订阅customers$

public ngOnInit(): void {
this.customerService.customers$
.subscribe((customers: Customer[]) => this.customerList = customers);
}

或者您可能想直接在模板中使用它

<li *ngFor="let customer of customerService.customers$ | async"> ... </li>

所以现在,在你再次调用getCustomers之前,数据将保留在customers$行为主体中。

如果想要刷新这些数据,该怎么办呢?只需调用getCustomers()

public async refresh(): Promise<void> {
try {
await this.customerService.getCustomers();
}
catch (e) {
// request failed, handle exception
console.error(e);
}
}

使用此方法,我们不必在后续网络调用之间显式保留数据,因为它是由BehaviorSubject处理的。

PS:通常当一个组件被销毁时,摆脱订阅是一个很好的实践,为此你可以使用 answer中建议的方法。

伟大的答案。

或者你可以这样做:

这是rxjs的最新版本。我使用的是RxJS5.5.7版本

import {share} from "rxjs/operators";


this.http.get('/someUrl').pipe(share());

你可以简单地使用< >强ngx-cacheable < / >强!它更适合你的场景。

使用这个的好处

  • 它只调用rest API一次,缓存响应&为下面的请求返回相同的值。
  • 在创建/更新/删除操作后,可以根据需要调用API。

所以,你的服务类将是这样的-

import { Injectable } from '@angular/core';
import { Cacheable, CacheBuster } from 'ngx-cacheable';


const customerNotifier = new Subject();


@Injectable()
export class customersService {


// relieves all its caches when any new value is emitted in the stream using notifier
@Cacheable({
cacheBusterObserver: customerNotifier,
async: true
})
getCustomer() {
return this.http.get('/someUrl').map(res => res.json());
}


// notifies the observer to refresh the data
@CacheBuster({
cacheBusterNotifier: customerNotifier
})
addCustomer() {
// some code
}


// notifies the observer to refresh the data
@CacheBuster({
cacheBusterNotifier: customerNotifier
})
updateCustomer() {
// some code
}
}

< a href = " https://itnext。io/improve-your-angular-app-performance-by-using-this-simple- observed -cache-decorator-71e81dfa76ae" rel="nofollow noreferrer">这里 . exe是更多参考的链接。

RXJS 5.4.0版本(2017-05-09)增加了对shareReplay的支持。

为什么使用共享回放?

当您有不希望在多个订阅者之间执行的副作用或繁重的计算时,通常需要使用shareReplay。在您知道流的后期订阅者需要访问先前发出的值的情况下,它可能也很有价值。这种在订阅上重放价值的能力是share和shareereplay的区别所在。

你可以很容易地修改一个angular服务来使用它,并返回一个带有缓存结果的可观察对象,它只会进行一次http调用(假设第一次调用成功)。

Angular服务示例

下面是一个使用shareReplay的非常简单的客户服务。

customer.service.ts

import { shareReplay } from 'rxjs/operators';
import { Observable } from 'rxjs';
import { HttpClient } from '@angular/common/http';


@Injectable({providedIn: 'root'})
export class CustomerService {


private readonly _getCustomers: Observable<ICustomer[]>;


constructor(private readonly http: HttpClient) {
this._getCustomers = this.http.get<ICustomer[]>('/api/customers/').pipe(shareReplay());
}
    

getCustomers() : Observable<ICustomer[]> {
return this._getCustomers;
}
}


export interface ICustomer {
/* ICustomer interface fields defined here */
}

注意,构造函数中的赋值可以移动到getCustomers方法,但作为从HttpClient are "cold"在构造函数中这样做是可以接受的,因为http调用只会在第一次调用subscribe时进行。

此外,这里还假设初始返回的数据在应用程序实例的生命周期内不会过时。

上面的大多数答案都适用于不接受输入的http请求。每次你想要使用一些输入进行api调用时,都需要重新创建请求。上面唯一可以处理这个的响应是@Arlo的回复

我已经创建了一个稍微简单的装饰器,您可以使用它将响应共享给每个具有相同输入的调用者。与Arlo的回复不同,它不会对延迟的订阅者重放响应,而是将同时发生的请求作为一个请求来处理。如果目标是重放响应给延迟的观察者(也就是缓存的响应),你可以修改下面的代码,用shareReplay(1)替换share():

https://gist.github.com/OysteinAmundsen/b97a2359292463feb8c0e2270ed6695a

import { finalize, Observable, share } from 'rxjs';


export function SharedObservable(): MethodDecorator {
const obs$ = new Map<string, Observable<any>>();
return (target: any, propertyKey: string | symbol, descriptor: PropertyDescriptor) => {
const originalMethod = descriptor.value;
descriptor.value = function (...args: any[]) {
const key = JSON.stringify(args);
if (!obs$.has(key)) {
// We have no observable for this key yet, so we create one
const res = originalMethod.apply(this, args).pipe(
share(), // Make the observable hot
finalize(() => obs$.delete(key)) // Cleanup when observable is complete
);
obs$.set(key, res);
}
// Return the cached observable
return obs$.get(key);
};
return descriptor;
};
}

用法:

@SharedObservable()
myFunc(id: number): Observable<any> {
return this.http.get<any>(`/api/someUrl/${id}`);
}