Ayende @ Rahien

Hi!
My name is Oren Eini
Founder of Hibernating Rhinos LTD and RavenDB.
You can reach me by phone or email:

ayende@ayende.com

+972 52-548-6969

, @ Q c

Posts: 5,953 | Comments: 44,410

filter by tags archive

Distributed transactions with remote queuing system


I got into an interesting discussion about that two days ago, so I thought that I would put down some of my thoughts about the topic.

Just to put some context into operation here, I am NOT talking about solving the generic problem of DTC on top of remote queuing system. I am talking specifically about sharing a transaction between a remote queuing system and a database. This is important if we want to enable “pull msg from queue, update db” scenarios. Without those, you are running into danger zones of processing a message twice.

We will assume that the remote queuing system operations on the Dequeue/Timeout/Ack model. In which you dequeue a message from the queue, and then you have a timeout to acknowledge its processing before it is put back into the queue. We will also assume a database that supports transactions.

Basically, what we need to do is to keep a record of all the messages we processed. We do that by storing that record in the database, where it is subject to the same transactional rules as the rest of our data. We would need a table similar to this:

CREATE TABLE Infrastructure.ProcessedMessages
(
   MsgId uniqueidentifier primary key,
   Timestamp DateTime not null
)

Given that, we can handle messages using the following code:

using(var tx = con.BeginTransaction())
{
    var msg = queue.Dequeue();
    try
    {
        InsertToProcessedMessages(msg);
    }
    catch(DuplicatePrimaryKey)
    {
        queue.Ack(msg);
        tx.Rollback();
        return;
    }

    // process the msg using this transaction context
    tx.Commit();
queue.Ack(msg);
}

Because we can rely on the database to ensure transactional consistency, we can be sure that:

  • we will only process a message once
  • an error in processing a message will result in removing the process record from the database

We will probably need a daily job or something similar to clear out the table, but that would be about it.

Thoughts?


Comments

Joseph Daigle

What about a transactional remote queue like SQL Server Service Broker? In this case you receive a message inside a transaction, the message is dequeued, and if the transaction is rolled back for any reason the message is re-queued.

The guidance is that your Service Broker database and application transaction database should be separate. Therefore you would employ the DTC to get transactional consistency between your application database and your remote queue.

There's no need to keep track of which messages were processed as it will never be handled more than once (such that data is written to the application database).

Ayende Rahien

Josehph,

SQL Server Service Broker is a local queue, from a conceptual model.

The problem is that it is very likely that Send / Receive to the queue will fail if it is stored on remote machine.

It mostly depends on the deployment mode.

Alex Yakunin

"catch (DuplicatePrimaryKey)" does not ensures all errors are processed. E.g. deadlock exception won't be handled by this code. As result, Ack won't be invoked at all.

Another case is when tx.Commit() throws an exception. Generally, by the same reason; also, some DBs may check constraints with delay, so actually a wide range of exceptions is possible there, and everything depends on a particular case. If an exception is thrown by tx.Commit(), Ack won't be invoked again.

Ayende Rahien

Alex,

You can't GET a deadlock from a simple insert when the only things is an insert of a PK.

Alex Yakunin

Ups, initially I didn't fully understand the logic of this code, so ignore my previous comment.

IMO, it has just one lack, that will lead to an issue in this case:

  • SQL Server actually commits the transaction as result of tx.Commit() call

  • But something (e.g. network issue) prevent SQL client from returning correct completion code of this operation.

In this case your code will re-process the message, that actually was already processed.

Alex Yakunin

You can't GET a deadlock from a simple insert when the only things is an insert of a PK.

First of all, there is no any exact code, so I assumed there is generally anything.

I any case, I won't ASSUME there is just an INSERT even if I'd know this precisely. That's simply a bad practice: the DB-related code based on implication there can be just one kind of exception, and no deadlocks \ version conflicts might lead to completely unexpected issues in future, since the author maintaining InsertToProcessedMessages might not be aware of these implications of its callers .

Ayende Rahien

Alex,

If that is the case, on the next try to read the message, you will hit the DupPK error and ack it then.

Ayende Rahien

Alex,

You can pretty much assume that InsertToProcessedMessages translate to the direct ADO code to call INSERT INTO ProcessedMessages

There is generally no option for deadlock on insert and there are certainly no version conflicts for pure inserts.

Alex Yakunin

Oren, I know you like to argue, so let's leave the talk about best practices.

If your goal is to safely import the external queue to RDBMS as-is, the code seems almost absolutely safe. Btw, you don't need manual transactions at all to achieve this, because if entry isn't imported, it's always safe to re-import it.

Why "almost"? Well, because there is a cleanup process that removes imported && processed entries from DB. So there is a tiny chance that entry will be imported and removed, but Ack isn't sent for it (e.g. because queue was dead for enough long time). But practically, this is hardly possible.

Ayende Rahien

Alex,

The point isn't to import the queue.

The point is to process the message just once in a successful transaction

Bill

I think the code seems to work. The down side is you have to store some state (processed messages) specific to this problem on the client side but i can't think of a better way of doing it without dtc.

Maninder Batth

How about the following :-

try {

 1. Dequeue


    try {

               2. Process Message

        } catch (Exception ) {

               3. Message error, ACK and do not queue it back

               4. Internal system error, retry or compensate.

        }

        try {

            5. Commit message processing

            } catch (Exception) {

              1. Error while commiting, retry or compensate.

            }


   } catch (Exception )

 {

    //Error while Dequeue, retry or compensate.

 }


try {

    ACK

    } catch (Exception )

    {

    // perhaps retry ACK or compensate.

    }

Question :- why do you need table for persisting fully processed messages ?

Ayende Rahien

Maninder,

Because I don't want to consume the message on error.

I want to put it back on the queue and let the queue poison msg handling to take care of that.

This way, I don't lose messages

Comment preview

Comments have been closed on this topic.

FUTURE POSTS

No future posts left, oh my!

RECENT SERIES

  1. The RavenDB Comic Strip (3):
    28 May 2015 - Part III – High availability & sleeping soundly
  2. Special Offer (2):
    27 May 2015 - 29% discount for all our products
  3. RavenDB Sharding (3):
    22 May 2015 - Adding a new shard to an existing cluster, splitting the shard
  4. Challenge (45):
    28 Apr 2015 - What is the meaning of this change?
  5. Interview question (2):
    30 Mar 2015 - fix the index
View all series

Syndication

Main feed Feed Stats
Comments feed   Comments Feed Stats