SubServer.php 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291
  1. <?php
  2. namespace app\logic;
  3. /**
  4. * Created by PhpStorm.
  5. * User: Administrator
  6. * Date: 2019/5/20
  7. * Time: 11:22
  8. */
  9. use app\lib\GlobConfigs;
  10. use app\lib\Wlog;
  11. use WebSocket\Client as Client;
  12. class SubServer
  13. {
  14. public $serv;
  15. //线程连接实例
  16. private $workRedis = null;
  17. private $configs = null;
  18. public function __construct()
  19. {
  20. $this->configs = GlobConfigs::getKey('swoole_subserv');
  21. $this->serv = new \swoole_websocket_server($this->configs['host'], $this->configs['port']);
  22. $this->serv->set($this->configs['sets']);
  23. $this->serv->on('Start', array($this, 'onStart'));
  24. $this->serv->on('WorkerStart', array($this, 'onWorkerStart'));
  25. $this->serv->on('message', array($this, 'onMessage'));
  26. $this->serv->on('close', array($this, 'onClose'));
  27. $this->serv->on('open', array($this, 'onOpen'));
  28. return $this->serv->start();
  29. }
  30. public function onStart($serv)
  31. {
  32. echo "Sub on Start " . "\n";
  33. }
  34. public function onWorkerStart($serv, $worker_id)
  35. {
  36. if ($serv->taskworker) {
  37. return;
  38. }
  39. $this->PingRedis($serv, $worker_id);
  40. $serv->tick(8000, function () use ($serv, $worker_id) {
  41. $this->PingRedis($serv, $worker_id);
  42. });
  43. if ($worker_id == 0) {
  44. $serv->tick(1000, function () use ($serv, $worker_id) {
  45. $msg_index_aray = $this->getBoardMsg();
  46. if (!empty($msg_index_aray)) {
  47. $this->SendMsgComm('msg_board', $msg_index_aray);
  48. }
  49. });
  50. }
  51. if ($worker_id == 1) {
  52. $serv->tick(1000, function () use ($serv, $worker_id) {
  53. $msg_user_array = $this->getSingMsg();
  54. if (!empty($msg_user_array)) {
  55. $this->SendMsgComm('msg_to_one_user', $msg_user_array);
  56. //$this->sendSingMsg($msg_user_array);
  57. }
  58. });
  59. }
  60. if ($worker_id == 2) {
  61. $serv->tick(5000, function () use ($serv, $worker_id) {
  62. $msg_tick_array = $this->getDoTick();
  63. if (!empty($msg_tick_array)) {
  64. $this->SendMsgComm('do_tick', $msg_tick_array);
  65. }
  66. });
  67. }
  68. }
  69. function onWorkerStop(swoole_server $server, int $worker_id)
  70. {
  71. echo "onWorkerStop.. " . $worker_id . ' ' . "\n";
  72. if ($this->workRedis) {
  73. $this->workRedis->close();
  74. }
  75. }
  76. public function onMessage($serv, $frame)
  77. {
  78. }
  79. public function onOpen($serv, $request)
  80. {
  81. }
  82. public function onClose($serv, $fd, $from_id)
  83. {
  84. }
  85. //redis Ping检测
  86. private function PingRedis($serv, $worker_id)
  87. {
  88. if (empty($this->workRedis)) {
  89. $this->ConectToRedis($serv, $worker_id);
  90. return true;
  91. }
  92. $redis = $this->workRedis;
  93. if (!$redis) {
  94. $ping_ret = false;
  95. } else {
  96. $ping_ret_s = $redis->ping();
  97. $ping_ret = "+pong" === strtolower($ping_ret_s) ? true : false;
  98. }
  99. if (!$ping_ret) {
  100. return $this->ConectToRedis($serv, $worker_id);
  101. }
  102. return true;
  103. }
  104. //工作线程 同步阻塞 redis客户端
  105. private function ConectToRedis($serv, $worker_id)
  106. {
  107. $conf = GlobConfigs::getKey('redis');
  108. $redis = new \Redis();
  109. try {
  110. $ret = $redis->connect($conf['host'], $conf['port'], $conf['overtime']);
  111. } catch (\Exception $e) {
  112. $ret = false;
  113. }
  114. if ($ret) {
  115. Wlog::getInstance()->WriteLog("success:成功建立redis连接[In Subserver] " . date("Y-m-d H:i:s") . ' ' . $worker_id);
  116. $this->workRedis = $redis;
  117. return $redis;
  118. } else {
  119. Wlog::getInstance()->WriteLog("error:建立redis连接[In Subserver] " . date("Y-m-d H:i:s") . ' ' . $worker_id);
  120. $this->workRedis = null;
  121. }
  122. return false;
  123. }
  124. ////////////////////////
  125. //去redis里找 msg_index 是否有数据,有取出来,
  126. private function getBoardMsg()
  127. {
  128. $redis = $this->workRedis;
  129. $len = $redis->LLEN(MSG_BOARD);
  130. if ($len <= 0) {
  131. return;
  132. }
  133. $datas = [];
  134. while ($now = $redis->rpop(MSG_BOARD)) {
  135. if (empty($now)) {
  136. break;
  137. }
  138. $now = json_decode($now, true);
  139. if (!isset($now['stype']) || empty($now['stype'])) {
  140. Wlog::getInstance()->WriteLog(['数据格式误,没有stype字段[subserver]', $now]);
  141. continue;
  142. }
  143. if (!isset($now['data'])) {
  144. Wlog::getInstance()->WriteLog(['数据格式误,没有data字段[subserver]', $now]);
  145. continue;
  146. }
  147. $datas[] = $now;
  148. }
  149. return $datas;
  150. }
  151. private function getSingMsg()
  152. {
  153. $redis = $this->workRedis;
  154. $len = $redis->LLEN(MSG_TO_ONE_USER);
  155. if ($len <= 0) {
  156. return;
  157. }
  158. $datas = [];
  159. while ($now = $redis->rpop(MSG_TO_ONE_USER)) {
  160. if (empty($now)) {
  161. break;
  162. }
  163. $now = json_decode($now, true);
  164. if (!isset($now['mtype']) || empty($now['mtype'])) {
  165. Wlog::getInstance()->WriteLog(['数据格式误,没有mtype字段[subserver]', $now]);
  166. continue;
  167. }
  168. if (!isset($now['stype']) || empty($now['stype'])) {
  169. Wlog::getInstance()->WriteLog(['数据格式误,没有stype字段[subserver]', $now]);
  170. continue;
  171. }
  172. if (!isset($now['data'])) {
  173. Wlog::getInstance()->WriteLog(['数据格式误,没有data字段[subserver]', $now]);
  174. continue;
  175. }
  176. if (!isset($now['to']) || empty($now['to'])) {
  177. Wlog::getInstance()->WriteLog(['数据格式误,没有to字段或为空[subserver]', $now]);
  178. continue;
  179. }
  180. if (!isset($now['from'])) {
  181. Wlog::getInstance()->WriteLog(['数据格式误,没有from字段[subserver]', $now]);
  182. continue;
  183. }
  184. $now['confirm'] = (isset($now['confirm']) && $now['confirm'] == 1) ? 1 : 0;
  185. $datas[] = $now;
  186. }
  187. return $datas;
  188. }
  189. private function getDoTick()
  190. {
  191. $redis = $this->workRedis;
  192. $len = $redis->LLEN(DO_TICK_USER);
  193. if ($len <= 0) {
  194. return;
  195. }
  196. $datas = [];
  197. while ($now = $redis->rpop(DO_TICK_USER)) {
  198. if (empty($now)) {
  199. break;
  200. }
  201. $allArray = json_decode($now, true);
  202. if (!is_array($allArray)) {
  203. Wlog::getInstance()->WriteLog(['数据格式误,不是全法数组[subserver-getDoTick]', $now]);
  204. }
  205. foreach ($allArray as $one) {
  206. if (!isset($one['uid']) || empty($one['uid'])) {
  207. Wlog::getInstance()->WriteLog(['数据格式误,没有uid字段[subserver-getDoTick]', $now]);
  208. continue;
  209. }
  210. if (!isset($one['msg'])) {
  211. Wlog::getInstance()->WriteLog(['数据格式误,没有msg字段[subserver-getDoTick]', $now]);
  212. continue;
  213. }
  214. $datas[] = $one;
  215. }
  216. }
  217. return $datas;
  218. }
  219. private function SendMsgComm($act, $msgArray)
  220. {
  221. $conf = GlobConfigs::getKey('admin_conf');
  222. $token = $conf['admin_token'];
  223. $url = $conf['url'];
  224. $client = new Client("$url?token=" . $token);
  225. $arr = [
  226. 'cmd' => 'subserv',
  227. 'act' => $act,
  228. 'data' => $msgArray,
  229. 'time' => time(),
  230. 'token' => '',
  231. ];
  232. $str = json_encode($arr, 256);
  233. $client->send($str);
  234. $client->receive();
  235. $client->close();
  236. unset($client);
  237. }
  238. }