1 安装pthreads
检查php版本,不是TS版的需要重新编译php增加–enable-maintainer-zts选项,此处省略安装
2 献上主要业务代码
daemon/push/question/worker.php
<?php
/**
* 推送工作线程
* 实现线程长驻、重复使用线程,线程超时、避免空线程占用资源
* @author yunfei
*
*/
class PushWorker extends Thread{
public $runing = false;
public $name;
public $param;
public $interval = 10000;//线程初始等待间隔10ms
public $timeout = 10;//线程超时时间
public $retry = 3;//重试次数
public function __construct($name, $timeout){
$this->runing = true;
$this->name = $name;
$this->param = '';
$this->timeout = $timeout;
}
/**
* 执行线程
* @see Threaded::run()
*/
public function run(){
$interval = $this->interval;
$wait_time = 0;
$retry = $this->retry;
while ($this->runing){//线程长驻、重复使用线程
if ($this->param) {//线程执行的参数
//重试功能
while($retry){
echo "线程[{$this->name}] 任务参数:".json_encode($this->param)." 开始执行";
//业务逻辑
$ret = array('code'=>200, 'msg'=>'执行结果');
echo "线程[{$this->name}] 任务参数:".json_encode($this->param)." 执行结果:{".json_encode($ret)."}";
if($ret['code'] == 200)
$retry = 0;
else {
$retry--;
if($retry == 0){//线程停止
$this->runing = false;
//$this->repush($info['content']);//失败任务回收
}
}
}
//初始化线程
$wait_time = 0;
$interval = $this->interval;
$this->param = '';//释放线程
$retry = $this->retry;
} else {
$interval = $interval << 1;//等待时间翻倍
}
usleep($interval);//线程等待
$wait_time += $interval;
echo "线程[{$this->name}]等待任务.. interval:[".($interval/1000)."ms] time:[".($wait_time/1000)."ms]";
//echo $this->interval;
//检查线程等待超时、线程退出
if($wait_time >= $this->timeout*1000000) $this->runing = false;
}
}
}
/**
* 线程池管理
* 实现线程池,任务分配功能
* @author yunfei
*
*/
class PoolWorker{
public $max_pthread = 100;
public $timeout = 10;
public $pool;
public function __construct($max_pthread, $timeout){
$this->max_pthread = $max_pthread;
$this->timeout = $timeout;
}
public function push($param){
$wait_time = 0;
$flag = false;//任务分发标识
//任务分配,若无可用线程则循环等待
while (!$flag){
$flag = true;
$bind = false;
//释放死掉的线程
if($this->pool){
foreach ($this->pool as $key => $worker){
if(!$worker->isRunning() && !$worker->runing){
unset($this->pool[$key]);
$this->max_pthread++;
}
}
}
//从线程池找一个空闲线程来执行任务
if($this->pool){
foreach ($this->pool as $worker){
if($worker->runing && $worker->param == ''){
$worker->param = $param;
$bind = true;
break;
}
}
}
//线程池没有可用线程时创建线程
if(!$bind){
if($this->max_pthread > 0){
$name = rand(10000, 65536);
$push_worker = new PushWorker($name, $this->timeout);//新建线程
$push_worker->start();
$push_worker->param = $param;
$this->pool[$name] = $push_worker;
$this->max_pthread--;
}else{
$flag = false;//标识任务未分出去
echo "任务".json_encode($param)."等待空闲线程...";
usleep(100000);//等待空闲线程100ms
$wait_time += 100000;
if($wait_time/1000000 >= $this->timeout) {
echo "任务[".$param['content']."] 等待超时";
return false;
}
}
}
}
}
}
$pool_worker = new PoolWorker($max_pthread, $timeout);
//多线程推送
for($i=0; $i<10; $i++){
$pool_worker->push($info);
}
//等待线程释放
while (count($pool_worker->pool)){
//遍历检查线程组运行结束
foreach ($pool_worker->pool as $key => $worker){
if($worker->param == '') {
$worker->runing = false;
unset($pool_worker->pool[$key]);
}
}
Ygtoo::output_log("等待线程结束...");
sleep(1);
}
//end
/**
* 推送工作线程
* 实现线程长驻、重复使用线程,线程超时、避免空线程占用资源
* @author yunfei
*
*/
class PushWorker extends Thread{
public $runing = false;
public $name;
public $param;
public $interval = 10000;//线程初始等待间隔10ms
public $timeout = 10;//线程超时时间
public $retry = 3;//重试次数
public function __construct($name, $timeout){
$this->runing = true;
$this->name = $name;
$this->param = '';
$this->timeout = $timeout;
}
/**
* 执行线程
* @see Threaded::run()
*/
public function run(){
$interval = $this->interval;
$wait_time = 0;
$retry = $this->retry;
while ($this->runing){//线程长驻、重复使用线程
if ($this->param) {//线程执行的参数
//重试功能
while($retry){
echo "线程[{$this->name}] 任务参数:".json_encode($this->param)." 开始执行";
//业务逻辑
$ret = array('code'=>200, 'msg'=>'执行结果');
echo "线程[{$this->name}] 任务参数:".json_encode($this->param)." 执行结果:{".json_encode($ret)."}";
if($ret['code'] == 200)
$retry = 0;
else {
$retry--;
if($retry == 0){//线程停止
$this->runing = false;
//$this->repush($info['content']);//失败任务回收
}
}
}
//初始化线程
$wait_time = 0;
$interval = $this->interval;
$this->param = '';//释放线程
$retry = $this->retry;
} else {
$interval = $interval << 1;//等待时间翻倍
}
usleep($interval);//线程等待
$wait_time += $interval;
echo "线程[{$this->name}]等待任务.. interval:[".($interval/1000)."ms] time:[".($wait_time/1000)."ms]";
//echo $this->interval;
//检查线程等待超时、线程退出
if($wait_time >= $this->timeout*1000000) $this->runing = false;
}
}
}
/**
* 线程池管理
* 实现线程池,任务分配功能
* @author yunfei
*
*/
class PoolWorker{
public $max_pthread = 100;
public $timeout = 10;
public $pool;
public function __construct($max_pthread, $timeout){
$this->max_pthread = $max_pthread;
$this->timeout = $timeout;
}
public function push($param){
$wait_time = 0;
$flag = false;//任务分发标识
//任务分配,若无可用线程则循环等待
while (!$flag){
$flag = true;
$bind = false;
//释放死掉的线程
if($this->pool){
foreach ($this->pool as $key => $worker){
if(!$worker->isRunning() && !$worker->runing){
unset($this->pool[$key]);
$this->max_pthread++;
}
}
}
//从线程池找一个空闲线程来执行任务
if($this->pool){
foreach ($this->pool as $worker){
if($worker->runing && $worker->param == ''){
$worker->param = $param;
$bind = true;
break;
}
}
}
//线程池没有可用线程时创建线程
if(!$bind){
if($this->max_pthread > 0){
$name = rand(10000, 65536);
$push_worker = new PushWorker($name, $this->timeout);//新建线程
$push_worker->start();
$push_worker->param = $param;
$this->pool[$name] = $push_worker;
$this->max_pthread--;
}else{
$flag = false;//标识任务未分出去
echo "任务".json_encode($param)."等待空闲线程...";
usleep(100000);//等待空闲线程100ms
$wait_time += 100000;
if($wait_time/1000000 >= $this->timeout) {
echo "任务[".$param['content']."] 等待超时";
return false;
}
}
}
}
}
}
$pool_worker = new PoolWorker($max_pthread, $timeout);
//多线程推送
for($i=0; $i<10; $i++){
$pool_worker->push($info);
}
//等待线程释放
while (count($pool_worker->pool)){
//遍历检查线程组运行结束
foreach ($pool_worker->pool as $key => $worker){
if($worker->param == '') {
$worker->runing = false;
unset($pool_worker->pool[$key]);
}
}
Ygtoo::output_log("等待线程结束...");
sleep(1);
}
//end
转载请注明:小Y » php的pthreads拓展使用