| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344 |
- <?php
- namespace app\logic;
- /**
- * Created by PhpStorm.
- * User: Administrator
- * Date: 2019/5/20
- * Time: 11:22
- */
- use app\lib\DataPack;
- use app\lib\GlobConfigs;
- use Illuminate\Database\Capsule\Manager as Capsule;
- use Illuminate\Database\Capsule\Manager as DB;
- use app\lib\Wlog;
- use app\logic\MyPgsql as MyPgsql;
- class MyServerV2
- {
- public $serv;
- //线程连接实例
- private $workRedis = null;
- private $workMysql = null;
- private $workPgsql = null;
- public function __construct()
- {
- $this->serv = new \swoole_websocket_server("0.0.0.0", 9090);
- $this->serv->set(GlobConfigs::getKey('swoolev2'));
- $this->serv->on('Start', array($this, 'onStart'));
- $this->serv->on('WorkerStart', array($this, 'onWorkerStart'));
- $this->serv->on('message', array($this, 'onMessage'));
- $this->serv->on('close', array($this, 'onClose'));
- $this->serv->on('open', array($this, 'onOpen'));
- $this->serv->on('task', array($this, 'onTask'));
- $this->serv->on('Finish', array($this, 'onFinish'));
- return $this->serv->start();
- }
- public function onStart($serv)
- {
- echo "Main on Start " . "\n";
- $this->startInit();
- }
- public function onWorkerStart($serv, $worker_id)
- {
- $type = $serv->taskworker ? 'Task' : 'Work';
- echo "onWorkerStart.. " . $type . '-' . $worker_id . "\n";
- if (!$serv->taskworker) {
- $this->PingRedis($serv, $worker_id);
- $serv->tick(8000, function () use ($serv, $worker_id) {
- $this->PingRedis($serv, $worker_id);
- });
- /*
- $this->PingMysql($serv, $worker_id);
- $serv->tick(5000, function () use ($serv, $worker_id) {
- $this->PingMysql($serv, $worker_id);
- });
- */
- $this->PingPgsql($serv, $worker_id);
- $serv->tick(5000, function () use ($serv, $worker_id) {
- $this->PingPgsql($serv, $worker_id);
- });
- $nowRedis = $this->workRedis;
- TimeRsync::getInstance()->DoTimePublic($serv, $worker_id, $this->workRedis);
- $serv->tick(60000, function () use ($serv, $worker_id, $nowRedis) {
- TimeRsync::getInstance()->DoTimePublic($serv, $worker_id, $this->workRedis);
- });
- MyPgsql::getInstance();
- //消息订阅
- if ($worker_id == 0) {
- if (!$worker_id) {
- return;
- }
- go(function () use ($serv, $worker_id) {
- $config = GlobConfigs::getKey('redis');
- $redis = new \Swoole\Coroutine\Redis();
- $redis->setOptions(['compatibility_mode' => true]);
- $redis->connect($config['host'], $config['port']);
- $r = $redis->SUBSCRIBE([MSG_REDIS_SUBSCRIBE, 'TEST']);
- if ($r) {
- while ($msg = $redis->recv()) {
- print_r($msg);
- // msg是一个数组, 包含以下信息
- // $type # 返回值的类型:显示订阅成功
- // $name # 订阅的频道名字 或 来源频道名字
- // $info # 目前已订阅的频道数量 或 信息内容
- list($type, $name, $info) = $msg;
- if ($type == 'subscribe') // 或psubscribe
- {
- // 频道订阅成功消息,订阅几个频道就有几条
- } else if ($type == 'unsubscribe' && $info == 0) // 或punsubscribe
- {
- break; // 收到取消订阅消息,并且剩余订阅的频道数为0,不再接收,结束循环
- } else if ($type == 'message') // 若为psubscribe,此处为pmessage
- {
- /*
- if ($need_unsubscribe) // 某个情况下需要退订
- {
- $redis->unsubscribe(); // 继续recv等待退订完成
- }
- */
- }
- }
- }
- });
- }
- }
- }
- function onWorkerStop(swoole_server $server, int $worker_id)
- {
- echo "onWorkerStop.. " . $worker_id . ' ' . "\n";
- if ($this->workRedis) {
- $this->workRedis->close();
- }
- if ($this->workMysql) {
- $this->workMysql->close();
- }
- }
- public function onMessage($serv, $frame)
- {
- if ( strtolower($frame->data->data) == '{"type":"ping"}'){
- $serv->send($frame->fd,'{"type":"pong"}');
- return ;
- }
- Wlog::getInstance()->WriteLog($frame);
- $serv->task($frame);
- }
- public function onOpen($serv, $request)
- {
- $token = isset($request->get['token']) ? $request->get['token'] : '';
- //$uid = isset($request->get['uid']) ? $request->get['uid'] : '';
- $fd = $request->fd;
- $redis = $this->workRedis;
- if (!$redis) {
- return;
- }
- $token_uid = $uid = intval($this->workRedis->hget(MAPS_TOKEN_UID, md5($token)));
- if (empty($token) || !$token_uid ) {
- $serv->push($request->fd, DataPack::toJson(['mtype' => 'system_msg', 'stype' => 'invalid_token', 'data' => ['msg' => '无效的token']]));
- $serv->disconnect($request->fd);
- return;
- }
- $oldfid = $this->workRedis->hget(MAPS_UID_FID, $uid);
- if ($oldfid != '' && $oldfid != $fd && $serv->exist($oldfid)) {
- $serv->push($oldfid, DataPack::toJson(['mtype' => 'system_msg', 'stype' => 'force_logout', 'data' => [ 'msg' => '你已在其它地方登陆,本次退出!']]));
- $serv->disconnect($oldfid);
- }
- $this->workRedis->hset(MAPS_UID_FID, $uid, $fd);
- $this->workRedis->hset(MAPS_FID_UID, $fd, $uid);
- $serv->push($fd, DataPack::toJson(['mtype' => 'system_msg', 'stype' => 'well_come', 'data' => [ 'msg' => '成功接入']]));
- }
- public function onTask($serv, \Swoole\Server\Task $task)
- {
- try {
- Wlog::getInstance()->WriteLog($task, 1, $serv->worker_id);
- CmdProxy::getInstance()->ParaCMD($serv, $task);
- } catch (\Exception $e) {
- Wlog::getInstance()->WriteLog(['onTask error:',$task,$e->getCode() . ' ' . $e->getMessage()], 3, $serv->worker_id);
- echo "发生异常." . $e->getCode() . ' ' . $e->getMessage() . "\n";
- }
- }
- public function onFinish($serv, $task_id, $data)
- {
- }
- public function onClose($serv, $fd, $from_id)
- {
- $uid = $this->workRedis->hget("MAPS_FID_UID", $fd);
- $this->workRedis->hdel(MAPS_UID_FID, $uid);
- $this->workRedis->hdel(MAPS_FID_UID, $fd);
- echo "ClientFd:{$fd} -- uid:{$uid} close connection!\n";
- }
- //redis Ping检测
- private function PingRedis($serv, $worker_id)
- {
- if (empty($this->workRedis)) {
- $this->ConectToRedis($serv, $worker_id);
- return true;
- }
- $redis = $this->workRedis;
- if (!$redis) {
- $ping_ret = false;
- } else {
- $ping_ret_s = $redis->ping();
- $ping_ret = "+pong" === strtolower($ping_ret_s) ? true : false;
- }
- if (!$ping_ret) {
- return $this->ConectToRedis($serv, $worker_id);
- }
- return true;
- }
- //工作线程 同步阻塞 redis客户端
- private function ConectToRedis($serv, $worker_id)
- {
- $conf = GlobConfigs::getKey('redis');
- $redis = new \Redis();
- try {
- $ret = $redis->connect($conf['host'], $conf['port'], $conf['overtime']);
- } catch (\Exception $e) {
- $ret = false;
- }
- if ($ret) {
- Wlog::getInstance()->WriteLog("success:成功建立redis连接 " . date("Y-m-d H:i:s") . ' ' . $worker_id);
- $this->workRedis = $redis;
- return $redis;
- } else {
- Wlog::getInstance()->WriteLog("error:建立redis连接 " . date("Y-m-d H:i:s") . ' ' . $worker_id);
- $this->workRedis = null;
- }
- return false;
- }
- //Mysql Ping检测
- private function PingMysql($serv, $worker_id)
- {
- if (empty($this->workMysql)) {
- $this->ConectToMysql($serv, $worker_id);
- return true;
- }
- $mysql = $this->workMysql;
- if (!$mysql) {
- $ping_ret = false;
- } else {
- $ping_ret_s = DB::select("select version() as v");
- $ping_ret = !empty($ping_ret_s) ? true : false;
- }
- if (!$ping_ret) {
- return $this->ConectToMysql($serv, $worker_id);
- }
- return true;
- }
- //工作线程 同步阻塞 Mysql 客户端
- private function ConectToMysql($serv, $worker_id)
- {
- $conf = GlobConfigs::getKey('mysql');
- $mysql = new Capsule();
- $mysql->addConnection($conf);
- $mysql->setAsGlobal();
- $mysql->bootEloquent();
- Wlog::getInstance()->WriteLog("success:成功建立Mysql连接 " . date("Y-m-d H:i:s") . ' ' . $worker_id);
- $this->workMysql = $mysql;
- return $mysql;
- }
- //Mysql Ping检测
- private function PingPgsql($serv, $worker_id)
- {
- if (empty($this->workPgsql)) {
- $this->ConectToPgsql($serv, $worker_id);
- return true;
- }
- $pgsql = $this->workPgsql;
- if (!$pgsql) {
- $ping_ret = false;
- } else {
- $ping_ret_s = DB::select("select version() as v");
- $ping_ret = !empty($ping_ret_s) ? true : false;
- }
- if (!$ping_ret) {
- return $this->ConectToPgsql($serv, $worker_id);
- }
- return true;
- }
- //工作线程 同步阻塞 Mysql 客户端
- private function ConectToPgsql($serv, $worker_id)
- {
- $conf = GlobConfigs::getKey('pgsql');
- $pgsql = new Capsule();
- $pgsql->addConnection($conf);
- $pgsql->setAsGlobal();
- $pgsql->bootEloquent();
- Wlog::getInstance()->WriteLog("success:成功建立Pgsql连接 " . date("Y-m-d H:i:s") . ' ' . $worker_id);
- $this->workPgsql = $pgsql;
- return $pgsql;
- }
- //服务启动作做一些初始化操作
- private function startInit()
- {
- $conf = GlobConfigs::getKey('redis');
- $adminconf = GlobConfigs::getKey('admin_conf');
- $redis = new \Redis();
- $ret = $redis->connect($conf['host'], $conf['port'], $conf['overtime']);
- if ($ret) {
- $redis->pipeline();
- $redis->del(MAPS_UID_FID);
- $redis->del(MAPS_FID_UID);
- //$redis->del(MAPS_TOKEN_UID);
- //$redis->del(MAPS_UID_TOKEN);
- $redis->hset(MAPS_TOKEN_UID,$adminconf['admin_uid'],$adminconf['md5']);
- $redis->hset(MAPS_UID_TOKEN,$adminconf['md5'],$adminconf['admin_uid']);
- $redis->exec();
- $redis->close();
- }
- }
- //redis推送消息
- private function MsgRedisList($serv, $worker_id)
- {
- }
- }
|