DB_pool.php 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. <?php
  2. /**
  3. * Created by PhpStorm.
  4. * User: Administrator
  5. * Date: 2019/5/27
  6. * Time: 8:56
  7. */
  8. namespace datainf\lib;
  9. /**
  10. * swoole 数据库连接池 BY 凌晨
  11. * 'worker_num' => 20, //worker进程数量
  12. * 'task_worker_num' => 10, //task进程数量 即为维持的MySQL连接的数量
  13. * 'daemonize'=> 1, //设置守护进程
  14. * 'max_request' => 10000, //最大请求数,超过了进程重启
  15. * 'dispatch_mode' => 2,/
  16. */
  17. class DB_pool
  18. {
  19. //db params
  20. protected $dbconfig = [];
  21. protected $serv;
  22. public function __construct($config = [])
  23. {
  24. $host = $config['poole_host'] ? $config['poole_host'] : "127.0.0.1"; // server监听的端口
  25. $port = $config['poole_port'] ? $config['poole_port'] : 9091; // server监听的端口
  26. $this->serv = new \swoole\server($host, $port);
  27. $this->serv->set($config['sets']);
  28. $this->dbconfig = $config['dbconfig'];
  29. }
  30. public function run()
  31. {
  32. $this->serv->on('Receive', array($this, 'onReceive'));
  33. // Task 回调的2个必须函数
  34. $this->serv->on('Task', array($this, 'onTask'));
  35. $this->serv->on('Connect', array($this, 'onConnect'));
  36. $this->serv->on('Finish', array($this, 'onFinish'));
  37. //$this->serv->on('Error', array($this, 'onError'));
  38. $this->serv->start();
  39. }
  40. public function onConnect(\swoole\server $server, int $fd, int $reactorId)
  41. {
  42. //$clientinfo = $server->getClientInfo($fd) ;
  43. //$cinfo = $clientinfo['remote_ip'].':'. $clientinfo['remote_port'] ;
  44. //echo "有客户接入:reactorId:$reactorId --> fd:$fd --> $cinfo \n";
  45. }
  46. public function onReceive($serv, $fd, $from_id, $data)
  47. {
  48. $resultStr = $this->serv->taskwait($data);
  49. if ($resultStr !== false) {
  50. $this->serv->send($fd, $resultStr);
  51. return;
  52. $result = json_decode($resultStr, true);
  53. if (isset($result['status']) && $result['status'] == 'success') {
  54. $this->serv->send($fd, $resultStr);
  55. } else {
  56. $this->serv->send($fd, $resultStr);
  57. }
  58. return;
  59. } else {
  60. $this->serv->send($fd, json_encode(['status' => 'false', 'data' => "Error. Task timeout"]));
  61. }
  62. }
  63. public function onTask($serv, $task_id, $from_id, $sql)
  64. {
  65. static $link = null;
  66. HELL:
  67. if ($link == null) {
  68. try {
  69. $conStr = $this->dbconfig['driver'] . ":host=" . $this->dbconfig['host'] . ";dbname=" . $this->dbconfig['database'] . ";port=" . $this->dbconfig['port'];
  70. $link = new \PDO($conStr, $this->dbconfig['username'], $this->dbconfig['password']);
  71. $link->exec("set names 'utf8'");
  72. echo "新建连接成功:Taskid=$task_id Fromid=$from_id \n";
  73. } catch (\PDOException $e) {
  74. $link = null;
  75. $this->serv->finish(json_encode(['status' => 'false', 'data' => $e->getMessage()], 256));
  76. echo "新建连接失败:Taskid=$task_id Fromid=$from_id " . $e->getMessage() . "\n";
  77. return;
  78. }
  79. }
  80. $sql = trim($sql);
  81. if (preg_match("/^select/i", $sql)) {
  82. $result = $link->query($sql);
  83. } else {
  84. $result = $link->exec($sql);
  85. }
  86. $data = ['status' => false];
  87. if (!$result) { //如果查询失败了
  88. if (in_array($link->errorCode(), [2013, 2006])) {//错误码为2013,或者2006,则重连数据库,重新执行sql
  89. $link = null;
  90. goto HELL;
  91. } else {
  92. $data['status'] = 'false';
  93. $data['data'] = $link->errorInfo();
  94. $this->serv->finish(json_encode($data, 256));
  95. return;
  96. }
  97. }
  98. if (preg_match("/^select/i", $sql)) { //如果是select操作,就返回关联数组
  99. $dataRet = $result->fetchAll(\PDO::FETCH_CLASS);
  100. $data['data'] = $dataRet;
  101. } else {//否则直接返回结果
  102. $data['data'] = $result;
  103. }
  104. $data['status'] = "success";
  105. $this->serv->finish(json_encode($data, 256));
  106. }
  107. public function onFinish($serv, $task_id, $data)
  108. {
  109. echo "任务完成";//taskwait 不触发这个函数。。
  110. return $data;
  111. }
  112. }
  113. /**服务启动
  114. * $serv=new DB_pool();
  115. * $serv->run();
  116. */
  117. /**客户端
  118. * $client = new \swoole_client(SWOOLE_SOCK_TCP);
  119. * $num=rand(111111,999999);
  120. * $rts=$client->connect('127.0.0.1', 9508, 10) or die("连接失败");//链接mysql客户端
  121. * $sql =("select * from zdk_test");
  122. * $client->send($sql);
  123. * $resdata = $client->recv();
  124. *
  125. * $resda=json_decode($resdata,true);
  126. * $client->close();
  127. * return json_encode($resda);
  128. */