MyServerV2.php 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334
  1. <?php
  2. namespace app\logic;
  3. /**
  4. * Created by PhpStorm.
  5. * User: Administrator
  6. * Date: 2019/5/20
  7. * Time: 11:22
  8. */
  9. use app\lib\GlobConfigs;
  10. use Illuminate\Database\Capsule\Manager as Capsule;
  11. use Illuminate\Database\Capsule\Manager as DB;
  12. use app\lib\Wlog;
  13. use app\logic\MyPgsql as MyPgsql;
  14. class MyServerV2
  15. {
  16. public $serv;
  17. //线程连接实例
  18. private $workRedis = null;
  19. private $workMysql = null;
  20. private $workPgsql = null ;
  21. public function __construct()
  22. {
  23. $this->serv = new \swoole_websocket_server("0.0.0.0", 9090);
  24. $this->serv->set(GlobConfigs::getKey('swoolev2'));
  25. $this->serv->on('Start', array($this, 'onStart'));
  26. $this->serv->on('WorkerStart', array($this, 'onWorkerStart'));
  27. $this->serv->on('message', array($this, 'onMessage'));
  28. $this->serv->on('close', array($this, 'onClose'));
  29. $this->serv->on('open', array($this, 'onOpen'));
  30. $this->serv->on('task', array($this, 'onTask'));
  31. $this->serv->on('Finish', array($this, 'onFinish'));
  32. return $this->serv->start();
  33. }
  34. public function onStart($serv)
  35. {
  36. echo "Main on Start " . "\n";
  37. $this->startInit();
  38. }
  39. public function onWorkerStart($serv, $worker_id)
  40. {
  41. $type = $serv->taskworker ? 'Task' : 'Work';
  42. echo "onWorkerStart.. " . $type . '-' . $worker_id . "\n";
  43. if (!$serv->taskworker) {
  44. $this->PingRedis($serv, $worker_id);
  45. $serv->tick(8000, function () use ($serv, $worker_id) {
  46. $this->PingRedis($serv, $worker_id);
  47. });
  48. /*
  49. $this->PingMysql($serv, $worker_id);
  50. $serv->tick(5000, function () use ($serv, $worker_id) {
  51. $this->PingMysql($serv, $worker_id);
  52. });
  53. */
  54. $this->PingPgsql($serv, $worker_id);
  55. $serv->tick(5000, function () use ($serv, $worker_id) {
  56. $this->PingPgsql($serv, $worker_id);
  57. });
  58. $nowRedis = $this->workRedis;
  59. TimeRsync::getInstance()->DoTimePublic($serv, $worker_id, $this->workRedis);
  60. $serv->tick(60000, function () use ($serv, $worker_id, $nowRedis) {
  61. TimeRsync::getInstance()->DoTimePublic($serv, $worker_id, $this->workRedis);
  62. });
  63. MyPgsql::getInstance();
  64. //消息订阅
  65. if ($worker_id == 0){
  66. if ( !$worker_id ){ return ; }
  67. go(function() use ($serv,$worker_id){
  68. $config = GlobConfigs::getKey('redis');
  69. $redis = new \Swoole\Coroutine\Redis();
  70. $redis->setOptions(['compatibility_mode' => true]);
  71. $redis->connect($config['host'], $config['port']);
  72. $r = $redis->SUBSCRIBE([MSG_REDIS_SUBSCRIBE,'TEST']);
  73. if ($r){
  74. while ($msg = $redis->recv()) {
  75. print_r($msg);
  76. // msg是一个数组, 包含以下信息
  77. // $type # 返回值的类型:显示订阅成功
  78. // $name # 订阅的频道名字 或 来源频道名字
  79. // $info # 目前已订阅的频道数量 或 信息内容
  80. list($type, $name, $info) = $msg;
  81. if ($type == 'subscribe') // 或psubscribe
  82. {
  83. // 频道订阅成功消息,订阅几个频道就有几条
  84. }
  85. else if ($type == 'unsubscribe' && $info == 0) // 或punsubscribe
  86. {
  87. break; // 收到取消订阅消息,并且剩余订阅的频道数为0,不再接收,结束循环
  88. }
  89. else if ($type == 'message') // 若为psubscribe,此处为pmessage
  90. {
  91. /*
  92. if ($need_unsubscribe) // 某个情况下需要退订
  93. {
  94. $redis->unsubscribe(); // 继续recv等待退订完成
  95. }
  96. */
  97. }
  98. }
  99. }
  100. });
  101. }
  102. }
  103. }
  104. function onWorkerStop(swoole_server $server, int $worker_id)
  105. {
  106. echo "onWorkerStop.. " . $worker_id . ' ' . "\n";
  107. if ($this->workRedis) {
  108. $this->workRedis->close();
  109. }
  110. if ($this->workMysql) {
  111. $this->workMysql->close();
  112. }
  113. }
  114. public function onMessage($serv, $frame)
  115. {
  116. Wlog::getInstance()->WriteLog($frame);
  117. $serv->task($frame);
  118. }
  119. public function onOpen($serv, $request)
  120. {
  121. $token = isset($request->get['token']) ? $request->get['token'] : '';
  122. $uid = isset($request->get['uid']) ? $request->get['uid'] : '';
  123. $fd = $request->fd;
  124. $redis = $this->workRedis;
  125. if (!$redis) {
  126. return;
  127. }
  128. $redis->hset(ON_OPEN, $fd, json_encode(['fd' => $fd, 'token' => $token, 'uid' => $uid], JSON_UNESCAPED_UNICODE));
  129. $str = '你好 ! fd=' . $fd . ' ,ip=' . $request->server['remote_addr'] . ' ,port=' . $request->server['remote_port'] . ' ,time=' . date('Y-m-d H:i:s');
  130. echo $str ."\n" ;
  131. $serv->push($fd, $str);
  132. /*
  133. $serv->after(120000,function() use($serv,$fd,$redis){
  134. if ($serv->exist($fd)){
  135. $serv->disconnect($fd,1000,json_encode('未验证用户关闭',JSON_UNESCAPED_UNICODE));
  136. $redis->hdel('onOpen',$fd);
  137. }
  138. });
  139. */
  140. }
  141. public function onTask($serv, \Swoole\Server\Task $task)
  142. {
  143. try {
  144. Wlog::getInstance()->WriteLog($task, 1, $serv->worker_id);
  145. CmdProxy::getInstance()->ParaCMD($serv, $task);
  146. } catch (\Exception $e) {
  147. echo "发生异常." . $e->getCode() . ' ' . $e->getMessage() . "\n";
  148. }
  149. }
  150. public function onFinish($serv, $task_id, $data)
  151. {
  152. }
  153. public function onClose($serv, $fd, $from_id)
  154. {
  155. $this->workRedis->hdel(ON_OPEN, $fd);
  156. echo "Client {$fd} close connection!\n";
  157. }
  158. //redis Ping检测
  159. private function PingRedis($serv, $worker_id)
  160. {
  161. if (empty($this->workRedis)) {
  162. $this->ConectToRedis($serv, $worker_id);
  163. return true;
  164. }
  165. $redis = $this->workRedis;
  166. if (!$redis) {
  167. $ping_ret = false;
  168. } else {
  169. $ping_ret_s = $redis->ping();
  170. $ping_ret = "+pong" === strtolower($ping_ret_s) ? true : false;
  171. }
  172. if (!$ping_ret) {
  173. return $this->ConectToRedis($serv, $worker_id);
  174. }
  175. return true;
  176. }
  177. //工作线程 同步阻塞 redis客户端
  178. private function ConectToRedis($serv, $worker_id)
  179. {
  180. $conf = GlobConfigs::getKey('redis');
  181. $redis = new \Redis();
  182. try {
  183. $ret = $redis->connect($conf['host'], $conf['port'], $conf['overtime']);
  184. } catch (\Exception $e) {
  185. $ret = false;
  186. }
  187. if ($ret) {
  188. Wlog::getInstance()->WriteLog("success:成功建立redis连接 " . date("Y-m-d H:i:s") . ' ' . $worker_id);
  189. $this->workRedis = $redis;
  190. return $redis;
  191. } else {
  192. Wlog::getInstance()->WriteLog("error:建立redis连接 " . date("Y-m-d H:i:s") . ' ' . $worker_id);
  193. $this->workRedis = null;
  194. }
  195. return false;
  196. }
  197. //Mysql Ping检测
  198. private function PingMysql($serv, $worker_id)
  199. {
  200. if (empty($this->workMysql)) {
  201. $this->ConectToMysql($serv, $worker_id);
  202. return true;
  203. }
  204. $mysql = $this->workMysql;
  205. if (!$mysql) {
  206. $ping_ret = false;
  207. } else {
  208. $ping_ret_s = DB::select("select version() as v");
  209. $ping_ret = !empty($ping_ret_s) ? true : false;
  210. }
  211. if (!$ping_ret) {
  212. return $this->ConectToMysql($serv, $worker_id);
  213. }
  214. return true;
  215. }
  216. //工作线程 同步阻塞 Mysql 客户端
  217. private function ConectToMysql($serv, $worker_id)
  218. {
  219. $conf = GlobConfigs::getKey('mysql');
  220. $mysql = new Capsule();
  221. $mysql->addConnection($conf);
  222. $mysql->setAsGlobal();
  223. $mysql->bootEloquent();
  224. Wlog::getInstance()->WriteLog("success:成功建立Mysql连接 " . date("Y-m-d H:i:s") . ' ' . $worker_id);
  225. $this->workMysql = $mysql;
  226. return $mysql;
  227. }
  228. //Mysql Ping检测
  229. private function PingPgsql($serv, $worker_id)
  230. {
  231. if (empty($this->workPgsql)) {
  232. $this->ConectToPgsql($serv, $worker_id);
  233. return true;
  234. }
  235. $pgsql = $this->workPgsql;
  236. if (!$pgsql) {
  237. $ping_ret = false;
  238. } else {
  239. $ping_ret_s = DB::select("select version() as v");
  240. $ping_ret = !empty($ping_ret_s) ? true : false;
  241. }
  242. if (!$ping_ret) {
  243. return $this->ConectToPgsql($serv, $worker_id);
  244. }
  245. return true;
  246. }
  247. //工作线程 同步阻塞 Mysql 客户端
  248. private function ConectToPgsql($serv, $worker_id)
  249. {
  250. $conf = GlobConfigs::getKey('pgsql');
  251. $pgsql = new Capsule();
  252. $pgsql->addConnection($conf);
  253. $pgsql->setAsGlobal();
  254. $pgsql->bootEloquent();
  255. Wlog::getInstance()->WriteLog("success:成功建立Pgsql连接 " . date("Y-m-d H:i:s") . ' ' . $worker_id);
  256. $this->workPgsql = $pgsql;
  257. return $pgsql;
  258. }
  259. //服务启动作做一些初始化操作
  260. private function startInit()
  261. {
  262. $conf = GlobConfigs::getKey('redis');
  263. $redis = new \Redis();
  264. $ret = $redis->connect($conf['host'], $conf['port'], $conf['overtime']);
  265. if ($ret) {
  266. $redis->pipeline();
  267. $redis->del(MAPS_UID_FID);
  268. $redis->del(MAPS_FID_UID);
  269. $redis->del(MAPS_USER_TOKEN);
  270. $redis->del(ON_OPEN);
  271. $redis->exec();
  272. $redis->close();
  273. }
  274. }
  275. //redis推送消息
  276. private function MsgRedisList($serv, $worker_id)
  277. {
  278. }
  279. }