I have a script that checks which MX record belongs to an email address. I have 300 thousand emails to check. Thus, a single thread process will take a long time.
I have a beanstalkd with a queue, and php sends messages to it through a file. However, I only get one worker to execute the queue. Currently, I canβt say that 10% of the workers are for the process.
I ran do_job_mx.php, which then opens a file that simply contains emails and sends them to the queue.
The PHP code for receiving email messages from a file and entering the queue is do_job_mx.php:
require_once('pheanstalk_init.php'); $pheanstalk = new Pheanstalk_Pheanstalk('127.0.0.1:11300'); $filename = '_blank.txt'; $filename = dirname(__FILE__) . '/in/' . $filename; foreach (file($filename, FILE_SKIP_EMPTY_LINES) as $line) { $json = json_encode(array("email" => trim($line))); $pheanstalk ->useTube('process_mx') ->put($json); }
The php code for the worker is do_worker_process_mx.php:
class Worker { public function __construct() { $this->log('worker process - starting'); require_once('pheanstalk_init.php'); $this->pheanstalk = new Pheanstalk_Pheanstalk('127.0.0.1:11300'); } public function __destruct() { $this->log('worker process - ending'); } public function run() { $this->log('worker process - starting to run'); while(1) { $job = $this->pheanstalk ->watch('process_mx') ->ignore('default') ->reserve(); $data = json_decode($job->getData(), true); $this->process_mx($data); $this->pheanstalk->delete($job); } } private function process_mx($data) { $domain = explode("@", $data['email']); dns_get_mx($domain[1], $mx_records); $mx_array = explode(".", strtolower($mx_records[0])); $mx = array_slice($mx_array, -2, count($mx_array)); $mx_domain = implode(".", $mx); echo $data['email'] . "\n"; $this->write_file($mx_domain, $data['email']); } private function write_file($mx, $email) { $filename = fopen(dirname(__FILE__) . "/out/" . $mx . ".txt", 'ab+'); fwrite($filename, $email . "\n"); fclose($filename); } private function log($txt) { echo $txt . "\n"; } } $worker = new Worker(); $worker->run();
Supervisord conf:
[program:do_worker_process] command=/usr/bin/php /srv/www/mydev/public_html/esp/do_worker_process_mx.php numprocs=10 numprocs_start=10 autostart=true autorestart=true stopsignal=QUIT log_stdout=true logfile=/var/log/supervisor/worker_process_mx.log
Currently I canβt say that 10+ workers have been added to the process.
Number of running processes:
# supervisorctl status do_worker_process RUNNING pid 44343, uptime 1:46:11