(PECL pthreads >= 0.34)
The static methods contained in the Cond class provide direct access to Posix Condition Variables.
jan (2013-05-25 13:11:45)
As Cond (nor Mutex) is good for queuing tasks, and arrays/objects stored a "serialized" (can't use it as excepted for signals outside the run()) here is a simple task queue trait:
<?php
define('T',1000000);
trait TaskQueue{
private $queue_len = 0;
private $queue_pointer = 0;
private $queue_curr = 0;
protected function queueInit(){$this->queue_len = 0; $this->queue_pointer = 0; $this->queue_curr = 0;}
protected function getNextQueued(){
if ($this->queue_len == 0) return null;
$p = $this->queue_curr;
$x = 0;
$ret = [];
do{
$key = "queue_func_{$p}";
if ($this->$key){
$ret['func'] = $this->$key;
unset($this->$key);
$key = "queue_params_{$p}";
$prs = $this->$key;
unset($this->$key);
$ret['params'] = [];
for ($i = 0; $i < $prs; $i++){
$key = "queue_param_{$p}_{$i}";
$ret['params'][] = $this->$key;
unset($this->$key);
}
$this->queue_len--;
$this->queue_curr++;
return $ret;
}
else $p++;
if ($x++ > 1000){ // the queue is corrupt
echo "\n\t\t task queue is corrupt for ".get_called_class()."\n";
$this->queue_len = 0;
$this->queue_pointer = 0;
return null;
}
}while(isset($this->$key));
return null;
}
protected function addToQueue($func,$params = []){
if (!is_array($params) || !@method_exists($this,$func)) return false;
$p = $this->queue_pointer;
$key = "queue_func_{$p}";
$this->$key = $func;
$key = "queue_params_{$p}";
$this->$key = count($params);
$i = 0;
foreach ($params as $pa){
$key = "queue_param_{$p}_{$i}";
$this->$key = $pa;
$i++;
}
$this->queue_pointer++;
$this->queue_len++;
return true;
}
}
class A extends Thread{
use TaskQueue;
private $sig_term;
public function __construct(){
$this->cond = Cond::create();
$this->sig_term = false;
$this->queueInit(); // threads don't like default parameters
$this->start();
}
public function run(){
while(!$this->sig_term){
while(($qi = $this->getNextQueued())){
call_user_func_array([$this,$qi['func']],$qi['params']);
}
echo "now doing MAIN -- ";
usleep(1*T);
echo "finished MAIN\n";
}
Mutex::destroy($m);
}
private function reallyDoA($a){
echo "doing A: {$a} -- ";
usleep(0.7*T);
echo "finished A\n";
}
public function doA($param1){
$this->addToQueue('reallyDoA',[$param1]);
}
private function reallyDoB($a,$b){
echo "doing B: {$a},{$b} -- ";
usleep(0.1*T);
echo "finished B\n";
}
public function doB($param1, $param2){
$this->addToQueue('reallyDoB',[$param1,$param2]);
}
public function term(){$this->sig_term = true;}
}
$a = new A();
usleep(1.2*T);
$a->doA(2);
$a->doA(3);
$a->doB("now b","something");
usleep(5.4*T);
$a->doA(5);
$a->doA(7);
usleep(3*T);
$a->term();
$a->join();
?>
very useful if you need syncronization from an outside event (ie: async socket read/write - your run() takes care of reading with timeout / signalig on_packet events, but writing may occur at any time (ie async heartbeeting) and fwrite($sock) while stream_select($sock) is waiting for timeout causes segfaults...)