| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296 |
- <?php
- namespace app\logic;
- /**
- * Created by PhpStorm.
- * User: Administrator
- * Date: 2019/5/20
- * Time: 11:22
- */
- use app\lib\GlobConfigs;
- use app\lib\Wlog;
- use WebSocket\Client as Client;
- class SubServer
- {
- public $serv;
- //线程连接实例
- private $workRedis = null;
- private $configs = null;
- public function __construct()
- {
- $this->configs = GlobConfigs::getKey('swoole_subserv');
- $this->serv = new \swoole_websocket_server($this->configs['host'], $this->configs['port']);
- $this->serv->set($this->configs['sets']);
- $this->serv->on('Start', array($this, 'onStart'));
- $this->serv->on('WorkerStart', array($this, 'onWorkerStart'));
- $this->serv->on('message', array($this, 'onMessage'));
- $this->serv->on('close', array($this, 'onClose'));
- $this->serv->on('open', array($this, 'onOpen'));
- return $this->serv->start();
- }
- public function onStart($serv)
- {
- echo "Sub on Start " . "\n";
- }
- public function onWorkerStart($serv, $worker_id)
- {
- if ($serv->taskworker) {
- return;
- }
- $this->PingRedis($serv, $worker_id);
- $serv->tick(8000, function () use ($serv, $worker_id) {
- $this->PingRedis($serv, $worker_id);
- });
- if ($worker_id == 0) {
- $serv->tick(500, function () use ($serv, $worker_id) {
- $msg_index_aray = $this->getBoardMsg();
- if (!empty($msg_index_aray)) {
- $this->SendMsgComm('msg_board', $msg_index_aray);
- }
- });
- }
- if ($worker_id == 1) {
- $serv->tick(500, function () use ($serv, $worker_id) {
- $msg_user_array = $this->getSingMsg();
- if (!empty($msg_user_array)) {
- $this->SendMsgComm('msg_to_one_user', $msg_user_array);
- }
- });
- }
- if ($worker_id == 2) {
- $serv->tick(5000, function () use ($serv, $worker_id) {
- $msg_tick_array = $this->getDoTick();
- if (!empty($msg_tick_array)) {
- $this->SendMsgComm('do_tick', $msg_tick_array);
- }
- });
- }
- }
- function onWorkerStop(swoole_server $server, int $worker_id)
- {
- echo "onWorkerStop.. " . $worker_id . ' ' . "\n";
- if ($this->workRedis) {
- $this->workRedis->close();
- }
- }
- public function onMessage($serv, $frame)
- {
- }
- public function onOpen($serv, $request)
- {
- }
- public function onClose($serv, $fd, $from_id)
- {
- }
- //redis Ping检测
- private function PingRedis($serv, $worker_id)
- {
- if (empty($this->workRedis)) {
- $this->ConectToRedis($serv, $worker_id);
- return true;
- }
- $redis = $this->workRedis;
- if (!$redis) {
- $ping_ret = false;
- } else {
- $ping_ret_s = $redis->ping();
- $ping_ret = "+pong" === strtolower($ping_ret_s) ? true : false;
- }
- if (!$ping_ret) {
- return $this->ConectToRedis($serv, $worker_id);
- }
- return true;
- }
- //工作线程 同步阻塞 redis客户端
- private function ConectToRedis($serv, $worker_id)
- {
- $conf = GlobConfigs::getKey('redis');
- $redis = new \Redis();
- try {
- $ret = $redis->connect($conf['host'], $conf['port'], $conf['overtime']);
- } catch (\Exception $e) {
- $ret = false;
- }
- if ($ret) {
- Wlog::getInstance()->WriteLog("success:成功建立redis连接[In Subserver] " . date("Y-m-d H:i:s") . ' ' . $worker_id);
- $this->workRedis = $redis;
- return $redis;
- } else {
- Wlog::getInstance()->WriteLog("error:建立redis连接[In Subserver] " . date("Y-m-d H:i:s") . ' ' . $worker_id);
- $this->workRedis = null;
- }
- return false;
- }
- ////////////////////////
- //去redis里找 msg_index 是否有数据,有取出来,
- private function getBoardMsg()
- {
- $redis = $this->workRedis;
- $len = $redis->LLEN(MSG_BOARD);
- $i = 0;
- if ($len <= 0) {
- return;
- }
- $datas = [];
- while ($now = $redis->rpop(MSG_BOARD)) {
- if (empty($now) || $i >= 100) {
- $i = 0;
- break;
- }
- $now = json_decode($now, true);
- if (!isset($now['stype']) || empty($now['stype'])) {
- Wlog::getInstance()->WriteLog(['数据格式误,没有stype字段[subserver]', $now]);
- continue;
- }
- if (!isset($now['data'])) {
- Wlog::getInstance()->WriteLog(['数据格式误,没有data字段[subserver]', $now]);
- continue;
- }
- $datas[] = $now;
- }
- $i++;
- return $datas;
- }
- private function getSingMsg()
- {
- $redis = $this->workRedis;
- $len = $redis->LLEN(MSG_TO_ONE_USER);
- $i = 0;
- if ($len <= 0) {
- return;
- }
- $datas = [];
- while ($now = $redis->rpop(MSG_TO_ONE_USER)) {
- if (empty($now) || $i >= 500) {
- $i = 0;
- break;
- }
- $now = json_decode($now, true);
- if (!isset($now['mtype']) || empty($now['mtype'])) {
- Wlog::getInstance()->WriteLog(['数据格式误,没有mtype字段[subserver]', $now]);
- continue;
- }
- if (!isset($now['stype']) || empty($now['stype'])) {
- Wlog::getInstance()->WriteLog(['数据格式误,没有stype字段[subserver]', $now]);
- continue;
- }
- if (!isset($now['data'])) {
- Wlog::getInstance()->WriteLog(['数据格式误,没有data字段[subserver]', $now]);
- continue;
- }
- if (!isset($now['to']) || empty($now['to'])) {
- Wlog::getInstance()->WriteLog(['数据格式误,没有to字段或为空[subserver]', $now]);
- continue;
- }
- if (!isset($now['from'])) {
- Wlog::getInstance()->WriteLog(['数据格式误,没有from字段[subserver]', $now]);
- continue;
- }
- $now['confirm'] = (isset($now['confirm']) && $now['confirm'] == 1) ? 1 : 0;
- $datas[] = $now;
- $i++;
- }
- return $datas;
- }
- private function getDoTick()
- {
- $redis = $this->workRedis;
- $len = $redis->LLEN(DO_TICK_USER);
- $i = 0;
- if ($len <= 0) {
- return;
- }
- $datas = [];
- while ($now = $redis->rpop(DO_TICK_USER)) {
- if (empty($now) || $i >= 100) {
- $i = 0;
- break;
- }
- $allArray = json_decode($now, true);
- if (!is_array($allArray)) {
- Wlog::getInstance()->WriteLog(['数据格式误,不是全法数组[subserver-getDoTick]', $now]);
- }
- foreach ($allArray as $one) {
- if (!isset($one['uid']) || empty($one['uid'])) {
- Wlog::getInstance()->WriteLog(['数据格式误,没有uid字段[subserver-getDoTick]', $now]);
- continue;
- }
- if (!isset($one['msg'])) {
- Wlog::getInstance()->WriteLog(['数据格式误,没有msg字段[subserver-getDoTick]', $now]);
- continue;
- }
- $datas[] = $one;
- }
- $i++;
- }
- return $datas;
- }
- private function SendMsgComm($act, $msgArray)
- {
- $conf = GlobConfigs::getKey('admin_conf');
- $token = $conf['admin_token'];
- $url = $conf['url'];
- $client = new Client("$url?token=" . $token);
- $arr = [
- 'cmd' => 'subserv',
- 'act' => $act,
- 'data' => $msgArray,
- 'time' => time(),
- 'token' => '',
- ];
- $str = json_encode($arr, 256);
- $client->send($str);
- $client->receive();
- $client->close();
- unset($client);
- }
- }
|