How to implement an asynchronous queue to run a method in Symfony 3

First of all, some basic information about my project: I have a website created using Symfony 3. For some tasks, I am thinking of an implementation for running asynchronous PHP methods. Some events are time consuming, but their results need not be obvious.

For example: in the method newOrder, I have the addUserLTV function, which takes several steps. The customer does not need to wait for the completion of all steps, only to immediately receive confirmation after the main operation - "newOrder" will add addUserLTVto the queue and immediately show confirmation (completed launch). Queue tasks will be executed when the server has time to do this.

public function addUserLTV( $userID, $addLTV )
{ //same code
}

How to do it? Is this possible in symphony 3?

+4
source share
1 answer

This is something you can easily do with a queue package . A few words about why you should choose:

  • It supports many transports from the simplest (file system) to corporate (RabbitMQ or Amazon SQS).
  • It comes with a very powerful bundle .
  • It has a top-level abstraction that can be used with maximum ease.
  • There are many more that may come in handy.

Regarding your question. Here you can do this with a queue package. Follow the installation instructions from the document .

addUserLTV :

<?php
namespace Acme;

use Enqueue\Client\ProducerInterface;

class AddUserLTVService
{
    /**
     * @var ProducerInterface
     */
    private $producer;

    /**
     * @param ProducerInterface $producer
     */
    public function __construct(ProducerInterface $producer)
    {
        $this->producer = $producer;
    }

    public function addUserLTV( $userID, $addLTV )
    {
        $this->producer->sendCommand('add_user_ltv', [
            'userId' => $userID, 
            'ltv' => $addLTV]
        );
    }
}

( , ). Symfony:

services:
    Acme\AddUserLTVService: 
        arguments: ['@enqueue.producer']

. , :

<?php
namespace Acme;

use Enqueue\Client\CommandSubscriberInterface;
use Enqueue\Psr\PsrContext;
use Enqueue\Psr\PsrMessage;
use Enqueue\Psr\PsrProcessor;
use Enqueue\Util\JSON;

class AddUserTVAProcessor implements PsrProcessor, CommandSubscriberInterface
{
    public function process(PsrMessage $message, PsrContext $context)
    {
        $data = JSON::decode($message->getBody());

        $userID = $data['userID'];
        $addLTV = $data['ltv'];

        // do job

        return self::ACK;
    }

    public static function getSubscribedCommand()
    {
        return 'add_user_ltv';
    }
}

enqueue.client.processor:

services:
    Acme\AddUserTVAProcessor: 
        tags:
            - {name: 'enqueue.client.processor'}

. , :

./bin/console enqueue:consume --setup-broker -vvv
+7

Source: https://habr.com/ru/post/1680229/


All Articles