SubServer.php 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. <?php
  2. namespace app\logic;
  3. /**
  4. * Created by PhpStorm.
  5. * User: Administrator
  6. * Date: 2019/5/20
  7. * Time: 11:22
  8. */
  9. use app\lib\GlobConfigs;
  10. use app\lib\Wlog;
  11. use WebSocket\Client as Client;
  12. class SubServer
  13. {
  14. public $serv;
  15. //线程连接实例
  16. private $workRedis = null;
  17. private $configs = null;
  18. public function __construct()
  19. {
  20. $this->configs = GlobConfigs::getKey('swoolev_subserv');
  21. $this->serv = new \swoole_websocket_server($this->configs['host'], $this->configs['port']);
  22. unset($this->configs['host']);
  23. unset($this->configs['port']);
  24. $this->serv->set($this->configs);
  25. $this->serv->on('Start', array($this, 'onStart'));
  26. $this->serv->on('WorkerStart', array($this, 'onWorkerStart'));
  27. $this->serv->on('message', array($this, 'onMessage'));
  28. $this->serv->on('close', array($this, 'onClose'));
  29. $this->serv->on('open', array($this, 'onOpen'));
  30. $this->serv->on('task', array($this, 'onTask'));
  31. $this->serv->on('Finish', array($this, 'onFinish'));
  32. return $this->serv->start();
  33. }
  34. public function onStart($serv)
  35. {
  36. echo "Sub on Start " . "\n";
  37. }
  38. public function onWorkerStart($serv, $worker_id)
  39. {
  40. if ($serv->taskworker) {
  41. return;
  42. }
  43. $this->PingRedis($serv, $worker_id);
  44. $serv->tick(8000, function () use ($serv, $worker_id) {
  45. $this->PingRedis($serv, $worker_id);
  46. });
  47. if ($worker_id == 0) {
  48. $serv->tick(1000, function () use ($serv, $worker_id) {
  49. $msg_index_aray = $this->getIndexMsg();
  50. if (!empty($msg_index_aray)) {
  51. $this->sendIndexMsg($msg_index_aray);
  52. }
  53. });
  54. }
  55. }
  56. function onWorkerStop(swoole_server $server, int $worker_id)
  57. {
  58. echo "onWorkerStop.. " . $worker_id . ' ' . "\n";
  59. if ($this->workRedis) {
  60. $this->workRedis->close();
  61. }
  62. }
  63. public function onMessage($serv, $frame)
  64. {
  65. }
  66. public function onOpen($serv, $request)
  67. {
  68. }
  69. public function onTask($serv, \Swoole\Server\Task $task)
  70. {
  71. }
  72. public function onFinish($serv, $task_id, $data)
  73. {
  74. }
  75. public function onClose($serv, $fd, $from_id)
  76. {
  77. }
  78. //redis Ping检测
  79. private function PingRedis($serv, $worker_id)
  80. {
  81. if (empty($this->workRedis)) {
  82. $this->ConectToRedis($serv, $worker_id);
  83. return true;
  84. }
  85. $redis = $this->workRedis;
  86. if (!$redis) {
  87. $ping_ret = false;
  88. } else {
  89. $ping_ret_s = $redis->ping();
  90. $ping_ret = "+pong" === strtolower($ping_ret_s) ? true : false;
  91. }
  92. if (!$ping_ret) {
  93. return $this->ConectToRedis($serv, $worker_id);
  94. }
  95. return true;
  96. }
  97. //工作线程 同步阻塞 redis客户端
  98. private function ConectToRedis($serv, $worker_id)
  99. {
  100. $conf = GlobConfigs::getKey('redis');
  101. $redis = new \Redis();
  102. try {
  103. $ret = $redis->connect($conf['host'], $conf['port'], $conf['overtime']);
  104. } catch (\Exception $e) {
  105. $ret = false;
  106. }
  107. if ($ret) {
  108. Wlog::getInstance()->WriteLog("success:成功建立redis连接[In Subserver] " . date("Y-m-d H:i:s") . ' ' . $worker_id);
  109. $this->workRedis = $redis;
  110. return $redis;
  111. } else {
  112. Wlog::getInstance()->WriteLog("error:建立redis连接[In Subserver] " . date("Y-m-d H:i:s") . ' ' . $worker_id);
  113. $this->workRedis = null;
  114. }
  115. return false;
  116. }
  117. ////////////////////////
  118. //去redis里找 msg_index 是否有数据,有取出来,
  119. private function getIndexMsg()
  120. {
  121. $redis = $this->workRedis;
  122. $len = $redis->LLEN(MSG_INDEX);
  123. if ($len <=0 ) {
  124. return;
  125. }
  126. $datas = [];
  127. while ($now = $redis->rpop(MSG_INDEX)) {
  128. if (empty($now)) {
  129. break;
  130. }
  131. $datas[] = $now;
  132. }
  133. return $datas;
  134. }
  135. ///如果有msg_index数据,就通知服务端下发
  136. private function sendIndexMsg($msg_index_aray)
  137. {
  138. $conf = GlobConfigs::getKey('admin_conf');
  139. $token = $conf['admin_token'];
  140. $url = $conf['url'];
  141. $client = new Client("$url?token=" . $token);
  142. $arr = [
  143. 'cmd' => 'subserv',
  144. 'act' => 'index_msg',
  145. 'data' => $msg_index_aray,
  146. 'time' => time(),
  147. 'token' => '',
  148. ];
  149. $str = json_encode($arr, 256);
  150. $r = $client->send($str);
  151. $reciv = $client->receive();
  152. $client->close();
  153. unset($client);
  154. }
  155. /////////////////////////
  156. }