Wednesday, May 29, 2013

Processing Pipelines with TPL Dataflow

Pipelining is a very well known design pattern. It is used when a stream of data elements are processed through a series of pre-determined steps where a output of one serves as input for other step. These steps might also be conditionally linked. The concept is similar to assembly line used in factories where a stream of raw material is continuously added which goes through a series of steps to a finished product. This is commonly used in manufacturing plants. It must be remembered that pipelining has no effect on the processing of a single element (responsiveness) but it is phenomenal to improve the throughput of the overall system.

Let's consider a simple dataflow pipeline where a data element is passed through square, offset and doubling stages. The output of each step is used as input of consecutive following step. A list of element needs to be passed through these stages which would result in a resulting output list with processed elements.

In order to abstract the implementation of the processor, let us introduce an implementation. We will be providing different implementation of the processor and discuss the details about them.

Linear Processor
This is the simplest implementation of the processing pipeline. This divides the processing steps in different methods. We apply the same series of operations on all elements in loop. We also keep adding the resulting elements in a separate collection. When the loop finishes execution, we have our desired results in the collection, which is the returned.

Although this is the simplest and easiest of all data flow pipeline implementation but it doesn't make use of any parallelism in the code. The time to execute all elements in the pipeline is linearly proportional to the size of input collection. So if we assign x, y and z time units to execute square, offset and double operations than to process n elements it would require n(x + y + z) time units.

For more complex steps, we might want to refactor our code by extracting the implementation of these operations into separate type. This would make unit testing these steps a lot easier.

Parallel Loop Processor
Based on the recommendations from Pattern & Practices about usage of Parallel Loop pattern, it is suitable when we need to apply independent operations to a list of elements. This fits to our requirement where we need to apply a series of independent operations to a list of elements. These independent operations are Square, Offset and Double. For simplicity sake, we are not discussing cancellation and exception handling for these loops. Here we have also made use of System.Collections.Concurrent.ConcurrentBag<T>. This type was introduced in .net framework 4.0. This is one of the implementation of IProducerConsumerCollection<T> also introduced in the same framework version. ConcurrentBag has introduced a shared state but this is a thread safe type.

The only caveat is actually a feature of Parallel loop pattern. We cannot determine the actual order of execution of elements. We could verify that by looking at the returned data from Process() method. In this case, it seems to be a requirement that order of result should be the same as the input collection. So the result from this processor would be logically wrong.

Pipeline Processor
As we discussed above, we cannot use Parallel loop pattern when the order of elements in the result matters. Pattern & Practice team recommends another pattern for such requirement. This pattern is called Pipelining. The pattern is used in situations where the elements of a collection passes through a series of steps where output of one step serves as input to the next step. This pattern improves the throughput of multi-core machines by distributing the steps to multiple cores.

Pipelining requires us to update the processing elements to use producer consumer based collections. Here each step produces elements which are then consumed by the next processing element in the pipeline. Producer / Consumer based collections in the processing pipeline make each step independent of immediate previous and next element. The time to execute a list of operations through a pipeline is over-shadowed by the slowest element in the processing pipeline.

In the following example, we have used an implementation of IProducerConsumerCollection<T> i.e. BlockingCollection<T>. This was also introduced in .net framework 4.0. This provides an ideal implementation for producer consumer scenarios. Here consumers can wait using GetConsumingEnumerable() method of the collection. As the elements are added to the collection, they are enumerated by consumers. This also supports notifying the consumers about the completion of the producer / consumer scenario. In this case, no more element can be added to the collection. After enumerating the existing elements in the collection, the enumeration ends. It is a very common bug to forget calling GetConsumingEnumerable methods and directly iterating through the collection. Even I made the same mistake when I was writing the below code. In that case, the loop finishes if there are no current element in the collection.

As we finish processing all elements of input collection, we can notify the consumers by calling CompleteAdding method of BlockingCollection<T>. The consumers waiting on the collection finish execution of the elements in the collection and complete iterating the collection. In the case of processing pipeline, we need to call the same operation on the output collection for the processing element as the next consecutive processing element is waiting for elements in this collection. In this way, the elements in the processing pipeline pass the signal for execution completion. We can return the resulting collection to the requester now. We need to wait until the last element in the processing pipeline finishes execution. Since all the processing elements are implemented through task we can just call Wait on the last task in the pipeline to wait for finishing the execution of processing pipeline.

Dataflow Block Processor
Microsoft introduced Tpl Data flow to make it easier to construct data flow pipelines and networks. This hasn't been released as part of .net framework yet but you can get it as a nuget package.

After installing the package you should see the following reference being added to assembly references section.

There are three main interfaces in TPL Dataflow library. They are ISourceBlock, ITargetBlock and IPropagatorBlock. All blocks are inherited from IDataflowBlock. This becomes very useful while chaining of different blocks to construct a processing pipeline. Any source block can be linked to another Data flow block. In case of multiple destination blocks, it can route message to whoever picks it first. This is common to producer / consumer scenario. It also supports Broadcast block which could route message to multiple destination blocks.

From the coding perspective, Dataflow blocks based approach seems to be the middle ground between Linear Process and Pipeline processors discussed above. We don't need to create separate tasks manually and use Blocking collection, so it is nearly as simple as linear processor. It supports producer / consumer scenario by incorporating internal input and output queues which makes pipelining possible. We can implement our dataflow pipeline as in the following diagram:

We need a combination block to keep adding the results in a collection. When the processing pipeline finishes execution, it just returns the consolidated results. Source and Propagation blocks are a lot like observable collections where the next element is passed to the recipient without the recipients needing to ask for this.

In the above code, we are constructing the whole pipeline in the constructor of the processor. If the data flow blocks are pure in nature, we can even do that in a static constructor keeping the blocks as class members. As depicted in the previous image, we have created and linked four blocks to square, offset, double and consolidate the results.

We also need to chain the completion signal. As the first block receives the completion signal, the message is propagated in the data flow pipeline. Calling CompleteAdding() on a dataflow block means that there are no more blocks expected. After finishing executing the already existing elements in the queue, it can pass the message to the next block in the pipeline (if configured). At the end, we are waiting so that the last element in the pipeline finishes processing by using IDataflowBlock.Completion . We are just returning the results after that.

Dataflow Blocks also support handling exceptions and cancellation during the block execution. Let's keep that for a later discussion.


No comments: