There was recently an interesting question on stackoverflow.com regarding the asynchronous processing of a proposed line-buffered network protocol. By line-buffered, I mean a protocol that does not transmit the length of the data up-front, but rather simply delimits each line with a newline ‘\n’ character. The question detailed a theoretical protocol, in which ‘commands’ were sent over the network via line-buffering. Each command contained some lines of data and was delimited by two special ‘marker’ lines i.e.
COMMAND
line 1 of command data
line 2 of command data
…
line n of command data
END COMMAND
I thought that solving this via CCR would be an interesting problem. The full source code for this example can be found at the end of this post and requires Microsoft Robotics Developer Studio Express Edition.
Leaving aside the asynchronous requirements, consider the basic processing steps for our command protocol.

The steps themselves are straightforward. When going from bytes to chars, we’ll assume that a straight ANSI-to-Unicode translation will suffice. Then we need to delimit our char buffers into lines. Each char buffer will contain zero or more newline characters and our framing must remember incomplete (or partial) buffers. Once we have lines, we discard any input until COMMAND arrives, then we gather the lines of command data until we detect END COMMAND.
Let’s start at the top, with a simple non-blocking loop that prints the data associated with each arriving command. This loop, like the others presented here is implemented using CCR iterators. In our case, our handler just dumps the data out to the console.
IEnumerator<ITask> ProcessCommands(Socket client)
{
var endPoint = client.RemoteEndPoint;
var commandPort = new Port<List<string>>();
var donePort = TaskQueue.RunTask(
new IterativeTask<Socket, Port<List<string>>>(client, commandPort, ProcessLines));
var done = false;
while (!done)
{
yield return Arbiter.Choice(
commandPort.Receive(commandLines =>
{
Console.WriteLine("Received from " + endPoint + ":");
commandLines.ForEach(Console.WriteLine);
}),
donePort.Receive(ignored => done = true));
}
}
The key to this routine (and to the others that follow) are the two ports created at the top. The commandPort is a port through which our routine will receive completed command data. This port links our routine to ProcessLines, which is a subroutine spawned to gather lines into commands. The second port, donePort will provide a signal when ProcessLines is done. Note that ProcessLines does not explicitly post to this port – the CCR will automatically do this when the subroutine completes. I have used a small extension method to make the syntax of spawning these sub-tasks a little simpler:
static class TaskQueueExtensions
{
public static Port<EmptyValue> RunTask(this DispatcherQueue taskQueue, ITask task)
{
var donePort = new Port<EmptyValue>();
Arbiter.ExecuteToCompletion(taskQueue, task, donePort);
return donePort;
}
}
Finally, the main body of our routine, just sits in a loop, processing the commands until the signal from the donePort is received.
ProcessLines follows the same pattern of spawning a sub-task (in this case, ProcessChars) and using two ports, one to receive input from the sub-task and one to detect completion of the sub-task. The main body of ProcessLines demonstrates how easy it is to write non-blocking sequential logic over our simple command protocol using iterators. Note that the signature of this routine includes the port passed from the parent routine.
For the purposes of the example, we just ignore any input prior to receiving COMMAND.
IEnumerator<ITask> ProcessLines(Socket client, Port<List<string>> commandPort)
{
var done = false;
var linePort = new Port<string>();
var donePort = TaskQueue.RunTask(
new IterativeTask<Socket, Port<string>>(client, linePort, ProcessChars));
while (!done)
{
bool fCommandStarted = false;
yield return Arbiter.Choice(
linePort.Receive(line => fCommandStarted = line == "COMMAND"),
donePort.Receive(ignored => done = true));
if (fCommandStarted)
{
var commandLines = new List<string>();
bool fCommandEnded = false;
while (!done && !fCommandEnded)
{
yield return Arbiter.Choice(
linePort.Receive(line =>
{
if (line == "END COMMAND")
commandPort.Post(commandLines);
else
commandLines.Add(line);
}),
donePort.Receive(ignored => done = true));
}
}
}
}
ProcessChars follows the same pattern again. It turns consecutive chunks of char buffers (posted from subroutine ProcessBytes) into sequences of lines. For clarity, I’ve moved the actual framing code into a lambda expression assigned to framer. This routine will append chars to the string buffer until it encounters a newline, at which point it will post the contents of the buffer and reset it.
IEnumerator<ITask> ProcessChars(Socket client, Port<string> linePort)
{
var done = false;
var charBufferPort = new Port<char[]>();
var donePort = TaskQueue.RunTask(
new IterativeTask<Socket, Port<char[]>>(client, charBufferPort, ProcessBytes));
var builder = new StringBuilder();
var framer = new Handler<char[]>(buffer =>
{
foreach (var ch in buffer)
{
if (ch != '\r' && ch != '\n')
{
builder.Append(ch);
continue;
}
if (builder.Length > 0)
{
linePort.Post(builder.ToString());
builder = new StringBuilder();
}
}
});
while (!done)
{
yield return Arbiter.Choice(
charBufferPort.Receive(framer),
donePort.Receive(ignored => done = true));
}
}
And finally, ProcessBytes does the actual reading off the socket, converting the ASCII byte buffers into Unicode char buffers and posting back the results. It just loops round until a zero-byte receive is detected and then it exits. For significant numbers of connections, I wouldn’t recommend a Gen-0 allocated byte-buffer, but for the purposes of this exercise it’s fine.
IEnumerator<ITask> ProcessBytes(Socket client, Port<char[]> charBufferPort)
{
var iarPort = new Port<IAsyncResult>();
var buffer = new byte[4096];
var done = false;
do
{
client.BeginReceive(buffer, 0, buffer.Length, SocketFlags.None, iarPort.Post, null);
yield return iarPort.Receive(iar =>
{
var rcvd = client.EndReceive(iar);
if (rcvd > 0)
charBufferPort.Post(Encoding.ASCII.GetChars(buffer, 0, rcvd));
else
done = true;
});
} while (!done);
}
So, what do we have so far? Well, we have truly asynchronous processing in these routines and none of them block any thread if they have nothing to do. In addition, each routine runs concurrently with the others so the pipeline is parallelised. This means that if, for example, a connection sent through commands in quick succession, each stage of the pipeline could be working on the current packet, whilst the previous stage processed the next packet (assuming a 4-core box for the four stages).
But despite our non-blocking, thread-efficient, asynchronous, (reasonably) scalable solution, the most satisfying aspect is that the CCR iterator support enabled us to write the pipeline stages as fairly intuitive pieces of sequential, looping logic.
And finally…
What about exception handling? What if the socket connection is rudely aborted? How would such an exception be propagated through the pipeline? Where would the socket be cleaned up? What if an exception occurred in one of the intermediate stages of the pipeline – how do we clean-up then?
This is where CCR causalities really shine. The following is the section of code from the full sample, that deals with the newly accepted socket and spawns the original ProcessCommands task.
var client = server.EndAccept(iar);
var endPoint = client.RemoteEndPoint;
var errorPort = new Port<Exception>();
Dispatcher.AddCausality(new Causality(endPoint.ToString(), errorPort));
Console.WriteLine("{0} connected.", endPoint);
var donePort = TaskQueue.RunTask(new IterativeTask<Socket>(client, ProcessCommands));
Activate(errorPort.Receive(error =>
{
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine(error);
Console.ResetColor();
}));
Activate(donePort.Receive(ignored =>
{
using (client)
{
Console.WriteLine("{0} disconnected.", endPoint);
}
}));
The steps are quite simple. We create and activate a new causality with our errorPort attached. This causality will flow with any spawned tasks or posted messages. We then spawn ProcessCommands, which as we have seen, itself spawns sub-tasks and so on.
Then we activate a receiver should an unhandled exception be detected by the causality (I’ve deliberately not done any explicit exception handling code in the pipeline stages). This receiver just dumps the exception to the console. Finally we activate a receiver that gets triggered on the donePort, which will receive a signal when ProcessCommands exits. This writes a disconnection message and closes the socket.
Now, if the socket is rudely disconnected, the EndReceive will throw a SocketException. This will cause two things to happen. First, the exception will caught by the causality and posted to our errorPort, where it will be dumped to the console. Second, the exception has caused ProcessBytes to exit, so that triggers a cascading set of donePort posts, as each pipeline stage detects the exit of its prior stage. Ultimately that causes ProcessCommands to exit, our donePort to receive a signal, and then we close the socket.
What if an intermediate stage fails, say ProcessChars? Well, just as before, we’ll get the exception through the causality so we can dump that. And ultimately, ProcessCommands will still exit because its previous pipeline stages have also exited. But at this point, we are left with ProcessBytes still spinning on a connected socket, posting char buffers to a port no-one (i.e. ProcessChars) is reading any more.
But this won’t be a problem (thankfully). When ProcessCommands exits, and our donePort signals, we’ll close the socket anyway. This will likely cause ProcessBytes to exit via an exception. The causality will catch this exception and post it to our errorPort, but our one-shot errorPort handler has already fired and won’t fire again. And the donePort signal raised by ProcessBytes now exiting won’t go anywhere either because the handler of that signal, ProcessChars, has already gone away.
And really finally…
It was an interesting exercise to tackle this particular problem and I think it shows off iterators and causalities rather well. There’s still room for further enhancements, e.g. handling receive timeouts at the command level, which time-permitting, I’ll look at in a future post.