The Pool class
(PECL pthreads >= 2.0.0)
Введение
A Pool is a container for, and controller of, an adjustable number of Workers.
Pooling provides a higher level abstraction of the Worker functionality, including the management of references in the way required by pthreads.
Обзор классов
Свойства
- size
-
maximum number of Workers this Pool can use
- class
-
the class of the Worker
- ctor
-
the arguments for constructor of new Workers
- workers
-
references to Workers
- work
-
references to Threaded objects submitted to the Pool
- last
-
offset in workers of the last Worker used
Содержание
- Pool::collect — Collect references to completed tasks
- Pool::__construct — Creates a new Pool of Workers
- Pool::resize — Resize the Pool
- Pool::shutdown — Shutdown all Workers
- Pool::submit — Submits an object for execution
- Pool::submitTo — Submits an object for execution
Коментарии
Example class to demonstrate usage of Pool/Worker mechanism, also to show a few tricks & hints ;)
<?php
class Config extends Threaded{ // shared global object
protected $val=0, $val2=0;
protected function inc(){++$this->val;} // protected synchronizes by-object
public function inc2(){++$this->val2;} // no synchronization
}
class WorkerClass extends Worker{
protected static $worker_id_next = -1;
protected $worker_id;
protected $config;
public function __construct($config){
$this->worker_id = ++static::$worker_id_next; // static members are not avalable in thread but are in 'main thread'
$this->config = $config;
}
public function run(){
global $config;
$config = $this->config; // NOTE: setting by reference WON'T work
global $worker_id;
$worker_id = $this->worker_id;
echo "working context {$worker_id} is created!\n";
//$this->say_config(); // globally synchronized function.
}
protected function say_config(){ // 'protected' is synchronized by-object so WON'T work between multiple instances
global $config; // you can use the shared $config object as synchronization source.
$config->synchronized(function() use (&$config){ // NOTE: you can use Closures here, but if you attach a Closure to a Threaded object it will be destroyed as can't be serialized
var_dump($config);
});
}
}
class Task extends Stackable{ // Stackable still exists, it's just somehow dissappeared from docs (probably by mistake). See older version's docs for more details.
protected $set;
public function __construct($set){
$this->set = $set;
}
public function run(){
global $worker_id;
echo "task is running in {$worker_id}!\n";
usleep(mt_rand(1,100)*100);
$config = $this->getConfig();
$val = $config->arr->shift();
$config->arr[] = $this->set;
for ($i = 0 ; $i < 1000; ++$i){
$config->inc();
$config->inc2();
}
}
public function getConfig(){
global $config; // WorkerClass set this on thread's scope, can be reused by Tasks for additional asynch data source. (ie: connection pool or taskqueue to demultiplexer)
return $config;
}
}
$config = new Config;
$config->arr = new \Threaded();
$config->arr->merge(array(1,2,3,4,5,6));
class PoolClass extends Pool{
public function worker_list(){
if ($this->workers !== null)
return array_keys($this->workers);
return null;
}
}
$pool = new PoolClass(3, 'WorkerClass', [$config] );
$pool->worker_list();
//$pool->submitTo(0,new Task(-10)); // submitTo DOES NOT try to create worker
$spammed_id = -1;
for ($i = 1; $i <= 100; ++$i){ // add some jobs
if ($spammed_id == -1 && ($x = $pool->worker_list())!= null && @$x[2]){
$spammed_id = $x[2];
echo "spamming worker {$spammed_id} with lots of tasks from now on\n";
}
if ($spammed_id != -1 && ($i % 5) == 0) // every 5th job is routed to one worker, so it has 20% of the total jobs (with 3 workers it should do ~33%, not it has (33+20)%, so only delegate to worker if you plan to do balancing as well... )
$pool->submitTo($spammed_id,new Task(10*$i));
else
$pool->submit(new Task(10*$i));
}
$pool->shutdown();
var_dump($config); // "val" is exactly 100000, "val2" is probably a bit less
// also: if you disable the spammer, you'll that the order of the "arr" is random.
?>
Simple example with Collectable (basically Thread meant for Pool) and Pool
<?php
class job extends Collectable {
public $val;
public function __construct($val){
// init some properties
$this->val = $val;
}
public function run(){
// do some work
$this->val = $this->val . file_get_contents('http://www.example.com/', null, null, 3, 20);
$this->setGarbage();
}
}
// At most 3 threads will work at once
$p = new Pool(3);
$tasks = array(
new job('0'),
new job('1'),
new job('2'),
new job('3'),
new job('4'),
new job('5'),
new job('6'),
new job('7'),
new job('8'),
new job('9'),
new job('10'),
);
// Add tasks to pool queue
foreach ($tasks as $task) {
$p->submit($task);
}
// shutdown will wait for current queue to be completed
$p->shutdown();
// garbage collection check / read results
$p->collect(function($checkingTask){
echo $checkingTask->val;
return $checkingTask->isGarbage();
});
?>
Please note, when using the collect function, it's important that you extend the pool class so you can keep checking for finished threads until they're all done.
<?php
class TestWork extends Threaded {
protected $complete;
//$pData is the data sent to your worker thread to do it's job.
public function __construct($pData){
//transfer all the variables to local variables
$this->complete = false;
$this->testData = $pData;
}
//This is where all of your work will be done.
public function run(){
usleep(2000000); //sleep 2 seconds to simulate a large job
$this->complete = true;
}
public function isGarbage() {
return $this->complete;
}
}
class ExamplePool extends Pool
{
public $data = array();
public function process()
{
// Run this loop as long as we have
// jobs in the pool
while (count($this->work)) {
$this->collect(function (TestWork $task) {
// If a task was marked as done
// collect its results
if ($task->isGarbage()) {
$tmpObj = new stdclass();
$tmpObj->complete = $task->complete;
//this is how you get your completed data back out [accessed by $pool->process()]
$this->data[] = $tmpObj;
}
return $task->isGarbage();
});
}
// All jobs are done
// we can shutdown the pool
$this->shutdown();
return $this->data;
}
}
$pool = new ExamplePool(3);
$testData = 'asdf';
for($i=0;$i<5;$i++) {
$pool->submit(new TestWork($testData));
}
$retArr = $pool->process(); //get all of the results
echo '<pre>';
print_r($retArr); //return the array of results (and maybe errors)
echo '</pre>';
?>
In this example, it shows how to use a pool to get an array of results, using pThreads v3.2.1 and php 7.3.23
<?php
class TestWork extends Threaded {
//updated version that works with pThreads v3.2.1 and php 7.3.23
protected $complete;
//$pData is the data sent to your worker thread to do it's job.
public function __construct($pData) {
//transfer all the variables to local variables
$this->complete = false;
$this->testData = $pData;
}
//This is where all of your work will be done.
public function run() {
usleep(2000000); //sleep 2 seconds to simulate a large job
$this->complete = true;
}
public function isDone() {
return $this->complete;
}
}
class ExamplePool extends Pool {
public $data = array(); // used to return data after we're done
private $numTasks = 0; // counter used to know when we're done
/**
* override the submit function from the parent
* to keep track of our jobs
*/
public function submit(Threaded $task) {
$this->numTasks++;
parent::submit($task);
}
/**
* used to wait until all workers are done
*/
public function process() {
// Run this loop as long as we have
// jobs in the pool
while (count($this->data) < $this->numTasks) {
$this->collect(function (TestWork $task) {
// If a task was marked as done, collect its results
if ($task->isDone()) {
$tmpObj = new stdclass();
$tmpObj->complete = $task->complete;
//this is how you get your completed data back out [accessed by $pool->process()]
$this->data[] = $tmpObj;
}
return $task->isDone();
});
}
// All jobs are done
// we can shutdown the pool
$this->shutdown();
return $this->data;
}
}
$pool = new ExamplePool(3);
$testData = 'asdf';
for($i=0;$i<5;$i++) {
$pool->submit(new TestWork($testData));
}
$retArr = $pool->process(); //get all of the results
echo '<pre>';
print_r($retArr); //return the array of results (and maybe errors)
echo '</pre>';
?>