Why are Subjects not recommended in .NET Reactive Extensions?

I am currently getting to grips with the Reactive Extensions framework for .NET and I am working my way through the various introduction resources I've found (mainly http://www.introtorx.com)

Our application involves a number of hardware interfaces that detect network frames, these will be my IObservables, I then have a variety of components that will consume those frames or perform some manner of transform on the data and produce a new type of frame. There will also be other components that need to display every n'th frame for example. I am convinced that Rx is going to be useful for our application, however I am struggling with the implementation details for the IObserver interface.

Most (if not all) of the resources I have been reading have said that I should not implement the IObservable interface myself but use one of the provided functions or classes. From my research it appears that creating a Subject<IBaseFrame> would provide me what I need, I would have my single thread that reads data from the hardware interface and then calls the OnNext function of my Subject<IBaseFrame> instance. The different IObserver components would then receive their notifications from that Subject.

My confusion is coming from the advice give in the appendix of this tutorial where it says:

Avoid the use of the subject types. Rx is effectively a functional programming paradigm. Using subjects means we are now managing state, which is potentially mutating. Dealing with both mutating state and asynchronous programming at the same time is very hard to get right. Furthermore, many of the operators (extension methods) have been carefully written to ensure that correct and consistent lifetime of subscriptions and sequences is maintained; when you introduce subjects, you can break this. Future releases may also see significant performance degradation if you explicitly use subjects.

My application is quite performance critical, I am obviously going to test the performance of using the Rx patterns before it goes in to production code; however I am worried that I am doing something that is against the spirit of the Rx framework by using the Subject class and that a future version of the framework is going to hurt performance.

Is there a better way of doing what I want? The hardware polling thread is going to be running continuously whether there are any observers or not (the HW buffer will back up otherwise), so this is a very hot sequence. I need to then pass the received frames out to multiple observers.

Any advice would be greatly appreciated.

40502 次浏览

The quoted block text pretty much explains why you shouldn't be using Subject<T>, but to put it simpler, you are combining the functions of observer and observable, while injecting some sort of state in between (whether you're encapsulating or extending).

This is where you run into trouble; these responsibilities should be separate and distinct from each other.

That said, in your specific case, I'd recommend that you break your concerns into smaller parts.

First, you have your thread that is hot, and always monitoring the hardware for signals to raise notifications for. How would you do this normally? Events. So let's start with that.

Let's define the EventArgs that your event will fire.

// The event args that has the information.
public class BaseFrameEventArgs : EventArgs
{
public BaseFrameEventArgs(IBaseFrame baseFrame)
{
// Validate parameters.
if (baseFrame == null) throw new ArgumentNullException("IBaseFrame");


// Set values.
BaseFrame = baseFrame;
}


// Poor man's immutability.
public IBaseFrame BaseFrame { get; private set; }
}

Now, the class that will fire the event. Note, this could be a static class (since you always have a thread running monitoring the hardware buffer), or something you call on-demand which subscribes to that. You'll have to modify this as appropriate.

public class BaseFrameMonitor
{
// You want to make this access thread safe
public event EventHandler<BaseFrameEventArgs> HardwareEvent;


public BaseFrameMonitor()
{
// Create/subscribe to your thread that
// drains hardware signals.
}
}

So now you have a class that exposes an event. Observables work well with events. So much so that there's first-class support for converting streams of events (think of an event stream as multiple firings of an event) into IObservable<T> implementations if you follow the standard event pattern, through the static FromEventPattern method on the Observable class.

With the source of your events, and the FromEventPattern method, we can create an IObservable<EventPattern<BaseFrameEventArgs>> easily (the EventPattern<TEventArgs> class embodies what you'd see in a .NET event, notably, an instance derived from EventArgs and an object representing the sender), like so:

// The event source.
// Or you might not need this if your class is static and exposes
// the event as a static event.
var source = new BaseFrameMonitor();


// Create the observable.  It's going to be hot
// as the events are hot.
IObservable<EventPattern<BaseFrameEventArgs>> observable = Observable.
FromEventPattern<BaseFrameEventArgs>(
h => source.HardwareEvent += h,
h => source.HardwareEvent -= h);

Of course, you want an IObservable<IBaseFrame>, but that's easy, using the Select extension method on the Observable class to create a projection (just like you would in LINQ, and we can wrap all of this up in an easy-to-use method):

public IObservable<IBaseFrame> CreateHardwareObservable()
{
// The event source.
// Or you might not need this if your class is static and exposes
// the event as a static event.
var source = new BaseFrameMonitor();


// Create the observable.  It's going to be hot
// as the events are hot.
IObservable<EventPattern<BaseFrameEventArgs>> observable = Observable.
FromEventPattern<BaseFrameEventArgs>(
h => source.HardwareEvent += h,
h => source.HardwareEvent -= h);


// Return the observable, but projected.
return observable.Select(i => i.EventArgs.BaseFrame);
}

In general you should avoid using Subject, however for the thing you are doing here I think they work quite well. I asked a similar question when I came across the "avoid subjects" message in Rx tutorials.

To quote Dave Sexton (of Rxx)

"Subjects are the stateful components of Rx. They are useful for when you need to create an event-like observable as a field or a local variable."

I tend to use them as the entry point into Rx. So if I have some code that needs to say 'something happened' (like you have), I would use a Subject and call OnNext. Then expose that as an IObservable for others to subscribe to (you can use AsObservable() on your subject to make sure nobody can cast to a Subject and mess things up).

You could also achieve this with a .NET event and use FromEventPattern, but if I'm only going to turn the event into an IObservable anyway, I don't see the benefit of having an event instead of a Subject (which might mean I'm missing something here)

However, what you should avoid quite strongly is subscribing to an IObservable with a Subject, i.e. don't pass a Subject into the IObservable.Subscribe method.

If we ignore my dogmatic ways and ignore "subjects are good/bad" all together. Let us look at the problem space.

I bet you either have 1 of 2 styles of system you need to ingrate to.

  1. The system raises an event or a call back when a message arrives
  2. You need to poll the system to see if there are any message to process

For option 1, easy, we just wrap it with the appropriate FromEvent method and we are done. To the Pub!

For option 2, we now need to consider how we poll this and how to do this effciently. Also when we get the value, how do we publish it?

I would imagine that you would want a dedicated thread for polling. You wouldn't want some other coder hammering the ThreadPool/TaskPool and leaving you in a ThreadPool starvation situation. Alternatively you don't want the hassle of context switching (I guess). So assume we have our own thread, we will probably have some sort of While/Sleep loop that we sit in to poll. When the check finds some messages we publish them. Well all of this sounds perfect for Observable.Create. Now we probably cant use a While loop as that wont allow us to ever return a Disposable to allow cancellation. Luckily you have read the whole book so are savvy with Recursive scheduling!

I imagine something like this could work. #NotTested

public class MessageListener
{
private readonly IObservable<IMessage> _messages;
private readonly IScheduler _scheduler;


public MessageListener()
{
_scheduler = new EventLoopScheduler();


var messages = ListenToMessages()
.SubscribeOn(_scheduler)
.Publish();


_messages = messages;
messages.Connect();
}


public IObservable<IMessage> Messages
{
get {return _messages;}
}


private IObservable<IMessage> ListenToMessages()
{
return Observable.Create<IMessage>(o=>
{
return _scheduler.Schedule(recurse=>
{
try
{
var messages = GetMessages();
foreach (var msg in messages)
{
o.OnNext(msg);
}
recurse();
}
catch (Exception ex)
{
o.OnError(ex);
}
});
});
}


private IEnumerable<IMessage> GetMessages()
{
//Do some work here that gets messages from a queue,
// file system, database or other system that cant push
// new data at us.
//
//This may return an empty result when no new data is found.
}
}

The reason I really don't like Subjects, is that is usually a case of the developer not really having a clear design on the problem. Hack in a subject, poke it here there and everywhere, and then let the poor support dev guess at WTF was going on. When you use the Create/Generate etc methods you are localizing the effects on the sequence. You can see it all in one method and you know no-one else is throwing in a nasty side effect. If I see a subject fields I now have to go looking for all the places in a class it is being used. If some MFer exposes one publicly, then all bets are off, who knows how this sequence is being used! Async/Concurrency/Rx is hard. You don't need to make it harder by allowing side effects and causality programming to spin your head even more.

Often when you're managing a Subject, you're actually just reimplementing features already in Rx, and probably in not as robust, simple and extensible a way.

When you're trying to adapt some asynchronous data flow into Rx (or create an asynchronous data flow from one that's not currently asynchronous), the most common cases are usually:

  • The source of data is an event: As Lee says, this is the simplest case: use FromEvent and head to the pub.

  • The source of data is from a synchronous operation and you want polled updates, (eg a webservice or database call): In this case you could use Lee's suggested approach, or for simple cases, you could use something like Observable.Interval.Select(_ => <db fetch>). You may want to use DistinctUntilChanged() to prevent publishing updates when nothing has changed in the source data.

  • The source of data is some kind of asynchronous api that calls your callback: In this case, use Observable.Create to hook up your callback to call OnNext/OnError/OnComplete on the observer.

  • The source of data is a call that blocks until new data is available (eg some synchronous socket read operations): In this case, you can use Observable.Create to wrap the imperative code that reads from the socket and publishes to the Observer.OnNext when data is read. This may be similar to what you're doing with the Subject.

Using Observable.Create vs creating a class that manages a Subject is fairly equivalent to using the yield keyword vs creating a whole class that implements IEnumerator. Of course, you can write an IEnumerator to be as clean and as good a citizen as the yield code, but which one is better encapsulated and feels a neater design? The same is true for Observable.Create vs managing Subjects.

Observable.Create gives you a clean pattern for lazy setup and clean teardown. How do you achieve this with a class wrapping a Subject? You need some kind of Start method... how do you know when to call it? Or do you just always start it, even when no one is listening? And when you're done, how do you get it to stop reading from the socket/polling the database, etc? You have to have some kind of Stop method, and you have to still have access not just to the IObservable you're subscribed to, but the class that created the Subject in the first place.

With Observable.Create, it's all wrapped up in one place. The body of Observable.Create is not run until someone subscribes, so if no one subscribes, you never use your resource. And Observable.Create returns a Disposable that can cleanly shutdown your resource/callbacks, etc - this is called when the Observer unsubscribes. The lifetimes of the resources you're using to generate the Observable are neatly tied to the lifetime of the Observable itself.

It is bad to generalize that Subjects are not good to use for a public interface. While it is certainly true, that this is not the way a reactive programming approach should look like, it is definitively a good improvement/refactoring option for your classic code.

If you have a normal property with an public set accessor and you want to notify about changes, there speaks nothing against replacing it with a BehaviorSubject. INPC or additional other events are just not that clean and it personally wears me off. For this purpose you can and should use BehaviorSubjects as public properties instead of normal properties and ditch INPC or other events.

Additionally the Subject-interface makes the users of your interface more aware about the functionality of your properties and are more likely to subscribe instead of just getting the value.

It is the best to use if you want others to listen/subscribe to changes of a property.