Ayende @ Rahien

Refunds available at head office

Distributed transactions with remote queuing system

I got into an interesting discussion about that two days ago, so I thought that I would put down some of my thoughts about the topic.

Just to put some context into operation here, I am NOT talking about solving the generic problem of DTC on top of remote queuing system. I am talking specifically about sharing a transaction between a remote queuing system and a database. This is important if we want to enable “pull msg from queue, update db” scenarios. Without those, you are running into danger zones of processing a message twice.

We will assume that the remote queuing system operations on the Dequeue/Timeout/Ack model. In which you dequeue a message from the queue, and then you have a timeout to acknowledge its processing before it is put back into the queue. We will also assume a database that supports transactions.

Basically, what we need to do is to keep a record of all the messages we processed. We do that by storing that record in the database, where it is subject to the same transactional rules as the rest of our data. We would need a table similar to this:

CREATE TABLE Infrastructure.ProcessedMessages
(
   MsgId uniqueidentifier primary key,
   Timestamp DateTime not null
)

Given that, we can handle messages using the following code:

using(var tx = con.BeginTransaction())
{
    var msg = queue.Dequeue();
    try
    {
        InsertToProcessedMessages(msg);
    }
    catch(DuplicatePrimaryKey)
    {
        queue.Ack(msg);
        tx.Rollback();
        return;
    }

    // process the msg using this transaction context
    tx.Commit();
queue.Ack(msg);
}

Because we can rely on the database to ensure transactional consistency, we can be sure that:

  • we will only process a message once
  • an error in processing a message will result in removing the process record from the database

We will probably need a daily job or something similar to clear out the table, but that would be about it.

Thoughts?

Comments

Joseph Daigle
10/31/2010 03:21 PM by
Joseph Daigle

What about a transactional remote queue like SQL Server Service Broker? In this case you receive a message inside a transaction, the message is dequeued, and if the transaction is rolled back for any reason the message is re-queued.

The guidance is that your Service Broker database and application transaction database should be separate. Therefore you would employ the DTC to get transactional consistency between your application database and your remote queue.

There's no need to keep track of which messages were processed as it will never be handled more than once (such that data is written to the application database).

Ayende Rahien
10/31/2010 04:29 PM by
Ayende Rahien

Josehph,

SQL Server Service Broker is a local queue, from a conceptual model.

The problem is that it is very likely that Send / Receive to the queue will fail if it is stored on remote machine.

It mostly depends on the deployment mode.

Alex Yakunin
10/31/2010 04:48 PM by
Alex Yakunin

"catch (DuplicatePrimaryKey)" does not ensures all errors are processed. E.g. deadlock exception won't be handled by this code. As result, Ack won't be invoked at all.

Another case is when tx.Commit() throws an exception. Generally, by the same reason; also, some DBs may check constraints with delay, so actually a wide range of exceptions is possible there, and everything depends on a particular case. If an exception is thrown by tx.Commit(), Ack won't be invoked again.

Ayende Rahien
10/31/2010 04:51 PM by
Ayende Rahien

Alex,

You can't GET a deadlock from a simple insert when the only things is an insert of a PK.

Alex Yakunin
10/31/2010 04:53 PM by
Alex Yakunin

Ups, initially I didn't fully understand the logic of this code, so ignore my previous comment.

IMO, it has just one lack, that will lead to an issue in this case:

  • SQL Server actually commits the transaction as result of tx.Commit() call

  • But something (e.g. network issue) prevent SQL client from returning correct completion code of this operation.

In this case your code will re-process the message, that actually was already processed.

Alex Yakunin
10/31/2010 05:02 PM by
Alex Yakunin

You can't GET a deadlock from a simple insert when the only things is an insert of a PK.

First of all, there is no any exact code, so I assumed there is generally anything.

I any case, I won't ASSUME there is just an INSERT even if I'd know this precisely. That's simply a bad practice: the DB-related code based on implication there can be just one kind of exception, and no deadlocks \ version conflicts might lead to completely unexpected issues in future, since the author maintaining InsertToProcessedMessages might not be aware of these implications of its callers .

Ayende Rahien
10/31/2010 05:46 PM by
Ayende Rahien

Alex,

If that is the case, on the next try to read the message, you will hit the DupPK error and ack it then.

Ayende Rahien
10/31/2010 05:47 PM by
Ayende Rahien

Alex,

You can pretty much assume that InsertToProcessedMessages translate to the direct ADO code to call INSERT INTO ProcessedMessages

There is generally no option for deadlock on insert and there are certainly no version conflicts for pure inserts.

Alex Yakunin
10/31/2010 07:25 PM by
Alex Yakunin

Oren, I know you like to argue, so let's leave the talk about best practices.

If your goal is to safely import the external queue to RDBMS as-is, the code seems almost absolutely safe. Btw, you don't need manual transactions at all to achieve this, because if entry isn't imported, it's always safe to re-import it.

Why "almost"? Well, because there is a cleanup process that removes imported && processed entries from DB. So there is a tiny chance that entry will be imported and removed, but Ack isn't sent for it (e.g. because queue was dead for enough long time). But practically, this is hardly possible.

Ayende Rahien
11/01/2010 07:04 AM by
Ayende Rahien

Alex,

The point isn't to import the queue.

The point is to process the message just once in a successful transaction

Bill
11/01/2010 09:54 AM by
Bill

I think the code seems to work. The down side is you have to store some state (processed messages) specific to this problem on the client side but i can't think of a better way of doing it without dtc.

Maninder Batth
11/03/2010 04:06 PM by
Maninder Batth

How about the following :-

try {

 1. Dequeue


    try {

               2. Process Message

        } catch (Exception ) {

               3. Message error, ACK and do not queue it back

               4. Internal system error, retry or compensate.

        }

        try {

            5. Commit message processing

            } catch (Exception) {

              1. Error while commiting, retry or compensate.

            }


   } catch (Exception )

 {

    //Error while Dequeue, retry or compensate.

 }


try {

    ACK

    } catch (Exception )

    {

    // perhaps retry ACK or compensate.

    }

Question :- why do you need table for persisting fully processed messages ?

Ayende Rahien
11/03/2010 04:27 PM by
Ayende Rahien

Maninder,

Because I don't want to consume the message on error.

I want to put it back on the queue and let the queue poison msg handling to take care of that.

This way, I don't lose messages

Comments have been closed on this topic.