How does Pool :: collect work?

Help me understand how Pool :: collect works.

Pool :: collect - collect links to completed tasks

public void Pool::collect ( Callable $collector )

I assume that it Pool::collectregisters a function that will be called after each completes \Threaded $task. So I did:

<?php
$pool = new Pool(4);
$pool->collect($collector);
$pool->submit(new Task);

Does not work. But the following:

<?php
$pool = new Pool(4);
$pool->submit(new Task);
$pool->collect($collector);

So, I think that does Pool::collect: attaches $collectorto each previously presented \Threaded $task.

Now, when is it called exactly $collector? I guess it was called upon completion Threaded::run(). Wrong.

<?php
class Task extends Threaded {
    public function run () { echo "Task complete\n"; }
}

$collector = function (\Task $task) {
    echo "Collect task\n";
    return true;
};

$pool = new Pool(4);
$pool->submit(new Task);
$pool->collect($collector);
$pool->shutdown();

Outputs:

Collect task
Task complete

$collectorcalled before completion Threaded::run().


The documentation says a lot. Does not mean that the event $collectorshould return a boolean value. I did not know that .

Pool:: collect . , .

1. ?

+3
2

Pool::collect , $collector.

$collector true, Threaded .

PHP7

::collect Worker, - Pool .

, , ( ).

Pool::collect , , ( ).

Pool::collect , Worker Pool, .

PHP7

<?php
$pool = new Pool(4);

while (@$i++<10) {
    $pool->submit(new class($i) extends Threaded {
        public function __construct($id) {
            $this->id = $id;
        }

        public function run() {
            printf(
                "Hello World from %d\n", $this->id);
        }

        public $id;
    });
}

while ($pool->collect(function(Collectable $work){
    printf(
        "Collecting %d\n", $work->id);
    return $work->isGarbage();
})) continue;

$pool->shutdown();
?>

- :

Hello World from 1
Hello World from 2
Hello World from 3
Hello World from 4
Hello World from 8
Collecting 1
Hello World from 7
Hello World from 5
Collecting 5
Hello World from 6
Collecting 9
Collecting 2
Collecting 6
Collecting 10
Hello World from 9
Collecting 3
Collecting 7
Collecting 4
Hello World from 10
Collecting 8
Collecting 5
Collecting 10
+4

. , . , .

, , 4 . , (4), , .

<?php

class Task extends \Threaded
{

    protected $result;
    protected $completed = false;

    public function run()
    {
        echo "Task complete\n";
        $this->result = md5(rand(1, 2000));
        sleep(rand(1, 5));
        $this->completed = true;
    }

    public function isCompleted()
    {
        return $this->completed;
    }

    public function getResult()
    {
        return $this->result;
    }
}

$infinite = false;
$poolSize = 4;
$queue = array();

$pool = new \Pool($poolSize);
$pool->submit(new Task);
$pool->submit(new Task);
$pool->submit(new Task);
$pool->submit(new Task);


do {
    if($infinite === true){
        $queue = array();
    }

    $pool->collect(function (\Task $task) use (&$queue) {
        if ($task->isCompleted()) {
            echo "Collect task\n";
            $queue[] = $task->getResult();

            return true;
        } else {
            echo "task not complete\n";

            return false;
        }
    });

    $size = sizeof($queue);
    if ($size > 0) {

        echo $size . " result\n";
        print_r($queue);

        if($infinite === true) {
            for ($m = 0; $m < $size; $m++) {
                $pool->submit(new Task);
            }
        } else{
            if($size == $poolSize){
                break;
            }
        }
    }

    usleep(100000);

} while (true);
$pool->shutdown();
0

Source: https://habr.com/ru/post/1621432/


All Articles