Sunday, February 13, 2011

MVVM - Joining Rx's IObservables using LINQ to IObservable in a WPF Application

In this post, we will see how we can combine two IObservable(s) using LINQ. The project is developed on top of the code as developed in this post:

http://shujaatsiddiqi.blogspot.com/2011/02/mvvm-view-model-iobserver-observing.html

We will just be discussing the changes required for this example. For the discussion of complete code please refer to the link provided above.

As in the previous post, the view model will be observing the data streams from to IObservable. It needs to combine them both using LINQ (as specified above). It would be interesting as we would see how data is combined when data is pushed by the IObservable. As soon as the data is available, it would automatically combine it using different operators provided by Reactive Extensions (Rx). Since we don't need to display data separately in the view, we update the view just to have one ListBox. The ListBox should show Id and Name of each Student. It should also show the name of the course the student is enrolled in, which could be obtained from observing CourseModel.


After completion of data stream, the use should be notified by changing the Fill color of ellipse as green. The expected final display is shown as follows:



To update the view as described above, we need to update the design of the view as follows:

<Window x:Class="WpfApp_MVVM_ReactiveModel.MainWindow"
xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
xmlns:local="clr-namespace:WpfApp_MVVM_ReactiveModel"
Title="MainWindow" Height="350" Width="525" >
<Window.DataContext>
<local:MainWindowViewModel />
</Window.DataContext>
<Grid>
<ListBox Height="250" HorizontalAlignment="Left" Margin="-2,0,0,0"
Name="listBox1" VerticalAlignment="Top" Width="493"
ItemsSource="{Binding StudentList}">
<ListBox.ItemTemplate>
<DataTemplate >
<StackPanel Orientation="Horizontal">
<TextBlock Text="{Binding StudentId, StringFormat=Id:{0}}" />
<TextBlock Text=" " />
<TextBlock Text="{Binding StudentName, StringFormat=Name:{0}}" />
<TextBlock Text=" " />
<TextBlock Text="{Binding CourseName, StringFormat=Name:{0}}" />
</StackPanel>
</DataTemplate>
</ListBox.ItemTemplate>
</ListBox>
<Ellipse Height="39" HorizontalAlignment="Left" Margin="370,257,0,0"
Name="ellipse1" Stroke="Black" VerticalAlignment="Top" Width="116" >
<Ellipse.Style>
<Style TargetType="{x:Type Ellipse}">
<Style.Setters>
<Setter Property="Fill" Value ="Blue" />
</Style.Setters>
<Style.Triggers>
<DataTrigger Binding="{Binding IsLoaded}" Value="true" >
<Setter Property="Fill" Value="Green" />
</DataTrigger>
</Style.Triggers>
</Style>
</Ellipse.Style>
</Ellipse>
</Grid>
</Window>

The view is still using MainWindowViewModel as DataContext. You can see that an additional TextBlock is added to the template of items of ListBox. It is bound to CourseName from each item of the collection bound to the list box. The DataContext is expected to have a collection StudentList with items having properties StudentId, StudentName and CourseName. This is implemented as an ObservableCollection so that as the items are entered in the collection, the view could update itself using the notification mechanism built-in for this type of collection. It should also have a boolean property, IsDataLoaded. It should turn true when data is completely loaded. This is used in DataTrigger in the view to turn the notification ellipse to be filled as Green. We need to implement INotifyPropertyChanged just because of this property so that view can be notified for this change and the data triggered could trigger the fill color of the ellipse.

class MainWindowViewModel : INotifyPropertyChanged
{
private readonly ObservableCollection<StudentViewModel> _studentList =
new ObservableCollection<StudentViewModel>();

public ObservableCollection<StudentViewModel> StudentList
{
get { return _studentList; }
}

private bool _isLoaded = false;
public bool IsLoaded
{
get { return _isLoaded; }
set
{
_isLoaded = value;
onPropertyChanged("IsLoaded");
}
}

public MainWindowViewModel()
{
var courseModel2 = new CourseModel();
var studentModel2 = new StudentsModel();

(from c in courseModel2
from s in studentModel2
where s.CourseId == c.CourseId
select new { s.StudentName, s.StudentId, c.CourseName })
.Subscribe(
(on) => _studentList.Add(new StudentViewModel(on)),
(ex) => Console.WriteLine(@"On error"),
() => IsLoaded = true);

}

#region INotifyPropertyChanged implementation
public event PropertyChangedEventHandler PropertyChanged;
private void onPropertyChanged(string propertyname)
{
if (PropertyChanged != null)
{
PropertyChanged(this, new PropertyChangedEventArgs(propertyname));
}
}
#endregion INotifyPropertyChanged implementation
}

The focal point of this post is the constructor of MainWindowViewModel. Here we are combining to IObservable based collections using LINQ. This is a special new flavor of LINQ released with Rx. This is called LINQ to IObservable. This is available in System.Reactive assembly. Let's have a close look at the constructor.
>
public MainWindowViewModel()
{
var courseModel2 = new CourseModel();
var studentModel2 = new StudentsModel();

(from c in courseModel2
from s in studentModel2
where s.CourseId == c.CourseId
select new { s.StudentName, s.StudentId, c.CourseName })
.Subscribe(
(on) => _studentList.Add(new StudentViewModel(on)),
(ex) => Console.WriteLine(@"On error"),
() => IsLoaded = true);

}

So we are cross-joining instances of CourseModel and StudentsModel. This would be creating a Cartesian Product. We are filtering it for only those which have matching CourseId(s).

As specified in the definition of the view, we are interested in StudentId and StudentName from StudentsModel and CourseName from CourseModel.

The cross product of Cartesian Product of two or more IObservable(s) is also an IObservable. This enables us to subscribe to it by providing the OnNext, OnCompleted and OnError of Observer. It is to remember that MainWindowViewModel is not the observable for the new IObservable created by combining CourseModel and StudentsModel. Rx run-time would take care of how the provided lambda expressions for OnNext, OnCompleted and OnError are to be executed.

The overload of Subscribe method that we have used takes three arguments OnNext, OnError and OnCompleted respectively. We are not doing anything useful in the lambda expression provided for OnError except just writing on the exception message on the console. In the OnCompleted expressoin we are setting IsLoaded to true. This would trigger the DataTrigger in the view and change the fill color of Notification ellipse to Green. The type of parameter of OnNext is the anonymous type created for Select. This would have same elements with similar names. As we have data available in OnNext, we are creating a StudentViewModel object and add it to the StudentsList ObservableCollection . This is bound to the ListBox in the view which would show this new element. Though we can not use foreach for IObservable as it is a feature of just IEnumerable, we can understand it by this nested foreach loop. Don't even try this:


Since the combined collections have IObservable based, we don't have the data available at the same time. This makes it very special form of joining. This would keep each element of courseModel2 in memory until OnCompleted is not executed from studentModel2. As the elements are being received in background OnNext for studentModel they are combined with each element of courseModel. This would be creating equal number of Observers for studentModel2 as the total number of elements pushed by courseModel2. As the elements of courseModel2 continue to be received they are added to the list of Observers of studentModel2.


We should be able to safely make a summarized statement about it as follows:

All the elements pushed through OnNext of courseModel2 would be combined with all the elements pushed by studentModel2. If there is any OnNext of either of these IObservable before this LINQ, that would not be considered for joining.


As soon as the OnCompleted is executed for this Observer created at runtime, it would automatically unsubscribe it by disposing itself. As we have seen in previous posts that disposing an unsubscriber would result in its unsubscription. I think this is the default implementation provided for this dynamic Observer by Rx runtime.

This would need updates in StudentViewModel. We need to add a property CourseName as used in MainWindowViewModel. There is one more interesting thing, the datatype of parameter for constructor is specified as dynamic. This is basically because of anonymous type constructed during LINQ operation. In the constructor, we are using it as a duck type datatype, we know what properties would it have but we don't know it's name as it would be assigned any arbitrary name at run-time. This is a special feature of .net framework 4.0 [http://msdn.microsoft.com/en-us/library/dd264736.aspx].

class StudentViewModel : INotifyPropertyChanged
{
public StudentViewModel(dynamic student)
{
this.StudentId = student.StudentId;
this.StudentName = student.StudentName;
this.CourseName = student.CourseName;
}

private string _courseName;
public string CourseName
{
get { return _courseName; }
set
{
_courseName = value;
OnPropertyChanged("CourseName");
}
}

private string _studentName;
public string StudentName
{
get { return _studentName; }
set
{
_studentName = value;
OnPropertyChanged("StudentName");
}
}

private int _studentId;
public int StudentId
{
get { return _studentId; }
set
{
_studentId = value;
OnPropertyChanged("StudentId");
}
}

#region INotifyPropertyChanged implementation

public event PropertyChangedEventHandler PropertyChanged;
private void OnPropertyChanged(string propertyName)
{
if (PropertyChanged != null)
{
PropertyChanged(this, new PropertyChangedEventArgs(propertyName));
}
}

#endregion INotifyPropertyChanged implementation
}

So we need to update Student to add a new property, CourseId. This would be used as a key to join StudentsModel and CourseModel IObservable collections. This is int type.
>
class Student
{
public string StudentName { get; set; }
public int StudentId { get; set; }
public int CourseId { get; set; }
}

Since we have updated Student type, we need to update the definition for StudentsModel. This is to specify value of CourseId. We are just deriving it based on a algorithm [If _studentId is odd then set CourseId as 2, otherwise, set it as 1]. We are also reducing the number of times that timer event would execute. You can see that we are calling Observer's OnCompleted just when the event has been executed 3 times. As we call OnCompleted on Observer, we are also stopping the timer, this would not keep the timer running on the background.
>
class StudentsModel : IObservable<Student>
{
private int _studentId = 1;

//Timer to notify observers about updates in the reactive observable
private readonly DispatcherTimer _t = new DispatcherTimer();

public List<IObserver<Student>> Observers =
new List<IObserver<Student>>();

public StudentsModel()
{
_t.Interval = TimeSpan.FromMilliseconds(3000);
_t.Tick += new EventHandler(Tick);
_t.Start();
}

void Tick(object sender, EventArgs e)
{
var student = new Student()
{
StudentId = _studentId,
StudentName = string.Format("Student: {0}", _studentId),
CourseId = ((int)(_studentId % 2)) + 1
};

if (_studentId <= 3)
{
Observers.ForEach((observer) => observer.OnNext(student));
_studentId++;
}
else
{
Observers.ForEach((observer) => observer.OnCompleted());
_t.Stop();
}
}

public IDisposable Subscribe(IObserver<Student> observer)
{
IDisposable unSubscriber = new Unsubscriber<Student>(Observers, observer);
if (!Observers.Contains(observer))
{
Observers.Add(observer);
}
return unSubscriber;
}
}

Now Let's run the application. The application runs successfully and we see the elements being added to the list box.



But there is a weird thing. The notification ellipse is still filled as blue. It should have turned Green if OnCompleted of the LINQ result is executed in MainWindowViewModel. It means that OnCompleted ,for this, is not executed. You can prove that by inserting break point in the lambda expression used for OnCompleted. But the main question is why it is not executed??

Basically, we have discussed that equal numbers of Observers would be created for studentModel2 as the number of elements pushed by courseModel2 after the LINQ statment is executed. So there are will be two observers for this. If we insert break-point in the Tick event handler for timer _t in StudentsModel then we realize that OnCompleted is executed only once. We need to execute OnCompleted for all observers to fix this. This is really weird because we are using Foreach on Obsevers List. It should have executed the OnCompleted for all observers. There should have been no Observer in the list before next statement is executed [_t.Stop()]. But if we put a break point then it seems that OnCompleted is executed only once as one Observer is still lingering on.


This seems to get fixed if we update the code as follows:

void Tick(object sender, EventArgs e)
{
var student = new Student()
{
StudentId = _studentId,
StudentName = string.Format("Student: {0}", _studentId),
CourseId = ((int)(_studentId % 2)) + 1
};

if (_studentId <= 3)
{
Observers.ForEach((observer) => observer.OnNext(student));
_studentId++;
}
else
{
Observers.ToList().ForEach((observer) => observer.OnCompleted());
_t.Stop();
}
}

Now run this and insert the break point appropriately.



We have just called ToList() on a collection which is already a Generic List to fix this. This shows some issue with the deferred execution of Lambdas of foreach here, which is executing OnCompleted for each observer. This would definitely introduce some performance delay but whatever works...

Now since all the OnCompleted have been executed for all the collections involved in the LINQ query, it is safe to executed the OnCompleted of the result. Rx runtime does exactly that. This sets IsLoaded to true and results in filling the Notification ellipse as Green.



Download Code:

No comments: