20, //worker进程数量 * 'task_worker_num' => 10, //task进程数量 即为维持的MySQL连接的数量 * 'daemonize'=> 1, //设置守护进程 * 'max_request' => 10000, //最大请求数,超过了进程重启 * 'dispatch_mode' => 2,/ */ class DB_pool { //swoole set params protected $task_worker_num; protected $work_num; protected $max_request; protected $dispatch_mode; protected $daemonize; protected $server_port; protected $log_file; //db params protected $db_type; protected $db_host; protected $db_user; protected $db_pwd; protected $db_name; protected $db_port; public function __construct($config = []) { $this->host = $config['poole_host'] ? $config['poole_host'] : "127.0.0.1"; // server监听的端口 $this->server_port = $config['poole_port'] ? $config['poole_port'] : 9091; // server监听的端口 $this->worker_num = 5; $this->task_worker_num = 5; $this->dispatch_mode = 2; $this->daemonize = true ; $this->max_request = 10000; $filename = date("Ymd", time()); $this->log_file = "../logs/DB_pool_err_" . $filename . '.log'; $this->serv = new \swoole\server($this->host, $this->server_port); $this->serv->set(array( 'worker_num' => $this->worker_num, 'task_worker_num' => $this->task_worker_num, 'max_request' => $this->max_request, 'daemonize' => $this->daemonize, 'log_file' => $this->log_file, 'dispatch_mode' => $this->dispatch_mode, )); $this->db_type = $config['db_type'] ? $config['db_type'] : 'mysql'; $this->db_host = $config['db_host'] ? $config['db_host'] : '127.0.0.1'; $this->db_port = $config['db_port'] ? $config['db_port'] : 3306; $this->db_name = $config['db_name'] ? $config['db_name'] : 'test'; $this->db_user = $config['db_user'] ? $config['db_user'] : 'test'; $this->db_pwd = $config['db_pwd'] ? $config['db_pwd'] : 'test'; } public function run() { $this->serv->on('Receive', array($this, 'onReceive')); // Task 回调的2个必须函数 $this->serv->on('Task', array($this, 'onTask')); $this->serv->on('Connect', array($this, 'onConnect')); $this->serv->on('Finish', array($this, 'onFinish')); //$this->serv->on('Error', array($this, 'onError')); $this->serv->start(); } public function onConnect(\swoole\server $server, int $fd, int $reactorId) { //$clientinfo = $server->getClientInfo($fd) ; //$cinfo = $clientinfo['remote_ip'].':'. $clientinfo['remote_port'] ; //echo "有客户接入:reactorId:$reactorId --> fd:$fd --> $cinfo \n"; } public function onReceive($serv, $fd, $from_id, $data) { $resultStr = $this->serv->taskwait($data); if ($resultStr !== false) { $this->serv->send($fd, $resultStr ); return ; $result = json_decode($resultStr, true); if (isset($result['status']) && $result['status'] == 'success') { $this->serv->send($fd, $resultStr ); } else { $this->serv->send($fd, $resultStr); } return; } else { $this->serv->send($fd, json_encode(['status' => 'false', 'data' => "Error. Task timeout"])); } } public function onTask($serv, $task_id, $from_id, $sql) { static $link = null; HELL: if ($link == null) { try { $conStr = $this->db_type . ":host=$this->db_host;dbname=$this->db_name;port=$this->db_port"; $link = new \PDO($conStr, $this->db_user, $this->db_pwd); $link->exec("set names 'utf8'"); echo "新建连接成功:Taskid=$task_id Fromid=$from_id \n"; } catch (\PDOException $e) { $link = null; $this->serv->finish(json_encode(['status' => 'false', 'data' => $e->getMessage()], 256)); echo "新建连接失败:Taskid=$task_id Fromid=$from_id " . $e->getMessage() . "\n"; return; } } $sql = trim($sql); if (preg_match("/^select/i", $sql)){ $result = $link->query($sql); }else{ $result = $link->exec($sql); } $data = ['status'=>false] ; if (!$result) { //如果查询失败了 if (in_array($link->errorCode(), [2013, 2006])) {//错误码为2013,或者2006,则重连数据库,重新执行sql $link = null; goto HELL; } $errinfoArr = $link->errorInfo(); if (!($errinfoArr['0']=='00000' && $errinfoArr['1'] =='' && $errinfoArr['2']=='')){ $data['status'] = 'false'; $data['data'] = $link->errorInfo(); $this->serv->finish(json_encode($data, 256)); return; } } if (preg_match("/^select/i", $sql)) { //如果是select操作,就返回关联数组 $dataRet = $result->fetchAll(\PDO::FETCH_CLASS); $data['data'] = $dataRet ; } else {//否则直接返回结果 $data['data'] = $result; } $data['status'] = "success"; $this->serv->finish(json_encode($data, 256)); } public function onFinish($serv, $task_id, $data) { echo "任务完成";//taskwait 不触发这个函数。。 return $data; } } /**服务启动 * $serv=new DB_pool(); * $serv->run(); */ /**客户端 * $client = new \swoole_client(SWOOLE_SOCK_TCP); * $num=rand(111111,999999); * $rts=$client->connect('127.0.0.1', 9508, 10) or die("连接失败");//链接mysql客户端 * $sql =("select * from zdk_test"); * $client->send($sql); * $resdata = $client->recv(); * * $resda=json_decode($resdata,true); * $client->close(); * return json_encode($resda); */