Running a function in a PHP CLI script after a period of inactivity

I used Symfony2 with RabbitMqBundle to create a worker who submits documents to ElasticSearch. Indexing documents at a single bid is much slower than using the ElasticSearch API. So I created a buffer that flushes documents in ES in groups of thousands. The code looks (slightly simplified) as follows:

class SearchIndexator { protected $elasticaService; protected $buffer = []; protected $bufferSize = 0; // The maximum number of documents to keep in the buffer. // If the buffer reaches this amount of documents, then the buffers content // is send to elasticsearch for indexation. const MAX_BUFFER_SIZE = 1000; public function __construct(ElasticaService $elasticaService) { $this->elasticaService = $elasticaService; } /** * Destructor * * Flush any documents that remain in the buffer. */ public function __destruct() { $this->flush(); } /** * Add a document to the indexation buffer. */ public function onMessage(array $document) { // Prepare the document for indexation. $this->doHeavyWeightStuff($document); // Create an Elastica document $document = new \Elastica\Document( $document['key'], $document ); // Add the document to the buffer. $this->buffer[] = $document; // Flush the buffer when max buffersize has been reached. if (self::MAX_BUFFER_SIZE <= ++$this->bufferSize) { $this->flush(); } } /** * Send the current buffer to ElasticSearch for indexation. */ public function flush() { // Send documents to ElasticSearch for indexation. if (1 <= $this->bufferSize) { $this->elasticaService->addDocuments($this->buffer); } // Clear buffer $this->buffer = []; $this->bufferSize = 0; } } 

All this works well, but there is a small problem. The queue is populated with messages at an unpredictable rate. Sometimes 100,000 in 5 minutes, sometimes not one in a few hours. If, for example, 82,671 documents are queued, the last 671 documents are not indexed until another 329 documents are received, which can take several hours. I would like to be able to do the following:

Warning: Sci-Fi Code! This clearly won't work:

 class SearchIndexator { protected $elasticaService; protected $buffer = []; protected $bufferSize = 0; protected $flushTimer; // The maximum number of documents to keep in the buffer. // If the buffer reaches this amount of documents, then the buffers content // is send to elasticsearch for indexation. const MAX_BUFFER_SIZE = 1000; public function __construct(ElasticaService $elasticaService) { $this->elasticaService = $elasticaService; // Highly Sci-fi code $this->flushTimer = new Timer(); // Flush buffer after 5 minutes of inactivity. $this->flushTimer->setTimeout(5 * 60); $this->flushTimer->setCallback([$this, 'flush']); } /** * Destructor * * Flush any documents that remain in the buffer. */ public function __destruct() { $this->flush(); } /** * Add a document to the indexation buffer. */ public function onMessage(array $document) { // Prepare the document for indexation. $this->doHeavyWeightStuff($document); // Create an Elastica document $document = new \Elastica\Document( $document['key'], $document ); // Add the document to the buffer. $this->buffer[] = $document; // Flush the buffer when max buffersize has been reached. if (self::MAX_BUFFER_SIZE <= ++$this->bufferSize) { $this->flush(); } else { // Start a timer that will flush the buffer after a timeout. $this->initTimer(); } } /** * Send the current buffer to ElasticSearch for indexation. */ public function flush() { // Send documents to ElasticSearch for indexation. if (1 <= $this->bufferSize) { $this->elasticaService->addDocuments($this->buffer); } // Clear buffer $this->buffer = []; $this->bufferSize = 0; // There are no longer messages to be send, stop the timer. $this->flushTimer->stop(); } protected function initTimer() { // Start or restart timer $this->flushTimer->isRunning() ? $this->flushTimer->reset() : $this->flushTimer->start(); } } 

Now I know about the non-event related PHP limitations. But this is 2015, and there are solutions like ReactPHP, so should this be possible? For ร˜MQ this function . What will be the solution that will work for RabbitMQ or regardless of the expansion of the message queue?

Solutions I am skeptical about:

  • crysalead / code . It simulates a timer using declare(ticks = 1); . I am not sure if this is a strong and reliable approach. Any ideas?
  • I can run a cronjob that posts the message โ€œFLUSHโ€ in one queue every 5 minutes, and then explicitly flushes the buffer when this message is received, but it will be spoofing.
+5
source share
1 answer

As I mentioned in my comment, you can use signals. PHP allows you to register signal handlers for your script signals (i.e. SIGINT, SIGKILL, etc.).

For your use case, you can use the SIGALRM signal. This signal will signal your script after a certain time (which you can set) has expired. The positive side of these signals is that they are not blocked. In other words, the normal operation of your script will not interfere.

Corrected solution (ticks are deprecated since PHP 5.3):

 function signal_handler($signal) { // You would flush here print "Caught SIGALRM\n"; // Set the SIGALRM timer again or it won't trigger again pcntl_alarm(300); } // register your handler with the SIGALRM signal pcntl_signal(SIGALRM, "signal_handler", true); // set the timeout for the SIGALRM signal to 300 seconds pcntl_alarm(300); // start loop and check for pending signals while(pcntl_signal_dispatch() && your_loop_condition) { //Execute your code here } 

Note: you can use only one SIGALRM signal in a script, if you set the time of your signal using pcntl_alarm , the timer for your alarm will be reset (without turning on the signal) to its new set value.

0
source

Source: https://habr.com/ru/post/1238101/


All Articles