RxJS: 我如何“手动”更新一个可观察的?

我想我一定是误解了一些基本的东西,因为在我看来,这应该是可观察到的最基本的情况,但在我的一生中,我不能从医生那里弄清楚如何做到这一点。

基本上,我希望能够做到这一点:

// create a dummy observable, which I would update manually
var eventObservable = rx.Observable.create(function(observer){});
var observer = eventObservable.subscribe(
function(x){
console.log('next: ' + x);
}
...
var my_function = function(){
eventObservable.push('foo');
//'push' adds an event to the datastream, the observer gets it and prints
// next: foo
}

但是我还没有找到像 push这样的方法。我使用这个来处理点击,我知道他们有 Observable.fromEvent来处理,但是我试图用它来处理 React,我更愿意在回调中简单地更新数据流,而不是使用一个完全不同的事件处理系统。所以基本上我想要这个:

$( "#target" ).click(function(e) {
eventObservable.push(e.target.text());
});

我得到的最接近的是使用 observer.onNext('foo'),但这似乎并不实际工作,这是调用观察者,这似乎是不正确的。观察者应该是对数据流做出反应的东西,而不是改变它,对吗?

我是不是不理解观察者和可观察者之间的关系?

102934 次浏览

在 RX 中,观察者和可观察者是不同的实体。观察者订阅观察者。可观察对象通过调用观察对象的方法向其观察对象发送项目。如果需要调用 Observable.create()范围之外的观察器方法,可以使用 Subject,它是一个同时充当观察器和可观察器的代理。

你可以这样做:

var eventStream = new Rx.Subject();


var subscription = eventStream.subscribe(
function (x) {
console.log('Next: ' + x);
},
function (err) {
console.log('Error: ' + err);
},
function () {
console.log('Completed');
});


var my_function = function() {
eventStream.next('foo');
}

你可在此找到更多有关课题的资料:

我相信 Observable.create()不采取一个 观察者作为回调参数,而是一个发射器。所以如果你想给你的 显而易见增加一个新值,试试这个:

var emitter;
var observable = Rx.Observable.create(e => emitter = e);
var observer = {
next: function(next) {
console.log(next);
},
error: function(error) {
console.log(error);
},
complete: function() {
console.log("done");
}
}
observable.subscribe(observer);
emitter.next('foo');
emitter.next('bar');
emitter.next('baz');
emitter.complete();


//console output
//"foo"
//"bar"
//"baz"
//"done"

是的“主题”让它变得更容易,在同一个对象中提供“可观察”和“观察者”,但这并不完全相同,因为“主题”允许你将多个观察者订阅到同一个“可观察者”,当一个“可观察者”只向最后订阅的观察者发送数据时,你可以有意识地使用它。 如果你想修补的话,这是 JsBin

Var Observer = Observatory able.scrib (

函数(x){

 console.log('next: ' +

Var my _ function = function (){

Push (‘ hello’)

更新可观测数据的方法之一。