Moving On

by nick Tue, July 06 2010 12:49

After a number of very interesting years at Macquarie Bank, I have accepted a very exciting offer to join Microsoft as a Senior Software Development Engineer. Deciding to leave Sydney (and Australia) wasn’t an easy decision – it’s a wonderful place to work and, more importantly, to live – but life is about taking those opportunities to push yourself and expand your horizons.

So it’s with a little trepidation and a lot of excitement that the family unit (me,wife,4 kids,dog) will be heading to Redmond sometime in the next few months (visas permitting).

Tags:

CCR Lightning Talk

by nick Tue, June 29 2010 17:30

I delivered a 10 (well, maybe 11) minute lightning talk last night at the Sydney Alt.Net meeting. It was a lot of fun. The slides are here. There are some links here to various resources I mentioned during the talks and when chatting to people afterwards.

And also - this blog!.


Tags:

CCR | Programming Models

Finite State Machines in F#

by nick Sun, January 03 2010 13:39

Finite State Machines (FSMs) are a popular model of computation in a number of fields. Erlang has a behaviour for modelling FSMs that abstracts away the concurrency model and allows for the state-machine to be modelled as a sequence of pure functions that respond to events in given state with a sequence of actions and a new state. It’s straightforward to reproduce this in F#.

First we define a recursive data structure that captures a state transition:

type NextState<'s,'e> = NextState of ('s -> 'e -> NextState<'s,'e>) * 's

Here, the type variables ‘s and ‘e represent the possible states and possible events respectively. NextState is 2-tuple of a function and the current state. The function (‘s –> ‘e –> NextState<’s,’e>) takes a state and event and produces a new NextState. These functions represent the logical state of the FSM whereas the state variable represents the physical state of the FSM. For example, a traffic light may have the logical state Stop and the physical state { Red=true ; Amber=false ; Green=false }.

So far, so good. But we still need an active object / agent / actor to run the FSM for us. Here we can leverage F# mailboxes and write a generalised reactive loop that works with any sequence of functions that return a NextState<’s,’e>. The result of calling spawn is a function ‘e –> unit, that allows events to be triggered and dispatched to the FSM via the mailbox.

let spawn init = 
    MailboxProcessor.Start(fun inbox ->
        let rec loop ns =
            async {
                let! input = inbox.Receive()
                return! loop <| match ns with NextState(f,s) -> f s input
            }
        init() |> loop).Post

This is pretty much all we need at this point to model an FSM is a reasonably succinct way. As an exercise, let’s borrow from the example in the Erlang documentation:

A door with a code lock could be viewed as an FSM. Initially, the door is locked. Anytime someone presses a button, this generates an event. Depending on what buttons have been pressed before, the sequence so far may be correct, incomplete or wrong. If it is correct, the door is unlocked for 30 seconds (30000 ms). If it is incomplete, we wait for another button to be pressed. If it is is wrong, we start all over, waiting for a new button sequence.

First, let’s define the events that the door lock recognises along with its state.

type Event =
    | Key of char
    | Timeout

type State = {
    Code  : string
    SoFar : string option
}

Next, let’s write the function that determines whether the current state, which includes any digits entered so far, combined with another digit is enough to unlock the door. We can use an active pattern for this:

let (|Complete|Incomplete|Wrong|) (state,nextDigit) =
    let sofar' = 
        match state.SoFar with
        | Some digits -> String.Concat(digits, nextDigit.ToString())
        | None -> nextDigit.ToString()
    if state.Code = sofar' then
        Complete 
    elif sofar'.Length < state.Code.Length then
        Incomplete sofar'
    else
        Wrong

Finally, we can write the actual functions that move the door lock from state to state. We need a function that puts the door lock into a valid initial state and then a function for each of the valid states (locked or unlocked). Because functions are used to model the logical states, we have to use the let rec … and … syntax to declare these functions such that they can refer to each other in a mutually recursive manner. The only actions performed are those on a state change where a message is printed out.

let rec init code = 
    printfn "Initialised and locked."
    NextState (locked, { Code = code; SoFar = None })

and locked state = function
    | Key nextDigit ->
        match (state,nextDigit) with
        | Complete -> 
            printfn "Unlocked."
            NextState (unlocked, { state with SoFar = None })
        | Incomplete digits -> 
            NextState (locked, { state with SoFar = Some digits })
        | Wrong -> 
            NextState (locked, { state with SoFar = None })
    | Timeout ->
        NextState (locked, state)
        
and unlocked state = function
    | Key k ->
        NextState (unlocked, state)
    | Timeout ->
        printfn "Locked."
        NextState (locked, { state with SoFar = None })

We can run this altogether in FSI with a little test like this:

> let pid = spawn (fun() -> init "1234");;

val pid : (Event -> unit)

>
Initialised and locked.

> Key '1' |> pid;;
val it : unit = ()
> Key '2' |> pid;;
val it : unit = ()
> Key '3' |> pid;;
val it : unit = ()
> Key '4' |> pid;;
Unlocked.
val it : unit = ()
> Timeout |> pid;;
val it : unit = ()
>
Locked.

Conclusion

The model is quite a nice way to approach FSMs but there’s still a few deficiencies in this model compared to the Erlang support. There’s no support for a default handler that can process events in any state and nor is there any support for actually triggering a timeout. We’ll address these in a later post along with other more minor enhancements.

Tags:

FSpread

by nick Sun, May 31 2009 14:33
CP_banner_111x111_gen

I have recently been looking at the Spread Toolkit. I have written a set of bindings that allow F# clients to participate in a Spread network and posted them up to CodePlex. From the project site:

Spread is an open source toolkit that provides a high performance messaging service that is resilient to faults across local and wide area networks. It has a client-daemon architecture, with most of the functionality implemented in the Spread daemon and a simple tcp-based protocol for communication with clients.


By providing consistent, ordered and reliable membership messages to processes participating in a network, those processes can implement truly-distributed algorithms for fault-tolerance, load-balancing and leader-election, without further agreement/consensus protocols.

This project provides an F# implementation of the Spread client-side protocol, allowing F# projects to participate in a Spread network.

The sample projects provided include:

  1. A console application for joining and leaving groups, as well as sending messages to those groups.
  2. A server application that may have many instances, each agreeing on who is the leader, with the remaining replicating that state. Instances may come and go. The server demonstrates how a new leader may be elected in the event of the previous one failing. The server supports client subscriptions and load-balances these amongst the leader/replicas. Client subscriptions are migrated to new nodes in the event of node failures.
  3. A client application that subscribes the server application for its current state.

Tags:

Overlapped Socket I/O for High Throughput

by nick Thu, March 12 2009 14:31

One of the features of overlapped I/O on Windows is the ability to post multiple requests ahead of time. This can be used to good effect on sockets you expect to receive a lot of data. In this case, you post multiple asynchronous receives ahead of time and the OS notifies you as each one completes. Because you posted more than one receive, the OS can be already filling the next buffer whilst you are processing the current one.

If however, you process the receipt notifications via an I/O completion port (IOCP) that is being serviced by some thread-pool (and this is considered the most efficient and scalable way to process the I/O on the Windows OS), there’s an interesting wrinkle that can spoil your day; although the OS will signal the I/O completion in the order you posted the original requests, the servicing of those notifications is subject to all the normal multi-threaded race-conditions you’d expect in an unconstrained thread-pool i.e. your code can process the notifications out-of-order.

For example, you post 4 asynchronous receives to a socket, whose completions will be signalled via an IOCP serviced, via GetQueuedCompletionStatus, by 8 threads (assume an 8-core box, so 8 threads is the ideal). At some point, enough data arrives on the socket to satisfy the first 2 of the 4 receives and so the OS dispatches 2 completion notifications to the IOCP. Assuming that there are at least 2 idle threads in the pool, each will be woken in turn to process a notification. But which one actually completes first is dependent upon other scheduling activity in the system and is therefore unpredictable. This is not just a problem at the native-code level. Asynchronous socket I/O under the CLR is also subject to the same inherent race conditions.

The following code shows how, by associating each pending receive with a single CCR Port, the complexities of dealing with multiple I/O notifications can be considerable reduced. The ports are held in a queue whose ordering represents the expected completion.

First, let’s define an iterator function that continually reads data from a socket, posting it to a specified port, by utilising a specified number of pre-issued receives. This internally uses a queue that holds a port for each outstanding I/O operation.

/// <summary>
/// Read data from a socket.
/// </summary>
/// <param name="socket">The socket</param>
/// <param name="numRecvs">The number of pending receives</param>
/// <param name="dataPort">The port where inbound data is enqueued</param>
/// <returns></returns>
IEnumerator<ITask> Read(Socket socket, int numRecvs, Port<byte[]> dataPort)
{
    // Create a queue of ports (QoP)
    var queue = new Queue<Port<SocketAsyncEventArgs>>(numRecvs);

Then, we’ll populate the queue. Each entry in the queue is a port that will receive a message when the corresponding I/O completes.

    // Populate the QoP
    for (int i = 0; i < numRecvs; ++i)
    {
        var port = new Port<SocketAsyncEventArgs>();
        var sae = new SocketAsyncEventArgs();
        var buf = new byte[1024];
        sae.SetBuffer(buf, 0, buf.Length);
        sae.Completed += (sender, args) => port.Post(sae);
        if (!socket.ReceiveAsync(sae))
            port.Post(sae);

        queue.Enqueue(port);
    }

For brevity I have just assigned a regular byte buffer to the operation - in performance-sensitive production code you would more likely use a portion of a much larger array, allocated from the large object heap to avoid the compaction issues of non-LOH objects under pinning.

And then, the internal processing itself. This just loops round, dequeueing the port associated with the next operation. The port is first tested for presence of the completion message. If it’s not there (yet) it yields to a one-time receiver that either (a) copies and forwards the received data before reissuing the receive and requeuing the port, or (b) disposes of the I/O operation and throws it away. Ultimately, once all the operations have failed, the loop exits.

    while (queue.Count > 0)
    {
        var port = queue.Dequeue();

        SocketAsyncEventArgs sae = null;
        if (!port.Test(out sae))
            yield return port.Receive(s => sae = s);

        if (sae.SocketError != SocketError.Success)
        {
            sae.Dispose();
            continue;
        }

        dataPort.Post(sae.Buffer.CopyBytes(0, sae.BytesTransferred));
        if (!socket.ReceiveAsync(sae))
            port.Post(sae);
        queue.Enqueue(port);
    }
}

For reference, the CopyBytes() operation is just the following extension method. The data has to be copied, because the buffer associated with the operation will be re-used when the operation is re-submitted.

static class ByteBufferExtensions
{
    public static byte[] CopyBytes(this byte[] src, int offset, int count)
    {
        var dst = new byte[count];
        Buffer.BlockCopy(src, offset, dst, 0, count);
        return dst;
    }
}

On a final note, this code could be slightly optimised by setting the PortMode property of each port to OptimizedSingleReissueReceiver.

Tags:

AsyncLineReader

by nick Thu, February 26 2009 13:35

There was recently an interesting question on stackoverflow.com regarding the asynchronous processing of a proposed line-buffered network protocol. By line-buffered, I mean a protocol that does not transmit the length of the data up-front, but rather simply delimits each line with a newline ‘\n’ character. The question detailed a theoretical protocol, in which ‘commands’ were sent over the network via line-buffering. Each command contained some lines of data and was delimited by two special ‘marker’ lines i.e.

COMMAND
line 1 of command data
line 2 of command data

line n of command data
END COMMAND

I thought that solving this via CCR would be an interesting problem. The full source code for this example can be found at the end of this post and requires Microsoft Robotics Developer Studio Express Edition.

Leaving aside the asynchronous requirements, consider the basic processing steps for our command protocol.

Processing pipeline for command protocol

The steps themselves are straightforward. When going from bytes to chars, we’ll assume that a straight ANSI-to-Unicode translation will suffice. Then we need to delimit our char buffers into lines. Each char buffer will contain zero or more newline characters and our framing must remember incomplete (or partial) buffers. Once we have lines, we discard any input until COMMAND arrives, then we gather the lines of command data until we detect END COMMAND.

Let’s start at the top, with a simple non-blocking loop that prints the data associated with each arriving command. This loop, like the others presented here is implemented using CCR iterators. In our case, our handler just dumps the data out to the console.

IEnumerator<ITask> ProcessCommands(Socket client)
{
    var endPoint = client.RemoteEndPoint;
    var commandPort = new Port<List<string>>();
    var donePort = TaskQueue.RunTask(
        new IterativeTask<Socket, Port<List<string>>>(client, commandPort, ProcessLines));
    var done = false;
    while (!done)
    {
        yield return Arbiter.Choice(
            commandPort.Receive(commandLines =>
            {
                Console.WriteLine("Received from " + endPoint + ":");
                commandLines.ForEach(Console.WriteLine);
            }),
            donePort.Receive(ignored => done = true));
    }
}

The key to this routine (and to the others that follow) are the two ports created at the top. The commandPort is a port through which our routine will receive completed command data. This port links our routine to ProcessLines, which is a subroutine spawned to gather lines into commands. The second port, donePort will provide a signal when ProcessLines is done. Note that ProcessLines does not explicitly post to this port – the CCR will automatically do this when the subroutine completes. I have used a small extension method to make the syntax of spawning these sub-tasks a little simpler:

static class TaskQueueExtensions
{
    public static Port<EmptyValue> RunTask(this DispatcherQueue taskQueue, ITask task)
    {
        var donePort = new Port<EmptyValue>();
        Arbiter.ExecuteToCompletion(taskQueue, task, donePort);
        return donePort;
    }
}

Finally, the main body of our routine, just sits in a loop, processing the commands until the signal from the donePort is received.

ProcessLines follows the same pattern of spawning a sub-task (in this case, ProcessChars) and using two ports, one to receive input from the sub-task and one to detect completion of the sub-task. The main body of ProcessLines demonstrates how easy it is to write non-blocking sequential logic over our simple command protocol using iterators. Note that the signature of this routine includes the port passed from the parent routine.

For the purposes of the example, we just ignore any input prior to receiving COMMAND.

IEnumerator<ITask> ProcessLines(Socket client, Port<List<string>> commandPort)
{
    var done = false;
    var linePort = new Port<string>();
    var donePort = TaskQueue.RunTask(
        new IterativeTask<Socket, Port<string>>(client, linePort, ProcessChars));
    while (!done)
    {
        bool fCommandStarted = false;
        yield return Arbiter.Choice(
            linePort.Receive(line => fCommandStarted = line == "COMMAND"),
            donePort.Receive(ignored => done = true));
        if (fCommandStarted)
        {
            var commandLines = new List<string>();
            bool fCommandEnded = false;
            while (!done && !fCommandEnded)
            {
                yield return Arbiter.Choice(
                    linePort.Receive(line =>
                    {
                        if (line == "END COMMAND")
                            commandPort.Post(commandLines);
                        else
                            commandLines.Add(line);
                    }),
                    donePort.Receive(ignored => done = true));
            }
        }
    }
}

ProcessChars follows the same pattern again. It turns consecutive chunks of char buffers (posted from subroutine ProcessBytes) into sequences of lines. For clarity, I’ve moved the actual framing code into a lambda expression assigned to framer. This routine will append chars to the string buffer until it encounters a newline, at which point it will post the contents of the buffer and reset it.

IEnumerator<ITask> ProcessChars(Socket client, Port<string> linePort)
{
    var done = false;
    var charBufferPort = new Port<char[]>();
    var donePort = TaskQueue.RunTask(
        new IterativeTask<Socket, Port<char[]>>(client, charBufferPort, ProcessBytes));
    var builder = new StringBuilder();
    var framer = new Handler<char[]>(buffer =>
    {
        foreach (var ch in buffer)
        {
            if (ch != '\r' && ch != '\n')
            {
                builder.Append(ch);
                continue;
            }
            if (builder.Length > 0)
            {
                linePort.Post(builder.ToString());
                builder = new StringBuilder();
            }
        }
    });
    while (!done)
    {
        yield return Arbiter.Choice(
            charBufferPort.Receive(framer),
            donePort.Receive(ignored => done = true));
    }
}

And finally, ProcessBytes does the actual reading off the socket, converting the ASCII byte buffers into Unicode char buffers and posting back the results. It just loops round until a zero-byte receive is detected and then it exits. For significant numbers of connections, I wouldn’t recommend a Gen-0 allocated byte-buffer, but for the purposes of this exercise it’s fine.

IEnumerator<ITask> ProcessBytes(Socket client, Port<char[]> charBufferPort)
{
    var iarPort = new Port<IAsyncResult>();
    var buffer = new byte[4096];
    var done = false;
    do
    {
        client.BeginReceive(buffer, 0, buffer.Length, SocketFlags.None, iarPort.Post, null);
        yield return iarPort.Receive(iar =>
        {
            var rcvd = client.EndReceive(iar);
            if (rcvd > 0)
                charBufferPort.Post(Encoding.ASCII.GetChars(buffer, 0, rcvd));
            else
                done = true;
        });
    } while (!done);
}

So, what do we have so far? Well, we have truly asynchronous processing in these routines and none of them block any thread if they have nothing to do. In addition, each routine runs concurrently with the others so the pipeline is parallelised. This means that if, for example, a connection sent through commands in quick succession, each stage of the pipeline could be working on the current packet, whilst the previous stage processed the next packet (assuming a 4-core box for the four stages).

But despite our non-blocking, thread-efficient, asynchronous, (reasonably) scalable solution, the most satisfying aspect is that the CCR iterator support enabled us to write the pipeline stages as fairly intuitive pieces of sequential, looping logic.

And finally…

What about exception handling? What if the socket connection is rudely aborted? How would such an exception be propagated through the pipeline? Where would the socket be cleaned up? What if an exception occurred in one of the intermediate stages of the pipeline – how do we clean-up then?

This is where CCR causalities really shine. The following is the section of code from the full sample, that deals with the newly accepted socket and spawns the original ProcessCommands task.

var client = server.EndAccept(iar);
var endPoint = client.RemoteEndPoint;

var errorPort = new Port<Exception>();
Dispatcher.AddCausality(new Causality(endPoint.ToString(), errorPort));

Console.WriteLine("{0} connected.", endPoint);
var donePort = TaskQueue.RunTask(new IterativeTask<Socket>(client, ProcessCommands));

Activate(errorPort.Receive(error =>
{
    Console.ForegroundColor = ConsoleColor.Red;
    Console.WriteLine(error);
    Console.ResetColor();
}));

Activate(donePort.Receive(ignored =>
{
    using (client)
    {
        Console.WriteLine("{0} disconnected.", endPoint);
    }
}));

The steps are quite simple. We create and activate a new causality with our errorPort attached. This causality will flow with any spawned tasks or posted messages. We then spawn ProcessCommands, which as we have seen, itself spawns sub-tasks and so on.

Then we activate a receiver should an unhandled exception be detected by the causality (I’ve deliberately not done any explicit exception handling code in the pipeline stages). This receiver just dumps the exception to the console. Finally we activate a receiver that gets triggered on the donePort, which will receive a signal when ProcessCommands exits. This writes a disconnection message and closes the socket.

Now, if the socket is rudely disconnected, the EndReceive will throw a SocketException. This will cause two things to happen. First, the exception will caught by the causality and posted to our errorPort, where it will be dumped to the console. Second, the exception has caused ProcessBytes to exit, so that triggers a cascading set of donePort posts, as each pipeline stage detects the exit of its prior stage. Ultimately that causes ProcessCommands to exit, our donePort to receive a signal, and then we close the socket.

What if an intermediate stage fails, say ProcessChars? Well, just as before, we’ll get the exception through the causality so we can dump that. And ultimately, ProcessCommands will still exit because its previous pipeline stages have also exited. But at this point, we are left with ProcessBytes still spinning on a connected socket, posting char buffers to a port no-one (i.e. ProcessChars) is reading any more.

But this won’t be a problem (thankfully). When ProcessCommands exits, and our donePort signals, we’ll close the socket anyway. This will likely cause ProcessBytes to exit via an exception. The causality will catch this exception and post it to our errorPort, but our one-shot errorPort handler has already fired and won’t fire again. And the donePort signal raised by ProcessBytes now exiting won’t go anywhere either because the handler of that signal, ProcessChars, has already gone away.

And really finally…

It was an interesting exercise to tackle this particular problem and I think it shows off iterators and causalities rather well. There’s still room for further enhancements, e.g. handling receive timeouts at the command level, which time-permitting, I’ll look at in a future post.

Tags:

Stefan Tilkov on REST

by nick Sat, January 31 2009 13:30

An interesting presentation by Stefan Tilkov on REST and its principles.

I find that many people equate SOA with web-services and in turn, web-services with scalability and flexibility, because the web is scalable and flexible right?

The web really fired the debate about 'stateless' architectures being the key to scalability (actually this is the 'web' taking accolades that strictly belong to http but never mind). This is not so surprising - in the enterprise space, there are many problems of scale and in those enterprises, people's minds are often focused on what they can do to improve it. Http seemed a straightforward way to address this. Hence WS-*. In my opinion though, the scalability of some arbitrary system is a largely an issue for that system internally - it's entirely possible to build 'scalable-enough' systems without Fielding's thesis on your desk.. What often matters more, but is not addressed as aggressively, is the composition of these systems to form more useful composite systems.

The true benefit of a restful approach (for me) is the architectural style that deliberately defines a small number of verbs and a strict model for the use of those verbs. This seems to me to capture the essence of what is required for composition of services, particularly that 80% of composition which is unforseen. The difficulty of post-facto service composition is often what really hinders the emergence of real enterprise architecture and in turn why systems-integration (which is what much of enterprise development often really reduces to) can be such a fraught exercise. Using WS-* to wire your applications together is much like defining standards for electricity cabling and then requiring a unique plug-socket at the end of each individual cable.

Tags:

CCR Article on InfoQ.com

by nick Thu, January 29 2009 13:29

I have posted an introductory article on the CCR at infoq.com.

Tags:

George Chrysanthakopoulos on Channel 9

by nick Sun, January 18 2009 13:29

George talks to Erik Meijer, about coordination and (incidentally) concurrency.

Even if you’ve not used the CCR, it’s worth a watch. If his passion for an alternative to the existing mainstream programming models doesn’t at least raise your level of curiosity, then nothing will.

Tags:

IAsyncResult and WaitHandle lifetime

by nick Wed, January 07 2009 13:28

The BeginXXX/EndXXX asynchronous invocation pattern in .NET has BeginXXX return an IAsyncResult. In the past, I’ve not normally done anything with this, rather preferring to use the one passed into the callback e.g.

    var cmd = new SqlCommand();
    // Initialise the command... 
    cmd.BeginExecuteReader(iar =>
        {
            try
            {
                using (var rdr = cmd.EndExecuteReader(iar))
                {
                    // Deal with the results...
                }
            }
            catch(Exception e)
            {
                // Deal with the error...
            }
        }, null);

Surprisingly, this code has a subtle leak (of sorts). And it’s to do with a property of the IAsyncResult that’s not even used, the AsyncWaitHandle. Now this is normally only used if you really need to block to wait for the outcome of the operation, e.g.

    var conn = new SqlConnection(connstr);
    var cmd = new SqlCommand();
    // Initialise the command... 
    var iar = cmd.BeginExecuteReader();
    iar.AsyncWaitHandle.WaitOne();
    try
    {
        using (var rdr = cmd.EndExecuteReader(iar))
        {
            // Deal with the results...
        }
    }
    catch(Exception e)
    {
        // Deal with the error...
    }

The WaitHandle is really just a thin veneer a native Windows HANDLE, so when is it actually created and subsequently cleaned up? Well, unfortunately, it depends. The documentation advises implementers of IAsyncResult of two things

  1. The object that implements IAsyncResult does not need to create the WaitHandle until the AsyncWaitHandle property is read.
  2. (the) AsyncWaitHandle should be kept alive until the user calls the method that concludes the asynchronous operation. At that time the object behind AsyncWaitHandle can be discarded.

The italics are mine; what this means in practice is that you have to determine what your exact flavour of BeginXXX/EndXXX actually returns.

For example, the System.Data.SqlClient async operations return an IAsyncResult that both

  1. Eagerly creates the WaitHandle (regardless of whether you ever use it)
  2. Doesn’t actually dispose of the WaitHandle when EndXXX is called.

This means that, even if you use the first of the above two usage patterns and never explicitly reference the WaitHandle, at completion of the operation, the handle remains open until collected. But this could be a while, especially if the IAsyncResult was suspended long enough to get promoted through a GC generation or two. And if you hammer your database sufficiently, you’ll see a significant spike in the number of open handles in your process. You also incur the overhead of having the WaitHandle itself placed on the finalization queue.

In order to follow Microsoft’s own advice about eagerly disposing of unmanaged resources, we need to amend the first sample:

    cmd.BeginExecuteReader(iar =>
        {
            using (iar.AsyncWaitHandle)
            {
                try
                {
                    using (var rdr = cmd.EndExecuteReader(iar))
                    {
                        // Deal with the results...
                    }
                }
                catch (Exception e)
                {
                    // Deal with the error...
                }
            }
        }, null);

Extension methods can be used to reduce clutter and increase modularity:

    public static class SqlCommandExtensions
    {
        public static void BeginSafeExecuteReader(this SqlCommand cmd, AsyncCallback callback)
        {
            cmd.BeginExecuteReader(iar =>
                {
                    using (iar.AsyncWaitHandle)
                    {
                        callback(iar);
                    }
                }, null);
        }
    }

Which allows a solution very close to the original, but with deterministic disposal of the WaitHandle:

    cmd.BeginSafeExecuteReader(iar => 
        {
            try
            {
                using (var rdr = cmd.EndExecuteReader(iar))
                {
                    // Deal with the results...
                }
            }
            catch (Exception e)
            {
                // Deal with the error...
            }
        });

(By the way, the WaitHandle in the IAsyncResult returned from methods in the System.Net namespace is both lazily created and cleaned up on EndXXX, which means none of the above is necessary for async operations in that particular namespace).

Tags: