MyServerV2.php 12 KB


  1. <?php
  2. namespace app\logic;
  3. /**
  4. * Created by PhpStorm.
  5. * User: Administrator
  6. * Date: 2019/5/20
  7. * Time: 11:22
  8. */
  9. use app\lib\DataPack;
  10. use app\lib\GlobConfigs;
  11. use Illuminate\Database\Capsule\Manager as Capsule;
  12. use Illuminate\Database\Capsule\Manager as DB;
  13. use app\lib\Wlog;
  14. use app\logic\MyPgsql as MyPgsql;
  15. class MyServerV2
  16. {
  17. public $serv;
  18. //线程连接实例
  19. private $workRedis = null;
  20. private $workMysql = null;
  21. private $workPgsql = null;
  22. public function __construct()
  23. {
  24. $config = GlobConfigs::getKey('swoole');
  25. $this->serv = new \swoole_websocket_server($config['host'], $config['port']);
  26. $this->serv->set($config['sets']);
  27. //内存表(用户uid和fd的双向映射表)
  28. $fd_table = new \swoole_table($config['maxUsers']);
  29. $fd_table->column("uid", \swoole_table::TYPE_INT, 4);
  30. $fd_table->create();
  31. $user_table = new \swoole_table($config['maxUsers']);
  32. $user_table->column("fid", \swoole_table::TYPE_INT, 4);
  33. $user_table->create();
  34. $this->serv->ftable = $fd_table;
  35. $this->serv->utable = $user_table;
  36. $this->serv->on('Start', array($this, 'onStart'));
  37. $this->serv->on('WorkerStart', array($this, 'onWorkerStart'));
  38. $this->serv->on('message', array($this, 'onMessage'));
  39. $this->serv->on('close', array($this, 'onClose'));
  40. $this->serv->on('open', array($this, 'onOpen'));
  41. $this->serv->on('task', array($this, 'onTask'));
  42. $this->serv->on('Finish', array($this, 'onFinish'));
  43. return $this->serv->start();
  44. }
  45. public function onStart($serv)
  46. {
  47. echo "Main on Start " . "\n";
  48. $this->startInit();
  49. }
  50. public function onWorkerStart($serv, $worker_id)
  51. {
  52. $type = $serv->taskworker ? 'Task' : 'Work';
  53. echo "onWorkerStart.. " . $type . '-' . $worker_id . "\n";
  54. if (!$serv->taskworker) {
  55. $this->PingRedis($serv, $worker_id);
  56. $serv->tick(8000, function () use ($serv, $worker_id) {
  57. $this->PingRedis($serv, $worker_id);
  58. });
  59. /*
  60. $this->PingMysql($serv, $worker_id);
  61. $serv->tick(5000, function () use ($serv, $worker_id) {
  62. $this->PingMysql($serv, $worker_id);
  63. });
  64. */
  65. $this->PingPgsql($serv, $worker_id);
  66. $serv->tick(5000, function () use ($serv, $worker_id) {
  67. $this->PingPgsql($serv, $worker_id);
  68. });
  69. $nowRedis = $this->workRedis;
  70. TimeRsync::getInstance()->DoTimePublic($serv, $worker_id, $this->workRedis);
  71. $serv->tick(60000, function () use ($serv, $worker_id, $nowRedis) {
  72. TimeRsync::getInstance()->DoTimePublic($serv, $worker_id, $this->workRedis);
  73. });
  74. MyPgsql::getInstance();
  75. //消息订阅
  76. if ($worker_id == 0) {
  77. if (!$worker_id) {
  78. return;
  79. }
  80. go(function () use ($serv, $worker_id) {
  81. $config = GlobConfigs::getKey('redis');
  82. $redis = new \Swoole\Coroutine\Redis();
  83. $redis->setOptions(['compatibility_mode' => true]);
  84. $redis->connect($config['host'], $config['port']);
  85. $r = $redis->SUBSCRIBE([MSG_REDIS_SUBSCRIBE, 'TEST']);
  86. if ($r) {
  87. while ($msg = $redis->recv()) {
  88. print_r($msg);
  89. // msg是一个数组, 包含以下信息
  90. // $type # 返回值的类型:显示订阅成功
  91. // $name # 订阅的频道名字 或 来源频道名字
  92. // $info # 目前已订阅的频道数量 或 信息内容
  93. list($type, $name, $info) = $msg;
  94. if ($type == 'subscribe') // 或psubscribe
  95. {
  96. // 频道订阅成功消息,订阅几个频道就有几条
  97. } else if ($type == 'unsubscribe' && $info == 0) // 或punsubscribe
  98. {
  99. break; // 收到取消订阅消息,并且剩余订阅的频道数为0,不再接收,结束循环
  100. } else if ($type == 'message') // 若为psubscribe,此处为pmessage
  101. {
  102. /*
  103. if ($need_unsubscribe) // 某个情况下需要退订
  104. {
  105. $redis->unsubscribe(); // 继续recv等待退订完成
  106. }
  107. */
  108. }
  109. }
  110. }
  111. });
  112. }
  113. }
  114. }
  115. function onWorkerStop(swoole_server $server, int $worker_id)
  116. {
  117. echo "onWorkerStop.. " . $worker_id . ' ' . "\n";
  118. if ($this->workRedis) {
  119. $this->workRedis->close();
  120. }
  121. if ($this->workMysql) {
  122. $this->workMysql->close();
  123. }
  124. }
  125. public function onMessage($serv, $frame)
  126. {
  127. if (strtolower($frame->data->data) == '{"type":"ping"}') {
  128. $serv->send($frame->fd, '{"type":"pong"}');
  129. return;
  130. }
  131. Wlog::getInstance()->WriteLog($frame);
  132. $serv->task($frame);
  133. }
  134. public function onOpen($serv, $request)
  135. {
  136. //Wlog::getInstance()->WriteLog(['onOpenData', $request]);
  137. $token = isset($request->get['token']) ? $request->get['token'] : '';
  138. //$uid = isset($request->get['uid']) ? $request->get['uid'] : '';
  139. $fd = $request->fd;
  140. $redis = $this->workRedis;
  141. if (!$redis) {
  142. return;
  143. }
  144. $token_uid = $uid = intval($this->workRedis->hget(MAPS_TOKEN_UID, md5($token)));
  145. if (empty($token) || !$token_uid) {
  146. $serv->push($request->fd, DataPack::toJson(['mtype' => 'system_msg', 'stype' => 'invalid_token', 'data' => ['msg' => '无效的token']]));
  147. $serv->disconnect($request->fd);
  148. return;
  149. }
  150. //管理员账号可以多连
  151. $adminconfig = GlobConfigs::getKey('admin_conf');
  152. if ($adminconfig['admin_uid'] != $uid) {
  153. $oldfid = $this->workRedis->hget(MAPS_UID_FID, $uid);
  154. if ($oldfid != '' && $oldfid != $fd && $serv->exist($oldfid)) {
  155. $serv->push($oldfid, DataPack::toJson(['mtype' => 'system_msg', 'stype' => 'force_logout', 'data' => ['msg' => '你已在其它地方登陆,本次退出!']]));
  156. $serv->disconnect($oldfid);
  157. }
  158. $this->workRedis->hset(MAPS_UID_FID, $uid, $fd);
  159. $this->workRedis->hset(MAPS_FID_UID, $fd, $uid);
  160. $serv->ftable->set($fd, ['uid' => $uid]);
  161. $serv->utable->set($uid, ['fid' => $fd]);
  162. }
  163. $serv->push($fd, DataPack::toJson(['mtype' => 'system_msg', 'stype' => 'well_come', 'data' => ['msg' => '成功接入']]));
  164. }
  165. public function onTask($serv, \Swoole\Server\Task $task)
  166. {
  167. try {
  168. Wlog::getInstance()->WriteLog($task, 1, $serv->worker_id);
  169. CmdProxy::getInstance()->ParaCMD($serv, $task);
  170. } catch (\Exception $e) {
  171. Wlog::getInstance()->WriteLog(['onTask error:', $task, $e->getCode() . ' ' . $e->getMessage()], 3, $serv->worker_id);
  172. echo "发生异常." . $e->getCode() . ' ' . $e->getMessage() . "\n";
  173. }
  174. }
  175. public function onFinish($serv, $task_id, $data)
  176. {
  177. }
  178. public function onClose($serv, $fd, $from_id)
  179. {
  180. $uid = $this->workRedis->hget("MAPS_FID_UID", $fd);
  181. $adminconfig = GlobConfigs::getKey('admin_conf');
  182. if ($adminconfig['admin_uid'] != $uid) {
  183. $this->workRedis->hdel(MAPS_UID_FID, $uid);
  184. $this->workRedis->hdel(MAPS_FID_UID, $fd);
  185. $serv->ftable->del($fd);
  186. $serv->utable->del($uid);
  187. }
  188. echo "ClientFd:{$fd} -- uid:{$uid} close connection!\n";
  189. }
  190. //redis Ping检测
  191. private function PingRedis($serv, $worker_id)
  192. {
  193. if (empty($this->workRedis)) {
  194. $this->ConectToRedis($serv, $worker_id);
  195. return true;
  196. }
  197. $redis = $this->workRedis;
  198. if (!$redis) {
  199. $ping_ret = false;
  200. } else {
  201. $ping_ret_s = $redis->ping();
  202. $ping_ret = "+pong" === strtolower($ping_ret_s) ? true : false;
  203. }
  204. if (!$ping_ret) {
  205. return $this->ConectToRedis($serv, $worker_id);
  206. }
  207. return true;
  208. }
  209. //工作线程 同步阻塞 redis客户端
  210. private function ConectToRedis($serv, $worker_id)
  211. {
  212. $conf = GlobConfigs::getKey('redis');
  213. $redis = new \Redis();
  214. try {
  215. $ret = $redis->connect($conf['host'], $conf['port'], $conf['overtime']);
  216. } catch (\Exception $e) {
  217. $ret = false;
  218. }
  219. if ($ret) {
  220. Wlog::getInstance()->WriteLog("success:成功建立redis连接 " . date("Y-m-d H:i:s") . ' ' . $worker_id);
  221. $this->workRedis = $redis;
  222. return $redis;
  223. } else {
  224. Wlog::getInstance()->WriteLog("error:建立redis连接 " . date("Y-m-d H:i:s") . ' ' . $worker_id);
  225. $this->workRedis = null;
  226. }
  227. return false;
  228. }
  229. //Mysql Ping检测
  230. private function PingMysql($serv, $worker_id)
  231. {
  232. if (empty($this->workMysql)) {
  233. $this->ConectToMysql($serv, $worker_id);
  234. return true;
  235. }
  236. $mysql = $this->workMysql;
  237. if (!$mysql) {
  238. $ping_ret = false;
  239. } else {
  240. $ping_ret_s = DB::select("select version() as v");
  241. $ping_ret = !empty($ping_ret_s) ? true : false;
  242. }
  243. if (!$ping_ret) {
  244. return $this->ConectToMysql($serv, $worker_id);
  245. }
  246. return true;
  247. }
  248. //工作线程 同步阻塞 Mysql 客户端
  249. private function ConectToMysql($serv, $worker_id)
  250. {
  251. $conf = GlobConfigs::getKey('mysql');
  252. $mysql = new Capsule();
  253. $mysql->addConnection($conf);
  254. $mysql->setAsGlobal();
  255. $mysql->bootEloquent();
  256. Wlog::getInstance()->WriteLog("success:成功建立Mysql连接 " . date("Y-m-d H:i:s") . ' ' . $worker_id);
  257. $this->workMysql = $mysql;
  258. return $mysql;
  259. }
  260. //Mysql Ping检测
  261. private function PingPgsql($serv, $worker_id)
  262. {
  263. if (empty($this->workPgsql)) {
  264. $this->ConectToPgsql($serv, $worker_id);
  265. return true;
  266. }
  267. $pgsql = $this->workPgsql;
  268. if (!$pgsql) {
  269. $ping_ret = false;
  270. } else {
  271. $ping_ret_s = DB::select("select version() as v");
  272. $ping_ret = !empty($ping_ret_s) ? true : false;
  273. }
  274. if (!$ping_ret) {
  275. return $this->ConectToPgsql($serv, $worker_id);
  276. }
  277. return true;
  278. }
  279. //工作线程 同步阻塞 Mysql 客户端
  280. private function ConectToPgsql($serv, $worker_id)
  281. {
  282. $conf = GlobConfigs::getKey('pgsql');
  283. $pgsql = new Capsule();
  284. $pgsql->addConnection($conf);
  285. $pgsql->setAsGlobal();
  286. $pgsql->bootEloquent();
  287. Wlog::getInstance()->WriteLog("success:成功建立Pgsql连接 " . date("Y-m-d H:i:s") . ' ' . $worker_id);
  288. $this->workPgsql = $pgsql;
  289. return $pgsql;
  290. }
  291. //服务启动作做一些初始化操作
  292. private function startInit()
  293. {
  294. $conf = GlobConfigs::getKey('redis');
  295. $adminconf = GlobConfigs::getKey('admin_conf');
  296. $redis = new \Redis();
  297. $ret = $redis->connect($conf['host'], $conf['port'], $conf['overtime']);
  298. if ($ret) {
  299. $redis->pipeline();
  300. $redis->del(MAPS_UID_FID);
  301. $redis->del(MAPS_FID_UID);
  302. //$redis->del(MAPS_TOKEN_UID);
  303. //$redis->del(MAPS_UID_TOKEN);
  304. $redis->hset(MAPS_UID_TOKEN, $adminconf['admin_uid'], $adminconf['md5']);
  305. $redis->hset(MAPS_TOKEN_UID, $adminconf['md5'], $adminconf['admin_uid']);
  306. $redis->exec();
  307. $redis->close();
  308. }
  309. }
  310. //redis推送消息
  311. private function MsgRedisList($serv, $worker_id)
  312. {
  313. }
  314. }