Thursday, February 17, 2011

IObservable.Throttle causes OnNext on ThreadPool thread

This is a continuation of our following post:

http://shujaatsiddiqi.blogspot.com/2011/02/mvvm-observable-inotifypropertychangedp.html

In the above post we discussed about using PropertyChanged event of INotifyPropertyChanged by observing EventStream support of Reactive Extensions Rx. In this post we change the view model a bit. We remove the implementation of INotifyPropertyChanged. We rather define it as a DependencyObject to support a different change notification mechanism from view model. This is to demonstrate the following:

IObservable.Throttle causes OnNext to be generated on a ThreadPool thread instead.

We will see what problems might be caused by this and how we can work around this.

public class StudentViewModel : DependencyObject
{
#region Model
private Student _model;
#endregion Model

#region Constructor

public StudentViewModel(Student student)
{
_model = student;

Observable.FromEvent<PropertyChangedEventArgs>(_model, "PropertyChanged")
.Where(et => et.EventArgs.PropertyName == "StudentName")
.Subscribe((arg) =>
{
this.StudentId = _model.StudentId;
this.StudentName = _model.StudentName;
});
}

#endregion Constructor

#region Dependency Properties

public static DependencyProperty StudentIdProperty =
DependencyProperty.Register("StudentId", typeof(int), typeof(StudentViewModel));
public int StudentId
{
get { return (int)GetValue(StudentIdProperty); }
set { SetValue(StudentIdProperty, value); }
}

public static DependencyProperty StudentNameProperty =
DependencyProperty.Register("StudentName", typeof(string), typeof(StudentViewModel));
public string StudentName
{
get { return (string)GetValue(StudentNameProperty); }
set { SetValue(StudentNameProperty, value); }
}

#endregion Dependency Properties

#region Overriden methods of DependencyObject
protected override void OnPropertyChanged(DependencyPropertyChangedEventArgs e)
{
base.OnPropertyChanged(e);
switch (e.Property.Name)
{
case "StudentId":
_model.StudentId = (int)e.NewValue;
break;
case "StudentName":
_model.StudentName = (string)e.NewValue;
break;
}
}
#endregion Overriden methods of DependencyObject
}

We have update the properties StudentId and StudentName to be Dependency properties. To synchronize the model's corresponding properties we have overridden OnPropertyChanged method of DependencyObject. The argument DependencyPropertyChangedEventArgs , not only, provides us the details about the property being updated but it also provides the old and new values of this property. As you can see we are just using the new values of these properties to update the corresponding model's properties.

We have kept the handling of model's PropertyChanged event observation using Reactive extension's Observable event stream support available in CoreEx assembly. Let's run the application now and open two child windows.


When we enter StudentName in one child window, the same changes are reflected in the other child window.


Now we add throttling support in order to avoid frequent updates to the other child windows due to updates in model's properties. Let's update the constructor of StudentViewModel as follows:

public StudentViewModel(Student student)
{
_model = student;

Observable.FromEvent<PropertyChangedEventArgs>(_model, "PropertyChanged")
.Where(et => et.EventArgs.PropertyName == "StudentName")
.Throttle(TimeSpan.FromMilliseconds(500))
.Subscribe((arg) =>
{
this.StudentId = _model.StudentId;
this.StudentName = _model.StudentName;
});
}

This definition is same as previous. We have just used Throttling for PropertyChanged event for 500 milliseconds. Now we run the application. Open two child windows and enter data in StudentName property in one of the child window. As soon as we settle, the application shuts down. Let's see what is going on by updating the code further as follows:

public StudentViewModel(Student student)
{
_model = student;

Observable.FromEvent<PropertyChangedEventArgs>(_model, "PropertyChanged")
.Where(et => et.EventArgs.PropertyName == "StudentName")
.Throttle(TimeSpan.FromMilliseconds(500))
.Subscribe((arg) =>
{
try
{
this.StudentId = _model.StudentId;
this.StudentName = _model.StudentName;
}
catch (Exception ex)
{
MessageBox.Show(ex.Message);
}
});
}

Now we run the application and enter data in StudentName text box as previous. The exception handling logic causes the following MessageBox to be displayed.


This is because of the reason that IObservable.Throttle causes OnNext to be placed in a ThreadPool thread.


Using Invoke / BeginInvoke on Dispatcher:
The first solution could be to directly using Dispatcher.Invoke / Dispatcher.BeginInvoke. We can specify the code that we want to execute in the Action delegate. The delegate's code is executed here on UI thread. We update the constructor as follows:

public StudentViewModel(Student student)
{
_model = student;

Observable.FromEvent<PropertyChangedEventArgs>(_model, "PropertyChanged")
.Where(et => et.EventArgs.PropertyName == "StudentName")
.Throttle(TimeSpan.FromMilliseconds(500))
.Subscribe((arg) =>
{
Dispatcher.BeginInvoke(new Action(() =>
{
this.StudentId = _model.StudentId;
this.StudentName = _model.StudentName;
}), null);

});
}

Now we run the application. Again we open two windows and enter data in StudentName text box. We see that the data is successfully updated in the other window now. Let's insert the break point again in OnNext code.


Using other overload of Throttle:
We can use other overload of IObservable.Throttle method. This requires an additional overload which has an additional parameter of type IScheduler. IScheduler is a Rx interface available in CoreEx assembly and System.Concurrency namespace.

public StudentViewModel(Student student)
{
_model = student;

Observable.FromEvent<PropertyChangedEventArgs>(_model, "PropertyChanged")
.Where(et => et.EventArgs.PropertyName == "StudentName")
.Throttle(TimeSpan.FromMilliseconds(500), Scheduler.Dispatcher)
.Subscribe((arg) =>
{
this.StudentId = _model.StudentId;
this.StudentName = _model.StudentName;
});
}

We have specified Scheduler.Dispatcher for other parameter. This is also available in System.Concurrency namespace. It is available in CoreEx assembly. This is saving us from dispatching in our code but Throttle's generated OnNext messages will be generated in the UI thread. Let's run the application and open two child windows like before.


Download:

No comments: