workerman源码学习

workerman源码解读

首先最简单的server端代码是

use Workerman\Worker;
require_once './Autoloader.php';

// 创建一个Worker监听2346端口,使用websocket协议通讯
$ws_worker = new Worker("tcp://0.0.0.0:2346");

// 启动4个进程对外提供服务
$ws_worker->count = 4;

// 当收到客户端发来的数据后返回hello $data给客户端
$ws_worker->onMessage = function($connection, $data)
{
    // 向客户端发送hello $data
    $connection->send('hello ' . $data);
};

// 运行
Worker::runAll();

然后看看new worker的时候都在做什么

    /**
     * worker构造函数
     *
     * @param string $socket_name
     * @param array  $context_option
     */

    public function __construct($socket_name = '', $context_option = array())
    {
        // 保存worker实例
        $this->workerId = spl_object_hash($this);
        self::$_workers[$this->workerId] = $this;
        self::$_pidMap[$this->workerId] = array();
       
        // 获得实例化文件路径,用于自动加载设置根目录
        $backrace = debug_backtrace();
        $this->_appInitPath = dirname($backrace[0]['file']);
       
        // 设置socket上下文
        if($socket_name)
        {
            $this->_socketName = $socket_name;
            if(!isset($context_option['socket']['backlog']))
            {
                $context_option['socket']['backlog'] = self::DEFAUL_BACKLOG;
            }
            $this->_context = stream_context_create($context_option);
        }
       
        // 设置一个空的onMessage,当onMessage未设置时用来消费socket数据
        $this->onMessage = function(){};
    }

这段代码保存当前对象到静态属性$_workers数组,然后初始化了自动加载根目录、设置socket上下文、初始化$this->onMessage属性

接着是这段代码

// 启动4个进程对外提供服务
$ws_worker->count = 4;

设置ws_worker对象的公共属性

然后就是绑定一个匿名函数到$this->onMessage属性上

// 当收到客户端发来的数据后返回hello $data给客户端
$ws_worker->onMessage = function($connection, $data)
{
    // 向客户端发送hello $data
    $connection->send('hello ' . $data);
};

函数定义了当客户端发来消息时的响应

再接着就是运行

// 运行
Worker::runAll();

现在看看runAll方法都在干什么

    /**
     * 运行所有worker实例
     * @return void
     */

    public static function runAll()
    {
        // 初始化环境变量
        self::init();
        // 解析命令
        self::parseCommand();
        // 尝试以守护进程模式运行
        self::daemonize();
        // 初始化所有worker实例,主要是监听端口
        self::initWorkers();
        //  初始化所有信号处理函数
        self::installSignal();
        // 保存主进程pid
        self::saveMasterPid();
        // 创建子进程(worker进程)并运行
        self::forkWorkers();
        // 展示启动界面
        self::displayUI();
        // 尝试重定向标准输入输出
        self::resetStd();
        // 监控所有子进程(worker进程)
        self::monitorWorkers();
    }

runAll方法中做了调用了很多方法,都有注释,可以帮助我们了解方法的功能

逐个看看每个方法都是怎么工作的

// 初始化环境变量
self::init();
    /**
     * 初始化一些环境变量
     * @return void
     */

    protected static function init()
    {
        // 如果没设置$pidFile,则生成默认值
        if(empty(self::$pidFile))
        {
            $backtrace = debug_backtrace();
            self::$_startFile = $backtrace[count($backtrace)-1]['file'];
            self::$pidFile = __DIR__ . "/../".str_replace('/', '_', self::$_startFile).".pid";
        }
        // 没有设置日志文件,则生成一个默认值
        if(empty(self::$logFile))
        {
            self::$logFile = __DIR__ . '/../workerman.log';
        }
        touch(self::$logFile);
        chmod(self::$logFile, 0622);
        // 标记状态为启动中
        self::$_status = self::STATUS_STARTING;
        // 启动时间戳
        self::$_globalStatistics['start_timestamp'] = time();
        // 设置status文件位置
        self::$_statisticsFile = sys_get_temp_dir().'/workerman.status';
        // 尝试设置进程名称(需要php>=5.5或者安装了proctitle扩展)
        self::setProcessTitle('WorkerMan: master process  start_file=' . self::$_startFile);
        // 初始化id
        self::initId();
        // 初始化定时器
        Timer::init();
    }

这个方法里面就是一些环节的初始化,嵌套的函数有setProcessTitle()、initId()、Timer::init()

尝试设置进程名称 setProcessTitle方法

    /**
     * 设置当前进程的名称,在ps aux命令中有用
     * 注意 需要php>=5.5或者安装了protitle扩展
     * @param string $title
     * @return void
     */

    protected static function setProcessTitle($title)
    {
        // >=php 5.5
        if (function_exists('cli_set_process_title'))
        {
            @cli_set_process_title($title);
        }
        // 需要扩展
        elseif(extension_loaded('proctitle') && function_exists('setproctitle'))
        {
            @setproctitle($title);
        }
    }

逻辑相对简单、就是判断有没有可用的函数,有的话就设置

初始化Worker::initId()方法

    /**
     * 初始化idMap
     * return void
     */

    protected static function initId()
    {
        foreach(self::$_workers as $worker_id=>$worker)
        {
            self::$_idMap[$worker_id] = array_fill(0, $worker->count, 0);
        }
    }

这个函数初始化另一个属性静态属性$_idMap,也比较简单,注意这块的self::$_workers只有一个主进程对象

初始化定时器 Timer::init()方法

    /**
     * 初始化
     * @return void
     */

    public static function init($event = null)
    {
        if($event)
        {
            self::$_event = $event;
        }
        else
        {
            pcntl_signal(SIGALRM, array('\Workerman\Lib\Timer', 'signalHandle'), false);
        }
    }

这个方法安装了一个信号处理器,其中信号是SIGALRM,处理函数是signalHandle,在这里的signalHandle函数是没有执行的

Timer::signalHandle()

    /**
     * 信号处理函数,只处理ALARM事件
     * @return void
     */

    public static function signalHandle()
    {
        if(!self::$_event)
        {
            pcntl_alarm(1);
            self::tick();
        }
    }

pcntl_alarm的作用是创建一个计时器,在指定的秒数后向进程发送一个SIGALRM信号,每次对 pcntl_alarm()的调用都会取消之前设置的alarm信号

其中Timer::tick函数原型是

    /**
     * 尝试触发定时回调
     * @return void
     */

    public static function tick()
    {
        if(empty(self::$_tasks))//初始化时运行这段代码
        {
            pcntl_alarm(0);
            return;
        }
       
        $time_now = time();
        foreach (self::$_tasks as $run_time=>$task_data)
        {
            if($time_now >= $run_time)
            {
                foreach($task_data as $index=>$one_task)
                {
                    $task_func = $one_task[0];
                    $task_args = $one_task[1];
                    $persistent = $one_task[2];
                    $time_interval = $one_task[3];
                    try
                    {
                        call_user_func_array($task_func, $task_args);
                    }
                    catch(\Exception $e)
                    {
                        echo $e;
                    }
                    if($persistent)
                    {
                        self::add($time_interval, $task_func, $task_args);
                    }
                }
                unset(self::$_tasks[$run_time]);
            }
        }
    }

Worker::parseCommand()解析命令函数
这个函数解析用户的命令参数,根据命令参数执行动作

Worker::daemonize()函数根据用户参数尝试以守护进程模式运行

Worker::initWorkers()函数初始化所有worker实例,主要是监听端口
当前也只有一个进程
其中重要的一段代码是,该段代码选择性监听端口

            // 如果端口不可复用,则直接在主进程就监听
            if(!$worker->reusePort)
            {
                // 监听端口
                $worker->listen();
            }

Worker::listen()方法

    /**
     * 监听端口
     * @throws Exception
     */

    public function listen()
    {
        //已设置_socketName属性或者未设置_mainSocket才可以执行下面代码
        if(!$this->_socketName || $this->_mainSocket)
        {
            return;
        }
 
        // 设置自动加载根目录  
        Autoloader::setRootPath($this->_appInitPath);

        $local_socket = $this->_socketName;
        // 获得应用层通讯协议以及监听的地址
        list($scheme, $address) = explode(':', $this->_socketName, 2);
        // 如果有指定应用层协议,则检查对应的协议类是否存在
        if(!isset(self::$_builtinTransports[$scheme]))
        {
            $scheme = ucfirst($scheme);
            $this->protocol = "\\Protocols\\$scheme";//此处有改动,源代码('\\Protocols\'.$scheme)显示会有问题
            if(!class_exists($this->protocol))
            {
                $this->protocol = "\\Workerman\\Protocols\\$scheme";
                if(!class_exists($this->protocol))//协议不存在
                {
                    throw new Exception("class \\Protocols\\$scheme not exist");
                }
            }
            $local_socket = $this->transport.":".$address;//替换用户定义协议为tcp
        }
        else
        {
            $this->transport = self::$_builtinTransports[$scheme];
        }
       
        // flag
        $flags =  $this->transport === 'udp' ? STREAM_SERVER_BIND : STREAM_SERVER_BIND | STREAM_SERVER_LISTEN;
        $errno = 0;
        $errmsg = '';
        // 如果设置了端口复用,则设置SO_REUSEPORT选项为1
        if($this->reusePort)
        {
            stream_context_set_option($this->_context, 'socket', 'so_reuseport', 1);
        }
        if($this->transport === 'unix')
        {
            umask(0);
            if(!is_file($address))
            {
                register_shutdown_function(function()use($address){@unlink($address);});//定义执行结束时运行的匿名函数
            }
        }
        // 创建监听
        $this->_mainSocket = stream_socket_server($local_socket, $errno, $errmsg, $flags, $this->_context);
        if(!$this->_mainSocket)
        {
            throw new Exception($errmsg);
        }
       
        // 尝试打开tcp的keepalive,关闭TCP Nagle算法
        if(function_exists('socket_import_stream') && $this->transport === 'tcp')
        {
            $socket   = socket_import_stream($this->_mainSocket );
            @socket_set_option($socket, SOL_SOCKET, SO_KEEPALIVE, 1);
            @socket_set_option($socket, SOL_SOCKET, TCP_NODELAY, 1);
        }
       
        // 设置非阻塞
        stream_set_blocking($this->_mainSocket, 0);
       
        // 放到全局事件轮询中监听_mainSocket可读事件(客户端连接事件)
        if(self::$globalEvent)//现在self::$globalEvent还是null,所以下面代码还没执行
        {
            if($this->transport !== 'udp')
            {
                self::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, 'acceptConnection'));
            }
            else
            {
                self::$globalEvent->add($this->_mainSocket,  EventInterface::EV_READ, array($this, 'acceptUdpConnection'));
            }
        }
    }

Worker::installSignal()初始化所有信号处理函数

        // ignore
        pcntl_signal(SIGPIPE, SIG_IGN, false);//忽视SIGPIPE信号

这里对SIGINT、SIGUSR、SIGUSR2信号安装处理函数

    /**
     * 信号处理函数
     * @param int $signal
     */

    public static function signalHandler($signal)
    {
        switch($signal)
        {
            // stop
            case SIGINT:
                self::stopAll();
                break;
            // reload
            case SIGUSR1:
                self::$_pidsToRestart = self::getAllWorkerPids();
                self::reload();
                break;
            // show status
            case SIGUSR2:
                self::writeStatisticsToStatusFile();
                break;
        }
    }

Worker::saveMasterPid()保存主进程pid

Worker::forkWorkers()创建子进程(worker进程)并运行

    /**
     * 创建子进程
     * @return void
     */

    protected static function forkWorkers()
    {
        /** @var static $worker */
        foreach(self::$_workers as $worker)
        {
            // 启动过程中需要得到运行用户名的最大长度,在status时格式化展示
            if(self::$_status === self::STATUS_STARTING)
            {
                if(empty($worker->name))
                {
                    $worker->name = $worker->getSocketName();
                }
                $worker_name_length = strlen($worker->name);
                if(self::$_maxWorkerNameLength < $worker_name_length)
                {
                    self::$_maxWorkerNameLength = $worker_name_length;
                }
            }
           
            // 创建子进程
            while(count(self::$_pidMap[$worker->workerId]) < $worker->count)
            {
                static::forkOneWorker($worker);
            }
        }
    }

    /**
     * 创建一个子进程
     * @param Worker $worker
     * @throws Exception
     */

    protected static function forkOneWorker($worker)
    {
        $pid = pcntl_fork();
        // 获得可用的id
        $id = self::getId($worker->workerId, 0);
        // 主进程记录子进程pid
        if($pid > 0)
        {
            self::$_pidMap[$worker->workerId][$pid] = $pid;
            self::$_idMap[$worker->workerId][$id] = $pid;
        }
        // 子进程运行
        elseif(0 === $pid)
        {
            // 如果设置了端口复用,则在子进程执行监听
            if($worker->reusePort)
            {
                $worker->listen();
            }
            // 启动过程中尝试重定向标准输出
            if(self::$_status === self::STATUS_STARTING)
            {
                self::resetStd();
            }
            self::$_pidMap = array();
            self::$_workers = array($worker->workerId => $worker);
            Timer::delAll();
            self::setProcessTitle('WorkerMan: worker process  ' . $worker->name . ' ' . $worker->getSocketName());
            $worker->setUserAndGroup();
            $worker->id = $id;
            $worker->run();
            exit(250);
        }
        else
        {
            throw new Exception("forkOneWorker fail");
        }
    }

Timer::delAll()函数用来删除现在所有的定时,目测当前没有执行中的定时,这段代码暂时未明白缘由

Worker::run()方法运行子进程执行时的逻辑

    /**
     * 运行worker实例
     */

    public function run()
    {
        //更新 Worker 状态
        self::$_status = self::STATUS_RUNNING;
       
        // 注册进程退出回调,用来检查是否有错误
        register_shutdown_function(array("\\Workerman\\Worker", 'checkErrors'));
       
        // 设置自动加载根目录
        Autoloader::setRootPath($this->_appInitPath);
       
        // 如果没有全局事件轮询,则创建一个
        if(!self::$globalEvent)
        {
            if(extension_loaded('libevent'))
            {
                self::$globalEvent = new Libevent();
            }
            else
            {
                self::$globalEvent = new Select();
            }
            // 监听_mainSocket上的可读事件(客户端连接事件)
            if($this->_socketName)
            {
                if($this->transport !== 'udp')
                {
                    self::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, 'acceptConnection'));
                }
                else
                {
                    self::$globalEvent->add($this->_mainSocket,  EventInterface::EV_READ, array($this, 'acceptUdpConnection'));
                }
            }
        }
       
        // 重新安装事件处理函数,使用全局事件轮询监听信号事件
        self::reinstallSignal();
       
        // 用全局事件轮询初始化定时器
        Timer::init(self::$globalEvent);
       
        // 如果有设置进程启动回调,则执行
        if($this->onWorkerStart)
        {
            call_user_func($this->onWorkerStart, $this);
        }
       
        // 子进程主循环
        self::$globalEvent->loop();
    }

Select::add()函数添加事件及处理函数

    /**
     * 添加事件及处理函数
     * @see Events\EventInterface::add()
     */

    public function add($fd, $flag, $func, $args = array())
    {
        switch ($flag)
        {
            case self::EV_READ:
                $fd_key = (int)$fd;
                $this->_allEvents[$fd_key][$flag] = array($func, $fd);
                $this->_readFds[$fd_key] = $fd;
                break;
            case self::EV_WRITE:
                $fd_key = (int)$fd;
                $this->_allEvents[$fd_key][$flag] = array($func, $fd);
                $this->_writeFds[$fd_key] = $fd;
                break;
            case self::EV_SIGNAL:
                $fd_key = (int)$fd;
                $this->_signalEvents[$fd_key][$flag] = array($func, $fd);
                pcntl_signal($fd, array($this, 'signalHandler'));
                break;
            case self::EV_TIMER:
            case self::EV_TIMER_ONCE:
                // $fd 为 定时的时间间隔,单位为秒,支持小数,能精确到0.001秒
                $run_time = microtime(true)+$fd;
                $this->_scheduler->insert($this->_timerId, -$run_time);
                $this->_task[$this->_timerId] = array($func, (array)$args, $flag, $fd);
                $this->tick();
                return $this->_timerId++;
        }
       
        return true;
    }

Worker::acceptConnection()读事件处理函数

    /**
     * 接收一个客户端连接
     * @param resource $socket
     * @return void
     */

    public function acceptConnection($socket)
    {
        // 获得客户端连接
        $new_socket = @stream_socket_accept($socket, 0, $remote_address);
        // 惊群现象,忽略
        if(false === $new_socket)
        {
            return;
        }
       
        // 初始化连接对象
        $connection = new TcpConnection($new_socket, $remote_address);
        $this->connections[$connection->id] = $connection;
        $connection->worker = $this;
        $connection->protocol = $this->protocol;
        $connection->onMessage = $this->onMessage;
        $connection->onClose = $this->onClose;
        $connection->onError = $this->onError;
        $connection->onBufferDrain = $this->onBufferDrain;
        $connection->onBufferFull = $this->onBufferFull;
       
        // 如果有设置连接回调,则执行
        if($this->onConnect)
        {
            call_user_func($this->onConnect, $connection);
        }
    }

Worker::reinstallSignal()重新安装子进程事件处理函数,使用全局事件轮询监听信号事件

Timer::init(self::$globalEvent)用全局事件轮询初始化定时器

self::$globalEvent->loop()子进程主循环,Select::loop()或者libevent::loop();
Select::loop()函数

    /**
     * 主循环
     * @see Events\EventInterface::loop()
     */

    public function loop()
    {
        $e = null;
        while (1)
        {
            // 如果有信号,尝试执行信号处理函数
            pcntl_signal_dispatch();
           
            $read = $this->_readFds;
            $write = $this->_writeFds;
            // 等待可读或者可写事件
            @stream_select($read, $write, $e, 0, $this->_selectTimeout);
           
            // 尝试执行定时任务
            if(!$this->_scheduler->isEmpty())
            {
                $this->tick();
            }
           
            // 这些描述符可读,执行对应描述符的读回调函数
            if($read)
            {
                foreach($read as $fd)
                {
                    $fd_key = (int) $fd;
                    if(isset($this->_allEvents[$fd_key][self::EV_READ]))
                    {
                        call_user_func_array($this->_allEvents[$fd_key][self::EV_READ][0], array($this->_allEvents[$fd_key][self::EV_READ][1]));
                    }
                }
            }
           
            // 这些描述符可写,执行对应描述符的写回调函数
            if($write)
            {
                foreach($write as $fd)
                {
                    $fd_key = (int) $fd;
                    if(isset($this->_allEvents[$fd_key][self::EV_WRITE]))
                    {
                        call_user_func_array($this->_allEvents[$fd_key][self::EV_WRITE][0], array($this->_allEvents[$fd_key][self::EV_WRITE][1]));
                    }
                }
            }
        }
    }

Worker::displayUI()展示启动界面

Worker::resetStd()尝试重定向标准输入输出

Worker::monitorWorkers()监控所有子进程(worker进程)父进程轮询

    /**
     * 监控所有子进程的退出事件及退出码
     * @return void
     */

    protected static function monitorWorkers()
    {
        self::$_status = self::STATUS_RUNNING;
        while(1)
        {
            // 如果有信号到来,尝试触发信号处理函数
            pcntl_signal_dispatch();
            // 挂起进程,直到有子进程退出或者被信号打断
            $status = 0;
            $pid = pcntl_wait($status, WUNTRACED);
            // 如果有信号到来,尝试触发信号处理函数
            pcntl_signal_dispatch();
            // 有子进程退出
            if($pid > 0)
            {
                // 查找是哪个进程组的,然后再启动新的进程补上
                foreach(self::$_pidMap as $worker_id => $worker_pid_array)
                {
                    if(isset($worker_pid_array[$pid]))
                    {
                        $worker = self::$_workers[$worker_id];
                        // 检查退出状态
                        if($status !== 0)
                        {
                            self::log("worker[".$worker->name.":$pid] exit with status $status");
                        }
                       
                        // 统计,运行status命令时使用
                        if(!isset(self::$_globalStatistics['worker_exit_info'][$worker_id][$status]))
                        {
                            self::$_globalStatistics['worker_exit_info'][$worker_id][$status] = 0;
                        }
                        self::$_globalStatistics['worker_exit_info'][$worker_id][$status]++;
                       
                        // 清除子进程信息
                        unset(self::$_pidMap[$worker_id][$pid]);
                       
                        // 标记$id为可用id
                        $id = self::getId($worker_id, $pid);
                        self::$_idMap[$worker_id][$id] = 0;
                       
                        break;
                    }
                }
                // 如果不是关闭状态,则补充新的进程
                if(self::$_status !== self::STATUS_SHUTDOWN)
                {
                    self::forkWorkers();
                    // 如果该进程是因为运行reload命令退出,则继续执行reload流程
                    if(isset(self::$_pidsToRestart[$pid]))
                    {
                        unset(self::$_pidsToRestart[$pid]);
                        self::reload();
                    }
                }
                else
                {
                    // 如果是关闭状态,并且所有进程退出完毕,则主进程退出
                    if(!self::getAllWorkerPids())
                    {
                        self::exitAndClearAll();
                    }
                }
            }
            else
            {
                // 如果是关闭状态,并且所有进程退出完毕,则主进程退出
                if(self::$_status === self::STATUS_SHUTDOWN && !self::getAllWorkerPids())
                {
                   self::exitAndClearAll();
                }
            }
        }
    }

转载请注明:小Y » workerman源码学习

赞 (0) 评论 (0) 分享 ()

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址