httpserver = new \swoole\http\server($config['host'], $config['port']); $this->httpserver->set($config['sets']); $this->config = $config; $this->redisonfig = GlobConfigs::getKey('redis'); $this->httpserver->account = new \swoole\Atomic(); $this->httpserver->taskWorkingNum = new \swoole\Atomic(); $this->httpserver->on('request', array($this, 'OnRequest')); $this->httpserver->on('WorkerStart', array($this, 'onWorkerStart')); $this->httpserver->on('task', array($this, 'onTask')); $this->httpserver->on('finish', array($this, 'onFinish')); } public function onWorkerStart($serv, $worker_id) { if (!$serv->taskworker) { ModelBase::init(); Swoole\Timer::tick(500, function ($id) { $this->BatchSqlRedisToDB(); }); } if ($serv->worker_id == 0) { Swoole\Timer::tick(60000, function () { $this->logRunStatus(); }); } } private function logRunStatus() { echo date('Y-m-d H:i:s') . " 总请求数:" . $this->httpserver->account->get() . ' 运行任务数:' . $this->httpserver->taskWorkingNum->get(); echo ' work_id:' . $this->httpserver->worker_id . " 内存使用量:" . (memory_get_usage() / 1000) . 'k 峰值:' . (memory_get_peak_usage() / 1000) . "k\n"; } //sql批量写入数据库 private function BatchSqlRedisToDB() { $redisconfig = $this->redisonfig; //go(function () use ($redisconfig) { $begint = microtime(true); //$redis = new Swoole\Coroutine\Redis(); $redis = new \Redis(); $ret = $redis->connect($redisconfig['host'], $redisconfig['port']); if (!$ret) { echo "redis 连接失败!"; return; } if (!empty($redisconfig['passwd'])) { $ret = $redis->auth($redisconfig['passwd']); if (!$ret) { echo "redis auth 失败!"; return; } } $redis->select($redisconfig['db']); if ($redis->llen(self::SQLKEY) <= 0) { return; } $this->httpserver->account->add(); $batchsql = []; $max = 1000; for ($i = 0; $i < $max; $i++) { $now = $redis->rpop(self::SQLKEY); if (empty($now)) { break; } else { $batchsql[] = str_replace(";", ":", trim(trim($now), ";")); } } if (!empty($batchsql)) { $pdo = DB::getPdo(); $sqlstr = implode(";", $batchsql); $erowcount = 0; try { $erowcount = $pdo->exec($sqlstr); } catch (\Exception $e) { echo "发生异常:" . $e->getCode() . ' ---- ' . print_r($e->getMessage(), true) . "\n"; } if (!$erowcount) { echo "\n发生错误:" . $pdo->errorCode() . ' ---- ' . print_r($pdo->errorInfo(), true) . "\n"; echo "错误sql: " . $sqlstr . "\n\n"; } else { echo "成功运行:" . count($batchsql) . " 条!\n"; } } $redis->close(); echo "总请求数" . $this->httpserver->account->get() . " 单个线程消耗时间: " . (microtime(true) - $begint) . " s \n"; return; // }); } public function OnRequest($request, $response) { $data = Response::generate('', 1, '', '任务正常运行.'); $response->end($data); return; } public function onTask($serv, $task) { } public function onFinish($serv, int $task_id, $data) { } public function start() { $this->httpserver->start(); } }