DB_pool.php 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. <?php
  2. /**
  3. * Created by PhpStorm.
  4. * User: Administrator
  5. * Date: 2019/5/27
  6. * Time: 8:56
  7. */
  8. namespace app\lib;
  9. /**
  10. * swoole 数据库连接池 BY 凌晨
  11. * 'worker_num' => 20, //worker进程数量
  12. * 'task_worker_num' => 10, //task进程数量 即为维持的MySQL连接的数量
  13. * 'daemonize'=> 1, //设置守护进程
  14. * 'max_request' => 10000, //最大请求数,超过了进程重启
  15. * 'dispatch_mode' => 2,/
  16. */
  17. class DB_pool
  18. { //swoole set params
  19. protected $task_worker_num;
  20. protected $work_num;
  21. protected $max_request;
  22. protected $dispatch_mode;
  23. protected $daemonize;
  24. protected $server_port;
  25. protected $log_file;
  26. //db params
  27. protected $db_type;
  28. protected $db_host;
  29. protected $db_user;
  30. protected $db_pwd;
  31. protected $db_name;
  32. protected $db_port;
  33. public function __construct($config = [])
  34. {
  35. $this->host = $config['poole_host'] ? $config['poole_host'] : "127.0.0.1"; // server监听的端口
  36. $this->server_port = $config['poole_port'] ? $config['poole_port'] : 9091; // server监听的端口
  37. $this->worker_num = 5;
  38. $this->task_worker_num = 5;
  39. $this->dispatch_mode = 2;
  40. $this->daemonize = 1;
  41. $this->max_request = 10000;
  42. $filename = date("Y-m-d", time());
  43. $this->log_file = "../logs/mysqlpoole_" . $filename . '.log';
  44. $this->serv = new \swoole\server($this->host, $this->server_port);
  45. $this->serv->set(array(
  46. 'worker_num' => $this->worker_num,
  47. 'task_worker_num' => $this->task_worker_num,
  48. 'max_request' => $this->max_request,
  49. 'daemonize' => $this->daemonize,
  50. 'log_file' => $this->log_file,
  51. 'dispatch_mode' => $this->dispatch_mode,
  52. 'package_max_length'=> 4 * 1024 *1024,
  53. 'buffer_output_size'=> 4 * 1024 *1024,
  54. 'socket_buffer_size' => 8 * 1024 *1024
  55. ));
  56. $this->db_type = $config['db_type'] ? $config['db_type'] : 'mysql';
  57. $this->db_host = $config['db_host'] ? $config['db_host'] : '127.0.0.1';
  58. $this->db_port = $config['db_port'] ? $config['db_port'] : 3306;
  59. $this->db_name = $config['db_name'] ? $config['db_name'] : 'test';
  60. $this->db_user = $config['db_user'] ? $config['db_user'] : 'test';
  61. $this->db_pwd = $config['db_pwd'] ? $config['db_pwd'] : 'test';
  62. }
  63. public function run()
  64. {
  65. $this->serv->on('Receive', array($this, 'onReceive'));
  66. // Task 回调的2个必须函数
  67. $this->serv->on('Task', array($this, 'onTask'));
  68. $this->serv->on('Connect', array($this, 'onConnect'));
  69. $this->serv->on('Finish', array($this, 'onFinish'));
  70. //$this->serv->on('Error', array($this, 'onError'));
  71. $this->serv->start();
  72. }
  73. public function onConnect(\swoole\server $server, int $fd, int $reactorId)
  74. {
  75. //$clientinfo = $server->getClientInfo($fd) ;
  76. //$cinfo = $clientinfo['remote_ip'].':'. $clientinfo['remote_port'] ;
  77. //echo "有客户接入:reactorId:$reactorId --> fd:$fd --> $cinfo \n";
  78. }
  79. public function onReceive($serv, $fd, $from_id, $data)
  80. {
  81. $resultStr = $this->serv->taskwait($data);
  82. if ($resultStr !== false) {
  83. $this->serv->send($fd, $resultStr );
  84. return ;
  85. $result = json_decode($resultStr, true);
  86. if (isset($result['status']) && $result['status'] == 'success') {
  87. $this->serv->send($fd, $resultStr );
  88. } else {
  89. $this->serv->send($fd, $resultStr);
  90. }
  91. return;
  92. } else {
  93. $this->serv->send($fd, json_encode(['status' => 'false', 'data' => "Error. Task timeout"]));
  94. }
  95. }
  96. public function onTask($serv, $task_id, $from_id, $sql)
  97. {
  98. static $link = null;
  99. HELL:
  100. if ($link == null) {
  101. try {
  102. $conStr = $this->db_type . ":host=$this->db_host;dbname=$this->db_name;port=$this->db_port";
  103. $link = new \PDO($conStr, $this->db_user, $this->db_pwd);
  104. $link->exec("set names 'utf8'");
  105. echo "新建连接成功:Taskid=$task_id Fromid=$from_id \n";
  106. } catch (\PDOException $e) {
  107. $link = null;
  108. $this->serv->finish(json_encode(['status' => 'false', 'data' => $e->getMessage()], 256));
  109. echo "新建连接失败:Taskid=$task_id Fromid=$from_id " . $e->getMessage() . "\n";
  110. return;
  111. }
  112. }
  113. $sql = trim($sql);
  114. if (preg_match("/^select/i", $sql)){
  115. $result = $link->query($sql);
  116. }else{
  117. $result = $link->exec($sql);
  118. }
  119. $data = ['status'=>false] ;
  120. if (!$result) { //如果查询失败了
  121. if (in_array($link->errorCode(), [2013, 2006])) {//错误码为2013,或者2006,则重连数据库,重新执行sql
  122. $link = null;
  123. goto HELL;
  124. } else {
  125. $data['status'] = 'false';
  126. $data['data'] = $link->errorInfo();
  127. $this->serv->finish(json_encode($data, 256));
  128. return;
  129. }
  130. }
  131. if (preg_match("/^select/i", $sql)) { //如果是select操作,就返回关联数组
  132. $dataRet = $result->fetchAll(\PDO::FETCH_CLASS);
  133. $data['data'] = $dataRet ;
  134. } else {//否则直接返回结果
  135. $data['data'] = $result;
  136. }
  137. $data['status'] = "success";
  138. $this->serv->finish(json_encode($data, 256));
  139. }
  140. public function onFinish($serv, $task_id, $data)
  141. {
  142. echo "任务完成";//taskwait 不触发这个函数。。
  143. return $data;
  144. }
  145. }
  146. /**服务启动
  147. * $serv=new DB_pool();
  148. * $serv->run();
  149. */
  150. /**客户端
  151. * $client = new \swoole_client(SWOOLE_SOCK_TCP);
  152. * $num=rand(111111,999999);
  153. * $rts=$client->connect('127.0.0.1', 9508, 10) or die("连接失败");//链接mysql客户端
  154. * $sql =("select * from zdk_test");
  155. * $client->send($sql);
  156. * $resdata = $client->recv();
  157. *
  158. * $resda=json_decode($resdata,true);
  159. * $client->close();
  160. * return json_encode($resda);
  161. */