Ayende @ Rahien

It's a girl

Message passing, performance

I got some replies about the async event loop post, mentioning LMAX Disruptor and performance. I decided to see for myself what the fuss was all about.

You can read about the LMAX Disruptor, but basically, it is a very fast single process messaging library.

I wondered what that meant, so I wrote my own messaging library:

public class Bus<T>
{
Queue<T> q = new Queue<T>();

public void Enqueue(T msg)
{
lock (q)
{
q.Enqueue(msg);
}
}

public bool TryDequeue(out T msg)
{
lock (q)
{
if (q.Count == 0)
{
msg = default(T);
return false;
}
msg = q.Dequeue();
return true;
}
}
}

I think that you’ll agree that this is a thing of beauty and elegant coding. I then tested this with the following code:

public static void Read(Bus<string> bus)
{
int count = 0;
var sp = Stopwatch.StartNew();
while (sp.Elapsed.TotalSeconds < 10)
{
string msg;
while (bus.TryDequeue(out msg))
{
count++;
}
}
sp.Stop();

Console.WriteLine("{0:#,#;;0} msgs in {1} for {2:#,#} ops/sec", count, sp.Elapsed, (count / sp.Elapsed.TotalSeconds));
}

public static void Send(Bus<string> bus)
{
var sp = Stopwatch.StartNew();
while (sp.Elapsed.TotalSeconds < 10)
{
for (int i = 0; i < 1000; i++)
{
bus.Enqueue("test");
}
}
}

var bus = new Bus<string>();

ThreadPool.QueueUserWorkItem(state => Send(bus));

ThreadPool.QueueUserWorkItem(state => Read(bus));

The result of this code?

145,271,000 msgs in 00:00:10.4597977 for 13,888,510 ops/sec

Now, what happens when we use the DataFlow’s BufferBlock as the bus?

public static async Task ReadAsync(BufferBlock<string> bus)
{
int count = 0;
var sp = Stopwatch.StartNew();
while (sp.Elapsed.TotalSeconds < 10)
{
try
{
await bus.ReceiveAsync(TimeSpan.FromMilliseconds(5));
count++;
}
catch (TaskCanceledException e)
{
}
}
sp.Stop();

Console.WriteLine("{0:#,#;;0} msgs in {1} for {2:#,#} ops/sec", count, sp.Elapsed, (count / sp.Elapsed.TotalSeconds));
}

public static async Task SendAsync(BufferBlock<string> bus)
{
var sp = Stopwatch.StartNew();
while (sp.Elapsed.TotalSeconds < 10)
{
for (int i = 0; i < 1000; i++)
{
await bus.SendAsync("test");
}
}
}

What we get is:

43,268,149 msgs in 00:00:10 for 4,326,815 ops/sec.

I then decided to check what happens with the .NET port of the LMAX Disruptor. Here is the code:

public class Holder
{
public string Val;
}

internal class CounterHandler : IEventHandler<Holder>
{
public int Count;
public void OnNext(Holder data, long sequence, bool endOfBatch)
{
Count++;
}
}

static void Main(string[] args)
{
var disruptor = new Disruptor.Dsl.Disruptor<Holder>(() => new Holder(), 1024, TaskScheduler.Default);
var counterHandler = new CounterHandler();
disruptor.HandleEventsWith(counterHandler);

var ringBuffer = disruptor.Start();


var sp = Stopwatch.StartNew();
while (sp.Elapsed.TotalSeconds < 10)
{
for (var i = 0; i < 1000; i++)
{
long sequenceNo = ringBuffer.Next();

ringBuffer[sequenceNo].Val = "test";

ringBuffer.Publish(sequenceNo);
}
}
Console.WriteLine("{0:#,#;;0} msgs in {1} for {2:#,#} ops/sec", counterHandler.Count, sp.Elapsed, (counterHandler.Count / sp.Elapsed.TotalSeconds));
}

And the resulting performance is:

29,791,996 msgs in 00:00:10.0003334 for 2,979,100 ops/sec

Now, I’ll be the first to agree that this is really and absolutely not even close to be a fair benchmark. It is testing wildly different things. Distruptor is using a ring buffer, and the BlockBuffer didn’t, and the original Bus implementation just used an unbounded queue.

But that is a very telling benchmark as well. Pretty much because it doesn’t matter. What I need this for is for network protocol handling. As such, even assuming that every single byte is a message, we would have to go far beyond what any reasonable pipe can be expected to be handle.

Comments

Romain Verdier
07/14/2014 09:56 AM by
Romain Verdier

Just a remark: instanciating the Disruptor with a SingleThreadedClaimStrategy and a YieldingWaitStrategy gives me the following results:

Queue: 142 708 000 msgs in 00:00:10.0045894 for 14 264 254 ops/sec Disruptor: 328 055 000 msgs in 00:00:10.0009212 for 32 802 477 ops/sec

Scooletz
07/14/2014 11:58 AM by
Scooletz

@Romain, @Ayende, exactly. You used monitor based wait strategy instead of a better/faster one.

Joseph Daigle
07/14/2014 12:13 PM by
Joseph Daigle

The LMAX Disruptor is a very interesting piece of software designed for a fairly specialized use case. That is, high concurrency with lock-free "queues". The non-blocking and lock-free nature Disruptor is all about reducing CPU context switching and keeping data cached as close to the CPU has possible at all times. But is this appropriate for all projects? Is it appropriate for projects that don't need 100k or 1000k TPS throughput? Perhaps not. It does not surprise me that you can find ways to outperform the Disruptor with naive benchmarks.

Here's a really neat video: http://www.infoq.com/presentations/LMAX

Craig
07/14/2014 01:53 PM by
Craig

The Disruptor's goals are two fold: low latency and high throughput. They are related, but you can still reap the advantages of low latency even if you don't have huge demands on throughput.

Peter
07/14/2014 02:20 PM by
Peter

A test with ConcurrentQueue would be interesting it seems it was written for exactly this type of case...

Bartosz Adamczewski
07/14/2014 03:21 PM by
Bartosz Adamczewski

@Peter

ConcurrentQueue will give you poor locality so many cache misses so the perf will not beat a concurrent ring buffer but it still will be better then a lock with Queue.

chris mckelt
07/15/2014 02:13 PM by
chris mckelt

Wonder if the F# MailboxProcessor would perform better under this scenario? http://msdn.microsoft.com/en-us/library/ee370357.aspx

Ayende Rahien
07/15/2014 07:35 PM by
Ayende Rahien

Chris, Give it a try, report the results.

Jordan Terrell
07/15/2014 09:05 PM by
Jordan Terrell

Setup a test harness to try out difference scenarios. More scenarios to come...

https://github.com/iSynaptic/MessagingShootout

Comments have been closed on this topic.