Oren Eini

CEO of RavenDB

a NoSQL Open Source Document Database

Get in touch with me:

oren@ravendb.net +972 52-548-6969

Posts: 7,567
|
Comments: 51,184
Privacy Policy · Terms
filter by tags archive
time to read 4 min | 768 words

There are only two topics that remains in the Raven MQ server (replication & point to point messaging), but I decided to stop for a while and focus on the client API. My experience have shown that it is so much more important than anything else to gain acceptance for the project.

One thing that I want to make clear is that this is the high level API, which has very little to do with how this is actually implemented.

The first thing to be aware of is that Raven MQ is transactional. That is, all operations either complete successfully or fail as a single unit. That makes it very easy to work with it for a set of scenarios. It is not an accident that the API is very similar to the one that you get from Rhino Service Bus or NServiceBus, although Raven MQ client API is drastically more modest in what it is trying to do.

Getting started:

var raveMQEndpoint = new RavenMQEndpoint
{
    Url = "http://localhost:8181"
};
raveMQEndpoint.Start();

Subscribing (methods):

raveMQEndpoint.Subscribe("/streams/system/notifications", (ctx, untypedMsg) =>
{
    // do something with the msg
});

raveMQEndpoint.Subscribe<LoginAboutToExpire>("/streams/user/1234", (ctx, msg) =>
{
    // do something with the msg
});

raveMQEndpoint.Subscribe<LoginExpired>("/streams/user/1234", (ctx, msg) =>
{
    // do something with the msg
});

This allows you to handle untyped messaged, or to select specific types of messages that will be handled from the stream (ignoring messages not of this type). I’ll discuss the ctx parameter at a later stage, for now, you can ignore it. What you can’t see here is that the Subscribe methods here returns an IDisposable instance, which allows you to remove the subscription. Useful for temporary subscriptions, which is something that is pretty common for the scenarios that we see Raven MQ used for.

Subscribing (classes):

raveMQEndpoint.Subscribe("/streams/user/1234", () => new LoginExpiredConsumer());

raveMQEndpoint.Subscribe("/streams/user/1234", mefContainer);

Instead of registering a single method, you can register a factory method, or a MEF container, both of which will create a consumer class for handling the messages.

Serialization:

Raven MQ doesn’t care about the serialization format, you can it messages using whatever format you like, but the client API used JSON/BSON to store the data.

Sending messages:

Remember that I talked about the ctx parameter? The RavenMQEndpoint doesn’t offer a Send() method, that is handled by the ctx paratemer, which stands for Context, obviously. The idea is quite simple, we want to make message sending transactional, so we always use a context to send them, and only if the context completed successfully can we truly consume the message and send all the messages to the server. You can think of the Context as the Raven MQ transaction.

For sending messages outside of processing an existing message, you can use:

ravenMQEndpoint.Transaction(ctx=> ctx.Send("/queues/customers/1234", updateCustomerAddress));

This gives us a very easy way of scoping multiple messages in a single transaction without awkward APIs.

Thoughts?

time to read 5 min | 826 words

Originally posted at 11/9/2010

Raven MQ is a new project that I am working on. As you can guess from the name, this is a queuing system, but it is a queuing system with a few twists.  I already wrote a queuing system in the past (Rhino Queues), why write another one?

Raven MQ builds upon the experience in building Rhino Queues, but it also targets a different set of usage scenarios. Like Rhino Queues, Raven MQ can be xcopy deployed, but it is not usually used in a traditional point to point messaging system. Instead, Raven MQ is a queuing system for the web. What do I mean by that? Raven MQ has a different set of design decisions, focused on making some things that are traditionally expensive in queuing systems cheap:

  • Unlike in most queuing systems, queues are cheap. That allows you to create an unlimited amount of queues. Typical deployment of Raven MQ will have at least one queue per client.
  • Which leads to the next point, Raven MQ is designed to support literally thousands of clients.

The model isn’t the traditional queuing one you might be familiar with from MSMQ:

image

Instead, the model uses a central server to hold all the information:

image

 

The reasoning behind this is actually pretty simple. Unlike in traditional queuing systems, where we have a node of the queuing system running on each end point, Raven MQ makes the assumption that most of the clients connect to it are actually web clients, using JavaScript on the page or maybe Silverlight applications.

The decision to directly support those clients is what makes Raven MQ unique.

Transport models

Raven MQ offers two distinct models for transporting messages. The first is the traditional queue model, where each message can only be consumed by a single consumer. This is not a very interesting model.

A much more interesting model is the message stream. A message stream in Raven MQ is a set of messages sent to a particular queue. But unlike a queue, reading a message from the stream does not consume it. That means that multiple consumers can read the messages on the stream. Moreover, clients that arrive after the message was sent can still read the message (as long as its time to live is in effect).

Usage model

The previous section is probably hard to understand. As usual, an example will makes all the difference in the world.

Let us imagine that we are building a CRM system, and we are currently viewing a customer screen. At that point, we are subscribe to the following streams:

  • /streams/system/notifications – Global system notifications
  • /streams/customers/1234 – Updates about customer 1234
  • /streams/users/4321 – Updates about our logged on user

And the following queue:

  • /queues/mailboxes/1234 – Replies to our particular client

The idea is pretty simple, actually. When we read the customer data, we are loading it from the view model store, but we also need to be able to efficiently get updates about changes that happen to the customer when we are looking at it. We are doing that by subscribing to the appropriate stream. Another user who is also looking at the same user is also subscribed to the same stream. Even more importantly, a user that opened the customer after some changes have been made (but before they were written to the view model store) will also get those updates, and will be able to reconstruct the current state in an seamless manner.

This approach drastically simplifies the update problem in complex systems.

Why call them streams and not topics?

Topics are a routing mechanism, but with Raven MQ, streams aren’t used for routing. They are used to hold a set of messages, that is all. The problem with routing is that you can’t join up later and receive previously sent messages, and (much worse) you can’t really use routing on the web, because when you have potentially thousands of clients, all coming & going at will, you can’t setup a queue for each of them, it is too expensive.

The stream/notification model solve that problem rather neatly, even if I say so myself.

What I did not discussed?

Please note that I am discussing the system at a very high level right now. I didn’t talk about the API or the actual distribution model. That is intentional, I’ll cover that in a future post.

Raven.Munin

time to read 7 min | 1390 words

Raven.Munin is the actual implementation of a low level managed storage for RavenDB. I split it out of the RavenDB project because I intend to make use of it in additional projects.

At its core, Munin provides high performance transactional, non relational, data store written completely in managed code. The main point in writing it was to support the managed storage in RavenDB, but it is going to be used for Raven MQ as well, and probably a bunch of other stuff as well. I’ll post about Raven MQ in the future, so don’t bother asking about it.

Let us look at a very simple API example. First, we need to define a database:

public class QueuesStorage : Database
{
    public QueuesStorage(IPersistentSource persistentSource) : base(persistentSource)
    {
        Messages = Add(new Table(key => key["MsgId"], "Messages")
        {
            {"ByQueueName", key => key.Value<string>("QueueName")},
            {"ByMsgId", key => new ComparableByteArray(key.Value<byte[]>("MsgId"))}
        });

        Details = Add(new Table("Details"));
    }

    public Table Details { get; set; }

    public Table Messages { get; set; }
}

This is a database with two tables, Messages and Details. The Messages table has a primary key of MsgId, and two secondary indexes by queue name and by message id. The Details table is sorted by the key itself.

It is important to understand one very important concept about Munin. Data stored in it is composed of two parts. The key and the data. It is easier to explain when you look at the API:

bool Remove(JToken key);
Put(JToken key, byte[] value);
ReadResult Read(JToken key);

public class ReadResult
{
    public int Size { get; set; }
    public long Position { get; set; }
    public JToken Key { get; set; }
    public Func<byte[]> Data { get; set; }
}

Munin doesn’t really care about the data, it just saves it. But the key is important. In the Details table case, the table would be sorted by the full key. In the Messages table case, things are different. We use the lambda to extract the primary key from the key for each item, and we use additional lambdas to extract secondary indexes. Munin can build secondary indexes only from the key, not from the value. It is only the secondary indexes that allow range queries, the PK allow only direct access, which is why we define both the primary key and a secondary index on MsgId.

Let us see how we can save a new message:

public void Enqueue(string queue, byte[]data)
{
    messages.Put(new JObject
    {
        {"MsgId", uuidGenerator.CreateSequentialUuid().ToByteArray()},
        {"QueueName", queue},
    }, data);
}

And now we want to read it:

public Message Dequeue(Guid after)
{
    var key = new JObject { { "MsgId", after.ToByteArray()} };
    var result = messages["ByMsgId"].SkipAfter(key).FirstOrDefault();
    if (result == null)
        return null;

    var readResult = messages.Read(result);

    return new Message
    {
        Id = new Guid(readResult.Key.Value<byte[]>("MsgId")),
        Queue = readResult.Key.Value<string>("Queue"),
        Data = readResult.Data(),
    };
}

We do a bunch of stuff here, we scan the secondary index for the appropriate value, then get it from the actual index, load it into a DTO and return it.

Transactions

Munin is fully transactional, and it follows an append only, MVCC, multi reader single writer mode. The previous methods are run in the context of:

using (queuesStroage.BeginTransaction())
{
    // use the storage
    queuesStroage.Commit();
}

Storage on disk

Munin can work with either a file or in memory (which makes unit testing it a breeze). It uses an append only model, so on the disk it looks like this:image

Each of the red rectangle represent a separate transaction.

In memory data

You might have noted that we don’t keep any complex data structures on the disk. This is because all the actual indexing for the data is done in memory. The data on the disk is used solely for building the index in memory. Do note that the actual values are not held, only the keys. That means that search Munin indexes is lightning fast, since we never touch the disk for the search itself.

The data is actually held in an immutable binary tree, which gives us the ability to do MVCC reads, without any locking.

Compaction

Because Munin is using the append only model, it require periodic compaction. It does so automatically in RavenDB, waiting for periods of inactivity to do so.

Summary

Munin is a low level api, not something that you are likely to use directly. And it was explicitly modeled to give me an interface similar in capability to what Esent gives me, but in purely managed code.

Please note that is is released under the same license as RavenDB, AGPL.

time to read 2 min | 227 words

Originally posted at 11/4/2010

Recently I had an interesting support call for NHibernate. The problem was a bit complex when explained to me, but we got it simplified to something like:

When we have a component that contains a set, that component is not null, even when all the members are null.

The problem is actually an intersection of two separate rules in NHibernate:

  • When all the members of a component are null, the component itself will be null.
  • A set is never null, an empty set is still a valid instance of a set.

When you add a set to a component, you add something that is never null. Hence, the behavior of NHibernate in this case is also valid, since we have a non null member value, there will be an instance of that component.

The problem is that the users conceptually thought of the empty set as null as well. I had some hard time explaining that sets are never null, and that no, this isn’t a bug or unexpected combination of the two features. Both behave exactly as expected, and the intersection of both worked as expected. In fact, trying to make it not work in this fashion would introduced a lot of work, complexity and additional queries.

time to read 2 min | 226 words

Chris points out something very important:

“A much better solution would have been to simply put the database on a compressed directory, which would slow down some IO ..."

I don't agree.
Compression needs CPU. We got a lot of more IO by switching on compression (it's just less to write and read). Previous our CPU was about 40%, now averaging at 70%. Compression rate saves us about 30% per file. After switching on compression our IO bound application was about 20% faster.
We are currently planning switching on compression on all our production servers over Christmas, because using cpu-cores for compression is even cheaper than adding hard disks and raid for performance.

In general, most operations today are mostly IO bound, with the CPU mostly sitting there twiddling  the same byte until that byte threatens to sue for harassment. It make sense to trade off IO for CPU time, because our systems are being starved for IO.

In fact, you can just turn on compression at the File System level in most OSes, and it is likely to result in a significant saving for the application performance, assuming that the data does not already fits in memory.

Unstable

time to read 1 min | 50 words

Originally posted at 10/20/2010

Currently…

  • RavenDB has an unstable fork
  • Which has an unstable branch (too unstable to be the master branch of the unstable fork!)

And then there is what I am working on locally…

I hate making big changes.

time to read 2 min | 289 words

Originally posted at 10/17/2010

I got the following message in the rhino tools mailing list:

I am looking into Rhino-esb and NServiceBus for a smart client application. I was doing some stress testing lately and I noticed some very strange behavior with rhino-esb. I tried to send large numbers of requests at the same time (6000-10000) and the memory of my back-end when using rhino-esb was continuously rising.

Since RSB is in production for the last two or three years, that seemed suspicious. Luckily, there was a reproduction that I could run. I tried it out, and indeed, memory seems to be taken, in proportion to the number of messages sent. That had me worried, really worried.

I run the application under memory profiling (using JetBrains dotTrace), and tried it. Which gave me this:

image

I went Ouch! and Huh?! at the same time. The next step was to find who was holding those. Luckily, that was as easy as asking the profiler.

image

And a short hop to the code explained what was actually going on.

There is a LRU buffer there to prevent duplicate messages from being sent, and the default limit for the buffer is 10,000. And since the buffer is swept once every 3 minutes. It would look like a memory leak.

But what really pleased me wasn’t so much the answer, but how easy it was to figure it out.

FUTURE POSTS

  1. The null check that didn't check for nulls - 3 hours from now

There are posts all the way to Apr 28, 2025

RECENT SERIES

  1. Production Postmortem (52):
    07 Apr 2025 - The race condition in the interlock
  2. RavenDB (13):
    02 Apr 2025 - .NET Aspire integration
  3. RavenDB 7.1 (6):
    18 Mar 2025 - One IO Ring to rule them all
  4. RavenDB 7.0 Released (4):
    07 Mar 2025 - Moving to NLog
  5. Challenge (77):
    03 Feb 2025 - Giving file system developer ulcer
View all series

RECENT COMMENTS

Syndication

Main feed Feed Stats
Comments feed   Comments Feed Stats
}