serv = new \swoole_websocket_server("0.0.0.0", 9090); $this->serv->set(GlobConfigs::getKey('swoolev2')); $this->serv->on('Start', array($this, 'onStart')); $this->serv->on('WorkerStart', array($this, 'onWorkerStart')); $this->serv->on('message', array($this, 'onMessage')); $this->serv->on('close', array($this, 'onClose')); $this->serv->on('open', array($this, 'onOpen')); $this->serv->on('task', array($this, 'onTask')); $this->serv->on('Finish', array($this, 'onFinish')); return $this->serv->start(); } public function onStart($serv) { echo "Main on Start " . "\n"; $this->startInit(); } public function onWorkerStart($serv, $worker_id) { $type = $serv->taskworker ? 'Task' : 'Work'; echo "onWorkerStart.. " . $type . '-' . $worker_id . "\n"; if (!$serv->taskworker) { $this->PingRedis($serv, $worker_id); $serv->tick(8000, function () use ($serv, $worker_id) { $this->PingRedis($serv, $worker_id); }); /* $this->PingMysql($serv, $worker_id); $serv->tick(5000, function () use ($serv, $worker_id) { $this->PingMysql($serv, $worker_id); }); */ $this->PingPgsql($serv, $worker_id); $serv->tick(5000, function () use ($serv, $worker_id) { $this->PingPgsql($serv, $worker_id); }); $nowRedis = $this->workRedis; TimeRsync::getInstance()->DoTimePublic($serv, $worker_id, $this->workRedis); $serv->tick(60000, function () use ($serv, $worker_id, $nowRedis) { TimeRsync::getInstance()->DoTimePublic($serv, $worker_id, $this->workRedis); }); MyPgsql::getInstance(); //消息订阅 if ($worker_id == 0) { if (!$worker_id) { return; } go(function () use ($serv, $worker_id) { $config = GlobConfigs::getKey('redis'); $redis = new \Swoole\Coroutine\Redis(); $redis->setOptions(['compatibility_mode' => true]); $redis->connect($config['host'], $config['port']); $r = $redis->SUBSCRIBE([MSG_REDIS_SUBSCRIBE, 'TEST']); if ($r) { while ($msg = $redis->recv()) { print_r($msg); // msg是一个数组, 包含以下信息 // $type # 返回值的类型:显示订阅成功 // $name # 订阅的频道名字 或 来源频道名字 // $info # 目前已订阅的频道数量 或 信息内容 list($type, $name, $info) = $msg; if ($type == 'subscribe') // 或psubscribe { // 频道订阅成功消息,订阅几个频道就有几条 } else if ($type == 'unsubscribe' && $info == 0) // 或punsubscribe { break; // 收到取消订阅消息,并且剩余订阅的频道数为0,不再接收,结束循环 } else if ($type == 'message') // 若为psubscribe,此处为pmessage { /* if ($need_unsubscribe) // 某个情况下需要退订 { $redis->unsubscribe(); // 继续recv等待退订完成 } */ } } } }); } } } function onWorkerStop(swoole_server $server, int $worker_id) { echo "onWorkerStop.. " . $worker_id . ' ' . "\n"; if ($this->workRedis) { $this->workRedis->close(); } if ($this->workMysql) { $this->workMysql->close(); } } public function onMessage($serv, $frame) { if ( strtolower($frame->data->data) == '{"type":"ping"}'){ $serv->send($frame->fd,'{"type":"pong"}'); return ; } Wlog::getInstance()->WriteLog($frame); $serv->task($frame); } public function onOpen($serv, $request) { Wlog::getInstance()->WriteLog(['onOpenData',$request]); $token = isset($request->get['token']) ? $request->get['token'] : ''; //$uid = isset($request->get['uid']) ? $request->get['uid'] : ''; $fd = $request->fd; $redis = $this->workRedis; if (!$redis) { return; } $token_uid = $uid = intval($this->workRedis->hget(MAPS_TOKEN_UID, md5($token))); if (empty($token) || !$token_uid ) { $serv->push($request->fd, DataPack::toJson(['mtype' => 'system_msg', 'stype' => 'invalid_token', 'data' => ['msg' => '无效的token']])); $serv->disconnect($request->fd); return; } $oldfid = $this->workRedis->hget(MAPS_UID_FID, $uid); if ($oldfid != '' && $oldfid != $fd && $serv->exist($oldfid)) { $serv->push($oldfid, DataPack::toJson(['mtype' => 'system_msg', 'stype' => 'force_logout', 'data' => [ 'msg' => '你已在其它地方登陆,本次退出!']])); $serv->disconnect($oldfid); } $this->workRedis->hset(MAPS_UID_FID, $uid, $fd); $this->workRedis->hset(MAPS_FID_UID, $fd, $uid); $serv->push($fd, DataPack::toJson(['mtype' => 'system_msg', 'stype' => 'well_come', 'data' => [ 'msg' => '成功接入']])); } public function onTask($serv, \Swoole\Server\Task $task) { try { Wlog::getInstance()->WriteLog($task, 1, $serv->worker_id); CmdProxy::getInstance()->ParaCMD($serv, $task); } catch (\Exception $e) { Wlog::getInstance()->WriteLog(['onTask error:',$task,$e->getCode() . ' ' . $e->getMessage()], 3, $serv->worker_id); echo "发生异常." . $e->getCode() . ' ' . $e->getMessage() . "\n"; } } public function onFinish($serv, $task_id, $data) { } public function onClose($serv, $fd, $from_id) { $uid = $this->workRedis->hget("MAPS_FID_UID", $fd); $this->workRedis->hdel(MAPS_UID_FID, $uid); $this->workRedis->hdel(MAPS_FID_UID, $fd); echo "ClientFd:{$fd} -- uid:{$uid} close connection!\n"; } //redis Ping检测 private function PingRedis($serv, $worker_id) { if (empty($this->workRedis)) { $this->ConectToRedis($serv, $worker_id); return true; } $redis = $this->workRedis; if (!$redis) { $ping_ret = false; } else { $ping_ret_s = $redis->ping(); $ping_ret = "+pong" === strtolower($ping_ret_s) ? true : false; } if (!$ping_ret) { return $this->ConectToRedis($serv, $worker_id); } return true; } //工作线程 同步阻塞 redis客户端 private function ConectToRedis($serv, $worker_id) { $conf = GlobConfigs::getKey('redis'); $redis = new \Redis(); try { $ret = $redis->connect($conf['host'], $conf['port'], $conf['overtime']); } catch (\Exception $e) { $ret = false; } if ($ret) { Wlog::getInstance()->WriteLog("success:成功建立redis连接 " . date("Y-m-d H:i:s") . ' ' . $worker_id); $this->workRedis = $redis; return $redis; } else { Wlog::getInstance()->WriteLog("error:建立redis连接 " . date("Y-m-d H:i:s") . ' ' . $worker_id); $this->workRedis = null; } return false; } //Mysql Ping检测 private function PingMysql($serv, $worker_id) { if (empty($this->workMysql)) { $this->ConectToMysql($serv, $worker_id); return true; } $mysql = $this->workMysql; if (!$mysql) { $ping_ret = false; } else { $ping_ret_s = DB::select("select version() as v"); $ping_ret = !empty($ping_ret_s) ? true : false; } if (!$ping_ret) { return $this->ConectToMysql($serv, $worker_id); } return true; } //工作线程 同步阻塞 Mysql 客户端 private function ConectToMysql($serv, $worker_id) { $conf = GlobConfigs::getKey('mysql'); $mysql = new Capsule(); $mysql->addConnection($conf); $mysql->setAsGlobal(); $mysql->bootEloquent(); Wlog::getInstance()->WriteLog("success:成功建立Mysql连接 " . date("Y-m-d H:i:s") . ' ' . $worker_id); $this->workMysql = $mysql; return $mysql; } //Mysql Ping检测 private function PingPgsql($serv, $worker_id) { if (empty($this->workPgsql)) { $this->ConectToPgsql($serv, $worker_id); return true; } $pgsql = $this->workPgsql; if (!$pgsql) { $ping_ret = false; } else { $ping_ret_s = DB::select("select version() as v"); $ping_ret = !empty($ping_ret_s) ? true : false; } if (!$ping_ret) { return $this->ConectToPgsql($serv, $worker_id); } return true; } //工作线程 同步阻塞 Mysql 客户端 private function ConectToPgsql($serv, $worker_id) { $conf = GlobConfigs::getKey('pgsql'); $pgsql = new Capsule(); $pgsql->addConnection($conf); $pgsql->setAsGlobal(); $pgsql->bootEloquent(); Wlog::getInstance()->WriteLog("success:成功建立Pgsql连接 " . date("Y-m-d H:i:s") . ' ' . $worker_id); $this->workPgsql = $pgsql; return $pgsql; } //服务启动作做一些初始化操作 private function startInit() { $conf = GlobConfigs::getKey('redis'); $adminconf = GlobConfigs::getKey('admin_conf'); $redis = new \Redis(); $ret = $redis->connect($conf['host'], $conf['port'], $conf['overtime']); if ($ret) { $redis->pipeline(); $redis->del(MAPS_UID_FID); $redis->del(MAPS_FID_UID); //$redis->del(MAPS_TOKEN_UID); //$redis->del(MAPS_UID_TOKEN); $redis->hset(MAPS_UID_TOKEN,$adminconf['admin_uid'],$adminconf['md5']); $redis->hset(MAPS_TOKEN_UID,$adminconf['md5'],$adminconf['admin_uid']); $redis->exec(); $redis->close(); } } //redis推送消息 private function MsgRedisList($serv, $worker_id) { } }