Tuesday, February 15, 2011

WPF MVVM - Retry & Catch for Observables in Reactive Extension Rx

In this post we are going to simulate a scenario of client server communication. This would simulate how observables can be used to attempt to reconnect to the server and then switching to a different server after a few unsuccessful attempts. We will keep the view simple. We would just keep a TextBlock to show the status of connection. If it is connected, the status should be displayed with green foreground, otherwise it should be shown with Red foreground.

<Window x:Class="Wpf_MVVM_Observables.MainWindow"
xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
xmlns:local="clr-namespace:Wpf_MVVM_Observables"
Title="MainWindow" Height="350" Width="525">
<Window.DataContext>
<local:MainWindowViewModel />
</Window.DataContext>
<Grid>
<Label Content="Connection Status:" Height="25" HorizontalAlignment="Left"
Margin="12,114,0,0" Name="lblConnectionStatusMessage" VerticalAlignment="Top" Width="122" />
<TextBlock Height="27" HorizontalAlignment="Left" Margin="140,119,0,0"
Name="textBlockConnectionStatusMessage" Text="{Binding ConnectionStatusMessage}"
VerticalAlignment="Top" Width="221" >
<TextBlock.Style>
<Style>
<Setter Property="TextBlock.Foreground" Value="Red" />
<Style.Triggers>
<DataTrigger Binding="{Binding ConnectionStatusMessage}" Value="Connected">
<Setter Property="TextBlock.Foreground" Value="Green" />
</DataTrigger>
</Style.Triggers>
</Style>
</TextBlock.Style>
</TextBlock>
</Grid>
</Window>

This is expecting MainWindowViewModel in the same assembly. The view model should have a property, named ConnectionStatusMessage. The text of connection message is shown by binding it directly to the Text property of the TextBlock. The foreground of the message is managed through a DataTrigger on the TextBlock. DataTrigger allows us to trigger based on the values of ConnectionStatusMessage from DataContext. If the value is “Connected”, we are setting it as Green, otherwise, it would keep it as Red. Below are the way the view should appear in both states:

When connected:


When Disconnected (with specific connection status message):


The simplest view model to fulfill the requirements of the above view can be developed as presented below:

class MainWindowViewModel : INotifyPropertyChanged
{
#region Properties

private string _connectionStatusMessage;

public string ConnectionStatusMessage
{
get { return _connectionStatusMessage; }
set
{
_connectionStatusMessage = value;
OnPropertyChanged("ConnectionStatusMessage");
}
}

#endregion Properties

#region INotifyPropertyChanged implementation

public event PropertyChangedEventHandler PropertyChanged = delegate { };

private void OnPropertyChanged(string propertyName)
{
PropertyChanged(this, new PropertyChangedEventArgs(propertyName));
}

#endregion INotifyPropertyChanged implementation
}

As expected by the view, it has a property ConnectionStatusMessage. This is a string property. This is supporting change notification by implementing INotifyPropertyChanged. To verify if the view logic actually works, we can use DispatcherTimer and keep flipping the status message and see the effects in the view. We can update the view model by adding this code:

private DispatcherTimer _timer;

public MainWindowViewModel()
{
_connectionStatusMessage = "Disconnected";

_timer = new DispatcherTimer();
_timer.Interval = TimeSpan.FromSeconds(3);
_timer.Tick += new EventHandler(_timer_Tick);
_timer.Start();
}

void _timer_Tick(object sender, EventArgs e)
{
ConnectionStatusMessage = (_connectionStatusMessage == "Connected") ? "Disconnected" : "Connected";
}

We have just used DispatcherTimer to flip the status message between Connected and Disconnected after every 3 seconds. When we run the application we notice that when status message is Connected, the foreground appears as Green, otherwise for Disconnected, it appears as Red. We would need to add System.Windows.Threading namespace for using DispatcherTimer. When we are done testing, the above code can be safely removed from the view model.

Now we consider that we have a Server connection type which is observable. We will see the possible gains, by just making these types Observable. Here we are using static methods of Observable and Observer classes available in System.Linq namespace. We need to add the reference of System.Reactive assembly (currently need to install reactive extensions).

Observable.Create allows to define an Observable without actually implementing IObservable interface. We can specify the code we need to execute when an observer subscribes to this. This is an example of Cold Observable i.e. it only exists when an observable subscribes to it. Since server communications are generally maintained on a separate worker thread, we have created a new background thread [IsBackground = true]. This would also keep the UI responsive. Observable.Create has two overloads. One overload returns an IDisposable. The other one returns an Action delegate. This is the code executed when the observer gets unsubscribed. We have specified the return statement in the form of a lambda statement. All the Thread.Sleep statements are just to keep the different messages on the UI for a while so that we could see those messages on the display.

public MainWindowViewModel()
{
IObservable<Server> observableServerConnection = Observable.Create<Server>((o) =>
{
new Thread(() => {
for (int i = 0; i < 3; i++)
{
Thread.Sleep(2000);
o.OnNext(new Server() { ServerStatus = "Connected"});
}
o.OnError(new Exception("Server Disconnected"));
}) {IsBackground = true}.Start();

return () =>
{
ConnectionStatusMessage = "Running Disconnect Procedure!";

Thread.Sleep(2000);
ConnectionStatusMessage = "Disconnected";
};
});

IObserver<Server> observer = Observer.Create<Server>(
(server) => this.ConnectionStatusMessage = server.ServerStatus,
(ex) => {
this.ConnectionStatusMessage = "Getting Disconnected";
Thread.Sleep(2000);
});

observableServerConnection.Subscribe(observer);

}

Like Observable, Observer also allows us to define an IObserver without actually implementing it by a class. We can specify the handlers for OnNext, OnCompleted and OnError. There are different overloads available. At the minimum, we need to specify the handler for OnNext. Here we have specified the handlers for OnNext and OnError. For the remaining methods, default implementation of these methods in Observer class are used. For this example, the connection status is updated on the display as follows:

After each OnNext message (step 1):


After OnError is called from Observable (step 2)


Observer is automatically un-subscribed (step 3-a)


Observer is automatically un-subscribed (step 3-b)



In order to understand the above example better, we can draw a marble diagram. You can see that the Observable keeps on sending OnNext messages ten times. After that it sends calls OnError on the Observer. After this, the unsubscription code is executed. This explains the order of messages:

- Connected (Due to OnNext)
- Getting Disconnected (Due to OnError)
- Running Disconnect Procedure! (Due to unsubscription)
- Disconnected (Due to unsubscription)

Now you can see that unsubscription code is executed when OnError is finished execution.


Attempt to Retry few times before giving up:
Now we assume a requirement which directs us to try to reconnect a few times to a server before giving it up. We want to show that how easily it is for Observable to work in a situation like this. Just comment the subscription in the previous code and update it with the following code:

observableServerConnection.Retry(2).Subscribe(observer);

This code attempts to subscribe two times. If there is an error after two times, OnError of Observable is called with the exact exception message. The order of messages displayed on the screen would be as follows:

- Connected
- Running Disconnect Procedure!
- Disconnected
- Connected
- Getting Disconnected
- Running Disconnect Procedure!
- Disconnected

After seeing the marble diagram below the above sequence of messages should make sense. It can tell why OnError is executed before unsubscription code when observable is retrying. Basically, for the last Observable, the unsubscription code would be executed after OnCompleted / OnError is called on the Observable.



After Certain number of attempts, connect to a different server:
What if we need to connect to a different server after making a certain number of attempts to a server. In the example below, we are trying to reconnect two times to the server. After failing again, it just doesn't give up but attempt to connect to a different server. Look at how easy it could become if we implement Server's connection as Observable. Here we have used Catch method of Observable. We just need to update the constructor or MainWindowViewModel. We have created a new Observable like already existing Observable. If there is no error finally in the Observable on which Catch is defined then this Observable is not subscribed.

public MainWindowViewModel()
{
IObservable<Server> observableServerConnection =
Observable.Create<Server>((o) =>
{
new Thread(() =>
{
for (int i = 0; i < 3; i++)
{
Thread.Sleep(2000);
o.OnNext(new Server()
{ServerStatus = "Connected"});
}

o.OnError(new Exception("Server Disconnected"));
//o.OnCompleted();
}) {IsBackground = true}.Start();

return () =>
{
ConnectionStatusMessage =
"Running Disconnect Procedure!";

Thread.Sleep(2000);

ConnectionStatusMessage = "Disconnected";
};
});

IObservable<Server> observableServerConnection2 =
Observable.Create<Server>((o) =>
{
new Thread(() =>
{
for (int i = 0; i < 3; i++)
{
Thread.Sleep(2000);
o.OnNext(new Server() { ServerStatus = "Connected" });
}

o.OnError(new Exception("Server Disconnected2"));
//o.OnCompleted();
}) { IsBackground = true }.Start();

return () =>
{
ConnectionStatusMessage =
"Running Disconnect Procedure 2!";

Thread.Sleep(2000);

ConnectionStatusMessage = "Disconnected2";
};
});

IObserver<Server> observer =
Observer.Create<Server>(
(server) => this.ConnectionStatusMessage = server.ServerStatus,
(ex) =>
{
this.ConnectionStatusMessage = "Getting Disconnected";
Thread.Sleep(2000);
});

//observableServerConnection.Subscribe(observer);

observableServerConnection.Retry(2).Catch<Server>(observableServerConnection2).Subscribe(observer);

}

The view goes through these steps when application is run:

- Connected
- Running Disconnect Procedure!
- Disconnected
- Connected
- Running Disconnect Procedure!
- Disconnected
- Connected
- Getting Disconnected
- Running Disconnect Procedure 2!
- Disconnected2

Download:

No comments: