20, //worker进程数量 * 'task_worker_num' => 10, //task进程数量 即为维持的MySQL连接的数量 * 'daemonize'=> 1, //设置守护进程 * 'max_request' => 10000, //最大请求数,超过了进程重启 * 'dispatch_mode' => 2,/ */ class DB_pool { //db params protected $dbconfig = []; protected $serv; public function __construct($config = []) { $host = $config['poole_host'] ? $config['poole_host'] : "127.0.0.1"; // server监听的端口 $port = $config['poole_port'] ? $config['poole_port'] : 9091; // server监听的端口 $this->serv = new \swoole\server($host, $port); $this->serv->set($config['sets']); $this->dbconfig = $config['dbconfig']; } public function run() { $this->serv->on('Receive', array($this, 'onReceive')); // Task 回调的2个必须函数 $this->serv->on('Task', array($this, 'onTask')); $this->serv->on('Connect', array($this, 'onConnect')); $this->serv->on('Finish', array($this, 'onFinish')); //$this->serv->on('Error', array($this, 'onError')); $this->serv->start(); } public function onConnect(\swoole\server $server, int $fd, int $reactorId) { //$clientinfo = $server->getClientInfo($fd) ; //$cinfo = $clientinfo['remote_ip'].':'. $clientinfo['remote_port'] ; //echo "有客户接入:reactorId:$reactorId --> fd:$fd --> $cinfo \n"; } public function onReceive($serv, $fd, $from_id, $data) { $resultStr = $this->serv->taskwait($data); if ($resultStr !== false) { $this->serv->send($fd, $resultStr); return; $result = json_decode($resultStr, true); if (isset($result['status']) && $result['status'] == 'success') { $this->serv->send($fd, $resultStr); } else { $this->serv->send($fd, $resultStr); } return; } else { $this->serv->send($fd, json_encode(['status' => 'false', 'data' => "Error. Task timeout"])); } } public function onTask($serv, $task_id, $from_id, $sql) { static $link = null; HELL: if ($link == null) { try { $conStr = $this->dbconfig['driver'] . ":host=" . $this->dbconfig['host'] . ";dbname=" . $this->dbconfig['database'] . ";port=" . $this->dbconfig['port']; $link = new \PDO($conStr, $this->dbconfig['username'], $this->dbconfig['password']); $link->exec("set names 'utf8'"); echo "新建连接成功:Taskid=$task_id Fromid=$from_id \n"; } catch (\PDOException $e) { $link = null; $this->serv->finish(json_encode(['status' => 'false', 'data' => $e->getMessage()], 256)); echo "新建连接失败:Taskid=$task_id Fromid=$from_id " . $e->getMessage() . "\n"; return; } } $sql = trim($sql); if (preg_match("/^select/i", $sql)) { $result = $link->query($sql); } else { $result = $link->exec($sql); } $data = ['status' => false]; if (!$result) { //如果查询失败了 if (in_array($link->errorCode(), [2013, 2006])) {//错误码为2013,或者2006,则重连数据库,重新执行sql $link = null; goto HELL; } else { $data['status'] = 'false'; $data['data'] = $link->errorInfo(); $this->serv->finish(json_encode($data, 256)); return; } } if (preg_match("/^select/i", $sql)) { //如果是select操作,就返回关联数组 $dataRet = $result->fetchAll(\PDO::FETCH_CLASS); $data['data'] = $dataRet; } else {//否则直接返回结果 $data['data'] = $result; } $data['status'] = "success"; $this->serv->finish(json_encode($data, 256)); } public function onFinish($serv, $task_id, $data) { echo "任务完成";//taskwait 不触发这个函数。。 return $data; } } /**服务启动 * $serv=new DB_pool(); * $serv->run(); */ /**客户端 * $client = new \swoole_client(SWOOLE_SOCK_TCP); * $num=rand(111111,999999); * $rts=$client->connect('127.0.0.1', 9508, 10) or die("连接失败");//链接mysql客户端 * $sql =("select * from zdk_test"); * $client->send($sql); * $resdata = $client->recv(); * * $resda=json_decode($resdata,true); * $client->close(); * return json_encode($resda); */