SubServer.php 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. <?php
  2. namespace app\logic;
  3. /**
  4. * Created by PhpStorm.
  5. * User: Administrator
  6. * Date: 2019/5/20
  7. * Time: 11:22
  8. */
  9. use app\lib\GlobConfigs;
  10. use app\lib\Wlog;
  11. use WebSocket\Client as Client;
  12. class SubServer
  13. {
  14. public $serv;
  15. //线程连接实例
  16. private $workRedis = null;
  17. private $configs = null;
  18. public function __construct()
  19. {
  20. $this->configs = GlobConfigs::getKey('swoole_subserv');
  21. $this->serv = new \swoole_websocket_server($this->configs['host'], $this->configs['port']);
  22. $this->serv->set($this->configs['sets']);
  23. $this->serv->on('Start', array($this, 'onStart'));
  24. $this->serv->on('WorkerStart', array($this, 'onWorkerStart'));
  25. $this->serv->on('message', array($this, 'onMessage'));
  26. $this->serv->on('close', array($this, 'onClose'));
  27. $this->serv->on('open', array($this, 'onOpen'));
  28. return $this->serv->start();
  29. }
  30. public function onStart($serv)
  31. {
  32. echo "Sub on Start " . "\n";
  33. }
  34. public function onWorkerStart($serv, $worker_id)
  35. {
  36. if ($serv->taskworker) {
  37. return;
  38. }
  39. $this->PingRedis($serv, $worker_id);
  40. $serv->tick(8000, function () use ($serv, $worker_id) {
  41. $this->PingRedis($serv, $worker_id);
  42. });
  43. if ($worker_id == 0) {
  44. $serv->tick(500, function () use ($serv, $worker_id) {
  45. $msg_index_aray = $this->getBoardMsg();
  46. if (!empty($msg_index_aray)) {
  47. $this->SendMsgComm('msg_board', $msg_index_aray);
  48. }
  49. });
  50. }
  51. if ($worker_id == 1) {
  52. $serv->tick(500, function () use ($serv, $worker_id) {
  53. $msg_user_array = $this->getSingMsg();
  54. if (!empty($msg_user_array)) {
  55. $this->SendMsgComm('msg_to_one_user', $msg_user_array);
  56. }
  57. });
  58. }
  59. if ($worker_id == 2) {
  60. $serv->tick(5000, function () use ($serv, $worker_id) {
  61. $msg_tick_array = $this->getDoTick();
  62. if (!empty($msg_tick_array)) {
  63. $this->SendMsgComm('do_tick', $msg_tick_array);
  64. }
  65. });
  66. }
  67. }
  68. function onWorkerStop(swoole_server $server, int $worker_id)
  69. {
  70. echo "onWorkerStop.. " . $worker_id . ' ' . "\n";
  71. if ($this->workRedis) {
  72. $this->workRedis->close();
  73. }
  74. }
  75. public function onMessage($serv, $frame)
  76. {
  77. }
  78. public function onOpen($serv, $request)
  79. {
  80. }
  81. public function onClose($serv, $fd, $from_id)
  82. {
  83. }
  84. //redis Ping检测
  85. private function PingRedis($serv, $worker_id)
  86. {
  87. if (empty($this->workRedis)) {
  88. $this->ConectToRedis($serv, $worker_id);
  89. return true;
  90. }
  91. $redis = $this->workRedis;
  92. if (!$redis) {
  93. $ping_ret = false;
  94. } else {
  95. $ping_ret_s = $redis->ping();
  96. $ping_ret = "+pong" === strtolower($ping_ret_s) ? true : false;
  97. }
  98. if (!$ping_ret) {
  99. return $this->ConectToRedis($serv, $worker_id);
  100. }
  101. return true;
  102. }
  103. //工作线程 同步阻塞 redis客户端
  104. private function ConectToRedis($serv, $worker_id)
  105. {
  106. $conf = GlobConfigs::getKey('redis');
  107. $redis = new \Redis();
  108. try {
  109. $ret = $redis->connect($conf['host'], $conf['port'], $conf['overtime']);
  110. } catch (\Exception $e) {
  111. $ret = false;
  112. }
  113. if ($ret) {
  114. Wlog::getInstance()->WriteLog("success:成功建立redis连接[In Subserver] " . date("Y-m-d H:i:s") . ' ' . $worker_id);
  115. $this->workRedis = $redis;
  116. return $redis;
  117. } else {
  118. Wlog::getInstance()->WriteLog("error:建立redis连接[In Subserver] " . date("Y-m-d H:i:s") . ' ' . $worker_id);
  119. $this->workRedis = null;
  120. }
  121. return false;
  122. }
  123. ////////////////////////
  124. //去redis里找 msg_index 是否有数据,有取出来,
  125. private function getBoardMsg()
  126. {
  127. $redis = $this->workRedis;
  128. $len = $redis->LLEN(MSG_BOARD);
  129. $i = 0;
  130. if ($len <= 0) {
  131. return;
  132. }
  133. $datas = [];
  134. while ($now = $redis->rpop(MSG_BOARD)) {
  135. if (empty($now) || $i >= 100) {
  136. $i = 0;
  137. break;
  138. }
  139. $now = json_decode($now, true);
  140. if (!isset($now['stype']) || empty($now['stype'])) {
  141. Wlog::getInstance()->WriteLog(['数据格式误,没有stype字段[subserver]', $now]);
  142. continue;
  143. }
  144. if (!isset($now['data'])) {
  145. Wlog::getInstance()->WriteLog(['数据格式误,没有data字段[subserver]', $now]);
  146. continue;
  147. }
  148. $datas[] = $now;
  149. }
  150. $i++;
  151. return $datas;
  152. }
  153. private function getSingMsg()
  154. {
  155. $redis = $this->workRedis;
  156. $len = $redis->LLEN(MSG_TO_ONE_USER);
  157. $i = 0;
  158. if ($len <= 0) {
  159. return;
  160. }
  161. $datas = [];
  162. while ($now = $redis->rpop(MSG_TO_ONE_USER)) {
  163. if (empty($now) || $i >= 500) {
  164. $i = 0;
  165. break;
  166. }
  167. $now = json_decode($now, true);
  168. if (!isset($now['mtype']) || empty($now['mtype'])) {
  169. Wlog::getInstance()->WriteLog(['数据格式误,没有mtype字段[subserver]', $now]);
  170. continue;
  171. }
  172. if (!isset($now['stype']) || empty($now['stype'])) {
  173. Wlog::getInstance()->WriteLog(['数据格式误,没有stype字段[subserver]', $now]);
  174. continue;
  175. }
  176. if (!isset($now['data'])) {
  177. Wlog::getInstance()->WriteLog(['数据格式误,没有data字段[subserver]', $now]);
  178. continue;
  179. }
  180. if (!isset($now['to']) || empty($now['to'])) {
  181. Wlog::getInstance()->WriteLog(['数据格式误,没有to字段或为空[subserver]', $now]);
  182. continue;
  183. }
  184. if (!isset($now['from'])) {
  185. Wlog::getInstance()->WriteLog(['数据格式误,没有from字段[subserver]', $now]);
  186. continue;
  187. }
  188. $now['confirm'] = (isset($now['confirm']) && $now['confirm'] == 1) ? 1 : 0;
  189. $datas[] = $now;
  190. $i++;
  191. }
  192. return $datas;
  193. }
  194. private function getDoTick()
  195. {
  196. $redis = $this->workRedis;
  197. $len = $redis->LLEN(DO_TICK_USER);
  198. $i = 0;
  199. if ($len <= 0) {
  200. return;
  201. }
  202. $datas = [];
  203. while ($now = $redis->rpop(DO_TICK_USER)) {
  204. if (empty($now) || $i >= 100) {
  205. $i = 0;
  206. break;
  207. }
  208. $allArray = json_decode($now, true);
  209. if (!is_array($allArray)) {
  210. Wlog::getInstance()->WriteLog(['数据格式误,不是全法数组[subserver-getDoTick]', $now]);
  211. }
  212. foreach ($allArray as $one) {
  213. if (!isset($one['uid']) || empty($one['uid'])) {
  214. Wlog::getInstance()->WriteLog(['数据格式误,没有uid字段[subserver-getDoTick]', $now]);
  215. continue;
  216. }
  217. if (!isset($one['msg'])) {
  218. Wlog::getInstance()->WriteLog(['数据格式误,没有msg字段[subserver-getDoTick]', $now]);
  219. continue;
  220. }
  221. $datas[] = $one;
  222. }
  223. $i++;
  224. }
  225. return $datas;
  226. }
  227. private function SendMsgComm($act, $msgArray)
  228. {
  229. $conf = GlobConfigs::getKey('admin_conf');
  230. $token = $conf['admin_token'];
  231. $url = $conf['url'];
  232. $client = new Client("$url?token=" . $token);
  233. $arr = [
  234. 'cmd' => 'subserv',
  235. 'act' => $act,
  236. 'data' => $msgArray,
  237. 'time' => time(),
  238. 'token' => '',
  239. ];
  240. $str = json_encode($arr, 256);
  241. $client->send($str);
  242. $client->receive();
  243. $client->close();
  244. unset($client);
  245. }
  246. }