Transactions with MSMQ and Azure Service Bus

If you read my previous post, A Tale of Two Queues you’ll remember that we’re implementing a system to de-queue messages from an on-premise MSMQ and route these in to a Windows Azure service Bus. We need to make sure that the interface between the MSMQ and the Service bus has transactional integrity – i.e. that if we take a message off the MSMQ we don’t loose it before it gets placed on the service bus. Fortunately .Net gives us a framework which we can use to roll the receive from MSMQ and send to the service bus in to a single transaction

System.Transactions

Introduced with .Net 2.0, System.Transactions allows a developer to “create and participate in a transaction (local or distributed) with one or multiple participants”.  System.Transaction supports transactions initiated in SQL Server, ADO.NET, MSMQ, the Microsoft Distributed Transaction Coordinator, and Windows Azure Service Bus. It can be extended to support other providers too.

Multiple Participants – sounds just like what we need doesn’t it?

Our main processing thread peeks messages from the MSMQ asynchronously. We’ve created an event handler which will get called when a new message is available on our queue, and we call BeginPeek():

mq.PeekCompleted += new PeekCompletedEventHandler(OnPeekCompleted);
mq.BeginPeek();

We’re using Peek rather than read here in order that we have the ability to wrap our read from the queue inside a transaction – if we use BeginRead then the message has effectively been read from the queue when our event handler code is called.

Our event handler needs to do the following:

  1. Start a transaction
  2. Read the message from MSMQ
  3. Put the message on the Service Bus
  4. Commit the transaction

We use System.Transactions.TransactionScope to provide our transactional functionality, one thing to note is that we must open our connection to the queue inside the scope otherwise we won’t be able to rollback.

private static void OnPeekCompleted(Object sender,PeekCompletedEventArgs e)
{
    using (var scope = new TransactionScope())
    {
        try
        {
            using (MessageQueue transactionQueue = new MessageQueue(queuePath, QueueAccessMode.SendAndReceive)) 
            {
                transactionQueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(int) });
 
                Message msg = transactionQueue.Receive(MessageQueueTransactionType.Automatic);
 
                BrokeredMessage transmitMessage = new BrokeredMessage(msg.Body);
                QueueClient qc = QueueClient.CreateFromConnectionString(sbConnection, sbQueue);
 
                qc.Send(transmitMessage);
                // Display the message information on the screen.
                Console.WriteLine("Message body: {0}", (int)msg.Body);
                scope.Complete();
            }
        }
        catch (System.Exception ex)
        {
            Console.WriteLine("Excpetion: {0}", ex.ToString()); 
        }
 
    }
    mq.BeginPeek();
}

Also noteworthy is that the transaction type needs to be specified when we receive our message from the queue, in this case we require Automatic in order that we can rollback our transaction.

Rolling Back

If anything happens in our code which would cause an exception to be thrown, in the example above we’re concerned that we are not going to be able to connect to our service bus in Azure, the Complete method never gets called on our transaction. When the transaction gets disposed (at the end of our using statement) all pending transactions which were created in our handler are rolled back.

In our case this means that we can read a message of our MSMQ, and if we can’t forward it for whatever reason, it remains on the queue for processing in the future.

Leave a Reply