Oren Eini

CEO of RavenDB

a NoSQL Open Source Document Database

Get in touch with me:

oren@ravendb.net +972 52-548-6969

Posts: 7,640
|
Comments: 51,260
Privacy Policy · Terms
filter by tags archive
time to read 1 min | 192 words

Rhino Service Bus comes with two default implementations of saga persistence (OptimisticDistributedHashTableSagaSatePersister  & DistributedHashTableSagaSatePersister). Both of them rely on the Rhino DHT to actually handle the persistence, but they have quite a different behavior when you start looking at them.

The persistence mechanism is the same, but the difference between them is how they handle concurrency. OptimisticDistributedHashTableSagaSatePersister is very simple to reason about, it uses optimistic concurrency. If you have a concurrency conflict, it will throw and force re-evaluation of the message. You need to opt in to the OptimisticDistributedHashTableSagaSatePersister by extending the marker interface SupportsOptimisticConcurrency.

DistributedHashTableSagaSatePersister is a more complex concurrency solution, which will never lose a write, even if there is a conflict. This requires you to implement merge conflict resolution. It is significantly more complex, and is probably only worth it where you really care about writes always succeeding.

In order to use DistributedHashTableSagaSatePersister properly, your saga needs to implement Orchestrate<MergeSagaState>, that allow the saga to take action upon merges. In addition to that, you need to implement ISagaStateMerger for your particular saga state. This is where the logic for doing the merge is located.

time to read 3 min | 521 words

As I stated before, I started writing a queuing system a few days ago, loosely based on my previous efforts in this area, but aimed to create a production ready queuing infrastructure that I can use in my own applications. The decision to build this was not made lightly, but I wanted a queuing system that met my needs, and was flexible enough to extend at needs.

The design goals stated for Rhino Queues were:

  • XCopy deployable
  • Zero configuration
  • Durable
  • Supports System.Transactions
  • Works well with Load Balancing hardware
  • Supports sub queues
  • Support arbitrarily large messages

It is now publically available, and I think it deserve some discussion.

Here is a very simple example of using it:

var queueManager = new QueueManager(new IPEndPoint(IPAddress.Loopback, 2200), "queues.esent");
queueManager.CreateQueues("Web");
var queue = queueManager.GetQueue("Web");

while(ContinueProcessing)
{
    using(var tx = new TransactionScope())
    {
        var msg = queue.Receive();
        Console.WriteLine("Message from {0}:", msg.Headers["source"]);
        Console.WriteLine(Encoding.Unicode.GetString(msg.Data));
        tx.Complete();
    }
}

This example is merely to show the API.

The actual software is pretty interesting. Communication between different queues are done using TCP, with a protocol that works well with load balancing hardware, making load balancing queued application as easy as balancing any HTTP based app.

You cannot see it in the API example, but the system is supporting System.Transactions fully, so sending a message is delayed until a transaction is committed, and receipt of a message would be rolled back on transaction rollback. We even support recovery after a hard crash, by plugging into the recovery mechanism that MSDTC offers us.

All messages are durable, so a system reboot will not remove them. While the default mode for Rhino Queues is an embedded component in your application (XCopy deployable), we still support the ability to deliver a message to a server that is down, by implementing a fairly flexible message retry mechanism.

Because Rhino Service Bus makes such a heavy use of this, we are also support sub queues, and we have no hard limit on the size of messages that we can deliver.

I delayed announcing this until I could finish integrating this completely into Rhino Service Bus, but this is now done, and should just work. Rhino Service Bus will still supports MSMQ, but I think that a lot of the work that we are going to do now on Rhino Service Bus would be with the new queuing infrastructure.

time to read 2 min | 286 words

Patrick Smacchia (NDepend) has a good post about this topic. I have an issue with his assertion that "Static Structure is the Key":

The idea I would like to defend now is that when it comes to understand and maintain a program, one need to focus mostly on the static dependencies, the ones found in the source code. Knowing dynamic dependencies (who calls who at runtime) can make sense for example to profile performance or to fix some particular bugs. But most of the time invested to understand a program is generally spent in browsing static dependencies.

The problem is that this doesn't really hold for applications that were written with a container in place. Let us take Rhino Service Bus as an example of that. If you try to follow the static dependencies in the code, you will not be able to understand much. There aren't many.

If we will look at things like error handling, administration tasks or time outs, all of them are key parts of the way certain aspects of RSB behave, we will see that there is never a static reference to them. Instead, they are pulled in as a set of generic components that know how to integrate into the bus.

A significant part of the actual behavior in RSB is dynamically configured by RhinoServiceBusFacility, and even here, I felt that this class was getting too many responsibilities, so we broke that apart to more dynamically composed parts. The configuration syntax, for example, is driven by the facility and by the BusConfigurationAware components (of which we currently have three).

Focusing on the static dependencies in RSB wouldn't be very useful, not a lot is happening there.

time to read 2 min | 383 words

This one was a real pain to figure out. Can you imagine what would be the result of this code?

   1: var queue = new MessageQueue(queuePath);
   2: queue.Dispose();
   3: var peekAsyncResult = queue.BeginPeek();
   4: peekAsyncResult.AsyncWaitHandle.WaitOne();

If you guessed that we would get ObjectDisposedException, you are sadly mistaken. If you guessed that this would lead to a deadlock, you won.

Figuring out the behavior in a multi threaded system where one thread was beginning to listen and another was disposing the queue and waiting for pending operations to complete is… not fun.

Update: For some strange reason, I am not able to reproduce the problem shown above. I know that I did before I posted this, but I posted it as one of the last things that I did that day. I think that this is somehow related to the actual queue used and whatever or not it has messages.

time to read 21 min | 4061 words

Concurrency is a tough topic, fraught with problems, pitfalls and nasty issues. This is especially the case when you try to build distributed, inherently parallel systems. I am dealing with the topic quite a lot recently and I have create several solutions (none of them are originally mine, mind you).

There aren’t that many good solutions our there, most of them boil down to: “suck it up and deal with the complexity.” In this case, I want to try to deal with the complexity in a consistent fashion ( no one off solutions ) and in a way that I can deal without first meditating on the import of socks.

Let us see if I can come up with a good example. We have a saga that we use to check whatever a particular user has acceptable credit to buy something from us. The logic is that we need to verify with at least 2 credit card bureaus, and the average must be over 700. (This logic has nothing to do with the real world, since I just dreamt it up, by the way). Here is a simple implementation of a saga that can deal with those requirements:

   1: public class AccpetableCreditSaga : ISaga<AccpetableCreditState>,
   2:   InitiatedBy<IsAcceptableAsCustomer>,
   3:   Orchestrates<CreditCardScore>, 
   4:   Orchestrates<MergeSagaState>
   5: {
   6:   IServiceBus bus;
   7:   public bool IsCompleted {get;set;}
   8:   public Guid Id {get;set;}
   9:   
  10:   public AccpetableCreditSaga (IServiceBus bus)
  11:   {
  12:     this.bus = bus;
  13:   }
  14:   
  15:   public void Consume(IsAcceptableAsCustomer message)
  16:   {
  17:     bus.Send(
  18:       new Equifax.CheckCreditFor{Card = message.Card),
  19:       new Experian.CheckCreditFor{Card = message.Card),
  20:       new TransUnion.CheckCreditFor{Card = message.Card)
  21:       );
  22:   }
  23:   
  24:   public void Consume(CreditCardScore message)
  25:   {
  26:     State.Scores.Add(message);
  27:     
  28:     TryCompleteSaga();
  29:   }
  30:   
  31:   public void Consume(MergeSagaState message)
  32:   {
  33:     TryCompleteSaga();
  34:   }
  35:   
  36:   public void TryCompleteSaga()
  37:   {
  38:     if(State.Scores.Count <2)
  39:       return;
  40:      
  41:      bus.Publish(new CreditScoreAcceptable
  42:      {
  43:       CorrelationId = Id,
  44:       IsAcceptable = State.Scores.Average(x=>x.Score) > 700
  45:      });
  46:      IsCompleted = true;
  47:   }
  48: }

We have this strange MergeSagaState message, but other than that, it should be pretty obvious what is going on in here.It should be equally obvious that we have a serious problem here. Let us say that we get two reply messages with credit card scores, at the same time. We will create two instances of the saga that will run in parallel, each of them getting a copy of the saga’s state. But, the end result is that processing those messages doesn’t match the end condition for the saga. So even though in practice we have gotten all the messages we need, because we handled them in parallel, we had no chance to actually see both changes at the same time. This means that any logic that we have that requires us to have a full picture of what is going on isn’t going to work.

Rhino Service Bus solve the issue by putting the saga’s state into Rhino DHT. This means that a single saga may have several states at the same time. Merging them together is also something that the bus will take care off. Merging the different parts is inherently an issue that cannot be solved generically. There is no generic merge algorithm that you can use. Rhino Service Bus define an interface that will allow you to deal with this issue in a clean manner and supply whatever business logic is required to merge difference versions.

Here is an example of how we can merge the different versions together:

   1: public class AccpetableCreditStateMerger : ISagaStateMerger<AccpetableCreditState>
   2: {
   3:   public AccpetableCreditState Merge(AccpetableCreditState[] states)
   4:   {
   5:     return new AccpetableCreditState
   6:     {
   7:       SCores = states.SelectMany(x=>x.Scores)
   8:         .GroupBy(x=>x.Bureau)
   9:         .Select(x => new Score
  10:         {
  11:           Bureau = x.Key,
  12:           Score = x.Max(y=>y.Score)
  13:         }).ToList();
  14:     };
  15:   }
  16: }

Note that this is notepad code, so it may contain errors, but the actual intention should be clear. We accept an array of states that need to be merged, find the highest score from each bureau and return the merged state.

whenever Rhino Service Bus detects that the saga is in a conflicted state, it will post a MergeSagaState message to the saga. This will merge the saga’s state and call the Consume(MergeSagaState), in which the saga gets to decide what it wants to do about this (usually inspect the state to see if we missed anything). This also works for completing a saga, by the way, you cannot complete a saga in an inconsistent state, you will get called again with Consume(MergeSagaSate) to deal with that.

The state merger is also a good place to try to deal with concurrency compensating actions. If we notice in the merger that we perform some action twice and we need to revert one of them, for example. In general, it is better to be able to avoid having to do so, but that is the place for this logic.

time to read 5 min | 918 words

I have talked about Rhino DHT at length, and the versioning story that exists there. What I haven’t talked about is why I built it. Or, to be rather more exact, the actual use case that I had in mind.

Jason Diamond had pointed out a problem with the way sagas work with Rhino Service Bus.

Are BaristaSaga objects instantiated per message? If so, can two different instances be consuming different messages concurrently?

The reason I ask is because it looks like handling the PrepareDrink message could take some time. Is it possible that a PaymentComplete message could come in before the PrepareDrink message is finished being handled?

If the two instances of BaristaSaga have their own instance of BaristaState, I can see the GotPayment value set by handling the PaymentComplete message getting lost.

If the two instances of BaristaSaga share the same instance of BaristaState, do I now have to worry about synchronizing changes to the state across all of the sagas? Also, wouldn't this prevent having multiple barista "servers" handling messages since they wouldn't be able to share instances across processes/machines.

The answer to that is that yes, a saga can execute concurrently. Not only that, but it can execute concurrently on different machines. That put us in somewhat of a problem regarding consistent state.

There are several options that we can use to resolve the issue. One of them is to ensure that this cannot happen by locking on a shared resource when executing the saga (commonly done by opening a transaction on the saga’s row). That can significantly limit the system scalability. Another option is to persist the saga’s state in a way that ensure that we have no conflicts. One way of doing that is to persist the actual state change itself, which allow us to replay the object to a consistent state. Concurrent updates don’t bother us because we aren’t actually modifying the data.

That might require some careful thinking, however, to avoid a case where a saga tat is concurrently executing step on its own feet without paying attention. I strongly dislike anything that require careful thinking. It is like saying that C++’s has no memory leaks issues, it just require some careful thinking.

For RSB, I wanted to be able to do better than that. I selected Rhino DHT at persistence store for the default saga’s state (you can still do other things, of course). That means that concurrency is very explicit. If you got to a point where there were two concurrently executing instances of the saga, their state is going to go to Rhino DHT. Since they are both going to be from the same version, Rhino DHT is going to keep both state changes around.

The next time that we need the state for that particular saga, we are actually going to get both states. At that point, we introduce the ISagaStateMerger:

   1: public interface ISagaStateMerger<TState>
   2:     where TState : IVersionedSagaState
   3: {
   4:     TState Merge(TState[] states);
   5: }

This allow us to handle the notion of concurrency resolution in a very explicit manner. We get the appropriate state merger from the container and use that to merge the states back to a consistent state, which we then pass to the saga to continue its execution.

There is just one additional twist. A saga cannot complete until it is in a consistent state, so if the saga completes while it is in an inconsistent state, we will call the saga again (after resolving the conflict) and let it handle the final state conflict before perform the actual completion.

time to read 19 min | 3714 words

In a messaging system, a saga orchestrate a set of messages. The main benefit of using a saga is that it allows us to manage the interaction in a stateful manner (easy to think and reason about) while actually working in a distributed and asynchronous environment.

In Rhino Service Bus, I built the notion of sagas from the beginning. And I initially went with the approach that mix the saga’s behavior and the saga state in the same class. That did not turn out so well. While it works for simple matters, anything of sufficient complexity started to bring issue. Mainly, it was an issue of managing dependencies and managing state. It is possible to get this worked out, but I decided to follow Udi’s footsteps and create an explicit separation between the two.

Here is how it works, we have the state class:

   1: public class BaristaState
   2: {
   3:     public bool DrinkIsReady { get; set; }
   4:  
   5:     public bool GotPayment { get; set; }
   6:  
   7:     public string Drink { get; set; }
   8: }

This is just a standard class, nothing special here. But here is the actual saga class. This contains the behavior for the saga, with the state being maintained in the state class.

   1: public class BaristaSaga :
   2:     ISaga<BaristaState>,
   3:     InitiatedBy<PrepareDrink>,
   4:     Orchestrates<PaymentComplete>
   5: {
   6:     private readonly IServiceBus bus;
   7:  
   8:     public BaristaState State { get; set; }
   9:  
  10:     public Guid Id { get; set; }
  11:  
  12:     public bool IsCompleted { get; set; }
  13:  
  14:     public BaristaSaga(IServiceBus bus)
  15:     {
  16:         this.bus = bus;
  17:         State = new BaristaState();
  18:     }
  19:  
  20:     public void Consume(PrepareDrink message)
  21:     {
  22:         State.Drink = message.DrinkName;
  23:  
  24:         for (int i = 0; i < 10; i++)
  25:         {
  26:             Console.WriteLine("Barista: preparing drink: " + drink);
  27:             Thread.Sleep(500);
  28:         }
  29:         State.DrinkIsReady = true;
  30:         SubmitOrderIfDone();
  31:     }
  32:  
  33:     public void Consume(PaymentComplete message)
  34:     {
  35:         Console.WriteLine("Barista: got payment notification");
  36:         State.GotPayment = true;
  37:         SubmitOrderIfDone();
  38:     }
  39:  
  40:     private void SubmitOrderIfDone()
  41:     {
  42:         if (State.GotPayment && State.DrinkIsReady)
  43:         {
  44:             Console.WriteLine("Barista: drink is ready");
  45:             bus.Publish(new DrinkReady
  46:             {
  47:                 CorrelationId = Id,
  48:                 Drink = State.Drink
  49:             });
  50:             IsCompleted = true;
  51:         }
  52:     }
  53: }

There are a few things to notice in here. The saga class is a standard component, we use DI to inject dependencies to the class so it can perform whatever it is that it wants. The state property is used to maintain the state of the saga between message invocations.

This results in a simpler design for some parts of the code, and I think that overall it is a very simple model to talk and reason about.

time to read 12 min | 2264 words

One of the requirements that came up on my current project was the need to secure specific fields in a message during transit. I thought about it a while before I decided that this is something that should be made explicit in the message contract.

Here is an example from the tests:

   1: public class ClassWithSecretField
   2: {
   3:     public WireEcryptedString ShouldBeEncrypted
   4:     {
   5:         get; set;
   6:     }
   7: }

WireEncryptedString is a type that would be encrypted on the wire, as the name suggest.

And defining the keys in the configuration is done in this way:

   1: <facility id="rhino.esb" >
   2:   <bus threadCount="1"
   3:        numberOfRetries="5"
   4:        endpoint="msmq://localhost/test_queue2"
   5:          />
   6:   <messages>
   7:     <add name="Rhino.ServiceBus.Tests"
   8:          endpoint="msmq://localhost/test_queue"/>
   9:     <add name="Rhino.ServiceBus.Tests"
  10:          endpoint="msmq://localhost/test_queue2"/>
  11:   </messages>
  12:   <security>
  13:     <key>f/gdDWbDqHRvpqdRbTs3mxhGdZh9qCaDrasxJGXl+5s=</key>
  14:   </security>
  15: </facility>

On the wire, it has the following format:

   1: <?xml version='1.0' encoding='utf-8'?>
   2: <esb:messages 
   3:   xmlns:esb='http://servicebus.hibernatingrhinos.com/2008/12/20/esb' 
   4:   xmlns:tests.classwithsecretfield='Rhino.ServiceBus.Tests.When_Security_Is_Specified_In_Config+ClassWithSecretField, Rhino.ServiceBus.Tests'
   5:   xmlns:datastructures.wireecryptedstring='Rhino.ServiceBus.DataStructures.WireEcryptedString, Rhino.ServiceBus' xmlns:string='string'>
   6:   <tests.classwithsecretfield:ClassWithSecretField>
   7:     <datastructures.wireecryptedstring:ShouldBeEncrypted>
   8:       <string:Value iv='0yL9+t0uyDy9NeP7CU1Wow=='>q9a10IFuRxrzFoZewfdOyg==</string:Value>
   9:     </datastructures.wireecryptedstring:ShouldBeEncrypted>
  10:   </tests.classwithsecretfield:ClassWithSecretField>
  11: </esb:messages>

Following the Rhino Service Bus philosophy, it is quite a neat solution.

The actual encryption is doing using 256 bits key with Rijndael (AES). I considered other approaches, but all of them had quite a big overhead from manageability perspective.

There are some interesting implications for the implementation, that deserve some discussion. Let us assume that you send such a message to another end point.

If the endpoint…

  • has the same key as us, the message will be decrypted and everything works.
  • doesn’t have any security defined. At that point, the message will successfully deserialize. Any WireEncryptedString field will contain the encrypted value.
  • has a different key defined. Message serialization will fail.

Trying to send a message that contains WireEncryptedString will throw, we do not allow such an action.

And now you can tell me how many holes there are in my system :-)

time to read 1 min | 101 words

SecureString has exactly one purpose. Take information from the user and pass it to unmanaged function. anything else that you would try to do with it seems to be incredibly hard to do.

I wanted to extend Rhino Service Bus so a message that contained SecureString members would be automatically encrypted on the wire. It seemed like a nice & easy option to provide field level security for messages. However, it doesn’t seem to be a viable option, because working with SecureString is such an awkward task. I have created a WireEncryptedString class and I’ll just use that instead. Grr….

time to read 3 min | 575 words

One of the major hurdles in distributes systems is trying to understand how they work. Different parts are running at different places and sometimes at different times. Standard debugging usually breaks down at this point, because no one has even invented a non sequential debugger that would make sense to humans.

We are left with trying to understand what is going on in the system based on a pretty old notion, the system logs. With Rhino Service Bus, this was one of the things that I really cared about, so I made this into a first class concept. And no, you don’t get to hunt through a 3GB text file. The idea is that each message (and message interaction) in the system can be captured.

The configuration for this is quite simple:

   1: <bus threadCount="1"
   2:          numberOfRetries="5"
   3:          logEndpoint="msmq://localhost/starbucks.log"
   4:          endpoint="msmq://localhost/starbucks.backend"
   5:          />

And once we have done that, we copy each message to the log queue. But it is not just the arrived messages. It is also when a message arrived, how long it took to process it, why it failed, etc.

Using this approach, you can build tools that listen to the log queue and display the information in ways that makes sense to humans. For example, we can create a flow of a saga or conversation, or start getting input about the time it takes to process certain messages or detect SLA violations.

FUTURE POSTS

No future posts left, oh my!

RECENT SERIES

  1. API Design (10):
    29 Jan 2026 - Don't try to guess
  2. Recording (20):
    05 Dec 2025 - Build AI that understands your business
  3. Webinar (8):
    16 Sep 2025 - Building AI Agents in RavenDB
  4. RavenDB 7.1 (7):
    11 Jul 2025 - The Gen AI release
  5. Production postmorterm (2):
    11 Jun 2025 - The rookie server's untimely promotion
View all series

Syndication

Main feed ... ...
Comments feed   ... ...