How to put Amazon FIFO SQS in the Laravel queue?

Amazon has announced its new FIFO SQS service , and I would like to use it in the Laravel queue to solve some concurrency issues.

I created several new queues and changed the configurations. However, I got an error MissingParametersaying

The request must contain the parameter MessageGroupId.

So I changed the file vendor/laravel/framework/src/Illuminate/Queue/SqsQueue.php

public function pushRaw($payload, $queue = null, array $options = [])
{
    $response = $this->sqs->sendMessage(['QueueUrl' => $this->getQueue($queue), 'MessageBody' => $payload,
        'MessageGroupId' => env('APP_ENV', getenv('APP_ENV'))]);

    return $response->get('MessageId');
}

public function later($delay, $job, $data = '', $queue = null)
{
    $payload = $this->createPayload($job, $data);

    $delay = $this->getSeconds($delay);

    return $this->sqs->sendMessage([
        'QueueUrl' => $this->getQueue($queue), 'MessageBody' => $payload, 'DelaySeconds' => $delay,
        'MessageGroupId' => env('APP_ENV', getenv('APP_ENV'))
    ])->get('MessageId');
}

I use APP_ENVgroups as an identifier (this is a separate message queue, so it really doesn't matter much. I just want everything to be FIFO).

But I still get the same error message. How could I fix this? Any help would be appreciated.

(by the way, where sendMessagedid the SDK determine ? I can find a stub for it, but I did not find a detailed implementation)

+10
3

, , , SqsQueue.php , reset composer install composer update. Illuminate\Queue\Connectors\ConnectorInterface SQS FIFO, Laravel.

:

  • SqsFifoQueue, Illuminate\Queue\SqsQueue, SQS FIFO.
  • SqsFifoConnector, Illuminate\Queue\Connectors\SqsConnector, , SqsFifoQueue.
  • SqsFifoServiceProvider, SqsFifoConnector Laravel.
  • SqsFifoServiceProvider config/app.php.
  • config/queue.php, SQS FIFO.

:

  • SqsFifoQueue, Illuminate\Queue\SqsQueue, SQS FIFO.

    <?php
    
    class SqsFifoQueue extends \Illuminate\Queue\SqsQueue
    {
        public function pushRaw($payload, $queue = null, array $options = [])
        {
            $response = $this->sqs->sendMessage([
                'QueueUrl' => $this->getQueue($queue),
                'MessageBody' => $payload,
                'MessageGroupId' => uniqid(),
                'MessageDeduplicationId' => uniqid(),
            ]);
    
            return $response->get('MessageId');
        }
    }
    
  • SqsFifoConnector, Illuminate\Queue\Connectors\SqsConnector, , SqsFifoQueue.

    <?php
    
    use Aws\Sqs\SqsClient;
    use Illuminate\Support\Arr;
    
    class SqsFifoConnector extends \Illuminate\Queue\Connectors\SqsConnector
    {
        public function connect(array $config)
        {
            $config = $this->getDefaultConfiguration($config);
    
            if ($config['key'] && $config['secret']) {
                $config['credentials'] = Arr::only($config, ['key', 'secret']);
            }
    
            return new SqsFifoQueue(
                new SqsClient($config), $config['queue'], Arr::get($config, 'prefix', '')
            );
        }
    }
    
  • SqsFifoServiceProvider, SqsFifoConnector Laravel.

    <?php
    
    class SqsFifoServiceProvider extends \Illuminate\Support\ServiceProvider
    {
        public function register()
        {
            $this->app->afterResolving('queue', function ($manager) {
                $manager->addConnector('sqsfifo', function () {
                    return new SqsFifoConnector;
                });
            });
        }
    }
    
  • SqsFifoServiceProvider config/app.php.

    <?php
    
    return [
        'providers'     => [
            ...
            SqsFifoServiceProvider::class,
        ],
    ];
    
  • config/queue.php, SQS FIFO.

    <?php
    
    return [
    
        'default' => 'sqsfifo',
    
        'connections' => [
            'sqsfifo' => [
                'driver' => 'sqsfifo',
                'key'    => 'my_key'
                'secret' => 'my_secret',
                'queue'  => 'my_queue_url',
                'region' => 'my_sqs_region',
            ],
        ],
    ];
    

SQS FIFO.

: , laravel-sqs-fifo, https://github.com/maqe/laravel-sqs-fifo.

+18

MessageGroupId, MessageDeduplicationId .

0

FIFO , AWS SQS.

FIFO.

, .

https://packagist.org/packages/shiftonelabs/laravel-sqs-fifo-queue

queue.php

'sqs-fifo' => [
            'driver' => 'sqs-fifo',
            'key' => env('SQS_KEY'),
            'secret' => env('SQS_SECRET'),
            'prefix' => env('SQS_PREFIX'),
            'queue' => env('SQS_QUEUE'),
            'region' => env('SQS_REGION'),
            'group' => 'default',
            'deduplicator' => 'unique',
        ],

dispatch(new TestJob([]))->onQueue('My_Mail_Queue.fifo');

NB: , , .env

SQS_QUEUE=My_Mail_Queue.fifo,My_Message_Queue.fifo,My_Default_queue.fifo
0

Source: https://habr.com/ru/post/1661533/


All Articles