Run a large loop with parallel threads in the PHP CLI

I have an expensive computation process in Symfony2 / PHP that I would like to start multithreaded.

Since I iterate over thousands of objects, I think that I should not start one thread for each object. I would like to have a $ core variable that determines how many threads I want in parallel, and then iterate through the loop and save many threads. Therefore, every time a thread ends, you need to start a new one with the next object until all objects are completed.

Looking at the pthreads documentation and doing some google searches, I can't find a suitable example for this situation. All the examples I found have a fixed number of threads that they run once, none of them iterates through thousands of objects.

Can someone point me in the right direction to get started? I understand the basics of creating a thread and joining it, etc., but not how to do it in a loop with a wait condition.

+5
source share
2 answers

The answer to the question is the use of the Pool and Worker abstractions.

The basic idea is that you ::submit Threaded objects to the Pool , which it adds to the next available Worker , distributing your Threaded objects (circular) across all Workers .

The following is super simple code for PHP7 (pthreads v3):

 <?php $jobs = []; while (count($jobs) < 2000) { $jobs[] = mt_rand(0, 1999); } $pool = new Pool(8); foreach ($jobs as $job) { $pool->submit(new class($job) extends Threaded { public function __construct(int $job) { $this->job = $job; } public function run() { var_dump($this->job); } }); } $pool->shutdown(); ?> 

The work is useless, obviously. In the real world, I think your $jobs array continues to grow, so you can just swap foreach for some do {} while and continue to call ::submit for new jobs.

In the real world, you'll want to collect garbage in the same loop (just call Pool::collect with no parameters for the default behavior).

It is noteworthy that none of this would be possible if it really were that PHP was not intended to work in multi-threaded environments ... it is definitely .

This is the answer to the question, but it does not make it the best solution to your problem.

You mentioned in the comments that you assume that 8 threads running Symfony code take up less memory than 8 processes. This is not the case, PHP does not share anything all the time. You can expect that 8 Symfony threads take up as much memory as 8 Symfony processes, in fact, a bit more. The advantage of using threads over processes is that they can communicate, synchronize and (communicate) with each other.

Just because you can does not mean what you need. The best solution for this task is probably to use a ready-made package or software designed to perform the necessary actions.

Learning this material is good enough to implement a reliable solution - it will take a long time and you won’t want to deploy this first solution ...

If you decide to ignore my advice and let it go, you can find many examples in the github repository for pthreads.

+3
source

Joe has a good approach, but I found a different solution elsewhere that I am currently using. Basically, I have two teams, one management team and one working team. The management team starts background processes and checks their results:

 protected function process($worker, $entity, $timeout=60) { $min = $this->em->createQuery('SELECT MIN(e.id) FROM BM2SiteBundle:'.$entity.' e')->getSingleScalarResult(); $max = $this->em->createQuery('SELECT MAX(e.id) FROM BM2SiteBundle:'.$entity.' e')->getSingleScalarResult(); $batch_size = ceil((($max-$min)+1)/$this->parallel); $pool = array(); for ($i=$min; $i<=$max; $i+=$batch_size) { $builder = new ProcessBuilder(); $builder->setPrefix($this->getApplication()->getKernel()->getRootDir().'/console'); $builder->setArguments(array( '--env='.$this->getApplication()->getKernel()->getEnvironment(), 'maf:worker:'.$worker, $i, $i+$batch_size-1 )); $builder->setTimeout($timeout); $process = $builder->getProcess(); $process->start(); $pool[] = $process; } $this->output->writeln($worker.": started ".count($pool)." jobs"); $running = 99; while ($running > 0) { $running = 0; foreach ($pool as $p) { if ($p->isRunning()) { $running++; } } usleep(250); } foreach ($pool as $p) { if (!$p->isSuccessful()) { $this->output->writeln('fail: '.$p->getExitCode().' / '.$p->getCommandLine()); $this->output->writeln($p->getOutput()); } } } 

where $ this-> parallel is the variable I set to 6 on the 8-core computer, this means the number of running processes. Please note that this method requires that I iterate over a specific object (it breaks into it), which is always true in my use cases.

This is not ideal, but it launches completely new processes instead of threads, which I consider the best solution.

The worker command accepts the minimum and maximum ID numbers and does the actual work for dialing between the two.

This approach works as long as the dataset is fairly well distributed. If you do not have data in the range of 1-1000, but each identifier from 1000 to 2000 is used, the first three processes will not have anything in common.

+1
source

Source: https://habr.com/ru/post/1237177/


All Articles