php多进程类

使用介绍

<?php
/**
 * worker
 * @author xxx
 *
 */

 
class Worker
{
    //worker process number
    public $worker_num = 4;
    //if daemonize
    public $daemonize = false;
    //worker array
    public $arr_worker = array();
    //sysvmsg queue
    public $queue;
    public $onReceive;
   
    public $onStart;
    //log file
    public $log_file;
    //max message size
    public $max_size = 4096;
   
    public $block = true;
   
    public function __construct() {
        $this->init();
    }
   
    public function init(){
        $key = ftok(__FILE__, 'R');
        if(msg_queue_exists($key)){
            $this->queue = msg_get_queue($key, 0666);
            msg_remove_queue($this->queue);
        }
        $this->queue = msg_get_queue($key, 0666);
        //$this->log_file = 'Worker.log';
    }
   
    /**
     * 设置
     * @param unknown $config
     */

    public function set($config){
        foreach ($config as $key => $value){
            $this->{$key} = $value;
        }
    }
   
    /**
     * 运行
     */

    public function run(){
        $this->options();//parse options
        $this->daemonize();
        for ($i = 0; $i < $this->worker_num; $i++){
            $this->fork_child($i);
        }
        //$this->monitor();
        sleep(1);//睡一觉(进程同步)
    }
    /**
     * parse user options
     */

    public function options(){
        $short = "c:d::";
        $longopts = array('stop', 'help');
        $options = getopt($short, $longopts);
        if(isset($options['c']) && $options['c'] > 0){
            $this->worker_num = $options['c'];
        }
        if(isset($options['d'])){
            $this->daemonize = true;
        }
        if(isset($options['stop'])){
            global $argv;
            exec("kill `ps -ef|grep ".$argv[0]."|grep -v grep|grep -v vi|awk '{print $2}'`");
            exit();
        }
        if(isset($options['help'])){
            echo "if user set options and set config, options is first vaild!".PHP_EOL;
            echo "-c <number>    worker number".PHP_EOL;
            echo "-d             daemonize".PHP_EOL;
            echo "--stop         stop all worker".PHP_EOL;
            echo "--help         help".PHP_EOL;
            exit();
        }
    }
   
    /**
     * Run as deamon mode.
     * copy from workerman
     * @throws Exception
     */

    public function daemonize(){
        if(!$this->daemonize){
            return ;
        }
        umask(0);
        $pid = pcntl_fork();
        if($pid === -1){
            throw new Exception("fork error!");
        }else if($pid > 0){
            exit(0);
        }
        if (-1 === posix_setsid()) {
            throw new Exception("setsid fail");
        }
        // Fork again avoid SVR4 system regain the control of terminal.
        if($pid === -1){
            throw new Exception("fork error!");
        }else if($pid > 0){
            exit(0);
        }
    }
    /*
    public function monitor(){
        while (1){
            $pid = pcntl_wait($status);
            $this->arr_worker[$pid];
            $this->fork_child();
        }
    }
     */

   
    /**
     * fork a child process
     * @throws Exception
     */

    public function fork_child($i){
        $pid = pcntl_fork();
        if( $pid > 0 ){
            $this->arr_worker[$i] = $pid;
        }else if($pid == 0){
            if($this->onStart){
                call_user_func($this->onStart, $this);
            }
            $this->loop();
        }else{
            throw new Exception("fork error!");
        }
    }
   
    /**
     * 轮询
     */

    public function loop(){
        $pid = posix_getpid();
        $flags = 0;
        if(!$this->block){//设置非阻塞
            $flags = MSG_IPC_NOWAIT;
        }
        while (1){
            $msgtype = '';
            $message = '';
            $errcode = 0;
            //只接收msgtype=$pid的消息
            if(msg_receive($this->queue, $pid, $msgtype, $this->max_size, $message, false, $flags, $errcode)){
                //$this->log("receive $message strlen[".strlen($message).'] msgtype:'.$msgtype);
                if(trim($message) == "exit()"){
                    $this->log("process exit!");
                    exit();
                }else{
                    call_user_func($this->onReceive, $this, $message);
                }
            }
            if($errcode > 0){
                $this->log("receive errcode:".$errcode);
                sleep(1);
            }
            usleep(100);
        }
    }
   
    public $i = 0;
   
    /**
     * 发送数据
     * @param string $message
     * @param int    $id
     */

    public function send($message, $id = -1){
        //if($id == -1){
        //    $i = sprintf("%u", crc32($message)) % $this->worker_num;
        //}else{
        //    $i = $id % $this->worker_num;
        //}
        $i = $this->i % $this->worker_num;
        $this->i++;
        $j = 100;//最大重试次数
        do{
            //$this->log("send ".$message);
            $errcode = 0;
            $result = @msg_send($this->queue, $this->arr_worker[$i], $message, false, $this->block, $errcode);
            //send failed retry
            if(!$result){
                $this->log("send errcode:".$errcode);
                sleep(1);
                $j--;
            }
        } while(!$result && $j);
        return $result;
    }
    /**
     * 停止
     */

    public function stop(){
        foreach ($this->arr_worker as $pid){
            //发送消息类型为$pid的message
            if(msg_send($this->queue, $pid, "exit()", false)){
                $pid = pcntl_wait($status);
                $this->log("recover child process $pid");
                unset($this->arr_worker[array_search($pid, $this->arr_worker)]);
            }
        }
        msg_remove_queue($this->queue);//destory a message queue
    }
    private $index = 0;
   
    /**
     * log
     * @param unknown $message
     */

    public function log($message){
        $message = date("Y-m-d H:i:s").' '.posix_getpid().' '.$message.PHP_EOL;
        if(!$this->daemonize || !$this->log_file){
            echo $message;
        }else{
            $filename = $this->log_file.sprintf("%04d", $this->index);
            $stat = stat($filename);
            //日志文件大小大于2G则更换文件
            while($stat['size'] >= 2 * 1024 * 1024 * 1024){
                $this->index++;
                $filename = $this->log_file.sprintf("%04d", $this->index);
                $stat = stat($filename);
            }
            error_log($message, 3, $filename);
        }
    }
}

转载请注明:小Y » php多进程类

赞 (0) 评论 (0) 分享 ()

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址