| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177 |
- <?php
- /**
- * Created by PhpStorm.
- * User: Administrator
- * Date: 2019/5/27
- * Time: 8:56
- */
- namespace app\lib;
- /**
- * swoole 数据库连接池 BY 凌晨
- * 'worker_num' => 20, //worker进程数量
- * 'task_worker_num' => 10, //task进程数量 即为维持的MySQL连接的数量
- * 'daemonize'=> 1, //设置守护进程
- * 'max_request' => 10000, //最大请求数,超过了进程重启
- * 'dispatch_mode' => 2,/
- */
- class DB_pool
- { //swoole set params
- protected $task_worker_num;
- protected $work_num;
- protected $max_request;
- protected $dispatch_mode;
- protected $daemonize;
- protected $server_port;
- protected $log_file;
- //db params
- protected $db_type;
- protected $db_host;
- protected $db_user;
- protected $db_pwd;
- protected $db_name;
- protected $db_port;
- public function __construct($config = [])
- {
- $this->host = $config['poole_host'] ? $config['poole_host'] : "127.0.0.1"; // server监听的端口
- $this->server_port = $config['poole_port'] ? $config['poole_port'] : 9091; // server监听的端口
- $this->worker_num = 5;
- $this->task_worker_num = 5;
- $this->dispatch_mode = 2;
- $this->daemonize = 1;
- $this->max_request = 10000;
- $filename = date("Y-m-d", time());
- $this->log_file = "../logs/mysqlpoole_" . $filename . '.log';
- $this->serv = new \swoole\server($this->host, $this->server_port);
- $this->serv->set(array(
- 'worker_num' => $this->worker_num,
- 'task_worker_num' => $this->task_worker_num,
- 'max_request' => $this->max_request,
- 'daemonize' => $this->daemonize,
- 'log_file' => $this->log_file,
- 'dispatch_mode' => $this->dispatch_mode,
- 'package_max_length'=> 4 * 1024 *1024,
- 'buffer_output_size'=> 4 * 1024 *1024,
- 'socket_buffer_size' => 8 * 1024 *1024
- ));
- $this->db_type = $config['db_type'] ? $config['db_type'] : 'mysql';
- $this->db_host = $config['db_host'] ? $config['db_host'] : '127.0.0.1';
- $this->db_port = $config['db_port'] ? $config['db_port'] : 3306;
- $this->db_name = $config['db_name'] ? $config['db_name'] : 'test';
- $this->db_user = $config['db_user'] ? $config['db_user'] : 'test';
- $this->db_pwd = $config['db_pwd'] ? $config['db_pwd'] : 'test';
- }
- public function run()
- {
- $this->serv->on('Receive', array($this, 'onReceive'));
- // Task 回调的2个必须函数
- $this->serv->on('Task', array($this, 'onTask'));
- $this->serv->on('Connect', array($this, 'onConnect'));
- $this->serv->on('Finish', array($this, 'onFinish'));
- //$this->serv->on('Error', array($this, 'onError'));
- $this->serv->start();
- }
- public function onConnect(\swoole\server $server, int $fd, int $reactorId)
- {
- //$clientinfo = $server->getClientInfo($fd) ;
- //$cinfo = $clientinfo['remote_ip'].':'. $clientinfo['remote_port'] ;
- //echo "有客户接入:reactorId:$reactorId --> fd:$fd --> $cinfo \n";
- }
- public function onReceive($serv, $fd, $from_id, $data)
- {
- $resultStr = $this->serv->taskwait($data);
- if ($resultStr !== false) {
- $this->serv->send($fd, $resultStr );
- return ;
- $result = json_decode($resultStr, true);
- if (isset($result['status']) && $result['status'] == 'success') {
- $this->serv->send($fd, $resultStr );
- } else {
- $this->serv->send($fd, $resultStr);
- }
- return;
- } else {
- $this->serv->send($fd, json_encode(['status' => 'false', 'data' => "Error. Task timeout"]));
- }
- }
- public function onTask($serv, $task_id, $from_id, $sql)
- {
- static $link = null;
- HELL:
- if ($link == null) {
- try {
- $conStr = $this->db_type . ":host=$this->db_host;dbname=$this->db_name;port=$this->db_port";
- $link = new \PDO($conStr, $this->db_user, $this->db_pwd);
- $link->exec("set names 'utf8'");
- echo "新建连接成功:Taskid=$task_id Fromid=$from_id \n";
- } catch (\PDOException $e) {
- $link = null;
- $this->serv->finish(json_encode(['status' => 'false', 'data' => $e->getMessage()], 256));
- echo "新建连接失败:Taskid=$task_id Fromid=$from_id " . $e->getMessage() . "\n";
- return;
- }
- }
- $sql = trim($sql);
- if (preg_match("/^select/i", $sql)){
- $result = $link->query($sql);
- }else{
- $result = $link->exec($sql);
- }
- $data = ['status'=>false] ;
- if (!$result) { //如果查询失败了
- if (in_array($link->errorCode(), [2013, 2006])) {//错误码为2013,或者2006,则重连数据库,重新执行sql
- $link = null;
- goto HELL;
- } else {
- $data['status'] = 'false';
- $data['data'] = $link->errorInfo();
- $this->serv->finish(json_encode($data, 256));
- return;
- }
- }
- if (preg_match("/^select/i", $sql)) { //如果是select操作,就返回关联数组
- $dataRet = $result->fetchAll(\PDO::FETCH_CLASS);
- $data['data'] = $dataRet ;
- } else {//否则直接返回结果
- $data['data'] = $result;
- }
- $data['status'] = "success";
- $this->serv->finish(json_encode($data, 256));
- }
- public function onFinish($serv, $task_id, $data)
- {
- echo "任务完成";//taskwait 不触发这个函数。。
- return $data;
- }
- }
- /**服务启动
- * $serv=new DB_pool();
- * $serv->run();
- */
- /**客户端
- * $client = new \swoole_client(SWOOLE_SOCK_TCP);
- * $num=rand(111111,999999);
- * $rts=$client->connect('127.0.0.1', 9508, 10) or die("连接失败");//链接mysql客户端
- * $sql =("select * from zdk_test");
- * $client->send($sql);
- * $resdata = $client->recv();
- *
- * $resda=json_decode($resdata,true);
- * $client->close();
- * return json_encode($resda);
- */
|