| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148 |
- <?php
- /**
- * Created by PhpStorm.
- * User: Administrator
- * Date: 2019/5/27
- * Time: 8:56
- */
- namespace datainf\lib;
- /**
- * swoole 数据库连接池 BY 凌晨
- * 'worker_num' => 20, //worker进程数量
- * 'task_worker_num' => 10, //task进程数量 即为维持的MySQL连接的数量
- * 'daemonize'=> 1, //设置守护进程
- * 'max_request' => 10000, //最大请求数,超过了进程重启
- * 'dispatch_mode' => 2,/
- */
- class DB_pool
- {
- //db params
- protected $dbconfig = [];
- protected $serv;
- public function __construct($config = [])
- {
- $host = $config['poole_host'] ? $config['poole_host'] : "127.0.0.1"; // server监听的端口
- $port = $config['poole_port'] ? $config['poole_port'] : 9091; // server监听的端口
- $this->serv = new \swoole\server($host, $port);
- $this->serv->set($config['sets']);
- $this->dbconfig = $config['dbconfig'];
- }
- public function run()
- {
- $this->serv->on('Receive', array($this, 'onReceive'));
- // Task 回调的2个必须函数
- $this->serv->on('Task', array($this, 'onTask'));
- $this->serv->on('Connect', array($this, 'onConnect'));
- $this->serv->on('Finish', array($this, 'onFinish'));
- //$this->serv->on('Error', array($this, 'onError'));
- $this->serv->start();
- }
- public function onConnect(\swoole\server $server, int $fd, int $reactorId)
- {
- //$clientinfo = $server->getClientInfo($fd) ;
- //$cinfo = $clientinfo['remote_ip'].':'. $clientinfo['remote_port'] ;
- //echo "有客户接入:reactorId:$reactorId --> fd:$fd --> $cinfo \n";
- }
- public function onReceive($serv, $fd, $from_id, $data)
- {
- $resultStr = $this->serv->taskwait($data);
- if ($resultStr !== false) {
- $this->serv->send($fd, $resultStr);
- return;
- $result = json_decode($resultStr, true);
- if (isset($result['status']) && $result['status'] == 'success') {
- $this->serv->send($fd, $resultStr);
- } else {
- $this->serv->send($fd, $resultStr);
- }
- return;
- } else {
- $this->serv->send($fd, json_encode(['status' => 'false', 'data' => "Error. Task timeout"]));
- }
- }
- public function onTask($serv, $task_id, $from_id, $sql)
- {
- static $link = null;
- HELL:
- if ($link == null) {
- try {
- $conStr = $this->dbconfig['driver'] . ":host=" . $this->dbconfig['host'] . ";dbname=" . $this->dbconfig['database'] . ";port=" . $this->dbconfig['port'];
- $link = new \PDO($conStr, $this->dbconfig['username'], $this->dbconfig['password']);
- $link->exec("set names 'utf8'");
- echo "新建连接成功:Taskid=$task_id Fromid=$from_id \n";
- } catch (\PDOException $e) {
- $link = null;
- $this->serv->finish(json_encode(['status' => 'false', 'data' => $e->getMessage()], 256));
- echo "新建连接失败:Taskid=$task_id Fromid=$from_id " . $e->getMessage() . "\n";
- return;
- }
- }
- $sql = trim($sql);
- if (preg_match("/^select/i", $sql)) {
- $result = $link->query($sql);
- } else {
- $result = $link->exec($sql);
- }
- $data = ['status' => false];
- if (!$result) { //如果查询失败了
- if (in_array($link->errorCode(), [2013, 2006])) {//错误码为2013,或者2006,则重连数据库,重新执行sql
- $link = null;
- goto HELL;
- } else {
- $data['status'] = 'false';
- $data['data'] = $link->errorInfo();
- $this->serv->finish(json_encode($data, 256));
- return;
- }
- }
- if (preg_match("/^select/i", $sql)) { //如果是select操作,就返回关联数组
- $dataRet = $result->fetchAll(\PDO::FETCH_CLASS);
- $data['data'] = $dataRet;
- } else {//否则直接返回结果
- $data['data'] = $result;
- }
- $data['status'] = "success";
- $this->serv->finish(json_encode($data, 256));
- }
- public function onFinish($serv, $task_id, $data)
- {
- echo "任务完成";//taskwait 不触发这个函数。。
- return $data;
- }
- }
- /**服务启动
- * $serv=new DB_pool();
- * $serv->run();
- */
- /**客户端
- * $client = new \swoole_client(SWOOLE_SOCK_TCP);
- * $num=rand(111111,999999);
- * $rts=$client->connect('127.0.0.1', 9508, 10) or die("连接失败");//链接mysql客户端
- * $sql =("select * from zdk_test");
- * $client->send($sql);
- * $resdata = $client->recv();
- *
- * $resda=json_decode($resdata,true);
- * $client->close();
- * return json_encode($resda);
- */
|