| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199 |
- <?php
- namespace app\logic;
- /**
- * Created by PhpStorm.
- * User: Administrator
- * Date: 2019/5/20
- * Time: 11:22
- */
- use app\lib\GlobConfigs;
- use app\lib\Wlog;
- use WebSocket\Client as Client;
- class SubServer
- {
- public $serv;
- //线程连接实例
- private $workRedis = null;
- private $configs = null;
- public function __construct()
- {
- $this->configs = GlobConfigs::getKey('swoolev_subserv');
- $this->serv = new \swoole_websocket_server($this->configs['host'], $this->configs['port']);
- unset($this->configs['host']);
- unset($this->configs['port']);
- $this->serv->set($this->configs);
- $this->serv->on('Start', array($this, 'onStart'));
- $this->serv->on('WorkerStart', array($this, 'onWorkerStart'));
- $this->serv->on('message', array($this, 'onMessage'));
- $this->serv->on('close', array($this, 'onClose'));
- $this->serv->on('open', array($this, 'onOpen'));
- $this->serv->on('task', array($this, 'onTask'));
- $this->serv->on('Finish', array($this, 'onFinish'));
- return $this->serv->start();
- }
- public function onStart($serv)
- {
- echo "Sub on Start " . "\n";
- }
- public function onWorkerStart($serv, $worker_id)
- {
- if ($serv->taskworker) {
- return;
- }
- $this->PingRedis($serv, $worker_id);
- $serv->tick(8000, function () use ($serv, $worker_id) {
- $this->PingRedis($serv, $worker_id);
- });
- if ($worker_id == 0) {
- $serv->tick(1000, function () use ($serv, $worker_id) {
- $msg_index_aray = $this->getIndexMsg();
- if (!empty($msg_index_aray)) {
- $this->sendIndexMsg($msg_index_aray);
- }
- });
- }
- }
- function onWorkerStop(swoole_server $server, int $worker_id)
- {
- echo "onWorkerStop.. " . $worker_id . ' ' . "\n";
- if ($this->workRedis) {
- $this->workRedis->close();
- }
- }
- public function onMessage($serv, $frame)
- {
- }
- public function onOpen($serv, $request)
- {
- }
- public function onTask($serv, \Swoole\Server\Task $task)
- {
- }
- public function onFinish($serv, $task_id, $data)
- {
- }
- public function onClose($serv, $fd, $from_id)
- {
- }
- //redis Ping检测
- private function PingRedis($serv, $worker_id)
- {
- if (empty($this->workRedis)) {
- $this->ConectToRedis($serv, $worker_id);
- return true;
- }
- $redis = $this->workRedis;
- if (!$redis) {
- $ping_ret = false;
- } else {
- $ping_ret_s = $redis->ping();
- $ping_ret = "+pong" === strtolower($ping_ret_s) ? true : false;
- }
- if (!$ping_ret) {
- return $this->ConectToRedis($serv, $worker_id);
- }
- return true;
- }
- //工作线程 同步阻塞 redis客户端
- private function ConectToRedis($serv, $worker_id)
- {
- $conf = GlobConfigs::getKey('redis');
- $redis = new \Redis();
- try {
- $ret = $redis->connect($conf['host'], $conf['port'], $conf['overtime']);
- } catch (\Exception $e) {
- $ret = false;
- }
- if ($ret) {
- Wlog::getInstance()->WriteLog("success:成功建立redis连接[In Subserver] " . date("Y-m-d H:i:s") . ' ' . $worker_id);
- $this->workRedis = $redis;
- return $redis;
- } else {
- Wlog::getInstance()->WriteLog("error:建立redis连接[In Subserver] " . date("Y-m-d H:i:s") . ' ' . $worker_id);
- $this->workRedis = null;
- }
- return false;
- }
- ////////////////////////
- //去redis里找 msg_index 是否有数据,有取出来,
- private function getIndexMsg()
- {
- $redis = $this->workRedis;
- $len = $redis->LLEN(MSG_INDEX);
- if ($len <=0 ) {
- return;
- }
- $datas = [];
- while ($now = $redis->rpop(MSG_INDEX)) {
- if (empty($now)) {
- break;
- }
- $datas[] = $now;
- }
- return $datas;
- }
- ///如果有msg_index数据,就通知服务端下发
- private function sendIndexMsg($msg_index_aray)
- {
- $conf = GlobConfigs::getKey('admin_conf');
- $token = $conf['admin_token'];
- $url = $conf['url'];
- $client = new Client("$url?token=" . $token);
- $arr = [
- 'cmd' => 'subserv',
- 'act' => 'index_msg',
- 'data' => $msg_index_aray,
- 'time' => time(),
- 'token' => '',
- ];
- $str = json_encode($arr, 256);
- $r = $client->send($str);
- $reciv = $client->receive();
- $client->close();
- unset($client);
- }
- /////////////////////////
- }
|