Pool::collect

(PECL pthreads >= 2.0.0)

Pool::collectCollect references to completed tasks

Description

public void Pool::collect ( Callable $collector )

Allows the Pool to collect references determined to be garbage by the given collector

Parameters

collector

A Callable collector

Return Values

void

Examples

Example #1 Creating Pools

<?php
class MyWork extends Stackable {
    public function 
__construct() {
        
$this->complete false;
    }

    public function 
run() {
        
printf(
            
"Hello from %s in Thread #%lu\n"
            
__CLASS__$this->worker->getThreadId());
        
$this->complete true;
    }

    public function 
isComplete() { 
        return 
$this->complete
    }

    protected 
$complete;
}

class 
MyWorker extends Worker {
    
    public function 
__construct(Something $something) {
        
$this->something $something;
    }
    
    public function 
run() {
        
/** ... **/
    
}
}

$pool = new Pool(8, \MyWorker::class, [new Something()]);
$pool->submit(new MyWork());

usleep(1000);

$pool->collect(function($work){
    return 
$work->isComplete();
});
var_dump($pool);
?>

The above example will output:

Hello from MyWork in Thread #140222468777728
object(Pool)#1 (6) {
  ["size":protected]=>
  int(8)
  ["class":protected]=>
  string(8) "MyWorker"
  ["workers":protected]=>
  array(1) {
    [0]=>
    object(MyWorker)#4 (1) {
      ["something"]=>
      object(Something)#5 (0) {
      }
    }
  }
  ["work":protected]=>
  array(0) {
  }
  ["ctor":protected]=>
  array(1) {
    [0]=>
    object(Something)#2 (0) {
    }
  }
  ["last":protected]=>
  int(1)
}

Коментарии

The example code crashes and made me waste 2 working days
First of all, `Stackable` has no attribute named $worker or it's access method made it inaccessible.

Secondly, `Stackable` also doesn't have `getThreadId()` . It's best practice to use `Thread` class for realization of a thread since it has more control functions. It's better to use `Stackable` for object storage and use it's `run()` as its initialization.

The working example is

<?php
   
class MyWork extends Thread {
        protected 
$complete;

        public function 
__construct() {
           
$this->complete false;
        }

        public function 
run() {
           
printf(
               
"Hello from %s in Thread #%lu\n",
               
__CLASS__$this->getThreadId());
           
$this->complete true;
        }

        public function 
isComplete() {
            return 
$this->complete;
        }
    }

    class 
Something {}

    class 
MyWorker extends Worker {

        public function 
__construct(Something $something) {
           
$this->something $something;
        }

        public function 
run() {
           
/** ... **/
       
}
    }

   
$pool = new Pool(8\MyWorker::class, [new Something()]);
   
$pool->submit(new MyWork());

   
usleep(1000);

   
$pool->collect(function($work){
        return 
$work->isComplete();
    });
   
var_dump($pool);
?>
2014-12-29 11:33:16
http://php5.kiev.ua/manual/ru/pool.collect.html
Please note, when using the collect function, it's important that you extend the pool class so you can keep checking for finished threads until they're all done.

<?php
class TestWork extends Threaded {
    protected 
$complete;
   
//$pData is the data sent to your worker thread to do it's job.
   
public function __construct($pData){
       
//transfer all the variables to local variables
       
$this->complete false;
       
$this->testData $pData;
    }
   
//This is where all of your work will be done.
   
public function run(){
       
usleep(2000000); //sleep 2 seconds to simulate a large job
       
$this->complete true;
    }
    public function 
isGarbage() {
        return 
$this->complete;
    }
}
class 
ExamplePool extends Pool
{
    public 
$data = array();
    public function 
process()
    {
       
// Run this loop as long as we have
        // jobs in the pool
       
while (count($this->work)) {
           
$this->collect(function (TestWork $task) {
               
// If a task was marked as done
                // collect its results
               
if ($task->isGarbage()) {
                   
$tmpObj = new stdclass();
                   
$tmpObj->complete $task->complete;
                   
//this is how you get your completed data back out [accessed by $pool->process()]
                   
$this->data[] = $tmpObj;
                }
                return 
$task->isGarbage();
            });
        }
       
// All jobs are done
        // we can shutdown the pool
       
$this->shutdown();
        return 
$this->data;
    }
}
$pool = new ExamplePool(3);
$testData 'asdf';
for(
$i=0;$i<5;$i++) {
   
$pool->submit(new TestWork($testData));
}
$retArr $pool->process(); //get all of the results
echo '<pre>';
print_r($retArr); //return the array of results (and maybe errors)
echo '</pre>';
?>
2016-03-14 21:13:17
http://php5.kiev.ua/manual/ru/pool.collect.html
This example demonstrates various aspects of MTP with pthreads - esspecially worth of note is bidirectional communication with child threads.
I could not find anything about that so I would like to present you my research result.

<?php

class Model
{
   
    public 
$id;
    public 
$value;
   
}

class 
Connection
   
extends Worker
{
   
    protected static 
$link;
   
   
    public function 
__construct($hostname$username$password$database$port 3306)
    {
       
$this->hostname $hostname;
       
$this->username $username;
       
$this->password $password;
       
$this->database $database;
       
$this->port $port;
    }
   
    public function 
getConnection()
    {
        if(!
self::$link)
        {
            echo 
'Thread: '$this->getThreadId() ." Connecting to db\n";
           
self::$link = new \PDO(...);
        }
       
        return 
self::$link;
    }
   
}

/** @property Connection $worker */
class QueryTask
   
extends Threaded
{
   
    public 
$data;
    public 
$result;
   
    protected 
$_complete;
   
   
    public function 
__construct(Model $data)
    {
       
$this->_complete false;
       
$this->data $data;
    }
   
    public function 
run()
    {
       
/** @var \PDO $pdo */
       
$pdo $this->worker->getConnection();
       
       
$text 'Thread: '$this->worker->getThreadId() .' Job: '$this->data->id .' Data: '$this->data->value;
       
       
$t microtime(true);
       
       
$stmt $pdo->prepare("
            INSERT INTO `test` (`id`, `text`) VALUES (NULL, '"
$text ."')
        "
);
       
$stmt->execute();
       
       
$dt microtime(true) - $t;
       
       
$result = (int) $stmt->rowCount();
       
        echo 
$text .' Result: '$result .' Exec time: '$dt ."s\n";
       
       
$this->result $result;
       
$this->_complete true;
    }
   
    public function 
isGarbage() : bool
   
{
        return 
$this->_complete;
    }
   
}

$t microtime(true);

// uruchomienie
$pool = new Pool(5'Connection', [ 'localhost''root''password''test' ]);

// zadania
$tasks 10;

for(
$i=0$i<$tasks; ++$i)
{
   
$object = new Model();
   
$object->id $i;
   
$object->value rand();

   
$pool->submit(new QueryTask($object));
}

// oczekiwanie na zakonczenie
$data = [];

while(
1)
{
   
$newData = [];

   
$pool->collect(function(QueryTask $task) use (&$newData) {
        if(
$task->isGarbage())
        {
           
$tmpObj = new stdclass();
           
$tmpObj->complete $task->complete;
           
           
$newData$task->data->id ] = $task->data->value;
        }
       
        return 
$task->isGarbage();
    });
   
   
$data array_merge($data$newData);
   
    if(
count($data) == $tasks)
        break;
   
   
usleep(100000);
}

var_dump($data);
?>

Result:
Thread: 6796 Connecting to db
Thread: 3156 Connecting to db
Thread: 9040 Connecting to db
Thread: 7748 Connecting to db
Thread: 8836 Connecting to db
Job: 0 Done in: 0.0070011615753174s
Job: 4 Done in: 0.0069999694824219s
Job: 2 Done in: 0.0090010166168213s
Job: 3 Done in: 0.0090010166168213s
Job: 1 Done in: 0.003000020980835s
Job: 5 Done in: 0.0069999694824219s
Job: 7 Done in: 0.0079998970031738s
Job: 6 Done in: 0.0049998760223389s
Job: 9 Done in: 0.0079998970031738s
Job: 8 Done in: 0.0069999694824219s

array(10) {
  [0] =>
  int(17730)
  [1] =>
  int(18771)
  [2] =>
  int(12944)
  [3] =>
  int(6025)
  [4] =>
  int(29582)
  [5] =>
  int(10159)
  [6] =>
  int(26556)
  [7] =>
  int(9029)
  [8] =>
  int(15002)
  [9] =>
  int(4043)
}

Things worth noting here:
1. Constructing of 5 workers for 10 tasks. 5 last task are runned on existing threads with already set up connection to db.
2. You can "send" data to thread by creating new task and submiting it.
3. You can retrive result by collect function.
4. You can pass simple object to task constructor.
2017-12-02 06:05:54
http://php5.kiev.ua/manual/ru/pool.collect.html
In this example, it shows how to use a Threaded with a pool to get an array of results, using pThreads v3.2.1 and php 7.3.23

<?php
class TestWork extends Threaded {
//updated version that works with pThreads v3.2.1 and php 7.3.23
   
protected $complete;
   
//$pData is the data sent to your worker thread to do it's job.
   
public function __construct($pData) {
       
//transfer all the variables to local variables
       
$this->complete false;
       
$this->testData $pData;
    }
   
//This is where all of your work will be done.
   
public function run() {
       
usleep(2000000); //sleep 2 seconds to simulate a large job
       
$this->complete true;
    }
    public function 
isDone() {
        return 
$this->complete;
    }
}
class 
ExamplePool extends Pool {
    public 
$data = array(); // used to return data after we're done
   
private $numTasks 0// counter used to know when we're done
    /**
     * override the submit function from the parent
     * to keep track of our jobs
     */
   
public function submit(Threaded $task) {
       
$this->numTasks++;
       
parent::submit($task);
    }
   
/**
     * used to wait until all workers are done
     */
   
public function process() {
       
// Run this loop as long as we have
        // jobs in the pool
       
while (count($this->data) < $this->numTasks) {
           
$this->collect(function (TestWork $task) {
               
// If a task was marked as done, collect its results
               
if ($task->isDone()) {
                   
$tmpObj = new stdclass();
                   
$tmpObj->complete $task->complete;
                   
//this is how you get your completed data back out [accessed by $pool->process()]
                   
$this->data[] = $tmpObj;
                }
                return 
$task->isDone();
            });
        }
       
// All jobs are done
        // we can shutdown the pool
       
$this->shutdown();
        return 
$this->data;
    }
}
$pool = new ExamplePool(3);
$testData 'asdf';
for(
$i=0;$i<5;$i++) {
   
$pool->submit(new TestWork($testData));
}
$retArr $pool->process(); //get all of the results
echo '<pre>';
print_r($retArr); //return the array of results (and maybe errors)
echo '</pre>';
?>
2020-12-29 08:00:32
http://php5.kiev.ua/manual/ru/pool.collect.html

    Поддержать сайт на родительском проекте КГБ