The Cond class

(PECL pthreads >= 2.0.0)

Introduction

The static methods contained in the Cond class provide direct access to Posix Condition Variables.

Class synopsis

Cond {
/* Methods */
final public static boolean broadcast ( long $condition )
final public static long create ( void )
final public static boolean destroy ( long $condition )
final public static boolean signal ( long $condition )
final public static boolean wait ( long $condition , long $mutex [, long $timeout ] )
}

Table of Contents

Коментарии

Автор:
As Cond (nor Mutex) is good for queuing tasks, and arrays/objects stored a "serialized" (can't use it as excepted for signals outside the run()) here is a simple task queue trait:
<?php
define
('T',1000000);
trait TaskQueue{
    private 
$queue_len 0;
    private 
$queue_pointer 0;
    private 
$queue_curr 0;
    protected function 
queueInit(){$this->queue_len 0$this->queue_pointer 0$this->queue_curr 0;}
    protected function 
getNextQueued(){
        if (
$this->queue_len == 0) return null;
       
$p $this->queue_curr;
       
$x 0;
       
$ret = [];
        do{
           
$key "queue_func_{$p}";
            if (
$this->$key){
               
$ret['func'] = $this->$key;
                unset(
$this->$key);
               
$key "queue_params_{$p}";
               
$prs $this->$key;
                unset(
$this->$key);
               
$ret['params'] = [];
                for (
$i 0$i $prs$i++){
                   
$key "queue_param_{$p}_{$i}";
                   
$ret['params'][] =  $this->$key;
                    unset(
$this->$key);
                }
               
$this->queue_len--;
               
$this->queue_curr++;
                return 
$ret;
            }
            else 
$p++;
            if (
$x++ > 1000){   // the queue is corrupt
               
echo "\n\t\t task queue is corrupt for ".get_called_class()."\n";
               
$this->queue_len 0;
               
$this->queue_pointer 0;
                return 
null;
            }
        }while(isset(
$this->$key));
        return 
null;

    }
    protected function 
addToQueue($func,$params = []){
        if (!
is_array($params) || !@method_exists($this,$func)) return false;
       
$p $this->queue_pointer;
       
$key "queue_func_{$p}";
       
$this->$key $func;
       
$key "queue_params_{$p}";
       
$this->$key count($params);
       
$i 0;
        foreach (
$params as $pa){
           
$key "queue_param_{$p}_{$i}";
           
$this->$key $pa;
           
$i++;
        }
       
$this->queue_pointer++;
       
$this->queue_len++;
        return 
true;
    }
}

class 
extends Thread{
    use 
TaskQueue;
    private 
$sig_term;
    public function 
__construct(){
       
$this->cond Cond::create();
       
$this->sig_term false;
       
$this->queueInit(); // threads don't like default parameters
       
$this->start();
    }

    public function 
run(){
        while(!
$this->sig_term){
            while((
$qi $this->getNextQueued())){
               
call_user_func_array([$this,$qi['func']],$qi['params']);
            }
            echo 
"now doing MAIN -- ";
           
usleep(1*T);
            echo 
"finished MAIN\n";
        }
       
Mutex::destroy($m);
    }

    private function 
reallyDoA($a){
        echo 
"doing A: {$a} -- ";
       
usleep(0.7*T);
        echo 
"finished A\n";

    }
    public function 
doA($param1){
       
$this->addToQueue('reallyDoA',[$param1]);
    }
    private function 
reallyDoB($a,$b){
        echo 
"doing B: {$a},{$b} -- ";
       
usleep(0.1*T);
        echo 
"finished B\n";

    }
    public function 
doB($param1$param2){
       
$this->addToQueue('reallyDoB',[$param1,$param2]);
    }
    public function 
term(){$this->sig_term true;}
}

$a = new A();
usleep(1.2*T);
$a->doA(2);
$a->doA(3);
$a->doB("now b","something");
usleep(5.4*T);
$a->doA(5);
$a->doA(7);

usleep(3*T);
$a->term();
$a->join();
?>
very useful if you need syncronization from an outside event (ie: async socket read/write - your run() takes care of reading with timeout / signalig on_packet events, but writing may occur at any time (ie async heartbeeting) and fwrite($sock) while stream_select($sock) is waiting for timeout causes segfaults...)
2013-05-25 16:11:45
http://php5.kiev.ua/manual/ru/class.cond.html
public function run(){
        Mutex::lock($this->mutex);
        if($this->account->getBalance() >= drawCost){
            usleep ( 500 );
            $this->account->draw(drawCost);
            echo \Thread::getCurrentThreadId () . "__get__" . drawCost . ",The current Balance is:{$this->account->getBalance()}<br/>";
            Cond::broadcast($this->cond);
        }
        else {
            echo \Thread::getCurrentThreadId () . "__get fail__,the current Balance is:{$this->account->getBalance()}<br/>";

//Use Wait Can be not Running......Why ???

            Cond::wait($this->cond,$this->mutex);
        }
        Mutex::unlock($this->mutex);
  }
2015-12-20 20:39:24
http://php5.kiev.ua/manual/ru/class.cond.html

    Поддержать сайт на родительском проекте КГБ