I used Symfony2 with RabbitMqBundle to create a worker who submits documents to ElasticSearch. Indexing documents at a single bid is much slower than using the ElasticSearch API. So I created a buffer that flushes documents in ES in groups of thousands. The code looks (slightly simplified) as follows:
class SearchIndexator { protected $elasticaService; protected $buffer = []; protected $bufferSize = 0; // The maximum number of documents to keep in the buffer. // If the buffer reaches this amount of documents, then the buffers content // is send to elasticsearch for indexation. const MAX_BUFFER_SIZE = 1000; public function __construct(ElasticaService $elasticaService) { $this->elasticaService = $elasticaService; } /** * Destructor * * Flush any documents that remain in the buffer. */ public function __destruct() { $this->flush(); } /** * Add a document to the indexation buffer. */ public function onMessage(array $document) { // Prepare the document for indexation. $this->doHeavyWeightStuff($document); // Create an Elastica document $document = new \Elastica\Document( $document['key'], $document ); // Add the document to the buffer. $this->buffer[] = $document; // Flush the buffer when max buffersize has been reached. if (self::MAX_BUFFER_SIZE <= ++$this->bufferSize) { $this->flush(); } } /** * Send the current buffer to ElasticSearch for indexation. */ public function flush() { // Send documents to ElasticSearch for indexation. if (1 <= $this->bufferSize) { $this->elasticaService->addDocuments($this->buffer); } // Clear buffer $this->buffer = []; $this->bufferSize = 0; } }
All this works well, but there is a small problem. The queue is populated with messages at an unpredictable rate. Sometimes 100,000 in 5 minutes, sometimes not one in a few hours. If, for example, 82,671 documents are queued, the last 671 documents are not indexed until another 329 documents are received, which can take several hours. I would like to be able to do the following:
Warning: Sci-Fi Code! This clearly won't work:
class SearchIndexator { protected $elasticaService; protected $buffer = []; protected $bufferSize = 0; protected $flushTimer; // The maximum number of documents to keep in the buffer. // If the buffer reaches this amount of documents, then the buffers content // is send to elasticsearch for indexation. const MAX_BUFFER_SIZE = 1000; public function __construct(ElasticaService $elasticaService) { $this->elasticaService = $elasticaService; // Highly Sci-fi code $this->flushTimer = new Timer(); // Flush buffer after 5 minutes of inactivity. $this->flushTimer->setTimeout(5 * 60); $this->flushTimer->setCallback([$this, 'flush']); } /** * Destructor * * Flush any documents that remain in the buffer. */ public function __destruct() { $this->flush(); } /** * Add a document to the indexation buffer. */ public function onMessage(array $document) { // Prepare the document for indexation. $this->doHeavyWeightStuff($document); // Create an Elastica document $document = new \Elastica\Document( $document['key'], $document ); // Add the document to the buffer. $this->buffer[] = $document; // Flush the buffer when max buffersize has been reached. if (self::MAX_BUFFER_SIZE <= ++$this->bufferSize) { $this->flush(); } else { // Start a timer that will flush the buffer after a timeout. $this->initTimer(); } } /** * Send the current buffer to ElasticSearch for indexation. */ public function flush() { // Send documents to ElasticSearch for indexation. if (1 <= $this->bufferSize) { $this->elasticaService->addDocuments($this->buffer); } // Clear buffer $this->buffer = []; $this->bufferSize = 0; // There are no longer messages to be send, stop the timer. $this->flushTimer->stop(); } protected function initTimer() { // Start or restart timer $this->flushTimer->isRunning() ? $this->flushTimer->reset() : $this->flushTimer->start(); } }
Now I know about the non-event related PHP limitations. But this is 2015, and there are solutions like ReactPHP, so should this be possible? For รMQ this function . What will be the solution that will work for RabbitMQ or regardless of the expansion of the message queue?
Solutions I am skeptical about:
- crysalead / code . It simulates a timer using
declare(ticks = 1); . I am not sure if this is a strong and reliable approach. Any ideas? - I can run a cronjob that posts the message โFLUSHโ in one queue every 5 minutes, and then explicitly flushes the buffer when this message is received, but it will be spoofing.
Xatoo source share