Wednesday, February 9, 2011

WPF - IObservable Collection based Reactive Models In MVVM (Rx)

This post is a continuation of the discussion we started in last post. This would take this discussion a little further deep in the incorporation of Reactive Extension (Rx) in MVVM. In previous post we discussed how Rx is the mathematical dual of Linq to Object. Additionally, the interfaces IObservable and IObserver are mathematical dual of IEnumerator and IEnumerable.

In this post, we are going to discuss the following:

1. IObservable as a Collection of items.
2. Observing Projections of IObservable.
3. Dispatching the observation on UI thread.

Example App:
As an example application, we would be using a Window with a list (for Students) and a notification at the bottom. The notification ellipse should remain yellow until the data is being loaded.


As soon as the data load is completed, the notification ellipse should be updated and filled with green as follows:


Collection based Reactive Models:
Let's create a window. The window should be populate with a list of students. As the list becomes completed, it should show some form of notification. For notification, we are adding an ellipsis.

<Window x:Class="WpfApp_MVVM_ReactiveModel.MainWindow"
xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
xmlns:local="clr-namespace:WpfApp_MVVM_ReactiveModel"
Title="MainWindow" Height="350" Width="525" >
<Window.DataContext>
<local:MainWindowViewModel />
</Window.DataContext>
<Grid>
<ListBox Height="262" HorizontalAlignment="Left" Margin="4,2,0,0"
Name="listBox1" VerticalAlignment="Top" Width="493"
ItemsSource="{Binding StudentList}">
<ListBox.ItemTemplate>
<DataTemplate >
<StackPanel Orientation="Horizontal">
<TextBlock Text="{Binding StudentId, StringFormat=Id:{0}}" />
<TextBlock Text=" " />
<TextBlock Text="{Binding StudentName, StringFormat=Name:{0}}" />
</StackPanel>
</DataTemplate>
</ListBox.ItemTemplate>
</ListBox>
<Ellipse Height="31" HorizontalAlignment="Left" Margin="322,270,0,0" Name="ellipse1"
Stroke="Black" VerticalAlignment="Top" Width="166" >
<Ellipse.Style>
<Style>
<Setter Property="Ellipse.Fill" Value="Yellow" />
<Style.Triggers>
<DataTrigger Binding="{Binding IsDataStreamFinished}" Value="true">
<Setter Property="Ellipse.Fill" Value="Green" />
</DataTrigger>
</Style.Triggers>
</Style>
</Ellipse.Style>
</Ellipse>
</Grid>
</Window>

As apparent from the above definition that the DataContext is expected to have at least two properties StudentList and IsDataStreamFinished. The data from StudentList is shown in the ListBox. A customized DataTemplate is defined to specify exactly how each member form the collection would be displayed. Each member of the collection StudentList is expected to have two properties StudentId and StudentName. As soon as the value of IsDataStreamFinished becomes true the ellipse fills with green color. This indicates the completion of data stream.

As per the expectation of above view, the following view model is provided. It has both properties available i.e. IsDataStreamFinished and StudentList. This view model supports change notification of these properties by implementing INotifyPropertyChanged interface. This requires to provide PropertyChanged event. This is raised through the setter of these properties providing the name of properties using magic strings.

class MainWindowViewModel : IObserver<Student>, INotifyPropertyChanged
{
#region Properties

private ObservableCollection<StudentViewModel> _studentList;
public ObservableCollection<StudentViewModel> StudentList
{
get { return _studentList; }
set
{
_studentList = value;
onPropertyChanged("StudentList");
}
}

private bool _isDataStreamFinished;
public bool IsDataStreamFinished
{
get { return _isDataStreamFinished; }
set
{
_isDataStreamFinished = value;
onPropertyChanged("IsDataStreamFinished");
}
}

#endregion Properties

#region Observable Model

//Model
private StudentsModel model;

#endregion Observable Model

IDisposable unsubscriber;

#region Constructor

public MainWindowViewModel()
{
_studentList = new ObservableCollection<StudentViewModel>();

model = new StudentsModel();
unsubscriber = model.Subscribe(this);
}

#endregion Constructor

#region IObserver implementation
public void OnCompleted()
{
IsDataStreamFinished = true;
}

public void OnError(Exception error)
{
//throw new NotImplementedException();
}

public void OnNext(Student value)
{
//throw new NotImplementedException();
_studentList.Add(new StudentViewModel(value));
}

#endregion IObserver implementation

#region INotifyPropertyChanged implementation

public event PropertyChangedEventHandler PropertyChanged;
private void onPropertyChanged(string propertyname)
{
if (PropertyChanged != null)
{
PropertyChanged(this, new PropertyChangedEventArgs(propertyname));
}
}

#endregion INotifyPropertyChanged implementation
}

The view model also implements IObserver of Student i.e. IObserver<Student>. This makes it subscribe to the collection of Student supporting observation by implementing IObservable<Student> like our model defined later in the discussion. Implementing IObserver<Student> requires it to provide the definition of three methods i.e. OnNext, OnCompleted and OnError. The IObservable executes OnNext of each observer which is still subscribed. Similarly, OnCompleted is executed by IObservable when the collection subscribed by IObserver is already completed. This might also be a subset of the actual collection. OnError is resulted in case of any exception providing the detail of exception in the method parameter.

In this example, OnComplete() results in setting IsDataStreamFinished to true. This would execute the data trigger in the view and fill the ellipse with Green. OnNext() is resulting in the addition of a new instance of StudentViewModel (constructed by using Student available through method parameter). The definition of StudentViewModel and Student is as follows:

class StudentViewModel : INotifyPropertyChanged
{
public StudentViewModel(Student student)
{
this.StudentId = student.StudentId;
this.StudentName = student.StudentName;
}

#region Properties

private string _studentName;
public string StudentName
{
get { return _studentName; }
set
{
_studentName = value;
OnPropertyChanged("StudentName");
}
}

private int _studentId;
public int StudentId
{
get { return _studentId; }
set
{
_studentId = value;
OnPropertyChanged("StudentId");
}
}

#endregion Properties

#region INotifyPropertyChanged implementation

public event PropertyChangedEventHandler PropertyChanged;
private void OnPropertyChanged(string propertyName)
{
if (PropertyChanged != null)
{
PropertyChanged(this, new PropertyChangedEventArgs(propertyName));
}
}

#endregion INotifyPropertyChanged implementation
}

class Student
{
public string StudentName { get; set; }
public int StudentId { get; set; }
}

In the above definition StudentViewModel implements INotifyPropertyChanged to support change notifications.

The Unsubscriber is the same as we used in previous post:

class Unsubscriber<T> : IDisposable
{
private List<IObserver<T>> _observers;
private IObserver<T> _observer;

public Unsubscriber(List<IObserver<T>> observers, IObserver<T> observer)
{
this._observers = observers;
this._observer = observer;
}

public void Dispose()
{
if (_observer != null && _observers.Contains(_observer))
_observers.Remove(_observer);
}
}

Now let us look at the definition of the model. It implements IObervable<Student>. This would enable it to support subscription of observers requiring the subscription. The only requirement is to provide definition of Subscribe method accepting the IObservable of same type. As you can guess that in order to execute OnNext, OnCompleted and OnError of these observers, the IObservable, somehow, needs to maintain observers in a form of list. The below model has Observers List for the same purpose.

class StudentsModel : IObservable<Student>
{
private int _studentId;

public List<IObserver<Student>> observers = new List<IObserver<Student>>();

public StudentsModel()
{
t.Interval = TimeSpan.FromSeconds(1);
t.Tick += new EventHandler(t_Tick);

t.Start();
}

void t_Tick(object sender, EventArgs e)
{
_studentId++;
//throw new NotImplementedException();
var student = new Student()
{
StudentId = _studentId,
StudentName = string.Format("M{0}", _studentId)
};

if (_studentId <= 20)
{
Observers.ForEach((observer) => observer.OnNext(student));
}
else
{
Observers.ForEach((observer) => observer.OnCompleted());
t.Stop();
}
}

DispatcherTimer t = new DispatcherTimer();

public IDisposable Subscribe(IObserver<Student> observer)
{
IDisposable unSubscriber = new Unsubscriber<Student>(Observers, observer);
if (!Observers.Contains(observer))
{
Observers.Add(observer);
}
return unSubscriber;
}
}

As the Observer calls Subscribe, it is added to the Observers list. This method returns an IDisposable based object. This object can serve as an unsubscriber for getting updates through OnNext, OnCompleted and OnNext through IObservable. As we call dispose on the unsubscriber object, it removes the corresponding observer from the Observers list. Please see a wishful definition of unsubscriber from the last post.

In order to simulate updates from another source, we have used Timer. This is a DispatcherTimer with an interval of 1 second. For each timer tick, we are creating a new Student instance. We are calling OnNext of each subscriber announcing the availability of a new Student. As we have seen in the view model, it would add a new StudentViewModel to its list. This continues till 20 students are added.


After which the timer stops simulating the end of data availability. This results in the execution of OnCompleted method of each observer.



Selective Members through IObservable:
Being a dual of IEnumerable, IObservable supports the features provided by IEnumerable. It supports all standard LSQO (LINQ Standard Query Operators) plus additional operators. These additional operators have also been copied to IEnumerable later on.

It must be remembered that we don't need to subscribe to the whole collection provided by IObservable, we might subscribe to selective updates. Below, we are subscribing to only students whose StudentId is even. We just need to update the subscription and the rest of the code should be equally useful. Below is the updated code for the constructor for MainWindowViewModel for this requirement.

public MainWindowViewModel()
{
_studentList = new ObservableCollection<StudentViewModel>();

model = new StudentsModel();

unsubscriber = (from m in model
where m.StudentId % 2 == 0
select m).Subscribe(this);
}

The LINQ clause above is really LINQ but it is not LINQ to IEnumerable but it is LINQ to IObservable (shipped for Reactive extension support). This is just subscribing to a subset of model whose StudentId can be divided by 2 (definition of even number). We might want to take separate examination for even and odd students because of the limited seating availability in the examination hall. When we run the application, the final display is as follows:



In order to see another example of selection from IObservable, in the following code, we are subscribing for just 10 Student(s) after first 2 Student(s). The subscription code is as follows:

public MainWindowViewModel()
{
_studentList = new ObservableCollection<StudentViewModel>();
model = new StudentsModel();
unsubscriber = model.Skip(2).Take<Student>(10).Subscribe(this);
}

When we run it, the finalized view becomes available as follows:


As the subscription data becomes available, OnCompleted is automatically called and Dispose is called on the observer removing it from the Observers list in IObservable implementation.

Updates on a non-UI Thread:
I think it would be a usual case that IObserver will have to observe IObservable on a non-UI thread specially if IObservable is model and IObserver is a view model in MVVM.

Let's change the Timer to System.Timers.Timer and use its Elapsed event to simulate updates. We know that System.Timers.Timer is a multithreaded timer. Its event handler are executed on a ThreadPool thread. If we generate OnNext, OnCompleted and OnError updates then this can simulate updates in a non-UI thread.

Let's update the definition of StudentsModel as follows:

class StudentsModel : IObservable<Student>
{
private int _studentId;

public List<IObserver<Student>> Observers =
List<IObserver<Student>>();

public StudentsModel()
{
//For updates on non-UI thread
t.Elapsed += new ElapsedEventHandler(t_Elapsed);

t.Start();
}

//For updates on a non-UI thread
void t_Elapsed(object sender, ElapsedEventArgs e)
{
_studentId++;
var student = new Student()
{
StudentId = _studentId,
StudentName = string.Format("M{0}", _studentId)
};

if (_studentId <= 20)
{
Observers.ForEach((observer) => observer.OnNext(student));
}
else
{
Observers.ForEach((observer) => observer.OnCompleted());
t.Stop();
}
}

//For updates on a non-UI thread
Timer t = new Timer(1000);

public IDisposable Subscribe(IObserver<Student> observer)
{
IDisposable unSubscriber = new Unsubscriber<Student>(Observers, observer);
if (!Observers.Contains(observer))
{
Observers.Add(observer);
}
return unSubscriber;
}
}

It builds just fine but when we run this it results in the following:


Basically this exception is generated even for the very first Student added to the StudentList. Since StudentList is bound to the ListBox on the view and UI related elements have affinity to UI thread in WPF, that is why this exception is resulted. Being called from Elapsed event of System.Timers.Timer, OnNext is being executed on a ThreadPool thread.

As you could guess, we need to find out a way to observe on UI thread. So, even if these updates OnNext, OnCompleted and OnError are resulted in a non-UI thread. The framework should automatically dispatch them in UI thread to be used by observer. This is basically a decision for each subscription i.e. It is possible that some IObservers(s) are observing on UI thread and others are just fine for a non-UI thread.

Rx supports such dispatching. I am not sure but it might be using the same mechanism as used by BackgroundWorker for executing its ReportProgress and WorkCompleted events. That uses DispatcherSynchronizationContext provided by ASynchronousOpertionManager. Let's subscribe on UI thread. In the following code, we are still interested in students with even Ids. Here we are using ObserveOnDispathcher method of Observable class. This is available in System.Linq namespace in System.Reactive assembly.

public MainWindowViewModel()
{
_studentList = new ObservableCollection<StudentViewModel>();

model = new StudentsModel();

//observe on UI thread
unsubscriber = Observable.ObserveOnDispatcher<Student>(
(from m in model
where m.StudentId % 2 == 0
select m)
).Subscribe(this);

}

When we run the application, it runs successfully now. The finalized display is as follows:



Download:

2 comments:

Anonymous said...

I'm new to WPF and the MVVM pattern and this article seems to really dive deep into providing clarity. but after completing this tutorial, i get errors which say "The name 'Observable' does not exist in the current context" in the MainWindowViewModel.cs file. The only other place where Observable in listed is in #region Observable Model. But have used StudentsModel instead. Am i missing something?

Anonymous said...

The file is missing on your skydrive. Could you please update the link? Thanks