HttpServerRedisToSql.php 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. <?php
  2. /**
  3. * Created by PhpStorm.
  4. * User: Administrator
  5. * Date: 2019/6/26
  6. * Time: 12:03
  7. */
  8. namespace datainf\logic;
  9. use App\Lib\ModelBase;
  10. use App\Http\Response\Response;
  11. use Illuminate\Database\Capsule\Manager as DB;
  12. use datainf\lib\GlobConfigs;
  13. use swoole;
  14. class HttpServerRedisToSql
  15. {
  16. private $httpserver;
  17. private $config;
  18. private $redisonfig = [];
  19. const SQLKEY = 'ALLSQLKEY';
  20. public function __construct($config)
  21. {
  22. $this->httpserver = new \swoole\http\server($config['host'], $config['port']);
  23. $this->httpserver->set($config['sets']);
  24. $this->config = $config;
  25. $this->redisonfig = GlobConfigs::getKey('redis');
  26. $this->httpserver->account = new \swoole\Atomic();
  27. $this->httpserver->taskWorkingNum = new \swoole\Atomic();
  28. $this->httpserver->on('request', array($this, 'OnRequest'));
  29. $this->httpserver->on('WorkerStart', array($this, 'onWorkerStart'));
  30. $this->httpserver->on('task', array($this, 'onTask'));
  31. $this->httpserver->on('finish', array($this, 'onFinish'));
  32. }
  33. public function onWorkerStart($serv, $worker_id)
  34. {
  35. if (!$serv->taskworker) {
  36. ModelBase::init();
  37. Swoole\Timer::tick(500, function ($id) {
  38. $this->BatchSqlRedisToDB();
  39. });
  40. }
  41. if ($serv->worker_id == 0) {
  42. Swoole\Timer::tick(60000, function () {
  43. $this->logRunStatus();
  44. });
  45. }
  46. }
  47. private function logRunStatus()
  48. {
  49. echo date('Y-m-d H:i:s') . " 总请求数:" . $this->httpserver->account->get() . ' 运行任务数:' . $this->httpserver->taskWorkingNum->get();
  50. echo ' work_id:' . $this->httpserver->worker_id . " 内存使用量:" . (memory_get_usage() / 1000) . 'k 峰值:' . (memory_get_peak_usage() / 1000) . "k\n";
  51. }
  52. //sql批量写入数据库
  53. private function BatchSqlRedisToDB()
  54. {
  55. $redisconfig = $this->redisonfig;
  56. go(function () use ($redisconfig) {
  57. $begint = microtime(true);
  58. $redis = new Swoole\Coroutine\Redis();
  59. // $redis = new \Redis();
  60. $ret = $redis->connect($redisconfig['host'], $redisconfig['port']);
  61. if (!$ret) {
  62. echo "redis 连接失败!";
  63. return;
  64. }
  65. if (!empty($redisconfig['passwd'])) {
  66. $ret = $redis->auth($redisconfig['passwd']);
  67. if (!$ret) {
  68. echo "redis auth 失败!";
  69. return;
  70. }
  71. }
  72. $redis->select($redisconfig['db']);
  73. if ($redis->llen(self::SQLKEY) <= 0) {
  74. return;
  75. }
  76. $this->httpserver->account->add();
  77. $batchsql = [];
  78. $max = 1000;
  79. for ($i = 0; $i < $max; $i++) {
  80. $now = $redis->rpop(self::SQLKEY);
  81. if (empty($now)) {
  82. break;
  83. } else {
  84. $batchsql[] = str_replace(";", ":", trim(trim($now), ";"));
  85. }
  86. }
  87. if (!empty($batchsql)) {
  88. $pdo = DB::getPdo();
  89. DB::beginTransaction();
  90. $sqlstr = implode(";", $batchsql);
  91. $erowcount = 0;
  92. try {
  93. echo "\n SQL===> " . date('Y-m-d H:i:s') . "\n" . implode("\n", $batchsql) . "\n\n";
  94. $erowcount = $pdo->exec($sqlstr);
  95. DB::commit();
  96. } catch (\Exception $e) {
  97. DB::rollBack();
  98. echo "发生异常:" . $e->getCode() . ' ---- ' . print_r($e->getMessage(), true) . "\n";
  99. }
  100. if (!$erowcount) {
  101. echo "\n发生错误:" . $pdo->errorCode() . ' ---- ' . print_r($pdo->errorInfo(), true) . "\n";
  102. echo "错误sql: " . $sqlstr . "\n\n";
  103. } else {
  104. echo "成功运行:" . count($batchsql) . " 条!\n";
  105. }
  106. }
  107. $redis->close();
  108. echo "总请求数" . $this->httpserver->account->get() . " 单个线程消耗时间: " . (microtime(true) - $begint) . " s \n";
  109. return;
  110. });
  111. }
  112. public function OnRequest($request, $response)
  113. {
  114. $data = Response::generate('', 1, '', '任务正常运行.');
  115. $response->end($data);
  116. return;
  117. }
  118. public function onTask($serv, $task)
  119. {
  120. }
  121. public function onFinish($serv, int $task_id, $data)
  122. {
  123. }
  124. public function start()
  125. {
  126. $this->httpserver->start();
  127. }
  128. }