Ayende @ Rahien

It's a girl

Rhino Service Bus & RavenDB integration

One of the interesting things about Rhino Service Bus is that I explicitly designed it to work nicely with Unit of Work style data access libraries. When I did that, I worked mainly with NHibernate, but it turns out that this is really easy to integrate RavenDB as well, all you need is the following message module:

public class RavenDbMessageModule : IMessageModule
{
    private readonly IDocumentStore documentStore;

    [ThreadStatic]
    private static IDocumentSession currentSession;

    public static IDocumentSession CurrentSession
    {
        get { return currentSession; }
    }

    public RavenDbMessageModule(IDocumentStore documentStore)
    {
        this.documentStore = documentStore;
    }

    public void Init(ITransport transport, IServiceBus serviceBus)
    {
        transport.MessageArrived += TransportOnMessageArrived;
        transport.MessageProcessingCompleted += TransportOnMessageProcessingCompleted;
    }

    public void Stop(ITransport transport, IServiceBus serviceBus)
    {
        transport.MessageArrived -= TransportOnMessageArrived;
        transport.MessageProcessingCompleted -= TransportOnMessageProcessingCompleted;
    }

    private static void TransportOnMessageProcessingCompleted(CurrentMessageInformation currentMessageInformation, Exception exception)
    {
        if (currentSession != null)
        {
            if (exception == null)
                currentSession.SaveChanges();
            currentSession.Dispose();
        }
        currentSession = null;
    }

    private bool TransportOnMessageArrived(CurrentMessageInformation currentMessageInformation)
    {
        if (currentSession == null)
            currentSession = documentStore.OpenSession();
        return false;
    }
}

This is fairly simple. Register to the message arrive and message processing completed events. When a message arrive, create a new session for the consumers. When the message processing is completed, and there hasn’t been any error, we call SaveChanges, and then dispose.

The rest of it is pretty simple as well, we need to provide a BootStrapper:

public class BootStrapper : CastleBootStrapper
{
    IDocumentStore store;

    protected override void ConfigureContainer()
    {
        store = new DocumentStore
        {
            ConnectionStringName = "RavenDB"
        }.Initialize();

        IndexCreation.CreateIndexes(typeof(BootStrapper).Assembly, store);

        Container.Register(
            Component.For<IDocumentStore>()
                .Instance(store),
            Component.For<IMessageModule>()
                .ImplementedBy<RavenDbMessageModule>(),
            Component.For<IDocumentSession>()
                .UsingFactoryMethod(() => RavenDbMessageModule.CurrentSession)
            );

        base.ConfigureContainer();
    }
}

Which basically simply need to create the document store and expose it to the container. We get the document session from the current one (the one managed by the module).

All in all, it is quite a thing, and it takes very little time / complexity to setup.

Comments

Jason Meckley
12/07/2011 01:18 PM by
Jason Meckley

what about Windsor's component tracking and potential memory leaks?

I thought that UsingFactoryMethod would mean Windsor would apply concerns to the components if necessary. In this case the DisposableDecommission concern. Since the session is never released from the container there would be a DisposableDecommission concern for each session. Thus a memory leak.

And wouldn't you want to register IDocumentSession as transient. Or is this covered by the threadstatic attribute within the module itself?

The only other thought I have is that resolving the current session directly from the module () => RavenDbMessageModule.CurrentSession and not the container kernel => kernel.ResolveRavenDbMessageModule.CurrentSession would mean Windsor would not apply the decommission concern.

Carlos Mendes
12/07/2011 03:55 PM by
Carlos Mendes

Ayende, why do we have to use the [ThreadStatic] attribute in the current session?

Jason Meckley
12/07/2011 06:27 PM by
Jason Meckley

if you didn't use thread static then you would have a singleton of ISession, which was overwritten with each message consumed. thread static allows you to treat the singleton as a singleton per thread.

much the same way the session can be stored in Context.Items in a web environment.

Ayende Rahien
12/08/2011 08:51 AM by
Ayende Rahien

Jason, I am using the no tracking policy.

I am registering it as transient, yes, although this is actually manage by the thread static.

The reason that I go directly through the module is mostly because it is already static

Comments have been closed on this topic.