serv = new \swoole_websocket_server($config['host'], $config['port']); $this->serv->set($config['sets']); //内存表(用户uid和fd的双向映射表) $fd_table = new \swoole_table($config['maxUsers']); $fd_table->column("uid", \swoole_table::TYPE_INT, 4); $fd_table->create(); $user_table = new \swoole_table($config['maxUsers']); $user_table->column("fid", \swoole_table::TYPE_INT, 4); $user_table->create(); $this->serv->ftable = $fd_table; $this->serv->utable = $user_table; $this->serv->on('Start', array($this, 'onStart')); $this->serv->on('WorkerStart', array($this, 'onWorkerStart')); $this->serv->on('message', array($this, 'onMessage')); $this->serv->on('close', array($this, 'onClose')); $this->serv->on('open', array($this, 'onOpen')); $this->serv->on('task', array($this, 'onTask')); $this->serv->on('Finish', array($this, 'onFinish')); return $this->serv->start(); } public function onStart($serv) { echo "Main on Start " . "\n"; $this->startInit(); } public function onWorkerStart($serv, $worker_id) { $type = $serv->taskworker ? 'Task' : 'Work'; echo "onWorkerStart.. " . $type . '-' . $worker_id . "\n"; if (!$serv->taskworker) { $this->PingRedis($serv, $worker_id); $serv->tick(8000, function () use ($serv, $worker_id) { $this->PingRedis($serv, $worker_id); }); $this->PingPgsql($serv, $worker_id); $serv->tick(5000, function () use ($serv, $worker_id) { $this->PingPgsql($serv, $worker_id); }); $nowRedis = $this->workRedis; TimeRsync::getInstance()->DoTimePublic($serv, $worker_id, $this->workRedis); $serv->tick(60000, function () use ($serv, $worker_id, $nowRedis) { TimeRsync::getInstance()->DoTimePublic($serv, $worker_id, $this->workRedis); }); } } function onWorkerStop(swoole_server $server, int $worker_id) { echo "onWorkerStop.. " . $worker_id . ' ' . "\n"; if ($this->workRedis) { $this->workRedis->close(); } } public function onMessage($serv, $frame) { if (strtolower($frame->data) == strtolower('{"type":"ping"}')) { $serv->push($frame->fd, '{"type":"pong"}'); return; } Wlog::getInstance()->WriteLog($frame); $serv->task($frame); } public function onOpen($serv, $request) { $config = GlobConfigs::getKey('swoole'); if (count($serv->connections) > $config['maxUsers']) { $serv->push($request->fd, DataPack::toJson(['mtype' => 'system_msg', 'stype' => 'maxUsers', 'data' => '超过人数上限'])); $serv->disconnect($request->fd); return; } $token = isset($request->get['token']) ? $request->get['token'] : ''; //$uid = isset($request->get['uid']) ? $request->get['uid'] : ''; $fd = $request->fd; $redis = $this->workRedis; if (!$redis) { $serv->push($request->fd, DataPack::toJson(['mtype' => 'system_msg', 'stype' => 'error', 'data' => ['msg' => '服务端错误!-01']])); $serv->disconnect($request->fd); return; } $token_uid = $uid = intval($this->workRedis->hget(MAPS_TOKEN_UID, md5($token))); echo '新请求uid-'.$token_uid.' fd-'.$fd.' 接入....'."\n"; if (empty($token) || empty($token_uid) || $token_uid == '' || $token_uid == 0) { $serv->push($request->fd, DataPack::toJson(['mtype' => 'system_msg', 'stype' => 'invalid_token', 'data' => ['msg' => '无效的token']])); $serv->disconnect($request->fd); return; } //管理员账号可以多连 $adminconfig = GlobConfigs::getKey('admin_conf'); if ($adminconfig['admin_uid'] != $uid) { $oldfid = $this->workRedis->hget(MAPS_UID_FID, $uid); if ($oldfid != '' && $oldfid != $fd && $serv->exist($oldfid)) { $serv->push($oldfid, DataPack::toJson(['mtype' => 'system_msg', 'stype' => 'force_logout', 'data' => ['msg' => '你已在其它地方登陆,本次退出!']])); $serv->disconnect($oldfid); } $this->workRedis->hset(MAPS_UID_FID, $uid, $fd); $this->workRedis->hset(MAPS_FID_UID, $fd, $uid); $serv->ftable->set($fd, ['uid' => $uid]); $serv->utable->set($uid, ['fid' => $fd]); } else { if (!$this->admin_ip_Check($request->server['remote_addr'], $adminconfig['whiteips'])) { $serv->push($request->fd, DataPack::toJson(['mtype' => 'system_msg', 'stype' => 'force_logout', 'data' => ['msg' => '登陆被限!' . $request->server['remote_addr']]])); $serv->disconnect($request->fd); return; } $this->workRedis->hset(MAPS_FID_UID, $fd, $uid); $serv->ftable->set($fd, ['uid' => $uid]); } $msg = '成功接入 [' . $request->server['remote_addr'] . ' - ' . $fd . ' - ' . $uid . "]"; $serv->push($fd, DataPack::toJson(['mtype' => 'system_msg', 'stype' => 'well_come', 'data' => ['msg' => $msg]])); } public function onTask($serv, \Swoole\Server\Task $task) { try { Wlog::getInstance()->WriteLog($task, 1, $serv->worker_id); CmdProxy::getInstance()->ParaCMD($serv, $task); } catch (\Exception $e) { Wlog::getInstance()->WriteLog(['onTask error:', $task, $e->getCode() . ' ' . $e->getMessage()], 3, $serv->worker_id); echo "发生异常." . $e->getCode() . ' ' . $e->getMessage() . "\n"; } } public function onFinish($serv, $task_id, $data) { } public function onClose($serv, $fd, $from_id) { $uid = $this->workRedis->hget("MAPS_FID_UID", $fd); $adminconfig = GlobConfigs::getKey('admin_conf'); if ($adminconfig['admin_uid'] != $uid) { $this->workRedis->hdel(MAPS_UID_FID, $uid); $this->workRedis->hdel(MAPS_FID_UID, $fd); $serv->ftable->del($fd); $serv->utable->del($uid); } else { $this->workRedis->hdel(MAPS_FID_UID, $fd); $serv->ftable->del($fd); } echo "ClientFd:{$fd} -- uid:{$uid} close connection!\n"; } //redis Ping检测 private function PingRedis($serv, $worker_id) { if (empty($this->workRedis)) { $this->ConectToRedis($serv, $worker_id); return true; } $redis = $this->workRedis; if (!$redis) { $ping_ret = false; } else { $ping_ret_s = $redis->ping(); $ping_ret = "+pong" === strtolower($ping_ret_s) ? true : false; } if (!$ping_ret) { return $this->ConectToRedis($serv, $worker_id); } return true; } //工作线程 同步阻塞 redis客户端 private function ConectToRedis($serv, $worker_id) { $conf = GlobConfigs::getKey('redis'); $redis = new \Redis(); try { $ret = $redis->connect($conf['host'], $conf['port'], $conf['overtime']); } catch (\Exception $e) { $ret = false; } if ($ret) { Wlog::getInstance()->WriteLog("success:成功建立redis连接 " . date("Y-m-d H:i:s") . ' ' . $worker_id); $this->workRedis = $redis; return $redis; } else { Wlog::getInstance()->WriteLog("error:建立redis连接 " . date("Y-m-d H:i:s") . ' ' . $worker_id); $this->workRedis = null; } return false; } //Mysql Ping检测 private function PingPgsql($serv, $worker_id) { if (empty($this->workPgsql)) { $this->ConectToPgsql($serv, $worker_id); return true; } $pgsql = $this->workPgsql; if (!$pgsql) { $ping_ret = false; } else { $ping_ret_s = DB::select("select version() as v"); $ping_ret = !empty($ping_ret_s) ? true : false; } if (!$ping_ret) { return $this->ConectToPgsql($serv, $worker_id); } return true; } //工作线程 同步阻塞 Mysql 客户端 private function ConectToPgsql($serv, $worker_id) { $conf = GlobConfigs::getKey('pgsql'); $pgsql = new Capsule(); $pgsql->addConnection($conf); $pgsql->setAsGlobal(); $pgsql->bootEloquent(); Wlog::getInstance()->WriteLog("success:成功建立Pgsql连接 " . date("Y-m-d H:i:s") . ' ' . $worker_id); $this->workPgsql = $pgsql; return $pgsql; } //服务启动作做一些初始化操作 private function startInit() { $conf = GlobConfigs::getKey('redis'); $adminconf = GlobConfigs::getKey('admin_conf'); $redis = new \Redis(); $ret = $redis->connect($conf['host'], $conf['port'], $conf['overtime']); if ($ret) { $redis->pipeline(); $redis->del(MAPS_UID_FID); $redis->del(MAPS_FID_UID); //$redis->del(MAPS_TOKEN_UID); //$redis->del(MAPS_UID_TOKEN); $redis->hset(MAPS_UID_TOKEN, $adminconf['admin_uid'], $adminconf['md5']); $redis->hset(MAPS_TOKEN_UID, $adminconf['md5'], $adminconf['admin_uid']); $redis->exec(); $redis->close(); } } //admin ip check private function admin_ip_Check($ip, $okipArray = []) { if (empty($okipArray)) { return false; } foreach ($okipArray as $aip) { if ($ip === $aip) { return true; } $spos = strpos($aip, "*"); if ($spos === false) { continue; } else { for ($i = 0; $i < $spos; $i++) { if ($ip[$i] != $aip[$i]) { continue; } } return true; } } return false; } }