Friday, February 25, 2011

WPF - Performance Improvement for MVVM Applications - Part # 2

This is the second part of our discussion about performance improvement of an MVVM based application. You can find the other part of this discussion here:

- Part - 1: http://shujaatsiddiqi.blogspot.com/2011/01/wpf-performance-improvement-for-mvvm.html

- Part - 3: http://shujaatsiddiqi.blogspot.com/2011/03/wpf-performance-improvement-for-mvvm.html

In this example we would see how we can utilize Lazy Initialization to improve the performance of MVVM based application. Lazy initialization is a newly available features of .net framework 4.0. As I remember this is based on Tomas Petricek's suggestion which has been incorporated in .net framework recently. This is a part of effort to incorporate lazy execution in c# code. You might already know that LINQ queries support lazy evaulation already. They are only evaluated when the first time they are enumerated. If they are never enumerated then never such evaluation takes place. With new Lazy initialization feature, the framework now also supports lazy initialization.

http://msdn.microsoft.com/en-us/vcsharp/bb870976

In this example we would base our discussion on a hierarchical view model. The main view is consisted of some collection based controls. Each of these control also needs to define its template to specify how different items in the collection would be displayed on the screen.

Let's consider a view for a Course with just two students. The data of students should be displayed in tab items in a TabControl. The student information should only have two fields, their first and last names. Student's First name should be displayed as header of its TabItem. If the data is not entered yet then it should display "Default" in the header. The XAML definition of the view is as follows:
<Window x:Class="WpfApp_MVVM_LazyInitialize.MainWindow"
xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
xmlns:local="clr-namespace:WpfApp_MVVM_LazyInitialize"
Title="MainWindow" Height="350" Width="525">
<Window.DataContext>
<local:MainWindowViewModel />
</Window.DataContext>
<Grid>        
<TabControl Height="288" HorizontalAlignment="Left" Margin="12,12,0,0" Name="tabControl1" VerticalAlignment="Top" Width="480"
ItemsSource="{Binding StudentViewModels}" >
<TabControl.ItemTemplate>
<DataTemplate>
<TextBlock Text="{Binding StudentFirstName}" />
</DataTemplate>
</TabControl.ItemTemplate>
<TabControl.ContentTemplate>                
<DataTemplate>
<Grid>
<Label Content="First Name" Height="27" HorizontalAlignment="Left" 
Margin="12,29,0,0" Name="label1" VerticalAlignment="Top" Width="105" />
<TextBox Height="30" HorizontalAlignment="Left" Margin="123,29,0,0" 
Name="textBoxFirstName" VerticalAlignment="Top" Width="345" 
Text="{Binding Path=StudentFirstName}" />
<Label Content="Last Name" Height="27" HorizontalAlignment="Left" Margin="12,65,0,0" 
Name="label2" VerticalAlignment="Top" Width="105" />
<TextBox Height="30" HorizontalAlignment="Left" Margin="123,65,0,0" 
Name="textBoxLastName" VerticalAlignment="Top" Width="345" 
Text ="{Binding StudentLastName}" />
</Grid>                    
</DataTemplate>
</TabControl.ContentTemplate>
</TabControl>
</Grid>
</Window>

We would be needing two DataTemplates for TabControl. One is for Header i.e. ItemTemplate and the other is for the content of each TabItem i.e. ContentTemplate. The ItemsSource of this TabControl is bound to StudentViewModels collection. Each member of this collection should have atleast two properties StudentFirstName and StudentLastName. The ItemTemplate is just a TextBlock. The Text property of this TextBlock is bound to item's StudentFirstName property. The ContentTemplate has two TextBox(es). One of them is bound to StudentFirstName and the other is StudentLastName properties. We have kept the code behind of the view as default.

The DataContext of the above view is set as a new instance of MainWindowViewModel. It implements INotifyPropertyChanged so it needs to provide definition of PropertyChanged event as part of its contract. As required by the view, it provides a collection StudentViewModels. It is provided as an ObservableCollection. Each member of this generic collection is specified as of the type StudentViewModel.
class MainWindowViewModel : INotifyPropertyChanged
{
ObservableCollection<StudentViewModel> _studentViewModels = 
new ObservableCollection<StudentViewModel>();

public ObservableCollection<StudentViewModel> StudentViewModels
{
get
{
return _studentViewModels;
}
}

public MainWindowViewModel()
{
_studentViewModels.Add(new StudentViewModel());
_studentViewModels.Add(new StudentViewModel());
}

public event PropertyChangedEventHandler PropertyChanged;
private void OnPropertyChanged(string propertyName)
{
if (PropertyChanged != null)
{
PropertyChanged(this, new PropertyChangedEventArgs(propertyName));
}
}
}

In the constructor of the above view model we have added two members to the collection. As expected they are of type StudentViewModel. They would be displayed as Tab items in the view. The definition of StudentViewModel is as follows:
class StudentViewModel : INotifyPropertyChanged
{
Lazy<Student> _model = new Lazy<Student>();

string _studentFirstName;
public string StudentFirstName
{
get { return _studentFirstName; }
set
{
if (_studentFirstName != value)
{
_studentFirstName = value;
_model.Value.StudentFirstName = value;
OnPropertyChanged("StudentFirstName");
}
}
}

string _studentLastName;
public string StudentLastName
{
get { return _studentLastName; }
set
{
if (_studentLastName != value)
{
_studentLastName = value;
_model.Value.StudentLastName = value;
OnPropertyChanged("StudentLastName");
}
}
}

public StudentViewModel()
{
_studentFirstName = "Default";
}

public event PropertyChangedEventHandler PropertyChanged;
private void OnPropertyChanged(string propertyName)
{
if (PropertyChanged != null)
{
PropertyChanged(this, new PropertyChangedEventArgs(propertyName));
}
}
}

As expected by the view it has two properties StudentFirstName and StudentLastName. It implements INotifyPropertyChanged to support change notification.

The above view model uses Student as model. When the view model receives updates in these properties through WPF Biniding System, it just passes on those updates to the model. The instances initialized with Lazy initialization feature are accessed using Value property of the Lazy object reference. We have accessed StudentName and StudentLastName properties using the same. Let us assume that it were a model which requires a heavy instantiation. We are just displaying a MessageBox in the constructor. This would show a MessageBox from the constructor. In this way we would realize exactly when the constructor is called. This is how are trying to understand Lazy initialization of Student.
class Student
{
public string StudentFirstName { get; set; }
public string StudentLastName { get; set; }

public Student()
{
MessageBox.Show("Student constructor called");
}
}

Now let us run this. The application is shown as follows:



Enter some data in the First Name field of the first Tab Item. As soon as the focus is changed to the other field, the default Binding of Text property of TextBox triggers the source updates on LostFocus.


When you close the message box you can see the header of the TabItem being updated with the FirstName you entered.



Deciding which Constructor to use for initialization:
We can also decide which constructor of the type we want to use for initialization using the constructors of Lazy class. Lazy<T> allows Func<T> delegate as argument in some of its overloads.

In a multithreaded scenario:
In a multithreaded scenario the first thread using the lazy instance causes the initialization causes the initialization procedure and its value would be seen by the other threads. Although the other threads will also cause the initialization but their value will not be used. In a multithreaded scenario, like this, we can use any overload of EnsureInitialized method of Lazyintializer for even better performance. This can also use any of the default constructor or specialized constructor using Func delegate.

http://msdn.microsoft.com/en-us/library/system.threading.lazyinitializer.ensureinitialized.aspx

Download Code:

Thursday, February 24, 2011

Rx - Executing anonymous code asynchronously

In the previous post we discuss about how we can execute a method asynchronously using Reactive Extensions. In this post we will discuss how we can execute anonymous code asynchronously using Reactive Extensions. Rx provides this support using Observable.Start.

Invoking Actions (No data returned):
Let us start with discussing how we can execute code asynchronously with no data to return.


The event handler of Click event of the button is as follows:

private void button2_Click(object sender, RoutedEventArgs e)
{
Observable.Start(() =>
{
Thread.Sleep(2000);

return;
}
)
.ObserveOnDispatcher()
.Subscribe(
(result) => { ;},
(ex) => this.Background = Brushes.Red,
() => this.Background = Brushes.Green);
}

This code just executes simple code asynchronously in a ThreadPool thread. It generates OnNext, OnCompleted and OnError messages on the Dispatcher thread as we are observing on Dispatcher. After atleast two seconds the background of window should turn green.

Invoking anonymous code (data returned from lambda) asynchronously:
We can also execute anonymous asynchronous code using Observable.Start which actually return a value. This code is in the form of lambda expressions or lambda statements which explicitly return data through return statement. Let us add a button on the same window. When user clicks the button it should update the operand boxes with any random number.

private void btnFillRandomOperands_Click(object sender, RoutedEventArgs e)
{
Observable.Start<int>(
() =>
{
Thread.Sleep(2000);
return new Random().Next(1, 10);
}).ObserveOnDispatcher<int>()
.Subscribe(
(result) =>
{
this.textBoxOperand1.Text = result.ToString();
this.textBoxOperand2.Text = result.ToString();
},
(ex) => this.Background = Brushes.Red,
() => this.Background = Brushes.Green
);

}

The above code uses Random class to generate a random number between 1 and 10. The returned number is provided to OnNext handler by Rx runtime. This handler updates both operand text boxes with this number. After this, the runtime places OnCompleted handler which turns the background of the window to green.

When we run this and click this button, it results in the following window after atleast 2 seconds. Definitely, it might be a different random number generated and filled in the operands text boxes.



Passing Arguments to Code executed by Observable.Start:
It doesn’t seem that we can have any parameters to Observable.Start directly. But we can use our old technique of using Captured Variables. As we know that lambdas can capture variables from the scope they are generated from. The variables used by lambda like this are called Captured variables. Like a regular lambda statement executed asynchronously, these variables also have Closure issues. I have discussed here how to avoid closure issues by assigning the value of captured variables to local variables inside lambdas.

http://shujaatsiddiqi.blogspot.com/2010/12/wpf-dispatcherbegininvoke-and-closures.html

private void button1_Click(object sender, RoutedEventArgs e)
{
decimal operand1 = Decimal.Parse(this.textBoxOperand1.Text);
decimal operand2 = Decimal.Parse(this.textBoxOperand2.Text);

Observable.Start(() =>
{
Thread.Sleep(2000);

return operand1 + operand2;
})
.ObserveOnDispatcher()
.Subscribe(
(result) => { this.textBoxResult.Text = result.ToString(); });

operand1 = 3;
operand2 = 4;
}

In the above case what do you guess would be shown in the result textbox if we enter 2.1 and 3.5 in the two operand text boxes. If you guess 5.6 then you would be surprised to see this result.


This is the result of modified captured variables as the modified value of those captured variables are used which are 3 and 4. The result of addition of 3 and 4 is 7, which is displayed in the Result text box. In order to fix this, we can define two new local variables localOperand1 and localOperand2 and initialize them with the values of captured variables.

private void button1_Click(object sender, RoutedEventArgs e)
{
decimal operand1 = Decimal.Parse(this.textBoxOperand1.Text);
decimal operand2 = Decimal.Parse(this.textBoxOperand2.Text);

Observable.Start(() =>
{
decimal localOperand1 = operand1;
decimal localOperand2 = operand2;

Thread.Sleep(2000);

return localOperand1 + localOperand2;
})
.ObserveOnDispatcher()
.Subscribe(
(result) => { this.textBoxResult.Text = result.ToString(); });

Thread.Sleep(2);

operand1 = 3;
operand2 = 4;
}

When we run the application and enter data 2.1 and 3.5 then the correct result is displayed in the Result text box.


Although Resharper still shows warning but it is just fine as we have seen the correct result being calculated.


Observable.Start and Exceptions in the code executed:
As a result of an exception Observable.Start behaves same way as Observable.ToAsync. We can represent the behavior in marble diagram as follows:


Let us update the code executed asynchronously using Observable.Start so that if the sum of number entered is either zero or lesser, it should throw an Exception.

[DebuggerStepThrough]
private void button1_Click(object sender, RoutedEventArgs e)
{
decimal operand1 = Decimal.Parse(this.textBoxOperand1.Text);
decimal operand2 = Decimal.Parse(this.textBoxOperand2.Text);

Observable.Start(() =>
{
decimal localOperand1 = operand1;
decimal localOperand2 = operand2;
decimal sum = localOperand1 + localOperand2;

Thread.Sleep(2000);

if (sum <= 0)
{
throw new System.Exception("Error in computation");
}

return sum;
})
.ObserveOnDispatcher()
.Subscribe(
(result) => { this.textBoxResult.Text = result.ToString(); },
(ex) => this.Background = Brushes.Red,
() => this.Background = Brushes.Green);

}

You can notice that we are still subscribing on Dispatcher. In addition to OnNext, we have provided the code for OnCompleted and OrError. The window should turn green when the operation completed successfully resulting in OnCompleted. In case of exception, OnError is executed.



Using IScheduler to run Asynchronous code:
Like Observable.ToAsync, Observable.Start also provides overloads to run the asynchronous code using different IScheduler. We can run code which returns or does not return any data in the form of lambda and we can run it using the IScheduler of our choice. You can see that just by providing Scheduler.Dispatcher info to Start we can turn this method to run on UI thread:

Observable.Start(() =>
{
Thread.Sleep(2000);

return;
}, Scheduler.Dispatcher
)

Note:
There is one more overload of Observable.Start which returns ListObservable<TSource>. Since it is not about executing anonymous asynchronous code, we are discussing it here.

Download Code:

Wednesday, February 23, 2011

WPF - ASynchronous Function using Observable.ToASync [Rx ASynchronous Delegate] - Part # 2

This is the second part of our discussion about how we can execute the similar functionality as provided by asynchronous anonymous delegates in .net. In previous post we discuss how we can mimic the functionality provided by Func<...> delegate.

http://shujaatsiddiqi.blogspot.com/2011/02/wpf-asynchronous-function-using.html

In this post we will discuss about Action<...> delegate. As we know that we can not return any value from this compared to Func<...> delegate. As we know that .net provides 16 different generic overloads of Action<...> delegates in order to accommodate delegates for methods having up to 16 parameters. Similarly, 16 different overloads of Observable.ToAsync have been provided.

Let us add this method to our class:

[DebuggerStepThrough]
private void ProcessOperandsFireAndForget(decimal operand1, decimal operand2)
{
Thread.Sleep(5000);
if (operand1 <= 0 || operand2 <= 0)
{
throw new System.Exception("Exception generated");
}
}

Since it returns void, we can use Action delegate for this. In Reactive Extension we can use Observable.ToAsync for this method. It just producing a delay of 5 seconds. If any of the operands are zero or negative, it is resulting in an exception with message "Exception generated".

In order to run this we add another button on the window. The handler for Click event for the button can be as follows:

private void btnFireAndForget_Click(object sender, RoutedEventArgs e)
{
decimal operand1 = Decimal.Parse(this.textBoxOperand1.Text);
decimal operand2 = Decimal.Parse(this.textBoxOperand2.Text);

Observable.ToAsynclt;decimal, decimal>(ProcessOperandsFireAndForget, Scheduler.TaskPool)(operand1, operand2)
.ObserveOnDispatcher()
.Subscribe<Unit>(
(result) => { ; },
(ex) => this.Background = Brushes.Red,
() => this.Background = Brushes.Green);

}

Before running this code, let me explain the expected behavior. Basically you should be expecting the same behavior as of a method which actually returns a value. We can see that from marble diagram:

Graceful method execution:



Exception in method execution:


Now one thing is of interest which you might have already noticed. It is about OnNext message. Since OnNext passes some data in its argument, what data runtime would be passing in this as this method has void return type. Just to clarify this, I have used a generic overload of Subscribe method. See Unit type there? Basically is a new IEquatable struct provided in .net framework. Basically the message is generated. For methods with no return types, OnNext messages are generated with this type.



IScheduler support for Observable.ToAsync:
As I have told you that various overloads have been provided for Observable.ToAsync have been provided to accommodate all different overloads of Func and Action delegates. Make it at least double. Basically with each overload provided to support a particular number of arguments, one overload is provided to support an IScheduler so that we could decide which IScheduler to use to run this asynchronous code. Using these overloads we can specify which Scheduler we want to use to execute the method. Whatever thread is used to execute the method, would be the same thread on which OnNext, OnCompleted and OnError messages are generated.

Note:
We can also execute methods without any parameters with / without any return type using Observable.ToAsync. It would be following the same marble diagrams as presented in this post.

Download Code:

Tuesday, February 22, 2011

WPF - ASynchronous Function using Observable.ToASync [Rx ASynchronous Delegate] - Part # 1

This is the first part of our discussion about asynchronous method execution using Reactive Extension. The second part of this discussion can be found here:

http://shujaatsiddiqi.blogspot.com/2011/02/wpf-asynchronous-function-using_23.html

In this post we discuss how we can execute a method asynchronously using the features provided in Reactive Extensions Rx. We discussed the usage of asynchronous delegate in a WPF application in the following post:

http://shujaatsiddiqi.blogspot.com/2010/12/asynchronous-delegate-exception-
model.html

This is basically the similar concept in Rx.

<Window x:Class="WpfApp_AsynchObserver.MainWindow"
xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
Title="MainWindow" Height="350" Width="525">
<Grid>
<TextBox Height="27" HorizontalAlignment="Left" Margin="138,22,0,0"
Name="textBoxOperand1" VerticalAlignment="Top" Width="311" />
<TextBox Height="27" HorizontalAlignment="Left" Margin="138,55,0,0"
Name="textBoxOperand2" VerticalAlignment="Top" Width="311" />
<TextBox Height="27" HorizontalAlignment="Left" Margin="138,143,0,0"
Name="textBoxResult" VerticalAlignment="Top" Width="311" />
<Button Content="Sum" Height="28" HorizontalAlignment="Left" Margin="139,88,0,0"
Name="button1" VerticalAlignment="Top" Width="143" Click="button1_Click" />
<Label Content="Operand 1" Height="27" HorizontalAlignment="Left" Margin="12,22,0,0"
Name="label1" VerticalAlignment="Top" Width="120" />
<Label Content="Operand 2" Height="27" HorizontalAlignment="Left" Margin="12,55,0,0"
Name="label2" VerticalAlignment="Top" Width="120" />
<Label Content="Result" Height="27" HorizontalAlignment="Left" Margin="12,143,0,0"
Name="label3" VerticalAlignment="Top" Width="120" />
</Grid>
</Window>


The code behind is as follows:

public partial class MainWindow : Window
{
public MainWindow()
{
InitializeComponent();
}

private decimal ProcessOperands(decimal operand1, decimal operand2)
{
decimal sum;
sum = operand1 + operand2;

return sum;
}

private void button1_Click(object sender, RoutedEventArgs e)
{
decimal operand1 = Decimal.Parse(this.textBoxOperand1.Text);
decimal operand2 = Decimal.Parse(this.textBoxOperand2.Text);

this.textBoxResult.Text = ProcessOperands(operand1, operand2).ToString();
}
}

Lets run the application now. Everything is working great. We enter numeric digits in the two operands fields. When we click Enter, the result appears in the Result text box. Now let us change the method call (ProcessOperands) to be an asynchronous call using Observable.ToAsync. We need to update the button’s click handler as follows:

private void button1_Click(object sender, RoutedEventArgs e)
{
decimal operand1 = Decimal.Parse(this.textBoxOperand1.Text);
decimal operand2 = Decimal.Parse(this.textBoxOperand2.Text);

Observable.ToAsync<decimal, decimal, decimal>(ProcessOperands)(operand1, operand2)
.Subscribe(
(result) => { this.textBoxResult.Text = result.ToString(); }
);
}

The code clearly shows that we have used an overload of ToAsync which takes two decimal values as arguments and returns a decimal value. We have used this as our ProcessOperand method is defined like that. You can see that we have used the value provided in the onNext parameter to populate the textBoxResult. Basically onNext is placed when the async code is finished execution and result is available. If we compare it to the code we have written for Asynchronous delegates, we realize that Rx has internally executed code for BeginInvoke and also called EndInvoke for us getting the result using IAsyncResult. If we look at the marble diagram, it should be like this.



In the marble diagram, this has shown to be placing an OnCompleted message afterwards. Let us verify that this event is generated. In the following code, we are updating the Background color to green when OnCompleted message is received from the Observable generated for the method.

private void button1_Click(object sender, RoutedEventArgs e)
{
decimal operand1 = Decimal.Parse(this.textBoxOperand1.Text);
decimal operand2 = Decimal.Parse(this.textBoxOperand2.Text);

Observable.ToAsync<decimal, decimal, decimal>(ProcessOperands)(operand1, operand2)
.Subscribe(
(result) => { this.textBoxResult.Text = result.ToString(); },
() => this.Background = Brushes.Green
);
}

Now we run the application and enter values in the operands text boxes. When we click the button, the textBoxResult is updated with the sum of two operands. The window also turns green. This is due to the code we have written in OnCompleted handler.



Long running methods and Rx Thread Management for generating asynchronous block:
Now you might be thinking that this is a very simple example. What if the method is a long running method taking a few seconds. Would it still be the same code. Let’s mimic that this method is taking a few seconds by putting Thread.Sleep in method code as follows:

private decimal ProcessOperands(decimal operand1, decimal operand2)
{
decimal sum;
sum = operand1 + operand2;

Thread.Sleep(5000);
}

If you run this and enter data in the input text boxes and click Sum. You get the following exception:



Why is this exception generated by just delaying in the asynchronous method? Basically we have caused the subscription to be on a ThreadPool thread. Since we are delaying for 5 seconds, all Rx messages (OnNext, OnCompleted and OnError) seem to be dispatched on the calling thread of subscriber.

It seems if there is a delay of more than 3 milliseconds then the OnNext messages are dispatched on a ThreadPool thread otherwise they are dispatched on the thread of the subscriber. In this case it is UI thread. If the delay is 3 milliseconds or lesser these messages are always dispatched on UI thread. This is true even though we know that ToAsync is causing the method to be executed on a ThreadPool thread.

Fig: When OnNext is received in more than 3 milliseconds:



Fig: When OnNext is received in 3 milliseconds or less:



How exceptions are handled?

As we have discussed [http://shujaatsiddiqi.blogspot.com/2010/12/asynchronous-delegate-exception-model.html] the runtime is silent about exceptions for Async delegates if we don’t call EndInvoke. It is not the case with Observable.ToAsync. It always generate exception message OnError when there is an exception in the method executed asynchronously. Let us change the code of ProcessOperands so that it throws an exception.

[DebuggerStepThrough]
private decimal ProcessOperands(decimal operand1, decimal operand2)
{
decimal sum;
sum = operand1 + operand2;

if (sum < 0)
{
throw new System.Exception("Exception in processing data!");
}

return sum;
}

The above code is resulting an exception if the sum of these two operands is negative. We also need to update the code so that IObservable generated from Observable.ToAsync has a non-default OnError handler. I have decorated the method with DebuggerStepThrough attribute so that Debugger doesn’t bother me as I have FCEs turned on. Let’s update button1_Click as follows:

private void button1_Click(object sender, RoutedEventArgs e)
{
decimal operand1 = Decimal.Parse(this.textBoxOperand1.Text);
decimal operand2 = Decimal.Parse(this.textBoxOperand2.Text);

Observable.ToAsync<decimal, decimal, decimal>(ProcessOperands)(operand1, operand2)
.Subscribe(
(result) => { this.textBoxResult.Text = result.ToString(); },
(ex) => this.Background = Brushes.Red,
() => this.Background = Brushes.Green
);
}

It just changes the background of the window to Red if there is an OnError message from the method. Let’s run the application and enter data 2 and -5 in the operand fields. As we know that their sum is -3, the background of the window should turn Red.


We can show this in marble diagram as follows:



Observing on Non-UI thread using different IScheduler:

Now we suppose that the method would always take more than 3 milliseconds. Can we still use ToAsync feature of Observable? Yes we can!

We just need to Observe on the Dispatcher. This can be done by using ObserveOn feature of IObservable. There are two methods provided for this purpose. One method allows us to generate the messages on any IScheduler (including built-in schedulers like Dispatcher, ThreadPool, TaskPool, Immediate, CurrentThread, NewThread). We can use any of the four overloads of this method. We can also directly use ObserveOnDispatcher method from IObservable. We have used the same as below.

private void button1_Click(object sender, RoutedEventArgs e)
{
decimal operand1 = Decimal.Parse(this.textBoxOperand1.Text);
decimal operand2 = Decimal.Parse(this.textBoxOperand2.Text);

Observable.ToAsync<decimal, decimal, decimal>(ProcessOperands)(operand1, operand2)
.ObserveOnDispatcher<decimal>()
.Subscribe(
(result) => { this.textBoxResult.Text = result.ToString(); },
(ex) => this.Background = Brushes.Red,
() => this.Background = Brushes.Green
);
}

Now we run the application. Although the method is being executed on a separate ThreadPool thread but the OnNext, OnError and OnCompleted messages are dispatched on UI thread.

Fig: Method executed on a ThreadPool thread


Fig: OnNext message generated on Main UI Thread


Fig: OnCompleted placed on Main UI thread


Fig: OnError placed on Main UI thread



Limitations:
Since this is a way to implement Anonymous asynchronous delegates using Reactive Extensions so it seems to have has the same limitation as anonymous asynchronous delegate (Func) has. We can not have out or ref parameters in the method that we want to execute asynchronously. In non-reactive model, we might easily implement named asynchronous delegate as a work around but in the reactive extension world there doesn't seem to be any alternative.

Download Code:

Thursday, February 17, 2011

IObservable.Throttle causes OnNext on ThreadPool thread

This is a continuation of our following post:

http://shujaatsiddiqi.blogspot.com/2011/02/mvvm-observable-inotifypropertychangedp.html

In the above post we discussed about using PropertyChanged event of INotifyPropertyChanged by observing EventStream support of Reactive Extensions Rx. In this post we change the view model a bit. We remove the implementation of INotifyPropertyChanged. We rather define it as a DependencyObject to support a different change notification mechanism from view model. This is to demonstrate the following:

IObservable.Throttle causes OnNext to be generated on a ThreadPool thread instead.

We will see what problems might be caused by this and how we can work around this.

public class StudentViewModel : DependencyObject
{
#region Model
private Student _model;
#endregion Model

#region Constructor

public StudentViewModel(Student student)
{
_model = student;

Observable.FromEvent<PropertyChangedEventArgs>(_model, "PropertyChanged")
.Where(et => et.EventArgs.PropertyName == "StudentName")
.Subscribe((arg) =>
{
this.StudentId = _model.StudentId;
this.StudentName = _model.StudentName;
});
}

#endregion Constructor

#region Dependency Properties

public static DependencyProperty StudentIdProperty =
DependencyProperty.Register("StudentId", typeof(int), typeof(StudentViewModel));
public int StudentId
{
get { return (int)GetValue(StudentIdProperty); }
set { SetValue(StudentIdProperty, value); }
}

public static DependencyProperty StudentNameProperty =
DependencyProperty.Register("StudentName", typeof(string), typeof(StudentViewModel));
public string StudentName
{
get { return (string)GetValue(StudentNameProperty); }
set { SetValue(StudentNameProperty, value); }
}

#endregion Dependency Properties

#region Overriden methods of DependencyObject
protected override void OnPropertyChanged(DependencyPropertyChangedEventArgs e)
{
base.OnPropertyChanged(e);
switch (e.Property.Name)
{
case "StudentId":
_model.StudentId = (int)e.NewValue;
break;
case "StudentName":
_model.StudentName = (string)e.NewValue;
break;
}
}
#endregion Overriden methods of DependencyObject
}

We have update the properties StudentId and StudentName to be Dependency properties. To synchronize the model's corresponding properties we have overridden OnPropertyChanged method of DependencyObject. The argument DependencyPropertyChangedEventArgs , not only, provides us the details about the property being updated but it also provides the old and new values of this property. As you can see we are just using the new values of these properties to update the corresponding model's properties.

We have kept the handling of model's PropertyChanged event observation using Reactive extension's Observable event stream support available in CoreEx assembly. Let's run the application now and open two child windows.


When we enter StudentName in one child window, the same changes are reflected in the other child window.


Now we add throttling support in order to avoid frequent updates to the other child windows due to updates in model's properties. Let's update the constructor of StudentViewModel as follows:

public StudentViewModel(Student student)
{
_model = student;

Observable.FromEvent<PropertyChangedEventArgs>(_model, "PropertyChanged")
.Where(et => et.EventArgs.PropertyName == "StudentName")
.Throttle(TimeSpan.FromMilliseconds(500))
.Subscribe((arg) =>
{
this.StudentId = _model.StudentId;
this.StudentName = _model.StudentName;
});
}

This definition is same as previous. We have just used Throttling for PropertyChanged event for 500 milliseconds. Now we run the application. Open two child windows and enter data in StudentName property in one of the child window. As soon as we settle, the application shuts down. Let's see what is going on by updating the code further as follows:

public StudentViewModel(Student student)
{
_model = student;

Observable.FromEvent<PropertyChangedEventArgs>(_model, "PropertyChanged")
.Where(et => et.EventArgs.PropertyName == "StudentName")
.Throttle(TimeSpan.FromMilliseconds(500))
.Subscribe((arg) =>
{
try
{
this.StudentId = _model.StudentId;
this.StudentName = _model.StudentName;
}
catch (Exception ex)
{
MessageBox.Show(ex.Message);
}
});
}

Now we run the application and enter data in StudentName text box as previous. The exception handling logic causes the following MessageBox to be displayed.


This is because of the reason that IObservable.Throttle causes OnNext to be placed in a ThreadPool thread.


Using Invoke / BeginInvoke on Dispatcher:
The first solution could be to directly using Dispatcher.Invoke / Dispatcher.BeginInvoke. We can specify the code that we want to execute in the Action delegate. The delegate's code is executed here on UI thread. We update the constructor as follows:

public StudentViewModel(Student student)
{
_model = student;

Observable.FromEvent<PropertyChangedEventArgs>(_model, "PropertyChanged")
.Where(et => et.EventArgs.PropertyName == "StudentName")
.Throttle(TimeSpan.FromMilliseconds(500))
.Subscribe((arg) =>
{
Dispatcher.BeginInvoke(new Action(() =>
{
this.StudentId = _model.StudentId;
this.StudentName = _model.StudentName;
}), null);

});
}

Now we run the application. Again we open two windows and enter data in StudentName text box. We see that the data is successfully updated in the other window now. Let's insert the break point again in OnNext code.


Using other overload of Throttle:
We can use other overload of IObservable.Throttle method. This requires an additional overload which has an additional parameter of type IScheduler. IScheduler is a Rx interface available in CoreEx assembly and System.Concurrency namespace.

public StudentViewModel(Student student)
{
_model = student;

Observable.FromEvent<PropertyChangedEventArgs>(_model, "PropertyChanged")
.Where(et => et.EventArgs.PropertyName == "StudentName")
.Throttle(TimeSpan.FromMilliseconds(500), Scheduler.Dispatcher)
.Subscribe((arg) =>
{
this.StudentId = _model.StudentId;
this.StudentName = _model.StudentName;
});
}

We have specified Scheduler.Dispatcher for other parameter. This is also available in System.Concurrency namespace. It is available in CoreEx assembly. This is saving us from dispatching in our code but Throttle's generated OnNext messages will be generated in the UI thread. Let's run the application and open two child windows like before.


Download:

Tuesday, February 15, 2011

WPF MVVM - Retry & Catch for Observables in Reactive Extension Rx

In this post we are going to simulate a scenario of client server communication. This would simulate how observables can be used to attempt to reconnect to the server and then switching to a different server after a few unsuccessful attempts. We will keep the view simple. We would just keep a TextBlock to show the status of connection. If it is connected, the status should be displayed with green foreground, otherwise it should be shown with Red foreground.

<Window x:Class="Wpf_MVVM_Observables.MainWindow"
xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
xmlns:local="clr-namespace:Wpf_MVVM_Observables"
Title="MainWindow" Height="350" Width="525">
<Window.DataContext>
<local:MainWindowViewModel />
</Window.DataContext>
<Grid>
<Label Content="Connection Status:" Height="25" HorizontalAlignment="Left"
Margin="12,114,0,0" Name="lblConnectionStatusMessage" VerticalAlignment="Top" Width="122" />
<TextBlock Height="27" HorizontalAlignment="Left" Margin="140,119,0,0"
Name="textBlockConnectionStatusMessage" Text="{Binding ConnectionStatusMessage}"
VerticalAlignment="Top" Width="221" >
<TextBlock.Style>
<Style>
<Setter Property="TextBlock.Foreground" Value="Red" />
<Style.Triggers>
<DataTrigger Binding="{Binding ConnectionStatusMessage}" Value="Connected">
<Setter Property="TextBlock.Foreground" Value="Green" />
</DataTrigger>
</Style.Triggers>
</Style>
</TextBlock.Style>
</TextBlock>
</Grid>
</Window>

This is expecting MainWindowViewModel in the same assembly. The view model should have a property, named ConnectionStatusMessage. The text of connection message is shown by binding it directly to the Text property of the TextBlock. The foreground of the message is managed through a DataTrigger on the TextBlock. DataTrigger allows us to trigger based on the values of ConnectionStatusMessage from DataContext. If the value is “Connected”, we are setting it as Green, otherwise, it would keep it as Red. Below are the way the view should appear in both states:

When connected:


When Disconnected (with specific connection status message):


The simplest view model to fulfill the requirements of the above view can be developed as presented below:

class MainWindowViewModel : INotifyPropertyChanged
{
#region Properties

private string _connectionStatusMessage;

public string ConnectionStatusMessage
{
get { return _connectionStatusMessage; }
set
{
_connectionStatusMessage = value;
OnPropertyChanged("ConnectionStatusMessage");
}
}

#endregion Properties

#region INotifyPropertyChanged implementation

public event PropertyChangedEventHandler PropertyChanged = delegate { };

private void OnPropertyChanged(string propertyName)
{
PropertyChanged(this, new PropertyChangedEventArgs(propertyName));
}

#endregion INotifyPropertyChanged implementation
}

As expected by the view, it has a property ConnectionStatusMessage. This is a string property. This is supporting change notification by implementing INotifyPropertyChanged. To verify if the view logic actually works, we can use DispatcherTimer and keep flipping the status message and see the effects in the view. We can update the view model by adding this code:

private DispatcherTimer _timer;

public MainWindowViewModel()
{
_connectionStatusMessage = "Disconnected";

_timer = new DispatcherTimer();
_timer.Interval = TimeSpan.FromSeconds(3);
_timer.Tick += new EventHandler(_timer_Tick);
_timer.Start();
}

void _timer_Tick(object sender, EventArgs e)
{
ConnectionStatusMessage = (_connectionStatusMessage == "Connected") ? "Disconnected" : "Connected";
}

We have just used DispatcherTimer to flip the status message between Connected and Disconnected after every 3 seconds. When we run the application we notice that when status message is Connected, the foreground appears as Green, otherwise for Disconnected, it appears as Red. We would need to add System.Windows.Threading namespace for using DispatcherTimer. When we are done testing, the above code can be safely removed from the view model.

Now we consider that we have a Server connection type which is observable. We will see the possible gains, by just making these types Observable. Here we are using static methods of Observable and Observer classes available in System.Linq namespace. We need to add the reference of System.Reactive assembly (currently need to install reactive extensions).

Observable.Create allows to define an Observable without actually implementing IObservable interface. We can specify the code we need to execute when an observer subscribes to this. This is an example of Cold Observable i.e. it only exists when an observable subscribes to it. Since server communications are generally maintained on a separate worker thread, we have created a new background thread [IsBackground = true]. This would also keep the UI responsive. Observable.Create has two overloads. One overload returns an IDisposable. The other one returns an Action delegate. This is the code executed when the observer gets unsubscribed. We have specified the return statement in the form of a lambda statement. All the Thread.Sleep statements are just to keep the different messages on the UI for a while so that we could see those messages on the display.

public MainWindowViewModel()
{
IObservable<Server> observableServerConnection = Observable.Create<Server>((o) =>
{
new Thread(() => {
for (int i = 0; i < 3; i++)
{
Thread.Sleep(2000);
o.OnNext(new Server() { ServerStatus = "Connected"});
}
o.OnError(new Exception("Server Disconnected"));
}) {IsBackground = true}.Start();

return () =>
{
ConnectionStatusMessage = "Running Disconnect Procedure!";

Thread.Sleep(2000);
ConnectionStatusMessage = "Disconnected";
};
});

IObserver<Server> observer = Observer.Create<Server>(
(server) => this.ConnectionStatusMessage = server.ServerStatus,
(ex) => {
this.ConnectionStatusMessage = "Getting Disconnected";
Thread.Sleep(2000);
});

observableServerConnection.Subscribe(observer);

}

Like Observable, Observer also allows us to define an IObserver without actually implementing it by a class. We can specify the handlers for OnNext, OnCompleted and OnError. There are different overloads available. At the minimum, we need to specify the handler for OnNext. Here we have specified the handlers for OnNext and OnError. For the remaining methods, default implementation of these methods in Observer class are used. For this example, the connection status is updated on the display as follows:

After each OnNext message (step 1):


After OnError is called from Observable (step 2)


Observer is automatically un-subscribed (step 3-a)


Observer is automatically un-subscribed (step 3-b)



In order to understand the above example better, we can draw a marble diagram. You can see that the Observable keeps on sending OnNext messages ten times. After that it sends calls OnError on the Observer. After this, the unsubscription code is executed. This explains the order of messages:

- Connected (Due to OnNext)
- Getting Disconnected (Due to OnError)
- Running Disconnect Procedure! (Due to unsubscription)
- Disconnected (Due to unsubscription)

Now you can see that unsubscription code is executed when OnError is finished execution.


Attempt to Retry few times before giving up:
Now we assume a requirement which directs us to try to reconnect a few times to a server before giving it up. We want to show that how easily it is for Observable to work in a situation like this. Just comment the subscription in the previous code and update it with the following code:

observableServerConnection.Retry(2).Subscribe(observer);

This code attempts to subscribe two times. If there is an error after two times, OnError of Observable is called with the exact exception message. The order of messages displayed on the screen would be as follows:

- Connected
- Running Disconnect Procedure!
- Disconnected
- Connected
- Getting Disconnected
- Running Disconnect Procedure!
- Disconnected

After seeing the marble diagram below the above sequence of messages should make sense. It can tell why OnError is executed before unsubscription code when observable is retrying. Basically, for the last Observable, the unsubscription code would be executed after OnCompleted / OnError is called on the Observable.



After Certain number of attempts, connect to a different server:
What if we need to connect to a different server after making a certain number of attempts to a server. In the example below, we are trying to reconnect two times to the server. After failing again, it just doesn't give up but attempt to connect to a different server. Look at how easy it could become if we implement Server's connection as Observable. Here we have used Catch method of Observable. We just need to update the constructor or MainWindowViewModel. We have created a new Observable like already existing Observable. If there is no error finally in the Observable on which Catch is defined then this Observable is not subscribed.

public MainWindowViewModel()
{
IObservable<Server> observableServerConnection =
Observable.Create<Server>((o) =>
{
new Thread(() =>
{
for (int i = 0; i < 3; i++)
{
Thread.Sleep(2000);
o.OnNext(new Server()
{ServerStatus = "Connected"});
}

o.OnError(new Exception("Server Disconnected"));
//o.OnCompleted();
}) {IsBackground = true}.Start();

return () =>
{
ConnectionStatusMessage =
"Running Disconnect Procedure!";

Thread.Sleep(2000);

ConnectionStatusMessage = "Disconnected";
};
});

IObservable<Server> observableServerConnection2 =
Observable.Create<Server>((o) =>
{
new Thread(() =>
{
for (int i = 0; i < 3; i++)
{
Thread.Sleep(2000);
o.OnNext(new Server() { ServerStatus = "Connected" });
}

o.OnError(new Exception("Server Disconnected2"));
//o.OnCompleted();
}) { IsBackground = true }.Start();

return () =>
{
ConnectionStatusMessage =
"Running Disconnect Procedure 2!";

Thread.Sleep(2000);

ConnectionStatusMessage = "Disconnected2";
};
});

IObserver<Server> observer =
Observer.Create<Server>(
(server) => this.ConnectionStatusMessage = server.ServerStatus,
(ex) =>
{
this.ConnectionStatusMessage = "Getting Disconnected";
Thread.Sleep(2000);
});

//observableServerConnection.Subscribe(observer);

observableServerConnection.Retry(2).Catch<Server>(observableServerConnection2).Subscribe(observer);

}

The view goes through these steps when application is run:

- Connected
- Running Disconnect Procedure!
- Disconnected
- Connected
- Running Disconnect Procedure!
- Disconnected
- Connected
- Getting Disconnected
- Running Disconnect Procedure 2!
- Disconnected2

Download:

Sunday, February 13, 2011

MVVM - Joining Rx's IObservables using LINQ to IObservable in a WPF Application

In this post, we will see how we can combine two IObservable(s) using LINQ. The project is developed on top of the code as developed in this post:

http://shujaatsiddiqi.blogspot.com/2011/02/mvvm-view-model-iobserver-observing.html

We will just be discussing the changes required for this example. For the discussion of complete code please refer to the link provided above.

As in the previous post, the view model will be observing the data streams from to IObservable. It needs to combine them both using LINQ (as specified above). It would be interesting as we would see how data is combined when data is pushed by the IObservable. As soon as the data is available, it would automatically combine it using different operators provided by Reactive Extensions (Rx). Since we don't need to display data separately in the view, we update the view just to have one ListBox. The ListBox should show Id and Name of each Student. It should also show the name of the course the student is enrolled in, which could be obtained from observing CourseModel.


After completion of data stream, the use should be notified by changing the Fill color of ellipse as green. The expected final display is shown as follows:



To update the view as described above, we need to update the design of the view as follows:

<Window x:Class="WpfApp_MVVM_ReactiveModel.MainWindow"
xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
xmlns:local="clr-namespace:WpfApp_MVVM_ReactiveModel"
Title="MainWindow" Height="350" Width="525" >
<Window.DataContext>
<local:MainWindowViewModel />
</Window.DataContext>
<Grid>
<ListBox Height="250" HorizontalAlignment="Left" Margin="-2,0,0,0"
Name="listBox1" VerticalAlignment="Top" Width="493"
ItemsSource="{Binding StudentList}">
<ListBox.ItemTemplate>
<DataTemplate >
<StackPanel Orientation="Horizontal">
<TextBlock Text="{Binding StudentId, StringFormat=Id:{0}}" />
<TextBlock Text=" " />
<TextBlock Text="{Binding StudentName, StringFormat=Name:{0}}" />
<TextBlock Text=" " />
<TextBlock Text="{Binding CourseName, StringFormat=Name:{0}}" />
</StackPanel>
</DataTemplate>
</ListBox.ItemTemplate>
</ListBox>
<Ellipse Height="39" HorizontalAlignment="Left" Margin="370,257,0,0"
Name="ellipse1" Stroke="Black" VerticalAlignment="Top" Width="116" >
<Ellipse.Style>
<Style TargetType="{x:Type Ellipse}">
<Style.Setters>
<Setter Property="Fill" Value ="Blue" />
</Style.Setters>
<Style.Triggers>
<DataTrigger Binding="{Binding IsLoaded}" Value="true" >
<Setter Property="Fill" Value="Green" />
</DataTrigger>
</Style.Triggers>
</Style>
</Ellipse.Style>
</Ellipse>
</Grid>
</Window>

The view is still using MainWindowViewModel as DataContext. You can see that an additional TextBlock is added to the template of items of ListBox. It is bound to CourseName from each item of the collection bound to the list box. The DataContext is expected to have a collection StudentList with items having properties StudentId, StudentName and CourseName. This is implemented as an ObservableCollection so that as the items are entered in the collection, the view could update itself using the notification mechanism built-in for this type of collection. It should also have a boolean property, IsDataLoaded. It should turn true when data is completely loaded. This is used in DataTrigger in the view to turn the notification ellipse to be filled as Green. We need to implement INotifyPropertyChanged just because of this property so that view can be notified for this change and the data triggered could trigger the fill color of the ellipse.

class MainWindowViewModel : INotifyPropertyChanged
{
private readonly ObservableCollection<StudentViewModel> _studentList =
new ObservableCollection<StudentViewModel>();

public ObservableCollection<StudentViewModel> StudentList
{
get { return _studentList; }
}

private bool _isLoaded = false;
public bool IsLoaded
{
get { return _isLoaded; }
set
{
_isLoaded = value;
onPropertyChanged("IsLoaded");
}
}

public MainWindowViewModel()
{
var courseModel2 = new CourseModel();
var studentModel2 = new StudentsModel();

(from c in courseModel2
from s in studentModel2
where s.CourseId == c.CourseId
select new { s.StudentName, s.StudentId, c.CourseName })
.Subscribe(
(on) => _studentList.Add(new StudentViewModel(on)),
(ex) => Console.WriteLine(@"On error"),
() => IsLoaded = true);

}

#region INotifyPropertyChanged implementation
public event PropertyChangedEventHandler PropertyChanged;
private void onPropertyChanged(string propertyname)
{
if (PropertyChanged != null)
{
PropertyChanged(this, new PropertyChangedEventArgs(propertyname));
}
}
#endregion INotifyPropertyChanged implementation
}

The focal point of this post is the constructor of MainWindowViewModel. Here we are combining to IObservable based collections using LINQ. This is a special new flavor of LINQ released with Rx. This is called LINQ to IObservable. This is available in System.Reactive assembly. Let's have a close look at the constructor.
>
public MainWindowViewModel()
{
var courseModel2 = new CourseModel();
var studentModel2 = new StudentsModel();

(from c in courseModel2
from s in studentModel2
where s.CourseId == c.CourseId
select new { s.StudentName, s.StudentId, c.CourseName })
.Subscribe(
(on) => _studentList.Add(new StudentViewModel(on)),
(ex) => Console.WriteLine(@"On error"),
() => IsLoaded = true);

}

So we are cross-joining instances of CourseModel and StudentsModel. This would be creating a Cartesian Product. We are filtering it for only those which have matching CourseId(s).

As specified in the definition of the view, we are interested in StudentId and StudentName from StudentsModel and CourseName from CourseModel.

The cross product of Cartesian Product of two or more IObservable(s) is also an IObservable. This enables us to subscribe to it by providing the OnNext, OnCompleted and OnError of Observer. It is to remember that MainWindowViewModel is not the observable for the new IObservable created by combining CourseModel and StudentsModel. Rx run-time would take care of how the provided lambda expressions for OnNext, OnCompleted and OnError are to be executed.

The overload of Subscribe method that we have used takes three arguments OnNext, OnError and OnCompleted respectively. We are not doing anything useful in the lambda expression provided for OnError except just writing on the exception message on the console. In the OnCompleted expressoin we are setting IsLoaded to true. This would trigger the DataTrigger in the view and change the fill color of Notification ellipse to Green. The type of parameter of OnNext is the anonymous type created for Select. This would have same elements with similar names. As we have data available in OnNext, we are creating a StudentViewModel object and add it to the StudentsList ObservableCollection . This is bound to the ListBox in the view which would show this new element. Though we can not use foreach for IObservable as it is a feature of just IEnumerable, we can understand it by this nested foreach loop. Don't even try this:


Since the combined collections have IObservable based, we don't have the data available at the same time. This makes it very special form of joining. This would keep each element of courseModel2 in memory until OnCompleted is not executed from studentModel2. As the elements are being received in background OnNext for studentModel they are combined with each element of courseModel. This would be creating equal number of Observers for studentModel2 as the total number of elements pushed by courseModel2. As the elements of courseModel2 continue to be received they are added to the list of Observers of studentModel2.


We should be able to safely make a summarized statement about it as follows:

All the elements pushed through OnNext of courseModel2 would be combined with all the elements pushed by studentModel2. If there is any OnNext of either of these IObservable before this LINQ, that would not be considered for joining.


As soon as the OnCompleted is executed for this Observer created at runtime, it would automatically unsubscribe it by disposing itself. As we have seen in previous posts that disposing an unsubscriber would result in its unsubscription. I think this is the default implementation provided for this dynamic Observer by Rx runtime.

This would need updates in StudentViewModel. We need to add a property CourseName as used in MainWindowViewModel. There is one more interesting thing, the datatype of parameter for constructor is specified as dynamic. This is basically because of anonymous type constructed during LINQ operation. In the constructor, we are using it as a duck type datatype, we know what properties would it have but we don't know it's name as it would be assigned any arbitrary name at run-time. This is a special feature of .net framework 4.0 [http://msdn.microsoft.com/en-us/library/dd264736.aspx].

class StudentViewModel : INotifyPropertyChanged
{
public StudentViewModel(dynamic student)
{
this.StudentId = student.StudentId;
this.StudentName = student.StudentName;
this.CourseName = student.CourseName;
}

private string _courseName;
public string CourseName
{
get { return _courseName; }
set
{
_courseName = value;
OnPropertyChanged("CourseName");
}
}

private string _studentName;
public string StudentName
{
get { return _studentName; }
set
{
_studentName = value;
OnPropertyChanged("StudentName");
}
}

private int _studentId;
public int StudentId
{
get { return _studentId; }
set
{
_studentId = value;
OnPropertyChanged("StudentId");
}
}

#region INotifyPropertyChanged implementation

public event PropertyChangedEventHandler PropertyChanged;
private void OnPropertyChanged(string propertyName)
{
if (PropertyChanged != null)
{
PropertyChanged(this, new PropertyChangedEventArgs(propertyName));
}
}

#endregion INotifyPropertyChanged implementation
}

So we need to update Student to add a new property, CourseId. This would be used as a key to join StudentsModel and CourseModel IObservable collections. This is int type.
>
class Student
{
public string StudentName { get; set; }
public int StudentId { get; set; }
public int CourseId { get; set; }
}

Since we have updated Student type, we need to update the definition for StudentsModel. This is to specify value of CourseId. We are just deriving it based on a algorithm [If _studentId is odd then set CourseId as 2, otherwise, set it as 1]. We are also reducing the number of times that timer event would execute. You can see that we are calling Observer's OnCompleted just when the event has been executed 3 times. As we call OnCompleted on Observer, we are also stopping the timer, this would not keep the timer running on the background.
>
class StudentsModel : IObservable<Student>
{
private int _studentId = 1;

//Timer to notify observers about updates in the reactive observable
private readonly DispatcherTimer _t = new DispatcherTimer();

public List<IObserver<Student>> Observers =
new List<IObserver<Student>>();

public StudentsModel()
{
_t.Interval = TimeSpan.FromMilliseconds(3000);
_t.Tick += new EventHandler(Tick);
_t.Start();
}

void Tick(object sender, EventArgs e)
{
var student = new Student()
{
StudentId = _studentId,
StudentName = string.Format("Student: {0}", _studentId),
CourseId = ((int)(_studentId % 2)) + 1
};

if (_studentId <= 3)
{
Observers.ForEach((observer) => observer.OnNext(student));
_studentId++;
}
else
{
Observers.ForEach((observer) => observer.OnCompleted());
_t.Stop();
}
}

public IDisposable Subscribe(IObserver<Student> observer)
{
IDisposable unSubscriber = new Unsubscriber<Student>(Observers, observer);
if (!Observers.Contains(observer))
{
Observers.Add(observer);
}
return unSubscriber;
}
}

Now Let's run the application. The application runs successfully and we see the elements being added to the list box.



But there is a weird thing. The notification ellipse is still filled as blue. It should have turned Green if OnCompleted of the LINQ result is executed in MainWindowViewModel. It means that OnCompleted ,for this, is not executed. You can prove that by inserting break point in the lambda expression used for OnCompleted. But the main question is why it is not executed??

Basically, we have discussed that equal numbers of Observers would be created for studentModel2 as the number of elements pushed by courseModel2 after the LINQ statment is executed. So there are will be two observers for this. If we insert break-point in the Tick event handler for timer _t in StudentsModel then we realize that OnCompleted is executed only once. We need to execute OnCompleted for all observers to fix this. This is really weird because we are using Foreach on Obsevers List. It should have executed the OnCompleted for all observers. There should have been no Observer in the list before next statement is executed [_t.Stop()]. But if we put a break point then it seems that OnCompleted is executed only once as one Observer is still lingering on.


This seems to get fixed if we update the code as follows:

void Tick(object sender, EventArgs e)
{
var student = new Student()
{
StudentId = _studentId,
StudentName = string.Format("Student: {0}", _studentId),
CourseId = ((int)(_studentId % 2)) + 1
};

if (_studentId <= 3)
{
Observers.ForEach((observer) => observer.OnNext(student));
_studentId++;
}
else
{
Observers.ToList().ForEach((observer) => observer.OnCompleted());
_t.Stop();
}
}

Now run this and insert the break point appropriately.



We have just called ToList() on a collection which is already a Generic List to fix this. This shows some issue with the deferred execution of Lambdas of foreach here, which is executing OnCompleted for each observer. This would definitely introduce some performance delay but whatever works...

Now since all the OnCompleted have been executed for all the collections involved in the LINQ query, it is safe to executed the OnCompleted of the result. Rx runtime does exactly that. This sets IsLoaded to true and results in filling the Notification ellipse as Green.



Download Code: