| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243 |
- <?php
- /**
- * Mongodb操作类
- * Created by PhpStorm.
- * User: scstf
- * Date: 2018/9/13
- * Time: 15:35
- */
- namespace Biz\Db\Mongo;
- use MongoDB\Driver\Manager;
- use MongoDB\Driver\BulkWrite;
- use MongoDB\Driver\Exception\BulkWriteException;
- use MongoDB\Driver\Exception\Exception;
- use MongoDB\Driver\Query;
- use MongoDB\Driver\WriteConcern;
- class MongoOP
- {
- private $manager;
- private $bulk;
- private $dbName;
- private $instance;
- public function __construct($host, $port = 27017, $options = [])
- {
- $this->connect ($host, $port, $options);// or die('connection error:'.$this->connectString);
- }
- /**
- * 连接数据库
- * @param $host
- * @param int $port
- * @param array $options
- * @return Manager
- * @throws \Exception
- */
- private function connect($host, $port = 27017, $options = [])
- {
- if (!extension_loaded ('mongodb')) {
- return false;
- //throw new \Exception('没有安装mongodb扩展!');
- }
- $connectStr = "mongodb://{$host}:{$port}";
- if (isset($options['user']) && isset($options['password'])) {
- $user = $options['user'];
- $password = $options['password'];
- $connectStr = "mongodb://{$user}:{$password}@{$host}:{$port}";
- }
- if (isset($options['dbName'])) {
- $this->dbName = $options['dbName'];
- $connectStr .= '/' . $options['dbName'];
- }
- $this->bulk = new BulkWrite();
- $this->manager = new Manager($connectStr);
- return $this->instance = $this;
- }
- /**
- * 配置选中数据库
- * @param $dbName
- */
- public function setDbName($dbName)
- {
- $this->dbName = $dbName;
- }
- /**
- * 返回当前数据库名称
- * @return mixed
- */
- public function getDbName()
- {
- return $this->dbName;
- }
- /**
- * 插入数据
- * @param $key
- * @param $data
- * @param $sync
- * @return mixed
- */
- public function insert($key, array $data, $sync = true)
- {
- if (!$this->manager)
- return null;
- $manager = $this->manager;
- $writeConcern = new WriteConcern(1);
- $bulk = new BulkWrite(['ordered' => $sync]);//true为同步执行
- $bulk->insert ($data);
- try {
- return $result = $manager->executeBulkWrite ($this->dbName . ".{$key}", $bulk, $writeConcern) ? 1 : -1;//写入
- } catch (BulkWriteException $e) {
- $result = $e->getWriteResult ();
- // Check if the write concern could not be fulfilled
- if ($writeConcernError = $result->getWriteConcernError ()) {
- printf ("%s (%d): %s\n",
- $writeConcernError->getMessage (),
- $writeConcernError->getCode (),
- var_export ($writeConcernError->getInfo (), true)
- );
- }
- // Check if any write operations did not complete at all
- foreach ($result->getWriteErrors () as $writeError) {
- printf ("Operation#%d: %s (%d)\n",
- $writeError->getIndex (),
- $writeError->getMessage (),
- $writeError->getCode ()
- );
- }
- }
- return -1;
- }
- public function update($key, $id, $value, $data, $sync = true, $multi = true, $upsert = false)
- {
- $manager = $this->manager;
- if (!$manager)
- return null;
- $bulk = new BulkWrite(['ordered' => $sync]);
- $fill['$set'] = [];
- foreach ($data as $k => $v) {
- $item['$set'] = [$k => $v];
- $fill['$set'] = array_merge ($fill['$set'], $item['$set']);
- }
- $bulk->update (
- [$id => $value],
- $fill,
- ['multi' => $multi, 'upsert' => $upsert]
- );
- $ret = $manager->executeBulkWrite ($this->dbName . '.' . $key, $bulk);
- return $ret ? 1 : -1;
- }
- /**
- * 删除指定记录
- * @param $key
- * @param $id
- * @param $value
- * @param bool $sync
- * @param int $limit
- * @return int
- */
- public function remove($key, $id, $value, $sync = true, $limit = 0)
- {
- $manager = $this->manager;
- if (!$manager)
- return null;
- echo ',',$key,$id,$value;
- $bulk = new BulkWrite(['ordered' => $sync]);//true为同步执行
- $bulk->delete ([$id => $value], ['limit' => $limit]);//默认删除所有
- $ret = $manager->executeBulkWrite ($this->dbName . '.' . $key, $bulk);
- return $ret ? 1 : -1;
- }
- /**
- * 查询数据
- * @param $key
- * @param $condition
- * @param string $sort
- * @param int $asc
- * @param int $showId
- * @return array|bool
- * @throws Exception
- */
- public function search($key, $condition, $sort = '', $asc = 1, $showId = 0)
- {
- $manager = $this->manager;
- if (!$manager)
- return null;
- $filter = [];
- if (is_array ($condition))
- foreach ($condition as $k => $v) {
- $op = $this->convertOP ($v[0]);//转换操作符
- if (!$op) return false;
- $filter[$k] = [$op => $v[1]];
- }
- $options = [
- 'projection' => ['_id' => $showId], //不输出_id字段
- 'sort' => [$sort => $asc] //根据指定字段排序 1是升序,-1是降序
- ];
- if ($showId) {
- unset($options['projection']);
- }
- if (!$sort) {
- unset($options['sort']);
- }
- $query = new Query($filter, $options); //查询请求
- $list = $manager->executeQuery ($this->dbName . '.' . $key, $query); // 执行查询当前数据库下的$key集合
- return $list->toArray ();
- }
- public function get()
- {
- return $this;
- }
- /**
- * 转换操作符
- * @param $op
- * @return string
- */
- private function convertOP($op)
- {
- if (!is_string ($op)) return false;
- switch ($op) {
- case '>':
- case 'gt':
- return '$gt';
- case '<':
- case 'lt':
- return '$lt';
- case '>=':
- case 'gte':
- return '$gte';
- case '<=':
- case 'lte':
- return '$lte';
- case '!=':
- case '<>':
- case 'ne':
- case 'neq':
- case 'lgt':
- return '$ne';
- case 'in':
- return '$in';
- case 'nin':
- return '$nin';
- case 'eq':
- case '=':
- default:
- return '$eq';
- }
- }
- }
|