php的pthreads拓展使用

1 安装pthreads
检查php版本,不是TS版的需要重新编译php增加–enable-maintainer-zts选项,此处省略安装

2 献上主要业务代码
daemon/push/question/worker.php

<?php

/**
 * 推送工作线程
 * 实现线程长驻、重复使用线程,线程超时、避免空线程占用资源
 * @author yunfei
 *
 */

class PushWorker extends Thread{
   
    public $runing = false;
    public $name;
    public $param;
    public $interval = 10000;//线程初始等待间隔10ms
    public $timeout = 10;//线程超时时间
    public $retry = 3;//重试次数
   
    public function __construct($name, $timeout){
        $this->runing = true;
        $this->name   = $name;
        $this->param = '';
        $this->timeout = $timeout;
    }
   
    /**
     * 执行线程
     * @see Threaded::run()
     */

    public function run(){
        $interval = $this->interval;
        $wait_time = 0;
        $retry = $this->retry;
        while ($this->runing){//线程长驻、重复使用线程
            if ($this->param) {//线程执行的参数
                //重试功能
                while($retry){
                    echo "线程[{$this->name}] 任务参数:".json_encode($this->param)." 开始执行";
                    //业务逻辑
                    $ret = array('code'=>200, 'msg'=>'执行结果');
                    echo "线程[{$this->name}] 任务参数:".json_encode($this->param)." 执行结果:{".json_encode($ret)."}";
                    if($ret['code'] == 200)
                        $retry = 0;
                    else {
                        $retry--;
                        if($retry == 0){//线程停止
                            $this->runing = false;
                            //$this->repush($info['content']);//失败任务回收
                        }
                    }
                }

                //初始化线程
                $wait_time = 0;
                $interval = $this->interval;
                $this->param = '';//释放线程
                $retry = $this->retry;
            } else {
                $interval = $interval << 1;//等待时间翻倍
            }

            usleep($interval);//线程等待
            $wait_time += $interval;
            echo "线程[{$this->name}]等待任务.. interval:[".($interval/1000)."ms] time:[".($wait_time/1000)."ms]";
            //echo $this->interval;
            //检查线程等待超时、线程退出
            if($wait_time >= $this->timeout*1000000) $this->runing = false;
        }
    }
}

/**
 * 线程池管理
 * 实现线程池,任务分配功能
 * @author yunfei
 *
 */

class PoolWorker{

    public $max_pthread = 100;
    public $timeout = 10;
   
    public $pool;
   
    public function __construct($max_pthread, $timeout){
        $this->max_pthread = $max_pthread;
        $this->timeout = $timeout;
    }
   
    public function push($param){
        $wait_time = 0;
        $flag = false;//任务分发标识
        //任务分配,若无可用线程则循环等待
        while (!$flag){
            $flag = true;
            $bind = false;
            //释放死掉的线程
            if($this->pool){
                foreach ($this->pool as $key => $worker){
                    if(!$worker->isRunning() && !$worker->runing){
                        unset($this->pool[$key]);
                        $this->max_pthread++;
                    }
                }
            }

            //从线程池找一个空闲线程来执行任务
            if($this->pool){
                foreach ($this->pool as $worker){
                    if($worker->runing && $worker->param == ''){
                        $worker->param = $param;
                        $bind = true;
                        break;
                    }
                }
            }
           
            //线程池没有可用线程时创建线程
            if(!$bind){
                if($this->max_pthread > 0){
                    $name = rand(10000, 65536);
                    $push_worker = new PushWorker($name, $this->timeout);//新建线程
                    $push_worker->start();
                    $push_worker->param = $param;
                    $this->pool[$name] = $push_worker;
                    $this->max_pthread--;
                }else{
                    $flag = false;//标识任务未分出去
                    echo "任务".json_encode($param)."等待空闲线程...";
                    usleep(100000);//等待空闲线程100ms
                    $wait_time += 100000;
                    if($wait_time/1000000 >= $this->timeout) {
                        echo "任务[".$param['content']."] 等待超时";
                        return false;
                    }
                }
            }
        }
    }
}

$pool_worker = new PoolWorker($max_pthread, $timeout);
//多线程推送
for($i=0; $i<10; $i++){
    $pool_worker->push($info);
}

//等待线程释放
while (count($pool_worker->pool)){
    //遍历检查线程组运行结束
    foreach ($pool_worker->pool as $key => $worker){
        if($worker->param == '') {
            $worker->runing = false;
            unset($pool_worker->pool[$key]);
        }
    }
    Ygtoo::output_log("等待线程结束...");
    sleep(1);
}
//end

转载请注明:小Y » php的pthreads拓展使用

赞 (0) 评论 (0) 分享 ()

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址