MyServerV2.php 11 KB


  1. <?php
  2. namespace app\logic;
  3. /**
  4. * Created by PhpStorm.
  5. * User: Administrator
  6. * Date: 2019/5/20
  7. * Time: 11:22
  8. */
  9. use app\lib\DataPack;
  10. use app\lib\GlobConfigs;
  11. use Illuminate\Database\Capsule\Manager as Capsule;
  12. use Illuminate\Database\Capsule\Manager as DB;
  13. use app\lib\Wlog;
  14. class MyServerV2
  15. {
  16. public $serv;
  17. //线程连接实例
  18. private $workRedis = null;
  19. private $workPgsql = null;
  20. public function __construct()
  21. {
  22. $config = GlobConfigs::getKey('swoole');
  23. $this->serv = new \swoole_websocket_server($config['host'], $config['port']);
  24. $this->serv->set($config['sets']);
  25. //内存表(用户uid和fd的双向映射表)
  26. $fd_table = new \swoole_table($config['maxUsers']);
  27. $fd_table->column("uid", \swoole_table::TYPE_INT, 4);
  28. $fd_table->create();
  29. $user_table = new \swoole_table($config['maxUsers']);
  30. $user_table->column("fid", \swoole_table::TYPE_INT, 4);
  31. $user_table->create();
  32. $this->serv->ftable = $fd_table;
  33. $this->serv->utable = $user_table;
  34. $this->serv->on('Start', array($this, 'onStart'));
  35. $this->serv->on('WorkerStart', array($this, 'onWorkerStart'));
  36. $this->serv->on('message', array($this, 'onMessage'));
  37. $this->serv->on('close', array($this, 'onClose'));
  38. $this->serv->on('open', array($this, 'onOpen'));
  39. $this->serv->on('task', array($this, 'onTask'));
  40. $this->serv->on('Finish', array($this, 'onFinish'));
  41. return $this->serv->start();
  42. }
  43. public function onStart($serv)
  44. {
  45. echo "Main on Start " . "\n";
  46. $this->startInit();
  47. }
  48. public function onWorkerStart($serv, $worker_id)
  49. {
  50. $type = $serv->taskworker ? 'Task' : 'Work';
  51. echo "onWorkerStart.. " . $type . '-' . $worker_id . "\n";
  52. if (!$serv->taskworker) {
  53. $this->PingRedis($serv, $worker_id);
  54. $serv->tick(8000, function () use ($serv, $worker_id) {
  55. $this->PingRedis($serv, $worker_id);
  56. });
  57. $this->PingPgsql($serv, $worker_id);
  58. $serv->tick(5000, function () use ($serv, $worker_id) {
  59. $this->PingPgsql($serv, $worker_id);
  60. });
  61. $nowRedis = $this->workRedis;
  62. TimeRsync::getInstance()->DoTimePublic($serv, $worker_id, $this->workRedis);
  63. $serv->tick(60000, function () use ($serv, $worker_id, $nowRedis) {
  64. TimeRsync::getInstance()->DoTimePublic($serv, $worker_id, $this->workRedis);
  65. });
  66. }
  67. }
  68. function onWorkerStop(swoole_server $server, int $worker_id)
  69. {
  70. echo "onWorkerStop.. " . $worker_id . ' ' . "\n";
  71. if ($this->workRedis) {
  72. $this->workRedis->close();
  73. }
  74. }
  75. public function onMessage($serv, $frame)
  76. {
  77. if (strtolower($frame->data) == strtolower('{"type":"ping"}')) {
  78. $serv->push($frame->fd, '{"type":"pong"}');
  79. return;
  80. }
  81. Wlog::getInstance()->WriteLog($frame);
  82. $serv->task($frame);
  83. }
  84. public function onOpen($serv, $request)
  85. {
  86. $config = GlobConfigs::getKey('swoole');
  87. if (count($serv->connections) > $config['maxUsers']) {
  88. $serv->push($request->fd, DataPack::toJson(['mtype' => 'system_msg', 'stype' => 'maxUsers', 'data' => '超过人数上限']));
  89. $serv->disconnect($request->fd);
  90. return;
  91. }
  92. $token = isset($request->get['token']) ? $request->get['token'] : '';
  93. //$uid = isset($request->get['uid']) ? $request->get['uid'] : '';
  94. $fd = $request->fd;
  95. $redis = $this->workRedis;
  96. if (!$redis) {
  97. $serv->push($request->fd, DataPack::toJson(['mtype' => 'system_msg', 'stype' => 'error', 'data' => ['msg' => '服务端错误!-01']]));
  98. $serv->disconnect($request->fd);
  99. return;
  100. }
  101. $token_uid = $uid = intval($this->workRedis->hget(MAPS_TOKEN_UID, md5($token)));
  102. echo '新请求uid-'.$token_uid.' fd-'.$fd.' 接入....'."\n";
  103. if (empty($token) || empty($token_uid) || $token_uid == '' || $token_uid == 0) {
  104. $serv->push($request->fd, DataPack::toJson(['mtype' => 'system_msg', 'stype' => 'invalid_token', 'data' => ['msg' => '无效的token']]));
  105. $serv->disconnect($request->fd);
  106. return;
  107. }
  108. //管理员账号可以多连
  109. $adminconfig = GlobConfigs::getKey('admin_conf');
  110. if ($adminconfig['admin_uid'] != $uid) {
  111. $oldfid = $this->workRedis->hget(MAPS_UID_FID, $uid);
  112. if ($oldfid != '' && $oldfid != $fd && $serv->exist($oldfid)) {
  113. $serv->push($oldfid, DataPack::toJson(['mtype' => 'system_msg', 'stype' => 'force_logout', 'data' => ['msg' => '你已在其它地方登陆,本次退出!']]));
  114. $serv->disconnect($oldfid);
  115. }
  116. $this->workRedis->hset(MAPS_UID_FID, $uid, $fd);
  117. $this->workRedis->hset(MAPS_FID_UID, $fd, $uid);
  118. $serv->ftable->set($fd, ['uid' => $uid]);
  119. $serv->utable->set($uid, ['fid' => $fd]);
  120. } else {
  121. if (!$this->admin_ip_Check($request->server['remote_addr'], $adminconfig['whiteips'])) {
  122. $serv->push($request->fd, DataPack::toJson(['mtype' => 'system_msg', 'stype' => 'force_logout', 'data' => ['msg' => '登陆被限!' . $request->server['remote_addr']]]));
  123. $serv->disconnect($request->fd);
  124. return;
  125. }
  126. $this->workRedis->hset(MAPS_FID_UID, $fd, $uid);
  127. $serv->ftable->set($fd, ['uid' => $uid]);
  128. }
  129. $msg = '成功接入 [' . $request->server['remote_addr'] . ' - ' . $fd . ' - ' . $uid . "]";
  130. $serv->push($fd, DataPack::toJson(['mtype' => 'system_msg', 'stype' => 'well_come', 'data' => ['msg' => $msg]]));
  131. }
  132. public function onTask($serv, \Swoole\Server\Task $task)
  133. {
  134. try {
  135. Wlog::getInstance()->WriteLog($task, 1, $serv->worker_id);
  136. CmdProxy::getInstance()->ParaCMD($serv, $task);
  137. } catch (\Exception $e) {
  138. Wlog::getInstance()->WriteLog(['onTask error:', $task, $e->getCode() . ' ' . $e->getMessage()], 3, $serv->worker_id);
  139. echo "发生异常." . $e->getCode() . ' ' . $e->getMessage() . "\n";
  140. }
  141. }
  142. public function onFinish($serv, $task_id, $data)
  143. {
  144. }
  145. public function onClose($serv, $fd, $from_id)
  146. {
  147. $uid = $this->workRedis->hget("MAPS_FID_UID", $fd);
  148. $adminconfig = GlobConfigs::getKey('admin_conf');
  149. if ($adminconfig['admin_uid'] != $uid) {
  150. $this->workRedis->hdel(MAPS_UID_FID, $uid);
  151. $this->workRedis->hdel(MAPS_FID_UID, $fd);
  152. $serv->ftable->del($fd);
  153. $serv->utable->del($uid);
  154. } else {
  155. $this->workRedis->hdel(MAPS_FID_UID, $fd);
  156. $serv->ftable->del($fd);
  157. }
  158. echo "ClientFd:{$fd} -- uid:{$uid} close connection!\n";
  159. }
  160. //redis Ping检测
  161. private function PingRedis($serv, $worker_id)
  162. {
  163. if (empty($this->workRedis)) {
  164. $this->ConectToRedis($serv, $worker_id);
  165. return true;
  166. }
  167. $redis = $this->workRedis;
  168. if (!$redis) {
  169. $ping_ret = false;
  170. } else {
  171. $ping_ret_s = $redis->ping();
  172. $ping_ret = "+pong" === strtolower($ping_ret_s) ? true : false;
  173. }
  174. if (!$ping_ret) {
  175. return $this->ConectToRedis($serv, $worker_id);
  176. }
  177. return true;
  178. }
  179. //工作线程 同步阻塞 redis客户端
  180. private function ConectToRedis($serv, $worker_id)
  181. {
  182. $conf = GlobConfigs::getKey('redis');
  183. $redis = new \Redis();
  184. try {
  185. $ret = $redis->connect($conf['host'], $conf['port'], $conf['overtime']);
  186. } catch (\Exception $e) {
  187. $ret = false;
  188. }
  189. if ($ret) {
  190. Wlog::getInstance()->WriteLog("success:成功建立redis连接 " . date("Y-m-d H:i:s") . ' ' . $worker_id);
  191. $this->workRedis = $redis;
  192. return $redis;
  193. } else {
  194. Wlog::getInstance()->WriteLog("error:建立redis连接 " . date("Y-m-d H:i:s") . ' ' . $worker_id);
  195. $this->workRedis = null;
  196. }
  197. return false;
  198. }
  199. //Mysql Ping检测
  200. private function PingPgsql($serv, $worker_id)
  201. {
  202. if (empty($this->workPgsql)) {
  203. $this->ConectToPgsql($serv, $worker_id);
  204. return true;
  205. }
  206. $pgsql = $this->workPgsql;
  207. if (!$pgsql) {
  208. $ping_ret = false;
  209. } else {
  210. $ping_ret_s = DB::select("select version() as v");
  211. $ping_ret = !empty($ping_ret_s) ? true : false;
  212. }
  213. if (!$ping_ret) {
  214. return $this->ConectToPgsql($serv, $worker_id);
  215. }
  216. return true;
  217. }
  218. //工作线程 同步阻塞 Mysql 客户端
  219. private function ConectToPgsql($serv, $worker_id)
  220. {
  221. $conf = GlobConfigs::getKey('pgsql');
  222. $pgsql = new Capsule();
  223. $pgsql->addConnection($conf);
  224. $pgsql->setAsGlobal();
  225. $pgsql->bootEloquent();
  226. Wlog::getInstance()->WriteLog("success:成功建立Pgsql连接 " . date("Y-m-d H:i:s") . ' ' . $worker_id);
  227. $this->workPgsql = $pgsql;
  228. return $pgsql;
  229. }
  230. //服务启动作做一些初始化操作
  231. private function startInit()
  232. {
  233. $conf = GlobConfigs::getKey('redis');
  234. $adminconf = GlobConfigs::getKey('admin_conf');
  235. $redis = new \Redis();
  236. $ret = $redis->connect($conf['host'], $conf['port'], $conf['overtime']);
  237. if ($ret) {
  238. $redis->pipeline();
  239. $redis->del(MAPS_UID_FID);
  240. $redis->del(MAPS_FID_UID);
  241. //$redis->del(MAPS_TOKEN_UID);
  242. //$redis->del(MAPS_UID_TOKEN);
  243. $redis->hset(MAPS_UID_TOKEN, $adminconf['admin_uid'], $adminconf['md5']);
  244. $redis->hset(MAPS_TOKEN_UID, $adminconf['md5'], $adminconf['admin_uid']);
  245. $redis->exec();
  246. $redis->close();
  247. }
  248. }
  249. //admin ip check
  250. private function admin_ip_Check($ip, $okipArray = [])
  251. {
  252. if (empty($okipArray)) {
  253. return false;
  254. }
  255. foreach ($okipArray as $aip) {
  256. if ($ip === $aip) {
  257. return true;
  258. }
  259. $spos = strpos($aip, "*");
  260. if ($spos === false) {
  261. continue;
  262. } else {
  263. for ($i = 0; $i < $spos; $i++) {
  264. if ($ip[$i] != $aip[$i]) {
  265. continue;
  266. }
  267. }
  268. return true;
  269. }
  270. }
  271. return false;
  272. }
  273. }