configs = GlobConfigs::getKey('swoolev_subserv'); $this->serv = new \swoole_websocket_server($this->configs['host'], $this->configs['port']); unset($this->configs['host']); unset($this->configs['port']); $this->serv->set($this->configs); $this->serv->on('Start', array($this, 'onStart')); $this->serv->on('WorkerStart', array($this, 'onWorkerStart')); $this->serv->on('message', array($this, 'onMessage')); $this->serv->on('close', array($this, 'onClose')); $this->serv->on('open', array($this, 'onOpen')); $this->serv->on('task', array($this, 'onTask')); $this->serv->on('Finish', array($this, 'onFinish')); return $this->serv->start(); } public function onStart($serv) { echo "Sub on Start " . "\n"; } public function onWorkerStart($serv, $worker_id) { if ($serv->taskworker) { return; } $this->PingRedis($serv, $worker_id); $serv->tick(8000, function () use ($serv, $worker_id) { $this->PingRedis($serv, $worker_id); }); if ($worker_id == 0) { $serv->tick(1000, function () use ($serv, $worker_id) { $msg_index_aray = $this->getIndexMsg(); if (!empty($msg_index_aray)) { $this->sendIndexMsg($msg_index_aray); } }); } } function onWorkerStop(swoole_server $server, int $worker_id) { echo "onWorkerStop.. " . $worker_id . ' ' . "\n"; if ($this->workRedis) { $this->workRedis->close(); } } public function onMessage($serv, $frame) { } public function onOpen($serv, $request) { } public function onTask($serv, \Swoole\Server\Task $task) { } public function onFinish($serv, $task_id, $data) { } public function onClose($serv, $fd, $from_id) { } //redis Ping检测 private function PingRedis($serv, $worker_id) { if (empty($this->workRedis)) { $this->ConectToRedis($serv, $worker_id); return true; } $redis = $this->workRedis; if (!$redis) { $ping_ret = false; } else { $ping_ret_s = $redis->ping(); $ping_ret = "+pong" === strtolower($ping_ret_s) ? true : false; } if (!$ping_ret) { return $this->ConectToRedis($serv, $worker_id); } return true; } //工作线程 同步阻塞 redis客户端 private function ConectToRedis($serv, $worker_id) { $conf = GlobConfigs::getKey('redis'); $redis = new \Redis(); try { $ret = $redis->connect($conf['host'], $conf['port'], $conf['overtime']); } catch (\Exception $e) { $ret = false; } if ($ret) { Wlog::getInstance()->WriteLog("success:成功建立redis连接[In Subserver] " . date("Y-m-d H:i:s") . ' ' . $worker_id); $this->workRedis = $redis; return $redis; } else { Wlog::getInstance()->WriteLog("error:建立redis连接[In Subserver] " . date("Y-m-d H:i:s") . ' ' . $worker_id); $this->workRedis = null; } return false; } //////////////////////// //去redis里找 msg_index 是否有数据,有取出来, private function getIndexMsg() { $redis = $this->workRedis; $len = $redis->LLEN(MSG_INDEX); if ($len <=0 ) { return; } $datas = []; while ($now = $redis->rpop(MSG_INDEX)) { if (empty($now)) { break; } $datas[] = $now; } return $datas; } ///如果有msg_index数据,就通知服务端下发 private function sendIndexMsg($msg_index_aray) { $conf = GlobConfigs::getKey('admin_conf'); $token = $conf['admin_token']; $url = $conf['url']; $client = new Client("$url?token=" . $token); $arr = [ 'cmd' => 'subserv', 'act' => 'index_msg', 'data' => $msg_index_aray, 'time' => time(), 'token' => '', ]; $str = json_encode($arr, 256); $r = $client->send($str); $reciv = $client->receive(); $client->close(); unset($client); } ///////////////////////// }