PHP pThreads - How do you do garbage collection?

Given the following code, how can you guarantee that completed MyWorker objects will be destroyed / their memory freed?

Due to the fact that I need a script, I need ~ 50 threads that constantly receive data from cURL and process it.

I tried both of those whose threads never leave run(), or as shown in this code example, where they exit startup mode, and the collection function generates a new copy.

But it doesn’t matter that I hit the limits of memory in a minute or so. Could you tell me what I am doing wrong?

class MyWorker extends Threaded
{
    public $complete;
    public function __construct() {$this->complete = false;}
    public function run() {$this->complete = true;}
}

$pool = new Pool(50);
for($i=0; $i<50; $i++)
    $pool->submit(new MyWorker());
$pool->collect(function($worker)
{
    global $pool;
    if($worker->complete == true)
        $pool->submit(new MyWorker());
    return $worker->complete;
});
$pool->shutdown();
+2
source share
1 answer

?

Worker, pthreads, , Threaded , . , pthreads Pool Workers, .

pthreads, , , Pool::collect. Pool::collect , Threaded boolean true, .

...

, Pool

:

<?php

define("LOG", Mutex::create());
/* thread safe log to stdout */
function slog($message, $args = []) {
    $args = func_get_args();
    if (($message = array_shift($args))) {
        Mutex::lock(LOG);
        echo vsprintf(
            "{$message}\n", $args);
        Mutex::unlock(LOG);
    }
}

class Request extends Threaded {
    public function __construct($url) {
        $this->url = $url;
    }

    public function run() {
        $response = @file_get_contents($this->url);

        slog("%s returned %d bytes",
            $this->url, strlen($response));

        $this->reQueue();
    }

    public function getURL()        { return $this->url; }

    public function isQueued()      { return $this->queued; }
    public function reQueue()       { $this->queued = true; }

    protected $url;
    protected $queued = false;
}

/* create a pool of 50 threads */
$pool = new Pool(50);

/* submit 50 requests for execution */
while (@$i++<50) {
    $pool->submit(new Request(sprintf(
        "http://google.com/?q=%s", md5($i))));
}

do {
    $queue = array();

    $pool->collect(function($request) use ($pool, &$queue) {
        /* check for items to requeue */
        if ($request->isQueued()) {
            /* get the url for the request, insert into queue */
            $queue[] = 
                $request->getURL();
            /* allow this job to be collected */
            return true;
        }
    });

    /* resubmit completed tasks to pool */
    if (count($queue)) {
        foreach ($queue as $queued)
            $pool->submit(new Request($queued));
    }

    /* sleep for a couple of seconds here ... because, be nice ! */
    usleep(2.5 * 1000000);
} while (true);
?>
+8

Source: https://habr.com/ru/post/1621444/


All Articles