Why aren't all threads completed?

I tried an example from this answer by Joe https://stackoverflow.com/a/21243232/32 and it works fine, but when I tried to modify this code a bit:

$pool = new Pool(4);

while (@$i++<10) {
    $pool->submit(new class($i) extends Collectable {
        public function __construct($id) {
            $this->id = $id;
        }

        public function run() {
            printf(
                "Hello World from %d\n", $this->id);
        $this->html = file_get_contents('http://google.fr?q=' . $this->query);
            $this->setGarbage();
        }

        public $id;
public $html;
    });
}

while ($pool->collect(function(Collectable $work){
    printf(
        "Collecting %d\n", $work->id);
var_dump($work->html);
    return $work->isGarbage();
})) continue;

$pool->shutdown();

The "Hello world" graph is different from the "Collect" count. Documents are out of date. How about this problem?

+4
source share
2 answers

Worker::collectnot intended to produce results; It is not deterministic.

Worker::collectintended only to start garbage collection on objects referenced in the object stack Worker.

If the intention is to process each result as it appears, the code might look something like this:

<?php
$pool = new Pool(4);
$results = new Volatile();
$expected = 10;
$found = 0;

while (@$i++ < $expected) {
    $pool->submit(new class($i, $results) extends Threaded {

        public function __construct($id, Volatile $results) {
            $this->id = $id;
            $this->results = $results;
        }

        public function run() {
            $result = file_get_contents('http://google.fr?q=' . $this->id);

            $this->results->synchronized(function($results, $result){
                $results[$this->id] = $result;
                $results->notify();
            }, $this->results, $result);
        }

        private $id;
        private $results;
    });
}

do {
    $next = $results->synchronized(function() use(&$found, $results) {
        while (!count($results)) {
            $results->wait();
        }

        $found++;

        return $results->shift();
    });

    var_dump($next);
} while ($found < $expected);

while ($pool->collect()) continue;

$pool->shutdown();
?>

, , , , Volatile, , .

, , , , , , , , :

<?php
$pool = new Pool(4);
$results = new Volatile();
$expected = 10;

while (@$i++ < $expected) {
    $pool->submit(new class($i, $results) extends Threaded {

        public function __construct($id, Volatile $results) {
            $this->id = $id;
            $this->results = $results;
        }

        public function run() {
            $result = file_get_contents('http://google.fr?q=' . $this->id);

            $this->results->synchronized(function($results, $result){
                $results[$this->id] = $result;
                $results->notify();
            }, $this->results, $result);
        }

        private $id;
        private $results;
    });
}

$results->synchronized(function() use($expected, $results) {
    while (count($results) != $expected) {
        $results->wait();
    }
});

var_dump(count($results));

while ($pool->collect()) continue;

$pool->shutdown();
?>

, Collectable Threaded pthreads - , ... ...

, ... ...

+3

Pthreads V3 , V2. V3.

№ 1: , . V2, V3. . .

№ 2: , , . , , , , , . - , , , , php. , .

№ 3: (array), , .

, 3 , :

  • .

  • .

  • nb , .

  • , , .

  • .

:

    define("SQLHOST", "127.0.0.1");
    define("SQLUSER", "root");
    define("SQLPASS", "password");
    define("SQLDBTA", "mydatabase");

    $Nb_of_th=12; // (6 cpu cores in this example)
    $queries = array_chunk($queries, ($Nb_of_th));// whatever list of queries you want to pass to the workers
    $global_data=array();// all results from all pool cycles

    // first we set the main loops
    foreach ($queries as $key => $chunks) {
    $pool = new Pool($Nb_of_th, Worker::class);// 12 pools max
    $workCount = count($chunks);

    // second we launch the submits 
    foreach (range(1, $workCount) as $i) {
        $chunck = $chunks[$i - 1];
        $pool->submit(new MyWorkers($chunck));
    }

    $data = [];// pool cycle result array
    $collector = function (\Collectable $work) use (&$data) {
        $isGarbage = $work->isGarbage();
        if ($isGarbage) {
            $data[] = $work->result; // thread result
        }
        return $isGarbage;
    };

    do {
        $count = $pool->collect($collector);
        $isComplete = count($data) === $workCount;
    } while (!$isComplete);

    array_push($global_data, $data);// push pool results into main

    //complete purge
    unset($data);
    $pool->shutdown();
    unset($pool);
    gc_collect_cycles();// force garbage collector before new pool cycle
    }

    Var_dump($global_data); // results for all pool cycles

    class MyWorkers extends \Threaded implements \Collectable {

    private $isGarbage;
    public $result;
    private $process;

    public function __construct($process) {
        $this->process = $process;
    }

    public function run() {

        $con = new PDO('mysql:host=' . SQLHOST . ';dbname=' . SQLDBTA . ';charset=UTF8', SQLUSER, SQLPASS);
        $proc = (array) $this->process; // important ! avoid volatile destruction in V3
        $stmt = $con->prepare($proc);
        $stmt->execute();
        $obj = $stmt1->fetchall(PDO::FETCH_ASSOC);

        /* do whatever you want to do here */
        $this->result = (array) $obj; // important ! avoid volatile destruction in V3
        $this->isGarbage = true;
    }

    public function isGarbage() : bool
    {
    return $this->isGarbage;
    }
}
+1

Source: https://habr.com/ru/post/1621422/


All Articles