The Monadic Task Queue

by nick Tue, December 23 2008 13:27

With apologies to Greg Neverov, whose fascinating article I have borrowed heavily from and probably butchered quite badly in the process…

Introduction

In a previous life, I presented the outline of an approach for consuming the CCR from within F# asynchronous workflows. It works okay, but I was unhappy with one particular aspect of it – the removal of the task queue concept from the wrapper. The task queue (or dispatcher queue) is an important part of the CCR fabric and provides for the dispatcher to round-robin amongst multiple task queues, preventing some task starvation that the CLR thread-pool is subject to. Additionally, the CcrServiceBase class allows for a derived class to declare the task queue on construction, pass it to the base and then effectively hide it from normal arbiter activation, which cuts down considerably on typing.

I wanted a way to retain the task queue concept, but also to eliminate it’s explicit use from much of the machinery within the F# wrapper. Anyway, whilst trying to get my head round monads/computation expressions/workflows, it (very) slowly dawned on me that amongst other things, monads are a useful way of threading some state through a program without having to explicitly declare and use it at every turn. In this case, that state is the task queue itself.

A Simple Example

I’ll go into some of the gory details a bit later, but the essence of the workflow support is that you can write the arbitrations without referencing the task queue directly and then run the computation ‘over’ the task queue. Here is a very simple server, that listens for a string or a stop message, and prints the string or stops on receipt. Note, that just like asynchronous workflows and existing CCR patterns, no thread is blocked whilst waiting for a message.

First let’s define our possible inputs to the server:

type Message =
| Input of string
| Stop

Now let’s define the server itself. It’s just a function that loops round, listening on a port for a message:

let server port =
let rec loop =
ccr {
let! msg = Receive.from port
match msg with
| Input s ->
printfn "%s" s
do! loop
| Stop ->
printfn "done"
}
loop

For those unfamiliar with F# computation expressions, the magic is in ‘ccr’, ‘let!’ and ‘do!’ constructs. The essence of this is that the ‘ccr’ bit is an instance of a type (known as a builder), that supports the rewriting of this computation as (in this case) regular continuation passing, and the ‘let!’ and ‘do!’ keywords are a signal to the F# compiler to enlist the help of the builder at the correct points in the computation.

Now let’s create a CCR dispatcher and task queue - together these will provide the context under which our server will run - and a port so we can send it messages.

let dispatcher = new Dispatcher( Options = DispatcherOptions.UseBackgroundThreads )
let taskQueue = new DispatcherQueue("test", dispatcher)
let msgQueue = new Port<Message>()

Now, we can launch the server.

server msgQueue |> run taskQueue

‘run’ is another function that takes the task queue and threads it through to the workflow, effectively bootstrapping it. Now we can send messages to our server

msgQueue.Post (Input "Hello")
msgQueue.Post (Input "World")
msgQueue.Post (Stop)

Other Arbitrations

Before I go into the mechanism itself, let’s look at some of the other arbitrations that the CCR supports and how nicely F# allows us to express them within the workflow concept.

Possibly the simplest variation on the simple Receive above, is one adjusted for a timeout. In this case we return an option of the port type, using None to indicate the timeout.
        ccr {
let! r = Receive.from (port,5000)
match r with
| Some msg ->
match msg with
| Input s ->
printfn "%s" s
do! loop
| Stop ->
printfn "done"
| None ->
printfn "Timed out"
}

Next, a simple two-port choice, where we can receive a string from one port and an int from an other, using the in-built F# Choice union-type. The timeout version follows the same pattern as above and is not shown here.

        ccr {
let! msg = Receive.from_either (p1,p2)
match msg with
| Choice2_1 s ->
printfn "%s" s
| Choice2_2 i ->
printfn "%i" i
do! loop
}

And a two-port join, this time using a tuple.

        ccr {
let! s,i = Receive.from_both (p1,p2)
printfn "%s, %i" s i
do! loop
}

It’s fairly trivial to add support for most of the other arbitrations, including the various flavours of MultipleItemReceive. The n-item Choice doesn’t fit so well with the statically typed F# Choice type, because the latter only supports up to 7 possible choices, but this would normally be enough. And the subtleties of Interleave can’t easily be replicated in this model, particularly the concurrent receiver groups and the round-robin behaviour of the interleave, but we will attempt to tackle these problems at a later date.

You’ll notice that iterators disappear completely – the F# syntax gives us a way to structure our data dependencies in a different way.

The Implementation

You can find the full .fs file here, but the the basics are as follows:

First, we define a type that will describe the task within the computation workflow.

type CcrTask<'a> = CcrTask of (DispatcherQueue * ('a -> unit) * (exn -> unit) -> unit)
type Result<'a> = Success of 'a | Error of exn

‘CcrTask’ is just a function that takes a task queue and two continuations, one for the successful case and one for the failure case, and returns nothing. ‘Result’ is a type to help us deal with the result of an invocation.

Some of the underlying monadic operations that the builder will require to support those constructs such as ‘let!’ and ‘do!’.

let mreturn x = CcrTask (fun (q, ck, ek) -> ck x)

let apply (CcrTask m) (q, ck, ek) = m (q, ck, ek)

let tryApply f x (q, ck, ek )=
match (try Success (f x) with e -> Error e) with
| Success m -> apply m (q, ck, ek)
| Error e -> ek e

let bind m f = CcrTask (fun (q, ck, ek) ->
apply m (q, (fun x -> tryApply f x (q, ck, ek)), ek))

Our ‘run’ operation that bootstraps the workflow:

let run q m = 
apply m (q, (fun x -> ()), (fun x -> ()))

And our definition and declaration of the builder that assists the compiler in rewriting the computation as we require:

type Builder() =
member b.Return(x) = mreturn x
member b.Bind(m, k) = bind m k
member b.TryWith(m, cth) = tryWith m cth
member b.TryFinally(m, fin) = tryFinally m (liftM fin (mreturn ()))

member b.Let(x, f) = CcrTask (fun args -> apply (f x) args)
member b.Delay(f) = CcrTask (fun args -> apply (f ()) args)
member b.Zero() = mreturn ()
member b.Combine(m1, m2) = bind m1 (fun () -> m2)

let ccr = new Builder()

And finally a type with various overloads to support the various receiver arbitrations. I have included the receive and its timeout variant here. The other arbiters and some helper functions are available in the download.

type Receive() =

[<OverloadID("from")>]
static member from port =
CcrTask (fun (q, ck, ek) -> receive(port, ck) |> activate q)

[<OverloadID("from_with_timeout_ms")>]
static member from (port, timeout) =
CcrTask (fun (q, ck, ek) ->
let tp = new Port<DateTime>()
q.EnqueueTimer(TimeSpan.FromMilliseconds(float timeout), tp)
[|
receive (port, fun m -> ck (Some m));
receive (tp, fun _ -> ck None)
|]
|> choice
|> activate q)

Conclusion

Through computation expressions, F# allows much of the power of the CCR to be expressed more succinctly than in C#. In future articles, I’ll look at ways of structuring actor-oriented programs using these two technologies.

Tags: