Overlapped Socket I/O for High Throughput

by nick Thu, March 12 2009 14:31

One of the features of overlapped I/O on Windows is the ability to post multiple requests ahead of time. This can be used to good effect on sockets you expect to receive a lot of data. In this case, you post multiple asynchronous receives ahead of time and the OS notifies you as each one completes. Because you posted more than one receive, the OS can be already filling the next buffer whilst you are processing the current one.

If however, you process the receipt notifications via an I/O completion port (IOCP) that is being serviced by some thread-pool (and this is considered the most efficient and scalable way to process the I/O on the Windows OS), there’s an interesting wrinkle that can spoil your day; although the OS will signal the I/O completion in the order you posted the original requests, the servicing of those notifications is subject to all the normal multi-threaded race-conditions you’d expect in an unconstrained thread-pool i.e. your code can process the notifications out-of-order.

For example, you post 4 asynchronous receives to a socket, whose completions will be signalled via an IOCP serviced, via GetQueuedCompletionStatus, by 8 threads (assume an 8-core box, so 8 threads is the ideal). At some point, enough data arrives on the socket to satisfy the first 2 of the 4 receives and so the OS dispatches 2 completion notifications to the IOCP. Assuming that there are at least 2 idle threads in the pool, each will be woken in turn to process a notification. But which one actually completes first is dependent upon other scheduling activity in the system and is therefore unpredictable. This is not just a problem at the native-code level. Asynchronous socket I/O under the CLR is also subject to the same inherent race conditions.

The following code shows how, by associating each pending receive with a single CCR Port, the complexities of dealing with multiple I/O notifications can be considerable reduced. The ports are held in a queue whose ordering represents the expected completion.

First, let’s define an iterator function that continually reads data from a socket, posting it to a specified port, by utilising a specified number of pre-issued receives. This internally uses a queue that holds a port for each outstanding I/O operation.

/// <summary>
/// Read data from a socket.
/// </summary>
/// <param name="socket">The socket</param>
/// <param name="numRecvs">The number of pending receives</param>
/// <param name="dataPort">The port where inbound data is enqueued</param>
/// <returns></returns>
IEnumerator<ITask> Read(Socket socket, int numRecvs, Port<byte[]> dataPort)
{
    // Create a queue of ports (QoP)
    var queue = new Queue<Port<SocketAsyncEventArgs>>(numRecvs);

Then, we’ll populate the queue. Each entry in the queue is a port that will receive a message when the corresponding I/O completes.

    // Populate the QoP
    for (int i = 0; i < numRecvs; ++i)
    {
        var port = new Port<SocketAsyncEventArgs>();
        var sae = new SocketAsyncEventArgs();
        var buf = new byte[1024];
        sae.SetBuffer(buf, 0, buf.Length);
        sae.Completed += (sender, args) => port.Post(sae);
        if (!socket.ReceiveAsync(sae))
            port.Post(sae);

        queue.Enqueue(port);
    }

For brevity I have just assigned a regular byte buffer to the operation - in performance-sensitive production code you would more likely use a portion of a much larger array, allocated from the large object heap to avoid the compaction issues of non-LOH objects under pinning.

And then, the internal processing itself. This just loops round, dequeueing the port associated with the next operation. The port is first tested for presence of the completion message. If it’s not there (yet) it yields to a one-time receiver that either (a) copies and forwards the received data before reissuing the receive and requeuing the port, or (b) disposes of the I/O operation and throws it away. Ultimately, once all the operations have failed, the loop exits.

    while (queue.Count > 0)
    {
        var port = queue.Dequeue();

        SocketAsyncEventArgs sae = null;
        if (!port.Test(out sae))
            yield return port.Receive(s => sae = s);

        if (sae.SocketError != SocketError.Success)
        {
            sae.Dispose();
            continue;
        }

        dataPort.Post(sae.Buffer.CopyBytes(0, sae.BytesTransferred));
        if (!socket.ReceiveAsync(sae))
            port.Post(sae);
        queue.Enqueue(port);
    }
}

For reference, the CopyBytes() operation is just the following extension method. The data has to be copied, because the buffer associated with the operation will be re-used when the operation is re-submitted.

static class ByteBufferExtensions
{
    public static byte[] CopyBytes(this byte[] src, int offset, int count)
    {
        var dst = new byte[count];
        Buffer.BlockCopy(src, offset, dst, 0, count);
        return dst;
    }
}

On a final note, this code could be slightly optimised by setting the PortMode property of each port to OptimizedSingleReissueReceiver.

Tags: