workerman源码解读
首先最简单的server端代码是
require_once './Autoloader.php';
// 创建一个Worker监听2346端口,使用websocket协议通讯
$ws_worker = new Worker("tcp://0.0.0.0:2346");
// 启动4个进程对外提供服务
$ws_worker->count = 4;
// 当收到客户端发来的数据后返回hello $data给客户端
$ws_worker->onMessage = function($connection, $data)
{
// 向客户端发送hello $data
$connection->send('hello ' . $data);
};
// 运行
Worker::runAll();
然后看看new worker的时候都在做什么
* worker构造函数
*
* @param string $socket_name
* @param array $context_option
*/
public function __construct($socket_name = '', $context_option = array())
{
// 保存worker实例
$this->workerId = spl_object_hash($this);
self::$_workers[$this->workerId] = $this;
self::$_pidMap[$this->workerId] = array();
// 获得实例化文件路径,用于自动加载设置根目录
$backrace = debug_backtrace();
$this->_appInitPath = dirname($backrace[0]['file']);
// 设置socket上下文
if($socket_name)
{
$this->_socketName = $socket_name;
if(!isset($context_option['socket']['backlog']))
{
$context_option['socket']['backlog'] = self::DEFAUL_BACKLOG;
}
$this->_context = stream_context_create($context_option);
}
// 设置一个空的onMessage,当onMessage未设置时用来消费socket数据
$this->onMessage = function(){};
}
这段代码保存当前对象到静态属性$_workers数组,然后初始化了自动加载根目录、设置socket上下文、初始化$this->onMessage属性
接着是这段代码
$ws_worker->count = 4;
设置ws_worker对象的公共属性
然后就是绑定一个匿名函数到$this->onMessage属性上
$ws_worker->onMessage = function($connection, $data)
{
// 向客户端发送hello $data
$connection->send('hello ' . $data);
};
函数定义了当客户端发来消息时的响应
再接着就是运行
Worker::runAll();
现在看看runAll方法都在干什么
* 运行所有worker实例
* @return void
*/
public static function runAll()
{
// 初始化环境变量
self::init();
// 解析命令
self::parseCommand();
// 尝试以守护进程模式运行
self::daemonize();
// 初始化所有worker实例,主要是监听端口
self::initWorkers();
// 初始化所有信号处理函数
self::installSignal();
// 保存主进程pid
self::saveMasterPid();
// 创建子进程(worker进程)并运行
self::forkWorkers();
// 展示启动界面
self::displayUI();
// 尝试重定向标准输入输出
self::resetStd();
// 监控所有子进程(worker进程)
self::monitorWorkers();
}
runAll方法中做了调用了很多方法,都有注释,可以帮助我们了解方法的功能
逐个看看每个方法都是怎么工作的
self::init();
* 初始化一些环境变量
* @return void
*/
protected static function init()
{
// 如果没设置$pidFile,则生成默认值
if(empty(self::$pidFile))
{
$backtrace = debug_backtrace();
self::$_startFile = $backtrace[count($backtrace)-1]['file'];
self::$pidFile = __DIR__ . "/../".str_replace('/', '_', self::$_startFile).".pid";
}
// 没有设置日志文件,则生成一个默认值
if(empty(self::$logFile))
{
self::$logFile = __DIR__ . '/../workerman.log';
}
touch(self::$logFile);
chmod(self::$logFile, 0622);
// 标记状态为启动中
self::$_status = self::STATUS_STARTING;
// 启动时间戳
self::$_globalStatistics['start_timestamp'] = time();
// 设置status文件位置
self::$_statisticsFile = sys_get_temp_dir().'/workerman.status';
// 尝试设置进程名称(需要php>=5.5或者安装了proctitle扩展)
self::setProcessTitle('WorkerMan: master process start_file=' . self::$_startFile);
// 初始化id
self::initId();
// 初始化定时器
Timer::init();
}
这个方法里面就是一些环节的初始化,嵌套的函数有setProcessTitle()、initId()、Timer::init()
尝试设置进程名称 setProcessTitle方法
* 设置当前进程的名称,在ps aux命令中有用
* 注意 需要php>=5.5或者安装了protitle扩展
* @param string $title
* @return void
*/
protected static function setProcessTitle($title)
{
// >=php 5.5
if (function_exists('cli_set_process_title'))
{
@cli_set_process_title($title);
}
// 需要扩展
elseif(extension_loaded('proctitle') && function_exists('setproctitle'))
{
@setproctitle($title);
}
}
逻辑相对简单、就是判断有没有可用的函数,有的话就设置
初始化Worker::initId()方法
* 初始化idMap
* return void
*/
protected static function initId()
{
foreach(self::$_workers as $worker_id=>$worker)
{
self::$_idMap[$worker_id] = array_fill(0, $worker->count, 0);
}
}
这个函数初始化另一个属性静态属性$_idMap,也比较简单,注意这块的self::$_workers只有一个主进程对象
初始化定时器 Timer::init()方法
* 初始化
* @return void
*/
public static function init($event = null)
{
if($event)
{
self::$_event = $event;
}
else
{
pcntl_signal(SIGALRM, array('\Workerman\Lib\Timer', 'signalHandle'), false);
}
}
这个方法安装了一个信号处理器,其中信号是SIGALRM,处理函数是signalHandle,在这里的signalHandle函数是没有执行的
Timer::signalHandle()
* 信号处理函数,只处理ALARM事件
* @return void
*/
public static function signalHandle()
{
if(!self::$_event)
{
pcntl_alarm(1);
self::tick();
}
}
pcntl_alarm的作用是创建一个计时器,在指定的秒数后向进程发送一个SIGALRM信号,每次对 pcntl_alarm()的调用都会取消之前设置的alarm信号
其中Timer::tick函数原型是
* 尝试触发定时回调
* @return void
*/
public static function tick()
{
if(empty(self::$_tasks))//初始化时运行这段代码
{
pcntl_alarm(0);
return;
}
$time_now = time();
foreach (self::$_tasks as $run_time=>$task_data)
{
if($time_now >= $run_time)
{
foreach($task_data as $index=>$one_task)
{
$task_func = $one_task[0];
$task_args = $one_task[1];
$persistent = $one_task[2];
$time_interval = $one_task[3];
try
{
call_user_func_array($task_func, $task_args);
}
catch(\Exception $e)
{
echo $e;
}
if($persistent)
{
self::add($time_interval, $task_func, $task_args);
}
}
unset(self::$_tasks[$run_time]);
}
}
}
Worker::parseCommand()解析命令函数
这个函数解析用户的命令参数,根据命令参数执行动作
Worker::daemonize()函数根据用户参数尝试以守护进程模式运行
Worker::initWorkers()函数初始化所有worker实例,主要是监听端口
当前也只有一个进程
其中重要的一段代码是,该段代码选择性监听端口
if(!$worker->reusePort)
{
// 监听端口
$worker->listen();
}
Worker::listen()方法
* 监听端口
* @throws Exception
*/
public function listen()
{
//已设置_socketName属性或者未设置_mainSocket才可以执行下面代码
if(!$this->_socketName || $this->_mainSocket)
{
return;
}
// 设置自动加载根目录
Autoloader::setRootPath($this->_appInitPath);
$local_socket = $this->_socketName;
// 获得应用层通讯协议以及监听的地址
list($scheme, $address) = explode(':', $this->_socketName, 2);
// 如果有指定应用层协议,则检查对应的协议类是否存在
if(!isset(self::$_builtinTransports[$scheme]))
{
$scheme = ucfirst($scheme);
$this->protocol = "\\Protocols\\$scheme";//此处有改动,源代码('\\Protocols\'.$scheme)显示会有问题
if(!class_exists($this->protocol))
{
$this->protocol = "\\Workerman\\Protocols\\$scheme";
if(!class_exists($this->protocol))//协议不存在
{
throw new Exception("class \\Protocols\\$scheme not exist");
}
}
$local_socket = $this->transport.":".$address;//替换用户定义协议为tcp
}
else
{
$this->transport = self::$_builtinTransports[$scheme];
}
// flag
$flags = $this->transport === 'udp' ? STREAM_SERVER_BIND : STREAM_SERVER_BIND | STREAM_SERVER_LISTEN;
$errno = 0;
$errmsg = '';
// 如果设置了端口复用,则设置SO_REUSEPORT选项为1
if($this->reusePort)
{
stream_context_set_option($this->_context, 'socket', 'so_reuseport', 1);
}
if($this->transport === 'unix')
{
umask(0);
if(!is_file($address))
{
register_shutdown_function(function()use($address){@unlink($address);});//定义执行结束时运行的匿名函数
}
}
// 创建监听
$this->_mainSocket = stream_socket_server($local_socket, $errno, $errmsg, $flags, $this->_context);
if(!$this->_mainSocket)
{
throw new Exception($errmsg);
}
// 尝试打开tcp的keepalive,关闭TCP Nagle算法
if(function_exists('socket_import_stream') && $this->transport === 'tcp')
{
$socket = socket_import_stream($this->_mainSocket );
@socket_set_option($socket, SOL_SOCKET, SO_KEEPALIVE, 1);
@socket_set_option($socket, SOL_SOCKET, TCP_NODELAY, 1);
}
// 设置非阻塞
stream_set_blocking($this->_mainSocket, 0);
// 放到全局事件轮询中监听_mainSocket可读事件(客户端连接事件)
if(self::$globalEvent)//现在self::$globalEvent还是null,所以下面代码还没执行
{
if($this->transport !== 'udp')
{
self::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, 'acceptConnection'));
}
else
{
self::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, 'acceptUdpConnection'));
}
}
}
Worker::installSignal()初始化所有信号处理函数
pcntl_signal(SIGPIPE, SIG_IGN, false);//忽视SIGPIPE信号
这里对SIGINT、SIGUSR、SIGUSR2信号安装处理函数
* 信号处理函数
* @param int $signal
*/
public static function signalHandler($signal)
{
switch($signal)
{
// stop
case SIGINT:
self::stopAll();
break;
// reload
case SIGUSR1:
self::$_pidsToRestart = self::getAllWorkerPids();
self::reload();
break;
// show status
case SIGUSR2:
self::writeStatisticsToStatusFile();
break;
}
}
Worker::saveMasterPid()保存主进程pid
Worker::forkWorkers()创建子进程(worker进程)并运行
* 创建子进程
* @return void
*/
protected static function forkWorkers()
{
/** @var static $worker */
foreach(self::$_workers as $worker)
{
// 启动过程中需要得到运行用户名的最大长度,在status时格式化展示
if(self::$_status === self::STATUS_STARTING)
{
if(empty($worker->name))
{
$worker->name = $worker->getSocketName();
}
$worker_name_length = strlen($worker->name);
if(self::$_maxWorkerNameLength < $worker_name_length)
{
self::$_maxWorkerNameLength = $worker_name_length;
}
}
// 创建子进程
while(count(self::$_pidMap[$worker->workerId]) < $worker->count)
{
static::forkOneWorker($worker);
}
}
}
/**
* 创建一个子进程
* @param Worker $worker
* @throws Exception
*/
protected static function forkOneWorker($worker)
{
$pid = pcntl_fork();
// 获得可用的id
$id = self::getId($worker->workerId, 0);
// 主进程记录子进程pid
if($pid > 0)
{
self::$_pidMap[$worker->workerId][$pid] = $pid;
self::$_idMap[$worker->workerId][$id] = $pid;
}
// 子进程运行
elseif(0 === $pid)
{
// 如果设置了端口复用,则在子进程执行监听
if($worker->reusePort)
{
$worker->listen();
}
// 启动过程中尝试重定向标准输出
if(self::$_status === self::STATUS_STARTING)
{
self::resetStd();
}
self::$_pidMap = array();
self::$_workers = array($worker->workerId => $worker);
Timer::delAll();
self::setProcessTitle('WorkerMan: worker process ' . $worker->name . ' ' . $worker->getSocketName());
$worker->setUserAndGroup();
$worker->id = $id;
$worker->run();
exit(250);
}
else
{
throw new Exception("forkOneWorker fail");
}
}
Timer::delAll()函数用来删除现在所有的定时,目测当前没有执行中的定时,这段代码暂时未明白缘由
Worker::run()方法运行子进程执行时的逻辑
* 运行worker实例
*/
public function run()
{
//更新 Worker 状态
self::$_status = self::STATUS_RUNNING;
// 注册进程退出回调,用来检查是否有错误
register_shutdown_function(array("\\Workerman\\Worker", 'checkErrors'));
// 设置自动加载根目录
Autoloader::setRootPath($this->_appInitPath);
// 如果没有全局事件轮询,则创建一个
if(!self::$globalEvent)
{
if(extension_loaded('libevent'))
{
self::$globalEvent = new Libevent();
}
else
{
self::$globalEvent = new Select();
}
// 监听_mainSocket上的可读事件(客户端连接事件)
if($this->_socketName)
{
if($this->transport !== 'udp')
{
self::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, 'acceptConnection'));
}
else
{
self::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, 'acceptUdpConnection'));
}
}
}
// 重新安装事件处理函数,使用全局事件轮询监听信号事件
self::reinstallSignal();
// 用全局事件轮询初始化定时器
Timer::init(self::$globalEvent);
// 如果有设置进程启动回调,则执行
if($this->onWorkerStart)
{
call_user_func($this->onWorkerStart, $this);
}
// 子进程主循环
self::$globalEvent->loop();
}
Select::add()函数添加事件及处理函数
* 添加事件及处理函数
* @see Events\EventInterface::add()
*/
public function add($fd, $flag, $func, $args = array())
{
switch ($flag)
{
case self::EV_READ:
$fd_key = (int)$fd;
$this->_allEvents[$fd_key][$flag] = array($func, $fd);
$this->_readFds[$fd_key] = $fd;
break;
case self::EV_WRITE:
$fd_key = (int)$fd;
$this->_allEvents[$fd_key][$flag] = array($func, $fd);
$this->_writeFds[$fd_key] = $fd;
break;
case self::EV_SIGNAL:
$fd_key = (int)$fd;
$this->_signalEvents[$fd_key][$flag] = array($func, $fd);
pcntl_signal($fd, array($this, 'signalHandler'));
break;
case self::EV_TIMER:
case self::EV_TIMER_ONCE:
// $fd 为 定时的时间间隔,单位为秒,支持小数,能精确到0.001秒
$run_time = microtime(true)+$fd;
$this->_scheduler->insert($this->_timerId, -$run_time);
$this->_task[$this->_timerId] = array($func, (array)$args, $flag, $fd);
$this->tick();
return $this->_timerId++;
}
return true;
}
Worker::acceptConnection()读事件处理函数
* 接收一个客户端连接
* @param resource $socket
* @return void
*/
public function acceptConnection($socket)
{
// 获得客户端连接
$new_socket = @stream_socket_accept($socket, 0, $remote_address);
// 惊群现象,忽略
if(false === $new_socket)
{
return;
}
// 初始化连接对象
$connection = new TcpConnection($new_socket, $remote_address);
$this->connections[$connection->id] = $connection;
$connection->worker = $this;
$connection->protocol = $this->protocol;
$connection->onMessage = $this->onMessage;
$connection->onClose = $this->onClose;
$connection->onError = $this->onError;
$connection->onBufferDrain = $this->onBufferDrain;
$connection->onBufferFull = $this->onBufferFull;
// 如果有设置连接回调,则执行
if($this->onConnect)
{
call_user_func($this->onConnect, $connection);
}
}
Worker::reinstallSignal()重新安装子进程事件处理函数,使用全局事件轮询监听信号事件
Timer::init(self::$globalEvent)用全局事件轮询初始化定时器
self::$globalEvent->loop()子进程主循环,Select::loop()或者libevent::loop();
Select::loop()函数
* 主循环
* @see Events\EventInterface::loop()
*/
public function loop()
{
$e = null;
while (1)
{
// 如果有信号,尝试执行信号处理函数
pcntl_signal_dispatch();
$read = $this->_readFds;
$write = $this->_writeFds;
// 等待可读或者可写事件
@stream_select($read, $write, $e, 0, $this->_selectTimeout);
// 尝试执行定时任务
if(!$this->_scheduler->isEmpty())
{
$this->tick();
}
// 这些描述符可读,执行对应描述符的读回调函数
if($read)
{
foreach($read as $fd)
{
$fd_key = (int) $fd;
if(isset($this->_allEvents[$fd_key][self::EV_READ]))
{
call_user_func_array($this->_allEvents[$fd_key][self::EV_READ][0], array($this->_allEvents[$fd_key][self::EV_READ][1]));
}
}
}
// 这些描述符可写,执行对应描述符的写回调函数
if($write)
{
foreach($write as $fd)
{
$fd_key = (int) $fd;
if(isset($this->_allEvents[$fd_key][self::EV_WRITE]))
{
call_user_func_array($this->_allEvents[$fd_key][self::EV_WRITE][0], array($this->_allEvents[$fd_key][self::EV_WRITE][1]));
}
}
}
}
}
Worker::displayUI()展示启动界面
Worker::resetStd()尝试重定向标准输入输出
Worker::monitorWorkers()监控所有子进程(worker进程)父进程轮询
* 监控所有子进程的退出事件及退出码
* @return void
*/
protected static function monitorWorkers()
{
self::$_status = self::STATUS_RUNNING;
while(1)
{
// 如果有信号到来,尝试触发信号处理函数
pcntl_signal_dispatch();
// 挂起进程,直到有子进程退出或者被信号打断
$status = 0;
$pid = pcntl_wait($status, WUNTRACED);
// 如果有信号到来,尝试触发信号处理函数
pcntl_signal_dispatch();
// 有子进程退出
if($pid > 0)
{
// 查找是哪个进程组的,然后再启动新的进程补上
foreach(self::$_pidMap as $worker_id => $worker_pid_array)
{
if(isset($worker_pid_array[$pid]))
{
$worker = self::$_workers[$worker_id];
// 检查退出状态
if($status !== 0)
{
self::log("worker[".$worker->name.":$pid] exit with status $status");
}
// 统计,运行status命令时使用
if(!isset(self::$_globalStatistics['worker_exit_info'][$worker_id][$status]))
{
self::$_globalStatistics['worker_exit_info'][$worker_id][$status] = 0;
}
self::$_globalStatistics['worker_exit_info'][$worker_id][$status]++;
// 清除子进程信息
unset(self::$_pidMap[$worker_id][$pid]);
// 标记$id为可用id
$id = self::getId($worker_id, $pid);
self::$_idMap[$worker_id][$id] = 0;
break;
}
}
// 如果不是关闭状态,则补充新的进程
if(self::$_status !== self::STATUS_SHUTDOWN)
{
self::forkWorkers();
// 如果该进程是因为运行reload命令退出,则继续执行reload流程
if(isset(self::$_pidsToRestart[$pid]))
{
unset(self::$_pidsToRestart[$pid]);
self::reload();
}
}
else
{
// 如果是关闭状态,并且所有进程退出完毕,则主进程退出
if(!self::getAllWorkerPids())
{
self::exitAndClearAll();
}
}
}
else
{
// 如果是关闭状态,并且所有进程退出完毕,则主进程退出
if(self::$_status === self::STATUS_SHUTDOWN && !self::getAllWorkerPids())
{
self::exitAndClearAll();
}
}
}
}
转载请注明:小Y » workerman源码学习