Rhino Service BusConcurrency Violations are Business Logic
Concurrency is a tough topic, fraught with problems, pitfalls and nasty issues. This is especially the case when you try to build distributed, inherently parallel systems. I am dealing with the topic quite a lot recently and I have create several solutions (none of them are originally mine, mind you).
There aren’t that many good solutions our there, most of them boil down to: “suck it up and deal with the complexity.” In this case, I want to try to deal with the complexity in a consistent fashion ( no one off solutions ) and in a way that I can deal without first meditating on the import of socks.
Let us see if I can come up with a good example. We have a saga that we use to check whatever a particular user has acceptable credit to buy something from us. The logic is that we need to verify with at least 2 credit card bureaus, and the average must be over 700. (This logic has nothing to do with the real world, since I just dreamt it up, by the way). Here is a simple implementation of a saga that can deal with those requirements:
1: public class AccpetableCreditSaga : ISaga<AccpetableCreditState>,2: InitiatedBy<IsAcceptableAsCustomer>,
3: Orchestrates<CreditCardScore>,
4: Orchestrates<MergeSagaState>
5: {
6: IServiceBus bus;
7: public bool IsCompleted {get;set;}8: public Guid Id {get;set;}9:
10: public AccpetableCreditSaga (IServiceBus bus)11: {
12: this.bus = bus;13: }
14:
15: public void Consume(IsAcceptableAsCustomer message)16: {
17: bus.Send(
18: new Equifax.CheckCreditFor{Card = message.Card),19: new Experian.CheckCreditFor{Card = message.Card),20: new TransUnion.CheckCreditFor{Card = message.Card)21: );
22: }
23:
24: public void Consume(CreditCardScore message)25: {
26: State.Scores.Add(message);
27:
28: TryCompleteSaga();
29: }
30:
31: public void Consume(MergeSagaState message)32: {
33: TryCompleteSaga();
34: }
35:
36: public void TryCompleteSaga()37: {
38: if(State.Scores.Count <2)39: return;40:
41: bus.Publish(new CreditScoreAcceptable42: {
43: CorrelationId = Id,
44: IsAcceptable = State.Scores.Average(x=>x.Score) > 700
45: });
46: IsCompleted = true;47: }
48: }
We have this strange MergeSagaState message, but other than that, it should be pretty obvious what is going on in here.It should be equally obvious that we have a serious problem here. Let us say that we get two reply messages with credit card scores, at the same time. We will create two instances of the saga that will run in parallel, each of them getting a copy of the saga’s state. But, the end result is that processing those messages doesn’t match the end condition for the saga. So even though in practice we have gotten all the messages we need, because we handled them in parallel, we had no chance to actually see both changes at the same time. This means that any logic that we have that requires us to have a full picture of what is going on isn’t going to work.
Rhino Service Bus solve the issue by putting the saga’s state into Rhino DHT. This means that a single saga may have several states at the same time. Merging them together is also something that the bus will take care off. Merging the different parts is inherently an issue that cannot be solved generically. There is no generic merge algorithm that you can use. Rhino Service Bus define an interface that will allow you to deal with this issue in a clean manner and supply whatever business logic is required to merge difference versions.
Here is an example of how we can merge the different versions together:
1: public class AccpetableCreditStateMerger : ISagaStateMerger<AccpetableCreditState>2: {
3: public AccpetableCreditState Merge(AccpetableCreditState[] states)4: {
5: return new AccpetableCreditState6: {
7: SCores = states.SelectMany(x=>x.Scores)
8: .GroupBy(x=>x.Bureau)
9: .Select(x => new Score10: {
11: Bureau = x.Key,
12: Score = x.Max(y=>y.Score)
13: }).ToList();
14: };
15: }
16: }
Note that this is notepad code, so it may contain errors, but the actual intention should be clear. We accept an array of states that need to be merged, find the highest score from each bureau and return the merged state.
whenever Rhino Service Bus detects that the saga is in a conflicted state, it will post a MergeSagaState message to the saga. This will merge the saga’s state and call the Consume(MergeSagaState), in which the saga gets to decide what it wants to do about this (usually inspect the state to see if we missed anything). This also works for completing a saga, by the way, you cannot complete a saga in an inconsistent state, you will get called again with Consume(MergeSagaSate) to deal with that.
The state merger is also a good place to try to deal with concurrency compensating actions. If we notice in the merger that we perform some action twice and we need to revert one of them, for example. In general, it is better to be able to avoid having to do so, but that is the place for this logic.
More posts in "Rhino Service Bus" series:
- (08 Aug 2009) DHT Saga Sate Persisters Options
- (21 Jan 2009) Concurrency Violations are Business Logic
- (19 Jan 2009) Concurrency in a distributed world
- (16 Jan 2009) Saga and State
- (15 Jan 2009) Field Level Security
- (14 Jan 2009) Understanding a Distributed System
- (14 Jan 2009) The Starbucks example
- (14 Jan 2009) Locality and Independence
- (14 Jan 2009) Managing Timeouts
Comments
My recollection of another implementation from the NSB group is for the Saga State to be retrieved and updated transactionally. First simultaneous message updates the state, TryCompleteSaga returns, second simultaneous message updates the state, and fails because of a "stale state" (excuse the pun) exception, so the message is put back in the queue and processes successfully on the second try.
This assumes some sort of timestamp/version on the state and something like NH, but removes the dependency on DHT and state merges.
I have no implementation experience but I would presume the DHT and state merging would enable higher throughput than the transaction state.
If this is dealing with complexity, I don't want to know how you deal with simplicity :)
I've got one question: suppose this is real system and you're checking the credit of millions of users (I assume we're dealing with millions - maybe it's Amazon.com?). Do you really need to keep multiple versions of a saga? Usually you'll have sagas related to different customers, so if you kept a single version of each saga and locked it properly to prevent concurrent updates it wouldn't have a negative impact on overall system performance. Or would it?
<nitpicking
You have misaligned brackets in:
new Equifax.CheckCreditFor{Card = message.Card),
etc.
Rafal,
Are you saying that this is complex or not?
And I would have a saga per customer yes, but locking is a very expensive operation.
If I lock I am holding a thread captive. I don't have that many threads.
And that is leaving aside the problem of trying to lock in a distributed env.
I said it was notepad code.
I was joking, but anyway, its complex, especially when you start considering some real world cases. What I was trying to say was that when you have millions of different sagas you get the parallelism from the fact that they are independent objects, not independent versions of the same object. And thus probability that two threads will be modifying the same object is low, so we can use pessimistic locking without taking too much risk.
BTW, do you have to group by bureau name when merging? Is it possible that one bureau sends two scores?
Rafal,
The problem with pessimistic locking is that it doesn't scale very well.
If I have 10 machines, and each machines can handle 8 threads, I have 80 process handling threads.
Pessimistic locking take this thread out for a while, even in the case where we have no contention.
Using this approach, I am able to avoid any locking scenarios.
That is leaving aside the issue of this actually happening in the real world.
If we take the amazon sample, it is pretty common for me to browse in several tabs at the same time, and order at the same time or nearly so.
I think I understand what you want to achieve: a 100% distributed environment with independent nodes processing incoming messages and with no centralized elements. It works nicely when messages are 'additive', but merging becomes too complex when messages are 'exclusive' (logic depends on processing order). I'm sure that with careful design, your approach would work very well for many scenarios, like workflow engines or billing data flows.
BTW, what is your approach to distributing incoming messages between processing nodes?
Rafal,
The distribution is based on this model:
ayende.com/.../NServiceBus-Distributor-Review.aspx
I like the idea behind Rhino DHT, but what do you do if 2nd and 3rd score messages arrive simultaneously? Both instances of the saga will send the "accepted" or "not accepted" message (depending on the average scores in both instances).
You say that the saga can not complete in an inconsistent state, but what do you do with messages (possibly with opposite results) that already sent?
That is why you have compensating actions.
Some things are easier to avoid rather than to compensate their consequences. Imagine you've sent two messages: "customer accepted" and "customer not accepted". Somebody might have reacted already.
A possible solution might be to delay delivery of outgoing messages from a saga until the saga is persisted. And if it's inconsistent at that point, just throw the delayed messages away and give the saga an opportunity to send the right message. What do you think?
Sergey, if saga persistence is transactional and sending messages is transactional, you can wrap everything in a distributed transaction and enjoy consistent behavior.
Rafal, the joy of distributed transactions was the original reason to use DHT and versioning for sagas, afaik.
Comment preview