httpserver = new \swoole\http\server($config['host'], $config['port']); $this->httpserver->set($config['sets']); $this->config = $config; $this->httpserver->account = new \swoole\Atomic(); $this->httpserver->taskWorkingNum = new \swoole\Atomic(); $this->httpserver->on('request', array($this, 'OnRequest')); $this->httpserver->on('WorkerStart', array($this, 'onWorkerStart')); $this->httpserver->on('task', array($this, 'onTask')); $this->httpserver->on('finish', array($this, 'onFinish')); } public function onWorkerStart($serv, $worker_id) { /* $name = $serv->taskworker ? 'Task_' : 'Worker_'; $name = 'Data_' . $name . ($serv->worker_id < 10 ? '0' . $serv->worker_id : $serv->worker_id); swoole_set_process_name($name); */ $GLOBALS['model'] = ''; $GLOBALS['modeltime'] = 0; $this->InitDb(); if ($serv->worker_id == 0) { \Swoole\Timer::tick(60000, function () { $this->logRunStatus(); }); } if ($serv->worker_id == 1) { \Swoole\Timer::tick(3000, function () { if ($this->httpserver->taskWorkingNum->get() > 1) { $this->httpserver->taskWorkingNum->sub(); } }); } } private function logRunStatus() { echo date('Y-m-d H:i:s') . " 总请求数:" . $this->httpserver->account->get() . ' 运行任务数:' . $this->httpserver->taskWorkingNum->get(); echo ' work_id:' . $this->httpserver->worker_id . " 内存使用量:" . (memory_get_usage() / 1000) . 'k 峰值:' . (memory_get_peak_usage() / 1000) . "k\n"; } public function OnRequest($request, $response) { $response->header('Content-Type', 'text/html; charset=utf-8'); $response->header('Server', 'DataInfaceServer'); $now = explode(" ", microtime()); $paras = array_merge(['request_time' => date("H:i:s", $now[1]) . substr($now[0], 1, 5)], !empty($request->get) ? $request->get : [], !empty($request->post) ? $request->post : []); $this->httpserver->account->add(); $request_uri = substr($request->server['request_uri'], 1); $urls = ['setLeague', 'setMatch', 'setMatchResult', 'setOdds', 'setOddsCH', 'setBroadCast', 'upMatch', 'setResultExpress', 'setMatchWarn', 'HandleOrder']; if (!in_array($request_uri, $urls)) { $data = Response::generate('', 0, '', '无效的url'); $response->end($data); return; } $dataObj = json_decode($paras['data'], true); if (empty($dataObj)) { $data = Response::generate('', 5, '', '无效的data参数'); $response->end($data); return; } if ($this->httpserver->taskWorkingNum->get() > intval($this->config['sets']['task_worker_num'])) { $str = $this->httpserver->taskWorkingNum->get() . '/' . $this->config['sets']['task_worker_num']; $data = Response::generate('', 9, '', '任务线程已满,请稍等...' . $str); $response->end($data); return; } $check_token = true; if ($check_token) { $token = isset($paras['token']) ? $paras['token'] : ''; if (empty($token) || empty($this->Tokencheck($token))) { $data = Response::generate('', 6, '', '无效的token'); $response->end($data); return; } } //$this->onTask($this->httpserver, ['url' => $request_uri, 'paras' => $paras, 'respone' => $response]); $this->httpserver->task(['url' => $request_uri, 'paras' => $paras]); $data = Response::generate('', 1, ''); $response->end($data); return; } public function onTask($serv, $task) { $this->httpserver->taskWorkingNum->add(); var_dump($task); echo "收到任务:" . ($task->data)['url'] . ' -- ' . ($task->data)['request_time'] . "\n"; try { $url = ($task->data)['url']; $data = ($task->data)['paras']; switch ($url) { case 'setLeague': $ret = DataLogic::getInstance()->setLeague($data); break; case 'setMatch': $ret = DataLogic::getInstance()->setMatch($data); break; case 'setMatchResult': $ret = DataLogic::getInstance()->setMatchResult($data); break; case 'setOdds': $ret = DataLogic::getInstance()->setOdds($data); break; case 'setOddsCH': $ret = DataLogic::getInstance()->setOddsCH($data); break; case 'setBroadCast': $ret = DataLogic::getInstance()->setBroadCast($data); break; case 'upMatch': $ret = DataLogic::getInstance()->upMatch($data); break; case 'setResultExpress': $ret = DataLogic::getInstance()->setResultExpress($data); case 'setMatchWarn': $ret = DataLogic::getInstance()->setMatchWarn($data); case 'HandleOrder': $ret = DataLogic::getInstance()->HandleOrder($data); default: $ret = 'false'; break; } } catch (\Exception $e) { echo date('Y-m-d H:i:s') . ' ' . $e->getMessage() . ' - ' . print_r($task, true) . "\n"; } if ($this->httpserver->taskWorkingNum->get() > 1) { $this->httpserver->taskWorkingNum->sub(); } //$this->onFinish($serv, 0, $ret); } public function onFinish($serv, int $task_id, $data) { // print_r(['task_id' => $task_id, 'ret' => $data]); } private function Tokencheck($token) { $tokenvel = DB::table('system_user')->where(['token' => $token])->first(); return $tokenvel; } private function InitDb() { $over_time = 60 * 5; $now = microtime(true); $debugmsg = 'manager_pid=' . $this->httpserver->manager_pid . ' master_pid=' . $this->httpserver->master_pid; $debugmsg .= ' worker_id=' . $this->httpserver->worker_id . ' worker_pid=' . $this->httpserver->worker_pid . ' istask=' . intval($this->httpserver->taskworker); if (!$GLOBALS['modeltime']) { $GLOBALS['modeltime'] = $now; $GLOBALS['model'] = $this->httpserver->worker_id; ModelBase::init(); //echo "第一次建立数据库连接- $debugmsg .....\n"; //echo $GLOBALS['modeltime'] . '--' . $GLOBALS['model'] . "\n"; return; } if (($now - $GLOBALS['modeltime']) > $over_time) { $GLOBALS['modeltime'] = $now; $GLOBALS['model'] = $this->httpserver->worker_id; ModelBase::close(); ModelBase::init(); //echo "超时重连数据库-$debugmsg.....\n"; //echo $GLOBALS['modeltime'] . '--' . $GLOBALS['model'] . "\n"; return; } //echo "用的旧的连接-$debugmsg...." . $GLOBALS['modeltime'] . '--' . $GLOBALS['model'] . ".\n"; } public function start() { $this->httpserver->start(); } }