Ayende @ Rahien

Hi!
My name is Oren Eini
Founder of Hibernating Rhinos LTD and RavenDB.
You can reach me by phone or email:

ayende@ayende.com

+972 52-548-6969

, @ Q c

Posts: 5,953 | Comments: 44,410

filter by tags archive

Message passing, performance


I got some replies about the async event loop post, mentioning LMAX Disruptor and performance. I decided to see for myself what the fuss was all about.

You can read about the LMAX Disruptor, but basically, it is a very fast single process messaging library.

I wondered what that meant, so I wrote my own messaging library:

public class Bus<T>
{
Queue<T> q = new Queue<T>();

public void Enqueue(T msg)
{
lock (q)
{
q.Enqueue(msg);
}
}

public bool TryDequeue(out T msg)
{
lock (q)
{
if (q.Count == 0)
{
msg = default(T);
return false;
}
msg = q.Dequeue();
return true;
}
}
}

I think that you’ll agree that this is a thing of beauty and elegant coding. I then tested this with the following code:

public static void Read(Bus<string> bus)
{
int count = 0;
var sp = Stopwatch.StartNew();
while (sp.Elapsed.TotalSeconds < 10)
{
string msg;
while (bus.TryDequeue(out msg))
{
count++;
}
}
sp.Stop();

Console.WriteLine("{0:#,#;;0} msgs in {1} for {2:#,#} ops/sec", count, sp.Elapsed, (count / sp.Elapsed.TotalSeconds));
}

public static void Send(Bus<string> bus)
{
var sp = Stopwatch.StartNew();
while (sp.Elapsed.TotalSeconds < 10)
{
for (int i = 0; i < 1000; i++)
{
bus.Enqueue("test");
}
}
}

var bus = new Bus<string>();

ThreadPool.QueueUserWorkItem(state => Send(bus));

ThreadPool.QueueUserWorkItem(state => Read(bus));

The result of this code?

145,271,000 msgs in 00:00:10.4597977 for 13,888,510 ops/sec

Now, what happens when we use the DataFlow’s BufferBlock as the bus?

public static async Task ReadAsync(BufferBlock<string> bus)
{
int count = 0;
var sp = Stopwatch.StartNew();
while (sp.Elapsed.TotalSeconds < 10)
{
try
{
await bus.ReceiveAsync(TimeSpan.FromMilliseconds(5));
count++;
}
catch (TaskCanceledException e)
{
}
}
sp.Stop();

Console.WriteLine("{0:#,#;;0} msgs in {1} for {2:#,#} ops/sec", count, sp.Elapsed, (count / sp.Elapsed.TotalSeconds));
}

public static async Task SendAsync(BufferBlock<string> bus)
{
var sp = Stopwatch.StartNew();
while (sp.Elapsed.TotalSeconds < 10)
{
for (int i = 0; i < 1000; i++)
{
await bus.SendAsync("test");
}
}
}

What we get is:

43,268,149 msgs in 00:00:10 for 4,326,815 ops/sec.

I then decided to check what happens with the .NET port of the LMAX Disruptor. Here is the code:

public class Holder
{
public string Val;
}

internal class CounterHandler : IEventHandler<Holder>
{
public int Count;
public void OnNext(Holder data, long sequence, bool endOfBatch)
{
Count++;
}
}

static void Main(string[] args)
{
var disruptor = new Disruptor.Dsl.Disruptor<Holder>(() => new Holder(), 1024, TaskScheduler.Default);
var counterHandler = new CounterHandler();
disruptor.HandleEventsWith(counterHandler);

var ringBuffer = disruptor.Start();


var sp = Stopwatch.StartNew();
while (sp.Elapsed.TotalSeconds < 10)
{
for (var i = 0; i < 1000; i++)
{
long sequenceNo = ringBuffer.Next();

ringBuffer[sequenceNo].Val = "test";

ringBuffer.Publish(sequenceNo);
}
}
Console.WriteLine("{0:#,#;;0} msgs in {1} for {2:#,#} ops/sec", counterHandler.Count, sp.Elapsed, (counterHandler.Count / sp.Elapsed.TotalSeconds));
}

And the resulting performance is:

29,791,996 msgs in 00:00:10.0003334 for 2,979,100 ops/sec

Now, I’ll be the first to agree that this is really and absolutely not even close to be a fair benchmark. It is testing wildly different things. Distruptor is using a ring buffer, and the BlockBuffer didn’t, and the original Bus implementation just used an unbounded queue.

But that is a very telling benchmark as well. Pretty much because it doesn’t matter. What I need this for is for network protocol handling. As such, even assuming that every single byte is a message, we would have to go far beyond what any reasonable pipe can be expected to be handle.


Comments

Romain Verdier

Just a remark: instanciating the Disruptor with a SingleThreadedClaimStrategy and a YieldingWaitStrategy gives me the following results:

Queue: 142 708 000 msgs in 00:00:10.0045894 for 14 264 254 ops/sec Disruptor: 328 055 000 msgs in 00:00:10.0009212 for 32 802 477 ops/sec

Scooletz

@Romain, @Ayende, exactly. You used monitor based wait strategy instead of a better/faster one.

Joseph Daigle

The LMAX Disruptor is a very interesting piece of software designed for a fairly specialized use case. That is, high concurrency with lock-free "queues". The non-blocking and lock-free nature Disruptor is all about reducing CPU context switching and keeping data cached as close to the CPU has possible at all times. But is this appropriate for all projects? Is it appropriate for projects that don't need 100k or 1000k TPS throughput? Perhaps not. It does not surprise me that you can find ways to outperform the Disruptor with naive benchmarks.

Here's a really neat video: http://www.infoq.com/presentations/LMAX

Craig

The Disruptor's goals are two fold: low latency and high throughput. They are related, but you can still reap the advantages of low latency even if you don't have huge demands on throughput.

Peter

A test with ConcurrentQueue would be interesting it seems it was written for exactly this type of case...

Bartosz Adamczewski

@Peter

ConcurrentQueue will give you poor locality so many cache misses so the perf will not beat a concurrent ring buffer but it still will be better then a lock with Queue.

chris mckelt

Wonder if the F# MailboxProcessor would perform better under this scenario? http://msdn.microsoft.com/en-us/library/ee370357.aspx

Ayende Rahien

Chris, Give it a try, report the results.

Jordan Terrell

Setup a test harness to try out difference scenarios. More scenarios to come...

https://github.com/iSynaptic/MessagingShootout

Comment preview

Comments have been closed on this topic.

FUTURE POSTS

No future posts left, oh my!

RECENT SERIES

  1. The RavenDB Comic Strip (3):
    28 May 2015 - Part III – High availability & sleeping soundly
  2. Special Offer (2):
    27 May 2015 - 29% discount for all our products
  3. RavenDB Sharding (3):
    22 May 2015 - Adding a new shard to an existing cluster, splitting the shard
  4. Challenge (45):
    28 Apr 2015 - What is the meaning of this change?
  5. Interview question (2):
    30 Mar 2015 - fix the index
View all series

Syndication

Main feed Feed Stats
Comments feed   Comments Feed Stats