HttpServerRedisToSql.php 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. <?php
  2. /**
  3. * Created by PhpStorm.
  4. * User: Administrator
  5. * Date: 2019/6/26
  6. * Time: 12:03
  7. */
  8. namespace datainf\logic;
  9. use App\Lib\ModelBase;
  10. use App\Http\Response\Response;
  11. use Illuminate\Database\Capsule\Manager as DB;
  12. use datainf\lib\GlobConfigs;
  13. use swoole;
  14. class HttpServerRedisToSql
  15. {
  16. private $httpserver;
  17. private $config;
  18. private $redisonfig = [];
  19. const SQLKEY = 'ALLSQLKEY';
  20. public function __construct($config)
  21. {
  22. $this->httpserver = new \swoole\http\server($config['host'], $config['port']);
  23. $this->httpserver->set($config['sets']);
  24. $this->config = $config;
  25. $this->redisonfig = GlobConfigs::getKey('redis');
  26. $taskWorkingNum = new \swoole\Atomic();
  27. $this->httpserver->taskWorkingNum = $taskWorkingNum;
  28. $this->httpserver->on('request', array($this, 'OnRequest'));
  29. $this->httpserver->on('WorkerStart', array($this, 'onWorkerStart'));
  30. $this->httpserver->on('task', array($this, 'onTask'));
  31. $this->httpserver->on('finish', array($this, 'onFinish'));
  32. }
  33. public function onWorkerStart($serv, $worker_id)
  34. {
  35. if (!$serv->taskworker) {
  36. ModelBase::init();
  37. $obj = $this;
  38. Swoole\Timer::tick(500, function ($id) use ($obj) {
  39. $obj->BatchSqlRedisToDB();
  40. });
  41. }
  42. }
  43. //sql批量写入数据库
  44. private function BatchSqlRedisToDB()
  45. {
  46. $redisconfig = $this->redisonfig;
  47. go(function () use ($redisconfig) {
  48. $redis = new Swoole\Coroutine\Redis();
  49. $redis->connect($redisconfig['host'], $redisconfig['port']);
  50. if ($redis->llen(self::SQLKEY) <= 0) {
  51. return;
  52. }
  53. $batchsql = [];
  54. $max = 5000;
  55. for ($i = 0; $i < $max; $i++) {
  56. $now = $redis->rpop(self::SQLKEY);
  57. if (empty($now)) {
  58. break;
  59. } else {
  60. $batchsql[] = str_replace(";", ":", trim(trim($now), ";"));
  61. }
  62. }
  63. if (!empty($batchsql)) {
  64. $pdo = DB::getPdo();
  65. $sqlstr = implode(";", $batchsql);
  66. $pdo->exec($sqlstr);
  67. }
  68. return;
  69. });
  70. }
  71. public function OnRequest($request, $response)
  72. {
  73. $data = Response::generate('', 1, '', '任务正常运行.');
  74. $response->end($data);
  75. return;
  76. }
  77. public function onTask($serv, $task)
  78. {
  79. }
  80. public function onFinish($serv, int $task_id, $data)
  81. {
  82. }
  83. public function start()
  84. {
  85. $this->httpserver->start();
  86. }
  87. }