Rhino Service BusConcurrency Violations are Business Logic

time to read 21 min | 4061 words

Concurrency is a tough topic, fraught with problems, pitfalls and nasty issues. This is especially the case when you try to build distributed, inherently parallel systems. I am dealing with the topic quite a lot recently and I have create several solutions (none of them are originally mine, mind you).

There aren’t that many good solutions our there, most of them boil down to: “suck it up and deal with the complexity.” In this case, I want to try to deal with the complexity in a consistent fashion ( no one off solutions ) and in a way that I can deal without first meditating on the import of socks.

Let us see if I can come up with a good example. We have a saga that we use to check whatever a particular user has acceptable credit to buy something from us. The logic is that we need to verify with at least 2 credit card bureaus, and the average must be over 700. (This logic has nothing to do with the real world, since I just dreamt it up, by the way). Here is a simple implementation of a saga that can deal with those requirements:

   1: public class AccpetableCreditSaga : ISaga<AccpetableCreditState>,
   2:   InitiatedBy<IsAcceptableAsCustomer>,
   3:   Orchestrates<CreditCardScore>, 
   4:   Orchestrates<MergeSagaState>
   5: {
   6:   IServiceBus bus;
   7:   public bool IsCompleted {get;set;}
   8:   public Guid Id {get;set;}
   9:   
  10:   public AccpetableCreditSaga (IServiceBus bus)
  11:   {
  12:     this.bus = bus;
  13:   }
  14:   
  15:   public void Consume(IsAcceptableAsCustomer message)
  16:   {
  17:     bus.Send(
  18:       new Equifax.CheckCreditFor{Card = message.Card),
  19:       new Experian.CheckCreditFor{Card = message.Card),
  20:       new TransUnion.CheckCreditFor{Card = message.Card)
  21:       );
  22:   }
  23:   
  24:   public void Consume(CreditCardScore message)
  25:   {
  26:     State.Scores.Add(message);
  27:     
  28:     TryCompleteSaga();
  29:   }
  30:   
  31:   public void Consume(MergeSagaState message)
  32:   {
  33:     TryCompleteSaga();
  34:   }
  35:   
  36:   public void TryCompleteSaga()
  37:   {
  38:     if(State.Scores.Count <2)
  39:       return;
  40:      
  41:      bus.Publish(new CreditScoreAcceptable
  42:      {
  43:       CorrelationId = Id,
  44:       IsAcceptable = State.Scores.Average(x=>x.Score) > 700
  45:      });
  46:      IsCompleted = true;
  47:   }
  48: }

We have this strange MergeSagaState message, but other than that, it should be pretty obvious what is going on in here.It should be equally obvious that we have a serious problem here. Let us say that we get two reply messages with credit card scores, at the same time. We will create two instances of the saga that will run in parallel, each of them getting a copy of the saga’s state. But, the end result is that processing those messages doesn’t match the end condition for the saga. So even though in practice we have gotten all the messages we need, because we handled them in parallel, we had no chance to actually see both changes at the same time. This means that any logic that we have that requires us to have a full picture of what is going on isn’t going to work.

Rhino Service Bus solve the issue by putting the saga’s state into Rhino DHT. This means that a single saga may have several states at the same time. Merging them together is also something that the bus will take care off. Merging the different parts is inherently an issue that cannot be solved generically. There is no generic merge algorithm that you can use. Rhino Service Bus define an interface that will allow you to deal with this issue in a clean manner and supply whatever business logic is required to merge difference versions.

Here is an example of how we can merge the different versions together:

   1: public class AccpetableCreditStateMerger : ISagaStateMerger<AccpetableCreditState>
   2: {
   3:   public AccpetableCreditState Merge(AccpetableCreditState[] states)
   4:   {
   5:     return new AccpetableCreditState
   6:     {
   7:       SCores = states.SelectMany(x=>x.Scores)
   8:         .GroupBy(x=>x.Bureau)
   9:         .Select(x => new Score
  10:         {
  11:           Bureau = x.Key,
  12:           Score = x.Max(y=>y.Score)
  13:         }).ToList();
  14:     };
  15:   }
  16: }

Note that this is notepad code, so it may contain errors, but the actual intention should be clear. We accept an array of states that need to be merged, find the highest score from each bureau and return the merged state.

whenever Rhino Service Bus detects that the saga is in a conflicted state, it will post a MergeSagaState message to the saga. This will merge the saga’s state and call the Consume(MergeSagaState), in which the saga gets to decide what it wants to do about this (usually inspect the state to see if we missed anything). This also works for completing a saga, by the way, you cannot complete a saga in an inconsistent state, you will get called again with Consume(MergeSagaSate) to deal with that.

The state merger is also a good place to try to deal with concurrency compensating actions. If we notice in the merger that we perform some action twice and we need to revert one of them, for example. In general, it is better to be able to avoid having to do so, but that is the place for this logic.

More posts in "Rhino Service Bus" series:

  1. (08 Aug 2009) DHT Saga Sate Persisters Options
  2. (21 Jan 2009) Concurrency Violations are Business Logic
  3. (19 Jan 2009) Concurrency in a distributed world
  4. (16 Jan 2009) Saga and State
  5. (15 Jan 2009) Field Level Security
  6. (14 Jan 2009) Understanding a Distributed System
  7. (14 Jan 2009) The Starbucks example
  8. (14 Jan 2009) Locality and Independence
  9. (14 Jan 2009) Managing Timeouts