Wednesday, March 2, 2011

Rx - Executing IObservable in Parallel in a WPF application

In this post we are going to take the asynchronous code execution using Reactive extension a bit further. We would see how we can have multiple IObservable (s) are executed in parallel. Rx supports this using Observable.ForkJoin. We can have code executed in parallel which doesn't return any data, return same or different data.

It's not blocking rather still asynchronous:
Let's create a simple view with a single message and a button. We need to execute the IObservable in parallel when button is clicked.
<Window x:Class="WpfApp_Reactive_ForkJoin.MainWindow"
        xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
        xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
        xmlns:local="clr-namespace:WpfApp_Reactive_ForkJoin"
        Title="MainWindow" Height="350" Width="525">
    <Window.DataContext>
        <local:MainWindowViewModel />
    </Window.DataContext>
    <Grid>
        <TextBlock Height="47" HorizontalAlignment="Left" Margin="70,63,0,0" 
                   Name="textBlock1" Text="{Binding Message}" VerticalAlignment="Top" Width="384" />
        <Button Content="Start Operations" Height="38" HorizontalAlignment="Left" Margin="68,134,0,0"
                Name="btnStartOperations" VerticalAlignment="Top" Width="143" 
                Command="{Binding OperationCommand}"/>
    </Grid>
</Window>
The view is assigning an instance of MainWindowViewModel as the DataContext of the Window. The view expects Message property from the view model to bind it to the Text property of the TextBlock. It also expects an ICommand OperationCommand in the view model. We are implementing INotifyPropertyChanged so we need to provide PropertyChanged event in this view model. It has Message property which is bound to the Text property of the TextBlock in the view.
class MainWindowViewModel : INotifyPropertyChanged
{
    #region Constructor
    public MainWindowViewModel()
    {

    }
    #endregion Constructor

    #region Public Properties

    string _message = "Operation not started yet...";
    public string Message
    {
        get { return _message; }
        set
        {
            _message = value;
            OnPropertyChanged("Message");
        }
    }

    #endregion Properties

    #region INotifyPropertyChanged implementation

    public event PropertyChangedEventHandler PropertyChanged = delegate { };
    private void OnPropertyChanged(string propertyName)
    {
        PropertyChanged(this, new PropertyChangedEventArgs(propertyName));
    }

    #endregion INotifyPropertyChanged implementation

    #region Operation Command

    RelayCommand _operationCommand;
    public ICommand OperationCommand
    {
        get
        {
            if (_operationCommand == null)
            {
                _operationCommand = new RelayCommand(param => this.StartOperation(),
                    param => this.CanStartOperation);
            }
            return _operationCommand;
        }
    }

    bool CanStartOperation
    {
        get
        {
            return true;
        }
    }

    [DebuggerHidden]
    public void StartOperation()
    {
        Message = "Operation running";

        Observable.ForkJoin(
            Observable.Start(() => { 
                                       Message = "In 1st Observable.Start";
                                       Debug.WriteLine("Executing 1st on Thread: {0}", Thread.CurrentThread.ManagedThreadId);
                                       Thread.Sleep(2000);
                                   }),
            Observable.Start(() => { 
                                       Message = "In 2nd Observable.Start";
                                       Debug.WriteLine("Executing 2nd on Thread: {0}", Thread.CurrentThread.ManagedThreadId);
                                       Thread.Sleep(2000);
                                   }),
            Observable.Start(() => { 
                                       Message = "In 3rd Observable.Start";
                                       Debug.WriteLine("Executing 3rd on Thread: {0}", Thread.CurrentThread.ManagedThreadId);
                                       Thread.Sleep(10000);
                                       throw new System.Exception("exception");
                                   })
            )
            .Subscribe(
                        (result) => { 
                                        Message = result.ToString();
                                        Debug.WriteLine("OnNext: Executing 1st on Thread: {0}", Thread.CurrentThread.ManagedThreadId);
                                        Thread.Sleep(2000); 
                                    },
                        (ex) => 
                                { 
                                    Message = ex.Message;
                                    Debug.WriteLine("OnError: Executing 1st on Thread: {0}", Thread.CurrentThread.ManagedThreadId);
                                    Thread.Sleep(2000); 
                                },
                        () => { 
                                Message = "Completed";
                                Debug.WriteLine("Completed: Executing 1st on Thread: {0}", Thread.CurrentThread.ManagedThreadId);
                                Thread.Sleep(2000);
                              });            
    }

    #endregion Operation Command
}
You can see that we have used an instance of RelayCommand for OperationCommand. Yes, this is the same RelayCommand which is from Josh Smith's MVVM article. Since we want the button always to be enabled so CanStartOperation always returns true. We have used StartOperation as Execute method for OperationCommand.

http://msdn.microsoft.com/en-us/magazine/dd419663.aspx

The following marble diagram might be used to represent the behavior of Fork.Join when all asynchronous operations actually return data. All this data is combined to generate a single OnNext with combined data. Since there is no exception so OnCompleted is generated when all data streams are completed.


Can Only Observable.Start be used?
In the above example we have used Observable.Start. We can use any IObservable no matter how it is created. We can even use IObservable (s) resulting from Observable.ToAsync from methods we want to execute asynchronously.
Observable.ForkJoin(
    Observable.Start(() => { 
                               Message = "In 1st Observable.Start";
                               Debug.WriteLine("Executing 1st on Thread: {0}", Thread.CurrentThread.ManagedThreadId);
                               Thread.Sleep(2000);
                           }),
    Observable.Start(() => { 
                               Message = "In 2nd Observable.Start";
                               Debug.WriteLine("Executing 2nd on Thread: {0}", Thread.CurrentThread.ManagedThreadId);
                               Thread.Sleep(2000);
                           }),
    Observable.ToAsync(myMethod)()
    )
    .Subscribe(
                (result) => { 
                                Message = result.ToString();
                                Debug.WriteLine("OnNext: Executing 1st on Thread: {0}", Thread.CurrentThread.ManagedThreadId);
                                Thread.Sleep(2000); 
                            },
                (ex) => 
                        { 
                            Message = ex.Message;
                            Debug.WriteLine("OnError: Executing 1st on Thread: {0}", Thread.CurrentThread.ManagedThreadId);
                            Thread.Sleep(2000); 
                        },
                () => { 
                        Message = "Completed";
                        Debug.WriteLine("Completed: Executing 1st on Thread: {0}", Thread.CurrentThread.ManagedThreadId);
                        Thread.Sleep(2000);
                      });            

Observable.ForkJoin and Exception Handling:
If any of the IObservable results in an exception the no OnNext message is generated from the resulting IObservable but a single OnError is generated. We can show this on a marble diagram as follows:


Operations returning different Data:
For Observable.ForkJoin, we can combine IObservable(s) which return no data or same / different data. Yes, it is possible that we have IObservable resulting in different data. We can use a different Overload of Observable.ForkJoin for this.
public void StartOperation()
{
    Observable.ForkJoin<int, string, string>(
        Observable.Start<int>(() =>
            {
                Thread.Sleep(2000);
                return 1;
            }),
        Observable.ToAsync<string>(myMethod)(),
        (StudentId, StudentName) => { return string.Format("Student- Id: {0}, Name: {1}", StudentId, StudentName); }
    ).Subscribe(
            (result) => { Message = result; },
            (ex) => { Message = ex.Message; });
}

private string myMethod()
{
    Thread.Sleep(5000);
    return "Muhammad";
}
When we run the application and click StartOperation. After some delay(simulated operational delay in parallel streams using Thread.Sleep) the window is updated. We can also use IObservables directly created using Observable.Create() method
public void StartOperation()
{
    Observable.ForkJoin<int, string, string>(
        Observable.Create<int>(
            (o) =>
            {
                Thread.Sleep(2000);
                o.OnNext(1);
                o.OnCompleted();

                return () => { };
            }),
            Observable.CreateWithDisposable<string>(
            (o) =>
            {
                Thread.Sleep(2000);
                o.OnNext("Muhammad");
                o.OnCompleted();

                return Disposable.Empty;
            }),
            (StudentId, StudentName) => { return string.Format("Student- Id: {0}, Name: {1}", StudentId, StudentName); }
    ).Subscribe(
            (result) => { Message = result; },
            (ex) => { Message = ex.Message; });

}
Let's run this. When we click the button the view updates as follows:


Many OnNext messages on Individual IObservable (s)
Observable.ForkJoin combines the last values of each Observable stream, combines them and generates OnNext message on the resulting IObservable.
public void StartOperation()
{
    Observable.ForkJoin<int, string, string>(
        Observable.Create<int>(
            (o) =>
            {
                Thread.Sleep(2000);
                o.OnNext(1);
                o.OnNext(2);
                o.OnCompleted();

                return () => { };
            }),
            Observable.Create<string>(
            (o) =>
            {
                Thread.Sleep(2000);
                o.OnNext("Muhammad");
                o.OnCompleted();

                return () => { };
            }),
            (StudentId, StudentName) => { return string.Format("Student- Id: {0}, Name: {1}", StudentId, StudentName); }
    ).Subscribe(
            (result) => { Message = result; },
            (ex) => { Message = ex.Message; });

}
When we run this, the first IObservable places two OnNext with data 1 and 2 sequentially. The second IObservable results in data "Muhammad". Let's run this:


It is because the third argument from this overload of Observable.ForkJoin has been used as a selector function. It has combined StudentId (int) and StudentName (string) into a string and placed OnNext on the resulting Observable.ForkJoin IObservable. When OnNext is received from the resulting IObservable, it has updated the Message property with the same message. Since this is bound to the view using WPF Binding System, the view is updated with this message.

Cleanup after Parallel Operations:
If we need to do some cleanup after these parallel operations are completed then we can do that in the Finally block executed after IObservable from Observable.ForkJoin finishes. As we know this gets executed no matter if these parallel operations were successful resulting in OnCompleted or there was some exception from any of the operations. In the latter case this gets executed after OnError is executed.

Observable.ForkJoin and Thread Management:
All the IObservable (s) in ForkJoin would be executing on different ThreadPool threads. The resulting IObservable would also be running on a TheredPool thread. Since ThreadPool threads might be recycled, it is possible that the same thread seems to be used for OnNext and OnCompleted as one of the threads used for executing one of the IObservable code. It must be remembered that OnNext, OnCompleted will always be executing on the same thread.
public void StartOperation()
{
    Message = "Operation running";

    Observable.ForkJoin(
        Observable.Start(() =>
        {
            Message = "In 1st Observable.Start";
            Debug.WriteLine("Executing 1st on Thread: {0}, IsThreadPoolThread: {1} ", 
                Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread);
            Thread.Sleep(2000);
        }),
        Observable.Start(() =>
        {
            Message = "In 2nd Observable.Start";
            Debug.WriteLine("Executing 2nd on Thread: {0}, IsThreadPoolThread: {1}",
                Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread);
            Thread.Sleep(2000);
        }),                
        Observable.Start(() => 
        { 
            Message = "In 3rd Observable.Start";
            Debug.WriteLine("Executing 3rd on Thread: {0}, IsThreadPoolThread: {1}", 
                Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread);
            Thread.Sleep(2000);                                           
        }))
        .Subscribe(
                    (result) =>
                    {
                        Message = result.ToString();
                        Debug.WriteLine("OnNext: Executing on Thread: {0}, IsThreadPoolThread: {1}", 
                            Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread);
                        Thread.Sleep(2000);
                    },
                    (ex) =>
                    {
                        Message = ex.Message;
                        Debug.WriteLine("OnError: Executing on Thread: {0}, IsThreadPoolThread: {1}", 
                            Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread);
                        Thread.Sleep(2000);
                    },
                    () =>
                    {
                        Message = "Completed";
                        Debug.WriteLine("Completed: Executing on Thread: {0}, IsThreadPoolThread: {1}", 
                            Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread);
                        Thread.Sleep(2000);
                    });
}

Executing 1st on Thread: 3, IsThreadPoolThread: True
Executing 2nd on Thread: 12, IsThreadPoolThread: True
Executing 3rd on Thread: 11, IsThreadPoolThread: True
OnNext: Executing on Thread: 11, IsThreadPoolThread: True
Completed: Executing on Thread: 11, IsThreadPoolThread: True

Executing 1st on Thread: 11, IsThreadPoolThread: True
Executing 2nd on Thread: 10, IsThreadPoolThread: True
Executing 3rd on Thread: 12, IsThreadPoolThread: True
OnNext: Executing on Thread: 12, IsThreadPoolThread: True
Completed: Executing on Thread: 12, IsThreadPoolThread: True

Executing 1st on Thread: 11, IsThreadPoolThread: True
Executing 2nd on Thread: 10, IsThreadPoolThread: True
Executing 3rd on Thread: 12, IsThreadPoolThread: True
OnNext: Executing on Thread: 3, IsThreadPoolThread: True
Completed: Executing on Thread: 3, IsThreadPoolThread: True


Download Code:

No comments: