DB_pool.php 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. <?php
  2. /**
  3. * Created by PhpStorm.
  4. * User: Administrator
  5. * Date: 2019/5/27
  6. * Time: 8:56
  7. */
  8. namespace app\lib;
  9. /**
  10. * swoole 数据库连接池 BY Vali
  11. * 'worker_num' => 20, //worker进程数量
  12. * 'task_worker_num' => 10, //task进程数量 即为维持的MySQL连接的数量
  13. * 'daemonize'=> 1, //设置守护进程
  14. * 'max_request' => 10000, //最大请求数,超过了进程重启
  15. * 'dispatch_mode' => 2,/
  16. */
  17. class DB_pool
  18. { //swoole set params
  19. protected $task_worker_num;
  20. protected $work_num;
  21. protected $max_request;
  22. protected $dispatch_mode;
  23. protected $daemonize;
  24. protected $server_port;
  25. protected $log_file;
  26. //db params
  27. protected $db_type;
  28. protected $db_host;
  29. protected $db_user;
  30. protected $db_pwd;
  31. protected $db_name;
  32. protected $db_port;
  33. public function __construct($config = [])
  34. {
  35. $this->host = $config['poole_host'] ? $config['poole_host'] : "127.0.0.1"; // server监听的端口
  36. $this->server_port = $config['poole_port'] ? $config['poole_port'] : 9091; // server监听的端口
  37. $this->worker_num = 5;
  38. $this->task_worker_num = 5;
  39. $this->dispatch_mode = 2;
  40. $this->daemonize = true ;
  41. $this->max_request = 10000;
  42. $filename = date("Ymd", time());
  43. $this->log_file = "../logs/DB_pool_err_" . $filename . '.log';
  44. $this->serv = new \swoole\server($this->host, $this->server_port);
  45. $this->serv->set(array(
  46. 'worker_num' => $this->worker_num,
  47. 'task_worker_num' => $this->task_worker_num,
  48. 'max_request' => $this->max_request,
  49. 'daemonize' => $this->daemonize,
  50. 'log_file' => $this->log_file,
  51. 'dispatch_mode' => $this->dispatch_mode,
  52. ));
  53. $this->db_type = $config['db_type'] ? $config['db_type'] : 'mysql';
  54. $this->db_host = $config['db_host'] ? $config['db_host'] : '127.0.0.1';
  55. $this->db_port = $config['db_port'] ? $config['db_port'] : 3306;
  56. $this->db_name = $config['db_name'] ? $config['db_name'] : 'test';
  57. $this->db_user = $config['db_user'] ? $config['db_user'] : 'test';
  58. $this->db_pwd = $config['db_pwd'] ? $config['db_pwd'] : 'test';
  59. }
  60. public function run()
  61. {
  62. $this->serv->on('Receive', array($this, 'onReceive'));
  63. // Task 回调的2个必须函数
  64. $this->serv->on('Task', array($this, 'onTask'));
  65. $this->serv->on('Connect', array($this, 'onConnect'));
  66. $this->serv->on('Finish', array($this, 'onFinish'));
  67. //$this->serv->on('Error', array($this, 'onError'));
  68. $this->serv->start();
  69. }
  70. public function onConnect(\swoole\server $server, int $fd, int $reactorId)
  71. {
  72. //$clientinfo = $server->getClientInfo($fd) ;
  73. //$cinfo = $clientinfo['remote_ip'].':'. $clientinfo['remote_port'] ;
  74. //echo "有客户接入:reactorId:$reactorId --> fd:$fd --> $cinfo \n";
  75. }
  76. public function onReceive($serv, $fd, $from_id, $data)
  77. {
  78. $resultStr = $this->serv->taskwait($data);
  79. if ($resultStr !== false) {
  80. $this->serv->send($fd, $resultStr );
  81. return ;
  82. $result = json_decode($resultStr, true);
  83. if (isset($result['status']) && $result['status'] == 'success') {
  84. $this->serv->send($fd, $resultStr );
  85. } else {
  86. $this->serv->send($fd, $resultStr);
  87. }
  88. return;
  89. } else {
  90. $this->serv->send($fd, json_encode(['status' => 'false', 'data' => "Error. Task timeout"]));
  91. }
  92. }
  93. public function onTask($serv, $task_id, $from_id, $sql)
  94. {
  95. static $link = null;
  96. HELL:
  97. if ($link == null) {
  98. try {
  99. $conStr = $this->db_type . ":host=$this->db_host;dbname=$this->db_name;port=$this->db_port";
  100. $link = new \PDO($conStr, $this->db_user, $this->db_pwd);
  101. $link->exec("set names 'utf8'");
  102. echo "新建连接成功:Taskid=$task_id Fromid=$from_id \n";
  103. } catch (\PDOException $e) {
  104. $link = null;
  105. $this->serv->finish(json_encode(['status' => 'false', 'data' => $e->getMessage()], 256));
  106. echo "新建连接失败:Taskid=$task_id Fromid=$from_id " . $e->getMessage() . "\n";
  107. return;
  108. }
  109. }
  110. $sql = trim($sql);
  111. if (preg_match("/^select/i", $sql)){
  112. $result = $link->query($sql);
  113. }else{
  114. $result = $link->exec($sql);
  115. }
  116. $data = ['status'=>false] ;
  117. if (!$result) { //如果查询失败了
  118. if (in_array($link->errorCode(), [2013, 2006])) {//错误码为2013,或者2006,则重连数据库,重新执行sql
  119. $link = null;
  120. goto HELL;
  121. }
  122. $errinfoArr = $link->errorInfo();
  123. if (!($errinfoArr['0']=='00000' && $errinfoArr['1'] =='' && $errinfoArr['2']=='')){
  124. $data['status'] = 'false';
  125. $data['data'] = $link->errorInfo();
  126. $this->serv->finish(json_encode($data, 256));
  127. return;
  128. }
  129. }
  130. if (preg_match("/^select/i", $sql)) { //如果是select操作,就返回关联数组
  131. $dataRet = $result->fetchAll(\PDO::FETCH_CLASS);
  132. $data['data'] = $dataRet ;
  133. } else {//否则直接返回结果
  134. $data['data'] = $result;
  135. }
  136. $data['status'] = "success";
  137. $this->serv->finish(json_encode($data, 256));
  138. }
  139. public function onFinish($serv, $task_id, $data)
  140. {
  141. echo "任务完成";//taskwait 不触发这个函数。。
  142. return $data;
  143. }
  144. }
  145. /**服务启动
  146. * $serv=new DB_pool();
  147. * $serv->run();
  148. */
  149. /**客户端
  150. * $client = new \swoole_client(SWOOLE_SOCK_TCP);
  151. * $num=rand(111111,999999);
  152. * $rts=$client->connect('127.0.0.1', 9508, 10) or die("连接失败");//链接mysql客户端
  153. * $sql =("select * from zdk_test");
  154. * $client->send($sql);
  155. * $resdata = $client->recv();
  156. *
  157. * $resda=json_decode($resdata,true);
  158. * $client->close();
  159. * return json_encode($resda);
  160. */