Ayende @ Rahien

Refunds available at head office

Testing Rhino Service Bus applications

One of the really nice things about Rhino Service Bus applications is that we have created a structured way to handle inputs and outputs. You have messages coming in and out, as well as the endpoint local state to deal with. You don’t have to worry about how to deal with external integration points, because those are already going over messages.

And when you have basic input/output figured out, you are pretty much done.

For example, let us see the code that handles extending trail licenses in our ordering system:

public class ExtendTrialLicenseConsumer : ConsumerOf<ExtendTrialLicense>
{
    public IDocumentSession Session { get; set; }
    public IServiceBus Bus { get; set; }

    public void Consume(ExtendTrialLicense message)
    {
        var productId = message.ProductId ?? "products/" + message.Profile;
        var trial = Session.Query<Trial>()
            .Where(x => x.Email == message.Email && x.ProductId == productId)
            .FirstOrDefault();

        if (trial == null)
            return;
        
        trial.EndsAt = DateTime.Today.AddDays(message.Days);
        Bus.Send(new NewTrial
        {
            ProductId = productId,
            Email = trial.Email,
            Company = trial.Company,
            FullName = trial.Name,
            TrackingId = trial.TrackingId
        });
    }
}

How do we test something like this? As it turns out, quite easily:

public class TrailTesting : ConsumersTests
{
    protected override void PrepareData(IDocumentSession session)
    {
        session.Store(new Trial
        {
            Email = "you@there.gov",
            EndsAt = DateTime.Today,
            ProductId = "products/nhprof"
        });
    }

    [Fact]
    public void Will_update_trial_date()
    {
        Consume<ExtendTrialLicenseConsumer, ExtendTrialLicense>(new ExtendTrialLicense
        {
            ProductId = "products/nhprof",
            Days = 30,
            Email = "you@there.gov",
        });

        using (var session = documentStore.OpenSession())
        {
            var trial = session.Load<Trial>(1);
            Assert.Equal(DateTime.Today.AddDays(30), trial.EndsAt);
        }
    }

    // more tests here
}

All the magic happens in the base class, though:

public abstract class ConsumersTests : IDisposable
{
    protected IDocumentStore documentStore;
    private IServiceBus Bus = new FakeBus();

    protected ConsumersTests()
    {
        documentStore = new EmbeddableDocumentStore
        {
            RunInMemory = true,
            Conventions =
                {
                    DefaultQueryingConsistency = ConsistencyOptions.QueryYourWrites
                }
        }.Initialize();

        IndexCreation.CreateIndexes(typeof(Products_Stats).Assembly, documentStore);

        Products.Create(documentStore);
        using (var session = documentStore.OpenSession())
        {
            PrepareData(session);
            session.SaveChanges();
        }
    }

    protected T ConsumeSentMessage<T>()
    {
        var fakeBus = ((FakeBus)Bus);
        object o = fakeBus.Messages.Where(x => x.GetType() == typeof(T)).First();

        fakeBus.Messages.Remove(o);
        return (T) o;
    }

    protected void Consume<TConsumer, TMsg>(TMsg msg)
        where TConsumer : ConsumerOf<TMsg>, new()
    {
        var foo = new TConsumer();

        using (var documentSession = documentStore.OpenSession())
        {
            Set(foo, documentSession);
            Set(foo, Bus);
            Set(foo, documentStore);

            foo.Consume(msg);

            documentSession.SaveChanges();
        }
    }

    private void Set<T,TValue>(T foo, TValue value)
    {
        PropertyInfo firstOrDefault = typeof(T).GetProperties().FirstOrDefault(x=>x.PropertyType==typeof(TValue));
        if (firstOrDefault == null) return;
        firstOrDefault.SetValue(foo, value, null);
    }

    protected virtual void PrepareData(IDocumentSession session)
    {
    }

    public void Dispose()
    {
        documentStore.Dispose();
    }
}

And here are the relevant details for the FakeBus implementation:

public class FakeBus : IServiceBus
{
    public List<object>  Messages = new List<object>();

    public void Send(params object[] messages)
    {
        Messages.AddRange(messages);
    }
}

Now, admittedly, this is a fairly raw approach and we can probably do better. This is basically hand crafted auto mocking for consumers, and I don’t like the Consume<TConsumer,TMsg>() syntax very much. But it works, it is simple and it doesn’t really gets in the way.

I won’t say it is the way to go about it, but it is certainly easier than many other efforts that I have seen. We just need to handle the inputs & outputs and have a way to look at the local state, and you are pretty much done.

Rhino Service Bus & RavenDB integration

One of the interesting things about Rhino Service Bus is that I explicitly designed it to work nicely with Unit of Work style data access libraries. When I did that, I worked mainly with NHibernate, but it turns out that this is really easy to integrate RavenDB as well, all you need is the following message module:

public class RavenDbMessageModule : IMessageModule
{
    private readonly IDocumentStore documentStore;

    [ThreadStatic]
    private static IDocumentSession currentSession;

    public static IDocumentSession CurrentSession
    {
        get { return currentSession; }
    }

    public RavenDbMessageModule(IDocumentStore documentStore)
    {
        this.documentStore = documentStore;
    }

    public void Init(ITransport transport, IServiceBus serviceBus)
    {
        transport.MessageArrived += TransportOnMessageArrived;
        transport.MessageProcessingCompleted += TransportOnMessageProcessingCompleted;
    }

    public void Stop(ITransport transport, IServiceBus serviceBus)
    {
        transport.MessageArrived -= TransportOnMessageArrived;
        transport.MessageProcessingCompleted -= TransportOnMessageProcessingCompleted;
    }

    private static void TransportOnMessageProcessingCompleted(CurrentMessageInformation currentMessageInformation, Exception exception)
    {
        if (currentSession != null)
        {
            if (exception == null)
                currentSession.SaveChanges();
            currentSession.Dispose();
        }
        currentSession = null;
    }

    private bool TransportOnMessageArrived(CurrentMessageInformation currentMessageInformation)
    {
        if (currentSession == null)
            currentSession = documentStore.OpenSession();
        return false;
    }
}

This is fairly simple. Register to the message arrive and message processing completed events. When a message arrive, create a new session for the consumers. When the message processing is completed, and there hasn’t been any error, we call SaveChanges, and then dispose.

The rest of it is pretty simple as well, we need to provide a BootStrapper:

public class BootStrapper : CastleBootStrapper
{
    IDocumentStore store;

    protected override void ConfigureContainer()
    {
        store = new DocumentStore
        {
            ConnectionStringName = "RavenDB"
        }.Initialize();

        IndexCreation.CreateIndexes(typeof(BootStrapper).Assembly, store);

        Container.Register(
            Component.For<IDocumentStore>()
                .Instance(store),
            Component.For<IMessageModule>()
                .ImplementedBy<RavenDbMessageModule>(),
            Component.For<IDocumentSession>()
                .UsingFactoryMethod(() => RavenDbMessageModule.CurrentSession)
            );

        base.ConfigureContainer();
    }
}

Which basically simply need to create the document store and expose it to the container. We get the document session from the current one (the one managed by the module).

All in all, it is quite a thing, and it takes very little time / complexity to setup.

Setting up a Rhino Service Bus Application: Part II–One way bus

One really nice feature of Rhino Service Bus is the notion of the one way bus. What is that? It is a miniature implementation of the bus that supports only sending messages, not receiving them. In what world is this useful?

It turn out, it quite a few. One way bus is usually used for web apps that just send commands / events to another system, and have no need to interact with the bus other than that, or for command line tools that just send a message, etc. The advantage of the one way bus is that you don’t need your own endpoint to use it, you can just start it, send some messages, and go away.

Here is how you set it up. As usual, we start from the configuration (this assumes you have the Raven.ServiceBus.Castle nuget package):

<?xml version="1.0"?>
<configuration>
  <configSections>
    <section name="rhino.esb" type="Rhino.ServiceBus.Config.BusConfigurationSection, Rhino.ServiceBus"/>
  </configSections>
  <rhino.esb>
    <messages>
      <add name="HibernatingRhinos.Orders.Backend.Messages"
           endpoint="msmq://localhost/Orders.Backend"/>
    </messages>
  </rhino.esb>
</configuration>

You might note that we don’t have any bus/endpoint configuration, only the list of message owners.

Now, the next step is just to create the actual one way bus:

var container = new WindsorContainer();
new OnewayRhinoServiceBusConfiguration()
    .UseCastleWindsor(container)
    .Configure();

var onewayBus = container.Resolve<IOnewayBus>();
onewayBus.Send(new TestMsg
{
    Name = “ayende”
});

And that is all…

Setting up a Rhino Service Bus Application: Part I

Part of what we are currently doing at Hibernating Rhinos is work on some internal applications. In particular, we slowly update our existing applications to do two things:

  • Make sure that we are running a maintainable software package.
  • Dog food our own stuff in real world scenarios.

In this case, I want to talk about Rhino Service Bus. Corey has been maintaining the project and I haven’t really followed very closely on what was going on. Today we started doing major work on our order processing backend system, so I decided that we might as well ditch the 2.5+ years old version of Rhino Service Bus that we were running and get working with the latest.

I was so lazy about that that I actually didn’t bother to get the sources, and just got the nuget package.

Install-Package Rhino.ServiceBus.Castle

Install-Package Raven.ServiceBus.Host

That got me half way there. No need to worry about adding references, etc, what an awesome idea.

Next, I had to deal with the configuration. Here is how you configure a Rhino Service Bus application (app.config):

<configuration>
    <configSections>
        <section name="rhino.esb"
                         type="Rhino.ServiceBus.Config.BusConfigurationSection, Rhino.ServiceBus"/>
    </configSections>
    <rhino.esb>
        <bus
            threadCount="1"
            numberOfRetries="5"
            endpoint="msmq://localhost/Orders.Backend"
                />
        <messages>
            <add name="HibernatingRhinos.Orders.Backend.Messages"
                     endpoint="msmq://localhost/Orders.Backend"/>
        </messages>
    </rhino.esb>
</configuration>

As you can see, it is fairly simple. Specify the endpoint, and the ownerships of the messages, and you are pretty much done.

The next step is to setup the actual application. In order to do that, we need to define a boot strapper. Think about it like a constructor, but for the entire application. The minimal boot strapper needed is:

public class BootStrapper : CastleBootStrapper
{
    
}

Again, it doesn’t really get any simpler.

The next step is somewhat of a pain. A Rhino Service Bus project is usually a class library (dll). So if we want to run it, we need to host it somehwere. RSB comes with Rhino.ServiceBus.Host, but the command line interface sucks. You need to do this to get it running:

Rhino.ServiceBus.Host.exe /Action:Debug /Name:HibernatingRhinos.Orders.Backend /Assembly:.\HibernatingRhinos.Orders.Backend.dll

And honestly, who ever thought about making it so complicated? Give me something to make this simpler, for crying out loud. Moron developer who no consideration of actual system usability.

Oh, yeah, that would be me. Maybe I ought to fix that.

Anyway, that is pretty much everything you need to do to get RSB up & running. Here is a sample test consumer:

public class TestConsumer : ConsumerOf<TestMsg>
{
    public void Consume(TestMsg message)
    {
        Console.WriteLine(message.Name);
    }
}

And that is enough for now. See you in the next post…

Public Service Announcement: Git master repositories for the Rhino Tools projects

There have been some changes, and it seems that it is hard to track them. Here are where you can find the master repositories for the rhino tools projects:

Handling production errors in a messaging environment

So, today I got the first L2S Prof order. As you can imagine, I was pretty excited about that. However, it turned out that I had actually missed something when I built the backend for handling L2S Prof ordering. The details about what actually went wrong aren’t important (and are embarrassing).

But I logged into the server and checked out what was going on:

image

One of the major design criteria that I had with Rhino Service Bus is that it should be dead easy to handle production. As you can see in the screen shots, I have two messages of interest here, the first one is the actual message, and the second is the error information, which include the full stack trace. Using that, it was a piece of cake to isolate the problem, do the head slapping moment, and deploy a new version out. Once that was done, all I had to do is to move the message back to the processing queue, and I was done.

Just to give you an idea, here is how it looks like on my timeline:

image

I guarantee you that the customer in question didn’t have any idea that there was something wrong with his order processing.

I am loving it.

Using a service bus for queries

image Yep, another forum question. Unfortunately, in this case all I have is the title. Even more unfortunately, I already used the stripper metaphor before.

There are some questions that I am really not sure how to answer, because there are several underlying premises that are flat out wrong in the mere asking of the question.

“Can you design a square wheel carriage?” is a good example of that, and using a service bus for queries is another.

The short answer is that you don’t do that.

The longer answer is that you still don’t do that, but also explains why the question itself is wrong. One of the things that goes along with a service bus is the concept of services.

image Services are autonomous.

Does this ring a bell?

You don’t query a service for its state, because that would violate the autonomy tenant.

But I need the users data from the Personalization service to show the home page, I can hear you say. Well, sure, but you don’t perform queries across a service boundary.

 

Notice the terminology here. You don’t perform queries across a service boundary.

But you can perform queries inside a service boundary. The image on the right shows one such example of that.

We have several services in a single application, they communicate between services using a service bus.

But a service isn’t just something that is running in a server somewhere. The personalization service also have user interface, business logic that needs to run on the UI, etc.

That isn’t just some other part of the application that is accessing the personalization service operations. It is a part of the personalization service.

And inside a service boundary, there are no limitation to how you get the data you need to perform some operation.

You can perform a query using whatever method you like (a synchronous web service call, hitting the service database, using local state).

Personally, I would either access the data store directly (which usually means querying the service database) or use local state. I spoke about building a system where all queries are handled using local state here.

On PSake

James Kovacks introduced psake ( a power shell based build system )over a year ago, and at the time, I gave it a glance and decided that it was interesting, but not worth further investigation.

This weekend, as I was restructuring my Rhino Tools project, I realized that I need to touch the build system as well. The Rhino Tools build system has been through several projects, and was originally ported from Hibernate. It is NAnt based, complex, and can do just about everything that you want expect be easily understandable.

It became clear to me very quickly that it ain’t going to be easy to change the way it works, nor would it be easy to modify that to reflect the new structure. There are other issues with complex build systems, they tend to create zones of “there be dragons”, where only the initiated go, and even they go with trepidation. I decided to take advantage of the changes that I am already making to get a simpler build system.

I had a couple of options open to me: Rake and Bake.

Bake seemed natural, until I remember that no one touched it in a year or two. Beside, I can only stretch NIH so far :-). And while I know that people rave about rake, I did not want to introduce a Ruby dependency on my build system. I know that it was an annoyance when I had to build Fluent NHibernate.

One thing that I knew that I am not willing to go back to was editing XML, so I started looking at other build systems, ending up running into PSake.

There are a few interesting things that reading about it brought to mind. First, NAnt doesn’t cut it anymore. It can’t build WPF applications nor handle multi targeting well. Second, I am already managing the compilation part of the build using MSBuild, thanks to Visual Studio.

That leave the build system with executing msbuild, setting up directories, executing tests, running post build tools, etc.

PSake handles those well, since the execution environment is the command line. The syntax is nice, just enough to specify tasks and dependencies, but everything else is just pure command line. The following is Rhino Mocks build script, using PSake:

properties { 
  $base_dir  = resolve-path .
  $lib_dir = "$base_dir\SharedLibs"
  $build_dir = "$base_dir\build" 
  $buildartifacts_dir = "$build_dir\" 
  $sln_file = "$base_dir\Rhino.Mocks-vs2008.sln" 
  $version = "3.6.0.0"
  $tools_dir = "$base_dir\Tools"
  $release_dir = "$base_dir\Release"
} 

task default -depends Release

task Clean { 
  remove-item -force -recurse $buildartifacts_dir -ErrorAction SilentlyContinue 
  remove-item -force -recurse $release_dir -ErrorAction SilentlyContinue 
} 

task Init -depends Clean { 
    . .\psake_ext.ps1
    Generate-Assembly-Info `
        -file "$base_dir\Rhino.Mocks\Properties\AssemblyInfo.cs" `
        -title "Rhino Mocks $version" `
        -description "Mocking Framework for .NET" `
        -company "Hibernating Rhinos" `
        -product "Rhino Mocks $version" `
        -version $version `
        -copyright "Hibernating Rhinos & Ayende Rahien 2004 - 2009"
        
    Generate-Assembly-Info `
        -file "$base_dir\Rhino.Mocks.Tests\Properties\AssemblyInfo.cs" `
        -title "Rhino Mocks Tests $version" `
        -description "Mocking Framework for .NET" `
        -company "Hibernating Rhinos" `
        -product "Rhino Mocks Tests $version" `
        -version $version `
        -clsCompliant "false" `
        -copyright "Hibernating Rhinos & Ayende Rahien 2004 - 2009"
        
    Generate-Assembly-Info `
        -file "$base_dir\Rhino.Mocks.Tests.Model\Properties\AssemblyInfo.cs" `
        -title "Rhino Mocks Tests Model $version" `
        -description "Mocking Framework for .NET" `
        -company "Hibernating Rhinos" `
        -product "Rhino Mocks Tests Model $version" `
        -version $version `
        -clsCompliant "false" `
        -copyright "Hibernating Rhinos & Ayende Rahien 2004 - 2009"
        
    new-item $release_dir -itemType directory 
    new-item $buildartifacts_dir -itemType directory 
    cp $tools_dir\MbUnit\*.* $build_dir
} 

task Compile -depends Init { 
  exec msbuild "/p:OutDir=""$buildartifacts_dir "" $sln_file"
} 

task Test -depends Compile {
  $old = pwd
  cd $build_dir
  exec ".\MbUnit.Cons.exe" "$build_dir\Rhino.Mocks.Tests.dll"
  cd $old        
}

task Merge {
    $old = pwd
    cd $build_dir
    
    Remove-Item Rhino.Mocks.Partial.dll -ErrorAction SilentlyContinue 
    Rename-Item $build_dir\Rhino.Mocks.dll Rhino.Mocks.Partial.dll
    
    & $tools_dir\ILMerge.exe Rhino.Mocks.Partial.dll `
        Castle.DynamicProxy2.dll `
        Castle.Core.dll `
        /out:Rhino.Mocks.dll `
        /t:library `
        "/keyfile:$base_dir\ayende-open-source.snk" `
        "/internalize:$base_dir\ilmerge.exclude"
    if ($lastExitCode -ne 0) {
        throw "Error: Failed to merge assemblies!"
    }
    cd $old
}

task Release -depends Test, Merge {
    & $tools_dir\zip.exe -9 -A -j `
        $release_dir\Rhino.Mocks.zip `
        $build_dir\Rhino.Mocks.dll `
        $build_dir\Rhino.Mocks.xml `
        license.txt `
        acknowledgements.txt
    if ($lastExitCode -ne 0) {
        throw "Error: Failed to execute ZIP command"
    }
}

It is about 50 lines, all told, with a lot of spaces and is quite readable.

This handles the same tasks as the old set of scripts did, and it does this without undue complexity. I like it.

The complexity of unity

This post is about the Rhino Tools project. It has been running for a long time now, over 5 years, and amassed quite a few projects in it.

I really like the codebase in the projects in Rhino Tools, but secondary aspects has been creeping in that made managing the project harder. In particular, putting all the projects in a single repository made it easy, far too easy. Projects had an easy time taking dependencies that they shouldn’t, and the entire build process was… complex, to say the least.

I have been somewhat unhappily tolerant of this so far because while it was annoying, it didn’t actively create problems for me so far. The problems started creeping when I wanted to move Rhino Tools to use NHibernate 2.1. That is when I realized that this is going to be a very painful process, since I have to take on the entire Rhino Tools set of projects in one go, instead of dealing with each of them independently. the fact that so many of the dependencies where in Rhino Commons, to which I have a profound dislike, helped increase my frustration.

There are other things that I find annoying now, Rhino Security is a general purpose library for NHibernate, but it makes a lot of assumptions about how it is going to use, which is wrong. Rhino ETL had a dependency on Rhino Commons because of three classes.

To resolve that, I decided to make a few other changes, taking dependencies is supposed to be a hard process, it is supposed to make you think.

I have been working on splitting the Rhino Tools projects to all its sub projects, so each of them is independent of all the others. That increase the effort of managing all of them as a unit, but decrease the effort of managing them independently.

The current goals are to:

  • Make it simpler to treat each project independently
  • Make it easier to deal with the management of each project (dependencies, build scripts)

There is a side line in which I am also learning to use Git, and there is a high likelihood that the separate Rhino Tools projects will move to github. Suversion’s patching & tracking capabilities annoyed me for the very last time about a week ago.

Rhino Service Bus: DHT Saga Sate Persisters Options

Rhino Service Bus comes with two default implementations of saga persistence (OptimisticDistributedHashTableSagaSatePersister  & DistributedHashTableSagaSatePersister). Both of them rely on the Rhino DHT to actually handle the persistence, but they have quite a different behavior when you start looking at them.

The persistence mechanism is the same, but the difference between them is how they handle concurrency. OptimisticDistributedHashTableSagaSatePersister is very simple to reason about, it uses optimistic concurrency. If you have a concurrency conflict, it will throw and force re-evaluation of the message. You need to opt in to the OptimisticDistributedHashTableSagaSatePersister by extending the marker interface SupportsOptimisticConcurrency.

DistributedHashTableSagaSatePersister is a more complex concurrency solution, which will never lose a write, even if there is a conflict. This requires you to implement merge conflict resolution. It is significantly more complex, and is probably only worth it where you really care about writes always succeeding.

In order to use DistributedHashTableSagaSatePersister properly, your saga needs to implement Orchestrate<MergeSagaState>, that allow the saga to take action upon merges. In addition to that, you need to implement ISagaStateMerger for your particular saga state. This is where the logic for doing the merge is located.

Rhino Queues, Take 6

As I stated before, I started writing a queuing system a few days ago, loosely based on my previous efforts in this area, but aimed to create a production ready queuing infrastructure that I can use in my own applications. The decision to build this was not made lightly, but I wanted a queuing system that met my needs, and was flexible enough to extend at needs.

The design goals stated for Rhino Queues were:

  • XCopy deployable
  • Zero configuration
  • Durable
  • Supports System.Transactions
  • Works well with Load Balancing hardware
  • Supports sub queues
  • Support arbitrarily large messages

It is now publically available, and I think it deserve some discussion.

Here is a very simple example of using it:

var queueManager = new QueueManager(new IPEndPoint(IPAddress.Loopback, 2200), "queues.esent");
queueManager.CreateQueues("Web");
var queue = queueManager.GetQueue("Web");

while(ContinueProcessing)
{
    using(var tx = new TransactionScope())
    {
        var msg = queue.Receive();
        Console.WriteLine("Message from {0}:", msg.Headers["source"]);
        Console.WriteLine(Encoding.Unicode.GetString(msg.Data));
        tx.Complete();
    }
}

This example is merely to show the API.

The actual software is pretty interesting. Communication between different queues are done using TCP, with a protocol that works well with load balancing hardware, making load balancing queued application as easy as balancing any HTTP based app.

You cannot see it in the API example, but the system is supporting System.Transactions fully, so sending a message is delayed until a transaction is committed, and receipt of a message would be rolled back on transaction rollback. We even support recovery after a hard crash, by plugging into the recovery mechanism that MSDTC offers us.

All messages are durable, so a system reboot will not remove them. While the default mode for Rhino Queues is an embedded component in your application (XCopy deployable), we still support the ability to deliver a message to a server that is down, by implementing a fairly flexible message retry mechanism.

Because Rhino Service Bus makes such a heavy use of this, we are also support sub queues, and we have no hard limit on the size of messages that we can deliver.

I delayed announcing this until I could finish integrating this completely into Rhino Service Bus, but this is now done, and should just work. Rhino Service Bus will still supports MSMQ, but I think that a lot of the work that we are going to do now on Rhino Service Bus would be with the new queuing infrastructure.

Understanding Code: Static vs Dynamic Dependencies

Patrick Smacchia (NDepend) has a good post about this topic. I have an issue with his assertion that "Static Structure is the Key":

The idea I would like to defend now is that when it comes to understand and maintain a program, one need to focus mostly on the static dependencies, the ones found in the source code. Knowing dynamic dependencies (who calls who at runtime) can make sense for example to profile performance or to fix some particular bugs. But most of the time invested to understand a program is generally spent in browsing static dependencies.

The problem is that this doesn't really hold for applications that were written with a container in place. Let us take Rhino Service Bus as an example of that. If you try to follow the static dependencies in the code, you will not be able to understand much. There aren't many.

If we will look at things like error handling, administration tasks or time outs, all of them are key parts of the way certain aspects of RSB behave, we will see that there is never a static reference to them. Instead, they are pulled in as a set of generic components that know how to integrate into the bus.

A significant part of the actual behavior in RSB is dynamically configured by RhinoServiceBusFacility, and even here, I felt that this class was getting too many responsibilities, so we broke that apart to more dynamically composed parts. The configuration syntax, for example, is driven by the facility and by the BusConfigurationAware components (of which we currently have three).

Focusing on the static dependencies in RSB wouldn't be very useful, not a lot is happening there.

More disposal subtleties and framework bugs that stalks me

This one was a real pain to figure out. Can you imagine what would be the result of this code?

   1: var queue = new MessageQueue(queuePath);
   2: queue.Dispose();
   3: var peekAsyncResult = queue.BeginPeek();
   4: peekAsyncResult.AsyncWaitHandle.WaitOne();

If you guessed that we would get ObjectDisposedException, you are sadly mistaken. If you guessed that this would lead to a deadlock, you won.

Figuring out the behavior in a multi threaded system where one thread was beginning to listen and another was disposing the queue and waiting for pending operations to complete is… not fun.

Update: For some strange reason, I am not able to reproduce the problem shown above. I know that I did before I posted this, but I posted it as one of the last things that I did that day. I think that this is somehow related to the actual queue used and whatever or not it has messages.

Rhino Service Bus: Concurrency Violations are Business Logic

Concurrency is a tough topic, fraught with problems, pitfalls and nasty issues. This is especially the case when you try to build distributed, inherently parallel systems. I am dealing with the topic quite a lot recently and I have create several solutions (none of them are originally mine, mind you).

There aren’t that many good solutions our there, most of them boil down to: “suck it up and deal with the complexity.” In this case, I want to try to deal with the complexity in a consistent fashion ( no one off solutions ) and in a way that I can deal without first meditating on the import of socks.

Let us see if I can come up with a good example. We have a saga that we use to check whatever a particular user has acceptable credit to buy something from us. The logic is that we need to verify with at least 2 credit card bureaus, and the average must be over 700. (This logic has nothing to do with the real world, since I just dreamt it up, by the way). Here is a simple implementation of a saga that can deal with those requirements:

   1: public class AccpetableCreditSaga : ISaga<AccpetableCreditState>,
   2:   InitiatedBy<IsAcceptableAsCustomer>,
   3:   Orchestrates<CreditCardScore>, 
   4:   Orchestrates<MergeSagaState>
   5: {
   6:   IServiceBus bus;
   7:   public bool IsCompleted {get;set;}
   8:   public Guid Id {get;set;}
   9:   
  10:   public AccpetableCreditSaga (IServiceBus bus)
  11:   {
  12:     this.bus = bus;
  13:   }
  14:   
  15:   public void Consume(IsAcceptableAsCustomer message)
  16:   {
  17:     bus.Send(
  18:       new Equifax.CheckCreditFor{Card = message.Card),
  19:       new Experian.CheckCreditFor{Card = message.Card),
  20:       new TransUnion.CheckCreditFor{Card = message.Card)
  21:       );
  22:   }
  23:   
  24:   public void Consume(CreditCardScore message)
  25:   {
  26:     State.Scores.Add(message);
  27:     
  28:     TryCompleteSaga();
  29:   }
  30:   
  31:   public void Consume(MergeSagaState message)
  32:   {
  33:     TryCompleteSaga();
  34:   }
  35:   
  36:   public void TryCompleteSaga()
  37:   {
  38:     if(State.Scores.Count <2)
  39:       return;
  40:      
  41:      bus.Publish(new CreditScoreAcceptable
  42:      {
  43:       CorrelationId = Id,
  44:       IsAcceptable = State.Scores.Average(x=>x.Score) > 700
  45:      });
  46:      IsCompleted = true;
  47:   }
  48: }

We have this strange MergeSagaState message, but other than that, it should be pretty obvious what is going on in here.It should be equally obvious that we have a serious problem here. Let us say that we get two reply messages with credit card scores, at the same time. We will create two instances of the saga that will run in parallel, each of them getting a copy of the saga’s state. But, the end result is that processing those messages doesn’t match the end condition for the saga. So even though in practice we have gotten all the messages we need, because we handled them in parallel, we had no chance to actually see both changes at the same time. This means that any logic that we have that requires us to have a full picture of what is going on isn’t going to work.

Rhino Service Bus solve the issue by putting the saga’s state into Rhino DHT. This means that a single saga may have several states at the same time. Merging them together is also something that the bus will take care off. Merging the different parts is inherently an issue that cannot be solved generically. There is no generic merge algorithm that you can use. Rhino Service Bus define an interface that will allow you to deal with this issue in a clean manner and supply whatever business logic is required to merge difference versions.

Here is an example of how we can merge the different versions together:

   1: public class AccpetableCreditStateMerger : ISagaStateMerger<AccpetableCreditState>
   2: {
   3:   public AccpetableCreditState Merge(AccpetableCreditState[] states)
   4:   {
   5:     return new AccpetableCreditState
   6:     {
   7:       SCores = states.SelectMany(x=>x.Scores)
   8:         .GroupBy(x=>x.Bureau)
   9:         .Select(x => new Score
  10:         {
  11:           Bureau = x.Key,
  12:           Score = x.Max(y=>y.Score)
  13:         }).ToList();
  14:     };
  15:   }
  16: }

Note that this is notepad code, so it may contain errors, but the actual intention should be clear. We accept an array of states that need to be merged, find the highest score from each bureau and return the merged state.

whenever Rhino Service Bus detects that the saga is in a conflicted state, it will post a MergeSagaState message to the saga. This will merge the saga’s state and call the Consume(MergeSagaState), in which the saga gets to decide what it wants to do about this (usually inspect the state to see if we missed anything). This also works for completing a saga, by the way, you cannot complete a saga in an inconsistent state, you will get called again with Consume(MergeSagaSate) to deal with that.

The state merger is also a good place to try to deal with concurrency compensating actions. If we notice in the merger that we perform some action twice and we need to revert one of them, for example. In general, it is better to be able to avoid having to do so, but that is the place for this logic.

Rhino Service Bus: Concurrency in a distributed world

I have talked about Rhino DHT at length, and the versioning story that exists there. What I haven’t talked about is why I built it. Or, to be rather more exact, the actual use case that I had in mind.

Jason Diamond had pointed out a problem with the way sagas work with Rhino Service Bus.

Are BaristaSaga objects instantiated per message? If so, can two different instances be consuming different messages concurrently?

The reason I ask is because it looks like handling the PrepareDrink message could take some time. Is it possible that a PaymentComplete message could come in before the PrepareDrink message is finished being handled?

If the two instances of BaristaSaga have their own instance of BaristaState, I can see the GotPayment value set by handling the PaymentComplete message getting lost.

If the two instances of BaristaSaga share the same instance of BaristaState, do I now have to worry about synchronizing changes to the state across all of the sagas? Also, wouldn't this prevent having multiple barista "servers" handling messages since they wouldn't be able to share instances across processes/machines.

The answer to that is that yes, a saga can execute concurrently. Not only that, but it can execute concurrently on different machines. That put us in somewhat of a problem regarding consistent state.

There are several options that we can use to resolve the issue. One of them is to ensure that this cannot happen by locking on a shared resource when executing the saga (commonly done by opening a transaction on the saga’s row). That can significantly limit the system scalability. Another option is to persist the saga’s state in a way that ensure that we have no conflicts. One way of doing that is to persist the actual state change itself, which allow us to replay the object to a consistent state. Concurrent updates don’t bother us because we aren’t actually modifying the data.

That might require some careful thinking, however, to avoid a case where a saga tat is concurrently executing step on its own feet without paying attention. I strongly dislike anything that require careful thinking. It is like saying that C++’s has no memory leaks issues, it just require some careful thinking.

For RSB, I wanted to be able to do better than that. I selected Rhino DHT at persistence store for the default saga’s state (you can still do other things, of course). That means that concurrency is very explicit. If you got to a point where there were two concurrently executing instances of the saga, their state is going to go to Rhino DHT. Since they are both going to be from the same version, Rhino DHT is going to keep both state changes around.

The next time that we need the state for that particular saga, we are actually going to get both states. At that point, we introduce the ISagaStateMerger:

   1: public interface ISagaStateMerger<TState>
   2:     where TState : IVersionedSagaState
   3: {
   4:     TState Merge(TState[] states);
   5: }

This allow us to handle the notion of concurrency resolution in a very explicit manner. We get the appropriate state merger from the container and use that to merge the states back to a consistent state, which we then pass to the saga to continue its execution.

There is just one additional twist. A saga cannot complete until it is in a consistent state, so if the saga completes while it is in an inconsistent state, we will call the saga again (after resolving the conflict) and let it handle the final state conflict before perform the actual completion.

Rhino Service Bus: Saga and State

In a messaging system, a saga orchestrate a set of messages. The main benefit of using a saga is that it allows us to manage the interaction in a stateful manner (easy to think and reason about) while actually working in a distributed and asynchronous environment.

In Rhino Service Bus, I built the notion of sagas from the beginning. And I initially went with the approach that mix the saga’s behavior and the saga state in the same class. That did not turn out so well. While it works for simple matters, anything of sufficient complexity started to bring issue. Mainly, it was an issue of managing dependencies and managing state. It is possible to get this worked out, but I decided to follow Udi’s footsteps and create an explicit separation between the two.

Here is how it works, we have the state class:

   1: public class BaristaState
   2: {
   3:     public bool DrinkIsReady { get; set; }
   4:  
   5:     public bool GotPayment { get; set; }
   6:  
   7:     public string Drink { get; set; }
   8: }

This is just a standard class, nothing special here. But here is the actual saga class. This contains the behavior for the saga, with the state being maintained in the state class.

   1: public class BaristaSaga :
   2:     ISaga<BaristaState>,
   3:     InitiatedBy<PrepareDrink>,
   4:     Orchestrates<PaymentComplete>
   5: {
   6:     private readonly IServiceBus bus;
   7:  
   8:     public BaristaState State { get; set; }
   9:  
  10:     public Guid Id { get; set; }
  11:  
  12:     public bool IsCompleted { get; set; }
  13:  
  14:     public BaristaSaga(IServiceBus bus)
  15:     {
  16:         this.bus = bus;
  17:         State = new BaristaState();
  18:     }
  19:  
  20:     public void Consume(PrepareDrink message)
  21:     {
  22:         State.Drink = message.DrinkName;
  23:  
  24:         for (int i = 0; i < 10; i++)
  25:         {
  26:             Console.WriteLine("Barista: preparing drink: " + drink);
  27:             Thread.Sleep(500);
  28:         }
  29:         State.DrinkIsReady = true;
  30:         SubmitOrderIfDone();
  31:     }
  32:  
  33:     public void Consume(PaymentComplete message)
  34:     {
  35:         Console.WriteLine("Barista: got payment notification");
  36:         State.GotPayment = true;
  37:         SubmitOrderIfDone();
  38:     }
  39:  
  40:     private void SubmitOrderIfDone()
  41:     {
  42:         if (State.GotPayment && State.DrinkIsReady)
  43:         {
  44:             Console.WriteLine("Barista: drink is ready");
  45:             bus.Publish(new DrinkReady
  46:             {
  47:                 CorrelationId = Id,
  48:                 Drink = State.Drink
  49:             });
  50:             IsCompleted = true;
  51:         }
  52:     }
  53: }

There are a few things to notice in here. The saga class is a standard component, we use DI to inject dependencies to the class so it can perform whatever it is that it wants. The state property is used to maintain the state of the saga between message invocations.

This results in a simpler design for some parts of the code, and I think that overall it is a very simple model to talk and reason about.

Rhino Service Bus: Field Level Security

One of the requirements that came up on my current project was the need to secure specific fields in a message during transit. I thought about it a while before I decided that this is something that should be made explicit in the message contract.

Here is an example from the tests:

   1: public class ClassWithSecretField
   2: {
   3:     public WireEcryptedString ShouldBeEncrypted
   4:     {
   5:         get; set;
   6:     }
   7: }

WireEncryptedString is a type that would be encrypted on the wire, as the name suggest.

And defining the keys in the configuration is done in this way:

   1: <facility id="rhino.esb" >
   2:   <bus threadCount="1"
   3:        numberOfRetries="5"
   4:        endpoint="msmq://localhost/test_queue2"
   5:          />
   6:   <messages>
   7:     <add name="Rhino.ServiceBus.Tests"
   8:          endpoint="msmq://localhost/test_queue"/>
   9:     <add name="Rhino.ServiceBus.Tests"
  10:          endpoint="msmq://localhost/test_queue2"/>
  11:   </messages>
  12:   <security>
  13:     <key>f/gdDWbDqHRvpqdRbTs3mxhGdZh9qCaDrasxJGXl+5s=</key>
  14:   </security>
  15: </facility>

On the wire, it has the following format:

   1: <?xml version='1.0' encoding='utf-8'?>
   2: <esb:messages 
   3:   xmlns:esb='http://servicebus.hibernatingrhinos.com/2008/12/20/esb' 
   4:   xmlns:tests.classwithsecretfield='Rhino.ServiceBus.Tests.When_Security_Is_Specified_In_Config+ClassWithSecretField, Rhino.ServiceBus.Tests'
   5:   xmlns:datastructures.wireecryptedstring='Rhino.ServiceBus.DataStructures.WireEcryptedString, Rhino.ServiceBus' xmlns:string='string'>
   6:   <tests.classwithsecretfield:ClassWithSecretField>
   7:     <datastructures.wireecryptedstring:ShouldBeEncrypted>
   8:       <string:Value iv='0yL9+t0uyDy9NeP7CU1Wow=='>q9a10IFuRxrzFoZewfdOyg==</string:Value>
   9:     </datastructures.wireecryptedstring:ShouldBeEncrypted>
  10:   </tests.classwithsecretfield:ClassWithSecretField>
  11: </esb:messages>

Following the Rhino Service Bus philosophy, it is quite a neat solution.

The actual encryption is doing using 256 bits key with Rijndael (AES). I considered other approaches, but all of them had quite a big overhead from manageability perspective.

There are some interesting implications for the implementation, that deserve some discussion. Let us assume that you send such a message to another end point.

If the endpoint…

  • has the same key as us, the message will be decrypted and everything works.
  • doesn’t have any security defined. At that point, the message will successfully deserialize. Any WireEncryptedString field will contain the encrypted value.
  • has a different key defined. Message serialization will fail.

Trying to send a message that contains WireEncryptedString will throw, we do not allow such an action.

And now you can tell me how many holes there are in my system :-)

System.Security.SecureString annoyances

SecureString has exactly one purpose. Take information from the user and pass it to unmanaged function. anything else that you would try to do with it seems to be incredibly hard to do.

I wanted to extend Rhino Service Bus so a message that contained SecureString members would be automatically encrypted on the wire. It seemed like a nice & easy option to provide field level security for messages. However, it doesn’t seem to be a viable option, because working with SecureString is such an awkward task. I have created a WireEncryptedString class and I’ll just use that instead. Grr….

Rhino Service Bus: Understanding a Distributed System

One of the major hurdles in distributes systems is trying to understand how they work. Different parts are running at different places and sometimes at different times. Standard debugging usually breaks down at this point, because no one has even invented a non sequential debugger that would make sense to humans.

We are left with trying to understand what is going on in the system based on a pretty old notion, the system logs. With Rhino Service Bus, this was one of the things that I really cared about, so I made this into a first class concept. And no, you don’t get to hunt through a 3GB text file. The idea is that each message (and message interaction) in the system can be captured.

The configuration for this is quite simple:

   1: <bus threadCount="1"
   2:          numberOfRetries="5"
   3:          logEndpoint="msmq://localhost/starbucks.log"
   4:          endpoint="msmq://localhost/starbucks.backend"
   5:          />

And once we have done that, we copy each message to the log queue. But it is not just the arrived messages. It is also when a message arrived, how long it took to process it, why it failed, etc.

Using this approach, you can build tools that listen to the log queue and display the information in ways that makes sense to humans. For example, we can create a flow of a saga or conversation, or start getting input about the time it takes to process certain messages or detect SLA violations.

Rhino Service Bus: The Starbucks example

Yesterday I finally completed the Starbucks sample for Rhino Service Bus. It is surprising to see how many aspects that little sample required.

There are several of highlights in the sample.

  • There are three actors in the application:
    • Barista
    • Cashier
    • Customer
  • There is zero setup necessary, Rhino Service Bus will create the queues if they don’t already exists. Again, the idea of reducing moving parts.
  • All three actors are running in the same process – but each is running in a different AppDomain.
    Note that this is a common deployment choice for development, but not one that I would use for production.
    The idea is that this make it significantly easier to debug & develop a distributed application.
  • There is very little configuration whatsoever. And a lot of conventions about how to figure out what consumers to use and how to build it.
  • The use of sagas & conversations is demoed. The entire buying process is a single conversation composed of several sagas.
  • The customer actor is showing how we can create instance & temporary subscriptions.

Rhino Service Bus: Locality and Independence

One of the main design goals for Rhino Service Bus was locality and independence of the end point. What do I mean by that?

Well, a Rhino ESB endpoint should have no dependency on any external service in order to operate. Examples of common external services are things like subscription services, timeout managers and error reporting. I already discussed how I handle errors, timeouts and subscriptions with Rhino Service Bus. So the question is why independence is such an important concept.

The answer is quite simple: reduce the number of moving parts. If I need to have a timeout service running to have my timeouts running, this is a problem. If subscriptions don’t work because I didn’t started the subscription service, that is a problem. Thos are problems because those things will happen, and figuring out what exactly is going on is hard, painful and not really recommended. Well, the first time you run into this, at least. As such, I sought to make sure that there would be only a single place that you would have to look in order to get things working.

There are other aspects of this as well, frankly. Less moving parts usually equal to less complexity, but the most important issue is that we have much better options for debugging.

The only place that you need to check for anything is the endpoint.

We currently don’t have tooling (because we rely of the default MSMQ ones to do a fairly adequate job for now), but a few spikes that I run has turned out some really nice tooling is quite simple, and the approach is the same for everything.

Rhino Service Bus: Managing Timeouts

I was first introduced to the notion of time as an explicit concept in Udi’s writing. The notion of being able to send a message in the future was, like most of the things that Udi talks about, simple in terms of implementation, but profound in conceptual terms.

Rhino Service Bus supports this explicitly by specifying a time in which a message will be received. The implementation for that is quite interesting, I think.

Again, we are making use of a subqueue to minimize the additional requirements that RSB has. So when we send a message to the RSB with a delay in it, we move it to the timeout subquque are record that fact internally. We have a timer going off every second that check for expiry of the messages and dispatch them when their time arrives. If the system crash at any point, all the timeouts are stored in the subqueue, and on startup, we are going to read the timeout messages and reconstruct our internal representation of them.

The API is quite simple:

   1: bus.DelaySend(bus.Endpoint, 
   2:     DateTime.Now.AddMilliseconds(250), 
   3:     msg);

We might need to make some changes to support extremely large queues and extremely long duration for delayed messages, but for now, it is doing quite well.

Errors are part of your experience

The NH Prof website is running on Rhino Service Bus. I decided that this is a great time to test Rhino Service Bus for real, and I built the website on top of it. Even the order processing is done on top of RSB.

But that isn't what I wanted to talk about today. I wanted to talk about errors, and how important they are. In this case, I screwed up when I build the error reporting capability for NH Prof, which means that if you tried to report an error back, you would get a nasty 404 and I would get nada as feedback.

The actual fault is not relevant, and it is fixed already, so hopefully you'll forgive me for having bugs in a beta product.

What I really wanted to talk about was how I fixed the issue. Remember, this is the production machine, and I have no debugger there. What I did was go to the MSMQ interface, go to the back end queue and take a peek in the errors sub queue, where I got to see the following:

image

The first message is the original one, the second is the error description for it. When I opened it, I could see that the error was "AyendeIsStupidException", which explained exactly what went wrong, and I was able to quickly resolve the issue by buying more IQ on eBay. I then moved the message back to the main queue, and got the email that I expected about the problem with NH Prof.

A few important points about this:

  • No debugging
  • No hunting through the logs
  • No custom tools. Rhino Service Bus Profiler doesn't exist (yet).
  • In your face, explicit and very quick troubleshooting experience.

Error handling was an explicit design goal when I set to build Rhino Service Bus, and I am happy to be able to say that so far, it is proving that it is working, and is definitely worth the time I spent in it.

Subscriptions: Mass Transit vs. NServiceBus vs. Rhino ServiceBus

For a pub / sub system, managing subscriptions could be classified as important. All three service buses has some notion, subscriptions are important. I find it interesting to trace the way that each of them is handling this scenario differently.

NServiceBus deal with this as a pluggable strategy. NServiceBus tend to do things that way :-)

The two options that come out of the box with it are database backed subscription storage or a queue backed subscription storage. Just to make things interesting, for queue backed subscription storage, you now have several deployment options at your hand (private subscription storage per endpoint, shared subscription queue, etc).

Mass Transit handle this by treating subscriptions as a service. Again, we have the option to chose between either a subscription storage based on distributed cache (memcached) or a local service running it. In both cases, we are talking about having a well known place to go looking for it.

I don't think that I like this very much, mainly because if you miss that, you get into some very strange situations. (Nothing works, and the first time that this happens, you don't know why).

For Rhino Service Bus, I choose to go with a private subscription storage. That is, each end point store its own subscription directly on the queue that it is using to accept messages. That means that I have only a single place to look for in order to understand how this endpoint is behaving. It also means that I literally have to do nothing in order to start getting messages flowing.