Azure: How to transfer messages from the poison queue back to the main queue?

I am wondering if there is a tool or library that can move messages between queues? I'm currently doing something like below

public static void ProcessQueueMessage([QueueTrigger("myqueue-poison")] string message, TextWriter log)
{
    CloudStorageAccount storageAccount = CloudStorageAccount.Parse(connString);
    CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();
    CloudQueue queue = queueClient.GetQueueReference("myqueue");
    queue.CreateIfNotExists();

    var messageData = JsonConvert.SerializeObject(data, new JsonSerializerSettings { ContractResolver = new CamelCasePropertyNamesContractResolver() });
    queue.AddMessage(new CloudQueueMessage(messageData));
}
+12
source share
5 answers

In essence, Azure Storage does not support moving messages from one queue to another. You will need to do it yourself.

One way to implement moving messages from one queue to another is to delete messages from the original queue (by calling GetMessages), read the contents of the message, and create a new message in the target queue. You can do this with the Storage Client Library.

, , Cerebrata Azure Management Studio. .

(2018-09-11) 1.4.1 Microsoft Azure .

+6

(2018-09-11) 1.4.1 Microsoft Azure Azure .

, - . , , - !

NuGet Microsoft.NET.Sdk.Functions:

using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Queue;

void Main()
{
    const string queuename = "MyQueueName";

    string storageAccountString = "xxxxxx";

    RetryPoisonMesssages(storageAccountString, queuename);
}

private static int RetryPoisonMesssages(string storageAccountString, string queuename)
{
    CloudQueue targetqueue = GetCloudQueueRef(storageAccountString, queuename);
    CloudQueue poisonqueue = GetCloudQueueRef(storageAccountString, queuename + "-poison");

    int count = 0;
    while (true)
    {
        var msg = poisonqueue.GetMessage();
        if (msg == null)
            break;

        poisonqueue.DeleteMessage(msg);
        targetqueue.AddMessage(msg);
        count++;
    }

    return count;
}

private static CloudQueue GetCloudQueueRef(string storageAccountString, string queuename)
{
    CloudStorageAccount storageAccount = CloudStorageAccount.Parse(storageAccountString);
    CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();
    CloudQueue queue = queueClient.GetQueueReference(queuename);

    return queue;
}
+4

Python . azure-storage-queue

queueService = QueueService(connection_string = "YOUR CONNECTION STRING")
for queue in queueService.list_queues():
  if "poison" in queue.name:
    print(queue.name)
    targetQueueName = queue.name.replace("-poison", "")
    while queueService.peek_messages(queue.name):
      for message in queueService.get_messages(queue.name, 32):
        print(".", end="", flush=True)
        queueService.put_message(targetQueueName, message.content)
        queueService.delete_message(queue.name, message.id, message.pop_receipt)
0

, , Node- @MitchWheats Azure.

import AzureStorage from 'azure-storage'
import { Context, HttpRequest } from '@azure/functions'
import util from 'util'

const queueService = AzureStorage.createQueueService()
queueService.messageEncoder = new AzureStorage.QueueMessageEncoder.TextBase64QueueMessageEncoder()

const deleteMessage = util.promisify(queueService.deleteMessage).bind(queueService)
const createMessage = util.promisify(queueService.createMessage).bind(queueService)
const getMessage = util.promisify(queueService.getMessage).bind(queueService)

export async function run (context: Context, req: HttpRequest): Promise<void> {
  try {
    const poisonQueue = (req.query.queue || (req.body && req.body.queue));
    const targetQueue = poisonQueue.split('-')[0]

    let count = 0

    while (true) {
      const message = await getMessage(poisonQueue)
      if (!message) { break; }
      if (message.messageText && message.messageId && message.popReceipt) {
        await createMessage(targetQueue, message.messageText)
        await deleteMessage(poisonQueue, message.messageId, message.popReceipt)
      }
      count++
    }

    context.res = {
      body: 'Replayed ${count} messages from ${poisonQueue} on ${targetQueue}'
    };
  } catch (e) {
    context.res = { status: 500 }
  }
}

, , . . AZURE_STORAGE_ACCOUNT AZURE_STORAGE_ACCESS_KEY, AZURE_STORAGE_CONNECTION_STRING. Azure Storage SDK.

0

, Microsoft.Azure.Storage.Queue. .NET, Program.cs :

using Microsoft.Azure.Storage;
using Microsoft.Azure.Storage.Queue;
using System.Threading.Tasks;

namespace PoisonMessageDequeuer
{
    class Program
    {
        static async Task Main(string[] args)
        {
            const string queuename = "MyQueueName";

            string storageAccountString = "xxx";

            await RetryPoisonMesssages(storageAccountString, queuename);
        }

        private static async Task<int> RetryPoisonMesssages(string storageAccountString, string queuename)
        {
            var targetqueue = GetCloudQueueRef(storageAccountString, queuename);
            var poisonqueue = GetCloudQueueRef(storageAccountString, queuename + "-poison");

            var count = 0;
            while (true)
            {
                var msg = await poisonqueue.GetMessageAsync();
                if (msg == null)
                    break;

                await poisonqueue.DeleteMessageAsync(msg);
                await targetqueue.AddMessageAsync(msg);
                count++;
            }

            return count;
        }

        private static CloudQueue GetCloudQueueRef(string storageAccountString, string queuename)
        {
            var storageAccount = CloudStorageAccount.Parse(storageAccountString);
            var queueClient = storageAccount.CreateCloudQueueClient();
            var queue = queueClient.GetQueueReference(queuename);

            return queue;
        }
    }
}

It's still pretty slow if you work with> 1000 messages, so I recommend that you study the batch APIs for large numbers.

0
source

Source: https://habr.com/ru/post/1612530/


All Articles