Go To My HomePage

MongoDB使用详解

一、MongoDB简介&安装

简介

​ MongoDB是NoSQL数据库中的文档型数据库,文档型数据库与关系型最为接近。它支持的数据结构非常松散,是类似json的bson格式,因此可以存储比较复杂的数据类型。MongoDB具有便捷和丰富的查询手段,高可用(主备)的特性,横向扩展(sharding)的能力,并支持多个存储引擎(wiredtiger、mmap)等优秀特性。

单节点安装

第一步:安装依赖

yum install -y make
yum install -y gcc-c++
yum install -y openssl

第二步:tar包解压缩

tar -zxvf mongodb-linux-x86_64-rhel[version].tgz -C /opt/module/
mv /opt/module/mongodb-linux-x86_64-rhel[version] /opt/module/mongodb

第三步:目录

# 创建数据存储目录
mkdir -p /opt/module/mongodb/data
# 创建日志存储目录
mkdir -p /opt/module/mongodb/logs
touch /opt/module/mongodb/logs/mongodb.log

第四步:单节点启动

# 方式一:后台启动方式 --fork,否则是个前台进程
./mongod --dbpath=/opt/module/mongodb/data --logpath=/opt/module/mongodb/logs/mongodb.log --bind_ip_all --fork [--logappend]

# 方式二:配置文件启动
mkdir -p /opt/module/mongodb/conf
vim /opt/module/mongodb/conf/mongodb.conf
# logpath=/opt/module/mongodb/logs/mongodb.log
# dbpath=/opt/module/mongodb/data
# bind_ip_all=true
# fork=true
# logappend=true
./mongod -f /opt/module/mongodb/conf/mongodb.conf

第四步:关闭

# 方式一:kill进程
ps -aux | grep mongod
kill -9 [processId]

# 方式二:命令关闭,关闭时必须指定数据存放位置
./mongod --shutdown -dbpath=/opt/module/mongodb/data

# 方式三:进入mongo客户端,使用函数关闭
./mongo
use admin
db.shutdownServer()

二、基本概念

无模式数据库

​ 很多NoSQL数据库有无模式的共同点。若要在关系型数据库中存储数据,首先必须定义模式,也就是用一种预定义结构向数据库说明要有哪些表格,表中有哪些列,每一列都存放何种类型的数据。必须先定义好模式,然后才能存放数据。

​ 相比之下,NoSQL数据库的数据存储就比较随意。键值数据库可以把任何数据存放在一个键的名下。文档数据库实际上也如此,因为它对所存储的文档结构没有限制。在列族数据库中,任意列里面都可以随意存放数据。你可以在图数据库中新增边,也可以随意向节点和边中添加属性。

MongoDB三要素

MongoDB三要素与传统关系型数据库概念类比

传统数据库 MongDB 解释说明
database database 数据库
table collection 数据库表/集合
row document 数据记录行/文档
column field 数据字段/域
index index 索引
table joins   表连接,MongoDB不支持
primary key primary key 主键,MongoDB自动将_id字段设置为主键

传统数据库的row与mongoDB的document对比

row:每一行都是一样的字段,不可添加不可减少,也就说fields的个数在定义table的时候就已经声明完成的

document: 它的每一个document都是独立的,同时也不是在创建collection的时候经声明完成的

MongoDB的数据类型

数据类型 描述
String 字符串,仅UTF-8编码合法
Integer 整型数值,根据服务器不同,可分为32位或64位
Boolean 布尔值
Double 双精度浮点数
Array 用于将数组或列表或多个值存储为一个键
Timestamp 时间戳
Object 用于内嵌文档
Null 用于创建空文档
Symbol 符号。该数据类型基本上等同于字符串类型,但不同的是,它一般用于使用了特殊符号类型的语言
Date 日期时间。用UNIX时间格式来存储日期

三、用户权限

​ Mongodb作为时下最为热门的数据库,那么其安全验证也是必不可少的,否则一个没有验证的数据库暴露出去,任何人可随意操作,这将是非常危险的。我们可以通过使用为MongoDB创建用户的方式来降低风险。

权限名 描述
read 允许用户读取指定数据库
readWrite 允许用户读写指定数据库
dbAdmin 允许用户在指定数据库中执行管理函数,如索引创建、删除,查看统计或访问system.profile
userAdmin 允许用户向system.users集合写入,可以在指定数据库里创建、删除和管理用户
clusterAdmin 只在admin数据库中可用,赋予用户所有分片和复制集相关函数的管理权限
readAnyDatabase 只在admin数据库中可用,赋予用户所有数据库的读权限,除系统库的集合
readWriteAnyDatabase 只在admin数据库中可用,赋予用户所有数据库的读写权限,除系统库的集合
userAdminAnyDatabase 只在admin数据库中可用,赋予用户所有数据库的userAdmin权限,除系统库的集合
dbAdminAnyDatabase 只在admin数据库中可用,赋予用户所有数据库的dbAdmin权限,除系统库的集合
root 只在admin数据库中可用。超级账号,超级权限

创建用户

前置条件:在admin库下操作。如果配置auth=true启动参数则需要有权限的用户操作,否则先不配置此参数,创建完用户之后开启此参数,并重启服务。

语法:
db.createUser({ 
    user: "<name>",
    pwd: "<cleartext password>",
    customData: { <any Object Data> },
    roles: [
        { role: "<role>", db: "<database>" },
        ...
    ]
});
  • user:新建用户名

  • pwd:新建用户密码

  • customData:存放一些用户相关的自定义数据,该属性也可忽略

  • roles:数组类型,配置用户的权限

示例:
db.createUser({user:'root',pwd:'root',roles:[{role:'root',db:'admin'}]})

创建用户完成之后,配置auth=true启动参数

登陆用户

前置条件:在admin库下操作

语法:
db.auth('<username>', '<password>');

示例:
db.auth('root','root');

查看用户信息

前置条件:在admin库下操作

show users
db.system.users.find()

更新用户

前置条件:在admin库下操作

语法:
db.updateUser('<username>', {<新的用户数据对象>});

示例:
db.updateUser('root', {'pwd':'123456', 'roles':[{'role':'root', 'db':'admin'}]});

修改用户密码函数

虽然更新用户函数也可以修改用户密码,MongoDB也提供了独立修改密码的函数

前置条件:在admin库下操作

语法:
db.changeUserPassword("<newUserName>","<newPassword>")

示例:
db.changeUserPassword('root', '123456');

删除用户

前置条件:在admin库下操作

语法:
db.dropUser('<userName>')

示例:
db.dropUser('root');

四、Database操作

创建数据库

在MongoDB中创建数据库的命令使用的是use命令。该命令有两层含义:

  1. 切换到指定数据库
  2. 如果切换的数据库不存在,则创建该数据库

如果只创建数据库未在数据库中创建集合,则此创建为逻辑创建,在内存中,但并未在生成对应目录。使用查看数据库命令是扫描磁盘目录,所以无法看到

查看数据库

show dbs
show databases

删除数据库

在MongoDB中使用db.dropDatabase()函数来删除数据库。在删除数据库之前,需要切换到需要删除的数据库,执行即可

示例:
use test;
db.dropDatabase();

五、Collection操作

MongoDB中的集合是一组文档的集,相当于关系型数据库中的表

创建集合

在MongoDB中,我们也可以不用创建集合,当我们插入一些数据时,会自动创建集合

语法格式:db.createCollection(<collectionName>, <options>)

options可以是如下参数

字段 类型 描述
capped 布尔 (可选),如果为 true,则创建固定集合,且必须指定 size 参数
固定集合是指当达到最大值时,它会自动覆盖最早的文档
size 数值 (可选)为固定集合指定一个最大值(以字节计)。 如果 capped 为 true,也需要指定该字段。
max 数值 (可选)指定固定集合中包含文档的最大数量。
示例:
db.createCollection('testCollection');
db.createCollection('testCollection', {'capped':true, 'size':2000000, 'max':1000});

查看集合

如果要查看已有集合,可以使用show collectionsshow tables命令

show collections;
show tables;

查看集合详情

如果要查看已有集合的详情,可以使用db.<collectionName>.stats()命令

示例:
db.testCollection.stats();

删除集合

需要先切换到需要删除集合所在的数据库,使用db.<collectionName>.drop()函数删除集合即可

示例:
db.testCollection.drop();

六、Document 操作

在MongoDB中文档是指多个键及其关联的值有序地放置在一起就是文档,其实指的就是数据。MongoDB中的文档的数据结构和JSON基本一样。所有存储在集合中的数据都是BSON格式。BSON是一种类似JSON的二进制形式的存储格式,是Binary JSON的简称。

新增文档

新增单一文档

  • insert函数

    语法:db.<collectionName>.insert(document)

      示例:
      db.user.insert({name: '张三', nickName: 'Tom', 'age': 18, course: ['java', 'spring']});
    
  • save函数

    语法:db.<collectionName>.save(document)

      示例:
      db.user.save({name: '李四', nickName: 'Jack', 'age': 18, course: ['html', 'js']});
    
  • insertOne函数

    语法:db.<collectionName>.insertOne(document)

      示例:
      db.user.insertOne({name: '王五', nickName: 'King', 'age': 20, course: ['java', 'js']});
    

批量新增文档

  • insert函数

    语法:db.<collectionName>.insert(documents)

      示例:
      db.user.insert([
      {name: '张三', nickName: 'Tom', 'age': 18, course: ['java', 'spring']},
      {name: '李四', nickName: 'Jack', 'age': 18, course: ['html', 'js']},
      {name: '王五', nickName: 'King', 'age': 20, course: ['java', 'js']}
      ]);
    
  • save函数

    语法:db.<collectionName>.save(documents)

      示例:
      db.user.save([
      {name: '张三', nickName: 'Tom', 'age': 18, course: ['java', 'spring']},
      {name: '李四', nickName: 'Jack', 'age': 18, course: ['html', 'js']},
      {name: '王五', nickName: 'King', 'age': 20, course: ['java', 'js']}
      ]);
    
  • insertMany函数

    语法:db.<collectionName>.save(documents)

      示例:
      db.user.insertMany([
      {name: '张三', nickName: 'Tom', 'age': 18, course: ['java', 'spring']},
      {name: '李四', nickName: 'Jack', 'age': 18, course: ['html', 'js']},
      {name: '王五', nickName: 'King', 'age': 20, course: ['java', 'js']}
      ]);
    

查询文档

基础应用

  • findOne函数用于查询集合中的一个文档

    语法:db.<collectionName>.findOne({<query>}, {<projection>});

    1. query:可选,代表查询条件。
    2. projection:可选,代表查询结果的投影字段名。即查询结果需要返回哪些字段或不需要返回哪些字段。
      示例:
      db.user.findOne();
      db.user.findOne({name: '张三'});
      db.user.findOne({name: '张三'}, {name: 1, age: 1, _id: 0});
    

    投影时

    _id为1的时候,其他字段必须是1

    _id是0的时候,其他字段可以是0

    如果没有_id字段约束,多个其他字段必须同为0或同为1

  • find函数用于查询集合中的若干文档

    语法:db.<collectionName>.find({<query>}, {<projection>});

      示例:
      db.user.find();
      db.user.find({name: '张三'});
      db.user.find({name: '张三'}, {name: 1, age: 1, _id: 0});
    

单条件逻辑运算符

操作 格式 范例 类似DBM语句
等于 {<key>:<value>}或
{<key>:{$eq:<value>}}
db.col.find({name: “张三”}) where name = ‘张三’
小于 {<key>:{$lt:<value>}} db.col.find({age: {$lt: 20}}) where age < 20
小于等于 {<key>:{$lte:<value>}} db.col.find({age: {$lte: 20}}) where age <= 20
大于 {<key>:{$gt:<value>}} db.col.find({age: {$gt: 10}}) where age > 10
大于等于 {<key>:{$gte:<value>}} db.col.find({age: {$gte: 10}}) where age >= 10
不等于 {<key>:{$ne:<value>}} db.col.find({age: {$ne: 20}}) where age != 20

多条件逻辑运算符

And条件

MongoDB的find()findOne()函数可以传入多个键(key),每个键(key)以逗号隔开,即常规SQL的AND条件

示例:
db.user.find({name: '张三', age :18});

Or条件

MongoDB的OR条件语句使用了关键字$or,语法格式如下

示例:
db.user.find({$or: [{name: '张三'}, {age :18}]});

$type查询

在MongoDB中根据字段的数量类型来查询数据使用$type操作符来实现

语法:db.<collectionName>.find({<attr>:{$type:<typeNum/typeAlias>}})

Type Number Alias
Double 1 “double”
String 2 “string”
Object 3 “object”
Array 4 “array”
Binary data 5 “binData”
ObjectId 7 “objectId”
Boolean 8 “bool”
Date 9 “date”
Null 10 “null”
Regular Expression 11 “regex”
JavaScript 13 “javascript”
JavaScript (with scope) 15 “javascriptWithScope”
32-bit integer 16 “int”
Timestamp 17 “timestamp”
64-bit integer 18 “long”
示例:
db.user.find({name: {$type: 'string'}});
db.user.find({name: {$type: 2}});

正则查询

MongoDB中查询条件也可以使用正则表达式作为匹配约束

语法:db.<collectionName>.find({<filedName>:<reg>/<option>})

option的选项有

  • i :不区分大小写以匹配大小写的情况
  • m:对于包含锚点的模式,将\n视作每行,每行都进行匹配
  • x:设置x选项后,正则表达式中的非转义的空白字符将被忽略
  • s:允许点字符(.)匹配包括换行符在内的所有字符
示例:
db.user.find({name: /^张/});
db.user.find({name: /三$/});
db.user.find({nickName: /t/i});

分页查询

在MongoDB中读取指定数量的数据记录,可以使用的limit方法,limit(<num>)方法接受一个数字参数,该参数指定读取的记录条数

在MongoDB中使用skip方法来跳过指定数量的文档,skip(<num>)方法同样接受一个数字参数作为跳过的文档条数

示例:
db.user.find().skip(5).limit(5);

排序

在 MongoDB中使用sort方法对数据进行排序,sort(<param>) 可以通过参数指定排序的字段,并使用1-1来指定排序的方式,其中1为升序排列,而 -1是用于降序排列。

示例:
db.user.find().sort({age:'asc'})

更新文档

save更新文档

save()函数的作用是保存文档,如果文档存在则覆盖,如果文档不存在则新增。save()函数对文档是否存在的唯一判断标准是_id系统唯一字段是否匹配。所以使用save()函数实现更新操作,则必须提供_id字段数据

示例:
db.user.save({name:"赵六"}); -- 新增
db.user.save({"_id" : ObjectId("6010c798fc86950278e5caac"),name:"赵六", age: 22}); --更新

update更新文档

update()函数用于更新已存在的文档

语法:
db.<collectionName>.update(
    <query>,
    <update>,
    <upsert:boolean>,
    <multi:boolean>
)
  • query:update的查询条件,类似sql的update更新语法内where后面的内容
  • update:update的对象和一些更新的操作符等,也可以理解为sql update查询内set后面的
  • upsert:可选,这个参数的意思是,如果不存在update的记录,是否插入这个document,true为插入,默认是false,不插入
  • multi:可选,mongodb 默认是false,只更新找到的第一条记录,如果这个参数为true,就把按条件查出来多条记录全部更新。只有在表达式更新语法中才可使用。

在MongoDB中的update是有两种更新方式,一种是覆盖更新,一种是表达式更新。

  • 覆盖更新:顾名思义,就是通过某条件,将新文档覆盖原有文档
  • 表达式更新:这种更新方式是通过表达式来实现复杂更新操作,如:字段更新、数值计算、数组操作、字段名修改等
覆盖更新
示例:
db.user.update({name: '张三'},{name: '张'});

将会给第一条符合添加的数据覆盖,因为multi选项默认为false

表达式更新

$inc

作用:对一个数字字段的某个field增加value

示例:
db.user.update({name: '张三'},{$inc: {age: 1}});

$set

作用:把文档中某个字段field的值设为value,如果field不存在,则增加新的字段并赋值为value

示例:
db.user.update({name: '张三'},{$set: {age: 20}});

$unset

作用:删除某个字段field

示例:
db.user.update({name: '张三'},{$unset: {age: null}});

unset指定的字段后可以跟任何值,只是起占位作用

$push

作用:把value追加到field里。注:field只能是数组类型,如果field不存在,会自动插入一个数组类型

示例:
db.user.update({name: '张三'},{$push: {course: 'vue'}});

$addToSet

作用:加一个值到数组内,而且只有当这个值在数组中不存在时才增加

示例:
db.user.update({name: '张三'},{$addToSet: {course: 'vue'}});

$pop

删除数组内第一个值{$pop:{<field>:-1}}、删除数组内最后一个值{$pop:{<field>:1}}

示例:
db.user.update({name: '张三'},{$pop: {course: -1}});

$pull

从数组field内删除所有等于指定值的值

示例:
db.user.update({name: '张三'},{$pull: {course: 'vue'}});

$pullAll

用法同$pull一样,可以一次性删除数组内的多个值

示例:
db.user.update({name: '张三'},{$pullAll: {course: ['vue', 'java', 'spring']}});

$rename

作用:对字段进行重命名。底层实现是先删除old_field字段,再创建new_field字段

示例:
db.user.update({name: '张三'},{$rename: {nickName: 'nick'}});

删除文档

deleteOne函数

作用:删除一个满足添加的数据

语法:db.<collectionName>.deleteOne({<query>})

db.user.deleteOne({'name':'张三'});

deleteMany函数

作用:删除所有满足添加的数据

语法:db.<collectionName>.deleteMany({<query>})

db.user.deleteMany({'name':'张三'});

删除文档还有一个remove函数,但已过时,官方推荐用deleteOne()和deleteMany()函数来实现删除操作。且在4.0版本中,remove函数并不会真正的释放存储空间,需要使用db.repairDatabase()函数来释放存储空间。

七、内置函数

aggregate函数

MongoDB中聚合的方法使用aggregate

语法:db.<collectionName>.aggregate(<agg_options>)

agg_options:数组类型参数,传入具体的聚合表达式,此参数代表聚合规则,如计算总和、平均值、最大最小值等。

求和$sum

语法:db.collectionName.aggregate([{"$group":{"_id":<field/null>, "<aggeName>":{"$sum":"$<field>"}}}])

  • $group:分组,代表聚合的分组条件
  • _id:分组的字段。相当于SQL分组语法group by column中的column部分。如果根据某字段的值分组,则定义为_id:'$field'。如果不需要分组则为_id:null
  • $sum:求和表达式。相当于SQL中的sum函数
  • $<filed>:代表文档中的需要求和字段
示例:
db.user.aggregate([{$group:{_id: null, sum_age: {$sum: '$age'}}}]);     --不分组求和
db.user.aggregate([{$group:{_id: '$name', sum_age: {$sum: '$age'}}}]);  --以name分组之后求和

统计$sum

示例:
db.user.aggregate([{$group:{_id: null, count: {'$sum': 1}}}]);     --不分组统计总数
db.user.aggregate([{$group:{_id: '$name', count: {'$sum': 1}}}]);  --以name分组之后统计总数

最大值$max

示例:
db.user.aggregate([{$group:{_id: null, count: {'$max': '$age'}}}]);     --不分组统计最大值
db.user.aggregate([{$group:{_id: '$name', count: {'$max': '$age'}}}]);  --以name分组之后统计最大值

最小值$min

示例:
db.user.aggregate([{$group:{_id: null, count: {'$min': '$age'}}}]);     --不分组统计最小值
db.user.aggregate([{$group:{_id: '$name', count: {'$min': '$age'}}}]);  --以name分组之后统计最小值

平均值$avg

示例:
db.user.aggregate([{$group:{_id: null, age_avg: {'$avg': '$age'}}}]);     --不分组统计平均值
db.user.aggregate([{$group:{_id: '$name', age_avg: {'$min': '$age'}}}]);  --以name分组之后统计平均值

字符串拼接

语法:db.collection.aggregate([{"$project":{"<result_name>":{"$concat":["$<field>",...]}}}])

连接字段必须是字符串,否则报错

$project:管道,进行字符串拼接处理,日期处理等操作的函数

示例:
db.user.aggregate([{$project: {'name-age':{$concat:['$name', '-', '$nickName']}}}]);

字符串转大写

示例:
db.user.aggregate([{$project: {nameUpper:{$toUpper:'$nickName'}}}]);

字符串转小写

示例:
db.user.aggregate([{$project: {nameUpper:{$toLower:'$nickName'}}}]);

截取字符串

示例:
db.user.aggregate([{$project: {nameSub:{$substr:['$nickName', 0, 3]}}}]);

日期格式化

示例:
db.user.insert({"birthDate":ISODate('2020-01-01T10:10:10.000Z')})
db.user.aggregate([{$project: {birth: {$dateToString: {format: '%Y年%m月%d日 %H:%M:%S', date: '$birthDate'}}}}]);

条件过滤

$match:匹配条件,放在前面,相当于SQL中的where子句,代表聚合之前进行条件筛选。放在后面,相当于SQL中的having子句。代表聚合之后进行条件筛选,只能筛选聚合结果,不能筛选聚合条件。

示例:
db.user.aggregate([{$match: {age: {$lt: 20}}},{$group:{_id: null, count: {$sum: 1}}}]);

八、运算符

在MongoDB中,数学类型(int/long/double)和日期类型(date)可以做数学运行。日期只能做加减。

加法

db.user.aggregate([{$project:{name: 1, new_age: {$add: ['$age', 1]}}}]);

减法

db.user.aggregate([{$project:{name: 1, new_age: {$subtract: ['$age', 1]}}}]);

乘法

db.user.aggregate([{$project:{name: 1, new_age: {$multiply: ['$age', 1]}}}]);

除法

db.user.aggregate([{$project:{name: 1, new_age: {$divide: ['$age', 1]}}}]);

取模

db.user.aggregate([{$project:{name: 1, new_age: {$mod: ['$age', 2]}}}]);

九、索引

在MongoDB3版本后,创建集合时默认为系统主键字段_id创建索引。且在关闭_id索引创建时会有警告提示。因为_id字段不创建索引,会导致Secondary在同步数据时负载变高。

创建索引

语法:db.<collectionName>.ensureIndex(<keys>, <options>)

  • keys:用于创建索引的列及索引数据的排序规则。如:并升序索引db.<collectionName>.ensureIndex({<keyName>:1})、降序索引db.<collectionName>.ensureIndex({<keyName>:-1})

  • options:创建索引时可定义的索引参数。可选参数如下

    参数 类型 描述
    background Boolean 默认false。建索引过程会阻塞其它数据库操作,background可指定以后台方式创建索引
    unique Boolean 默认false。建立的索引是否唯一,指定为true创建唯一索引
    name string 索引的名称。如果未指定默认生成<key>_<1/-1>的名称
    sparse Boolean 默认false,对文档中不存在的字段数据不启用索引
    expireAfterSeconds integer 指定一个以秒为单位的数值,完成 TTL设定,设定索引的生存时间

    如:db.<collectionName>.ensureIndex({<key>:1}, {'background':true})

查看索引

查看集合的索引信息

语法:db.<collectionName>.getIndexes()

查看索引键

语法:db.<collectionName>.getIndexKeys()

查看索引详情

语法:db.<collectionName>.getIndexSpecs();

查看索引占用空间

语法:db.<collectionName>.totalIndexSize([is_detail])

is_detail为false则只显示索引的总大小,为true显示该集合中每个索引的大小及总大小

删除指定索引

语法:db.<collectionName>.dropIndex('<indexName>')

删除集合的所有自建索引

语法:db.<collectionName>.dropIndexes()

此函数只删除自建索引,不会删除MongoDB创建的_id索引

重建索引

在MongoDB中使用reIndex函数重建索引。重建索引可以减少索引存储空间,减少索引碎片,优化索引查询效率。一般在数据大量变化后,会使用重建索引来提升索引性能。

语法:db.<collectionName>.reIndex()

索引类型

MongoDB支持多种类型的索引,包括单字段索引、复合索引、多key索引、文本索引等,每种类型的索引有不同的使用场合

单字段索引

db.<collectionName>.ensureIndex({<field>:1});

上述语句针对field创建了单字段索引,其能加速对此字段的各种查询请求,是最常见的索引形式。MongoDB默认创建的id索引也是这种类型。

交叉索引

为一个集合的多个字段分别建立索引,在查询的时候通过多个字段作为查询条件,这种情况称为交叉索引。

db.<collectionName>.ensureIndex({<field_1>:1});

db.<collectionName>.ensureIndex({<field_2>:1});

交叉索引的查询效率较低,例如db.<collectionName>.find({<field_1>:<value_1>, <field_2>: <value_2>})。在使用时,当查询使用到多个字段的时候,尽量使用复合索引,而不是交叉索引

复合|组合|聚合索引

针对多个字段联合创建索引,先按第一个字段排序,第一个字段相同的文档按第二个字段排序

db.<collectionName>.ensureIndex({<field_1>:1, <field_2>:1})

使用复合索引是需要注意最左前缀原则

多key索引

当索引的字段为数组时,创建出的索引称为多key索引,多key索引会为数组的每个元素建立一条索引

语法和建立一般索引一致db.<collectionName>.ensureIndex( {<arrayFiled>: 1} )

唯一索引

保证索引对应的字段不会出现相同的值,比如_id索引就是唯一索引。如果唯一索引所在字段有重复数据写入时,抛出异常

db.<collectionName>.ensureIndex({<field_1>: 1}, {unique: true})

部分索引

只针对符合某个特定条件的文档建立索引。部分索引就是带有过滤条件的索引,即索引只存在与某些文档之上

如:db.<collectionName>.ensureIndex({<field_1>: 1},{partialFilterExpression: {field_1: {$gt: <filterValue>}}})

只有当字段field_1大于指定的filterValue才建立索引,且field_1查询值比filterValue大时才生效

部分索引只为集合中那些满足指定的筛选条件的文档创建索引。如果你指定的partialFilterExpression和唯一约束、那么唯一性约束只适用于满足筛选条件的文档。具有唯一约束的部分索引不会阻止不符合唯一约束且不符合过滤条件的文档的插入。

查询计划

语法:db.<collectionName>.find(<findContent>).explain()

winningPlain.stageCOLLSCAN则为全表扫描,IXSCAN时使用了索引

十、集群

复制集(Replication Set)

MongoDB的复制至少需要两个节点。其中一个是主节点,负责处理客户端请求,其余的都是从节点,负责复制主节点上的数据。建议提供仲裁节点,此节点不存储数据,作用是当主节点出现故障时,选举出某个备用节点成为主节点,保证MongoDB的正常服务。客户端只需要访问主节点或从节点,不需要访问仲裁节点。

主节点记录在其上的所有操作oplog(操作日志),从节点定期轮询主节点获取这些操作,然后对自己的数据副本执行这些操作,从而保证从节点的数据与主节点一致。

MongoDB各个节点常见的搭配方式为:一主一从一仲裁、一主多从一仲裁。

环境准备

在一台主机上模拟使用单主机多端口的方式搭建复制集,1主2备1仲裁

①、创建数据目录

mkdir -p /opt/module/mongodb-replication-set/data/db-primary  主节点
mkdir -p /opt/module/mongodb-replication-set/data/db-s0       从节点
mkdir -p /opt/module/mongodb-replication-set/data/db-s1       从节点
mkdir -p /opt/module/mongodb-replication-set/data/db-arbiter  仲裁节点

②、创建配置目录

mkdir /opt/module/mongodb-replication-set/etc    创建配置目录
mkdir /opt/module/mongodb-replication-set/log    创建日志目录
mkdir /opt/module/mongodb-replication-set/pids   创建进程文件目录

③、配置文件

# Primary配置
vim /opt/module/mongodb-replication-set/etc/mongo-primary.conf
# 配置如下:
dbpath=/opt/module/mongodb-replication-set/data/db-primary          数据库目录
logpath=/opt/module/mongodb-replication-set/log/primary.log         日志文件
pidfilepath=/opt/module/mongodb-replication-set/pids/primary.pid    进程描述文件
bind_ip_all=true
directoryperdb=true                     为数据库自动提供重定向
logappend=true                          日志追加写入
replSet=rs                              复制集名称,一个复制集中的多个节点命名一致
port=37010                              端口
oplogSize=10000                         操作日志容量
fork=true                               后台启动

# Secondary-0配置
vim /opt/module/mongodb-replication-set/etc/mongo-s0.conf
# 配置如下:
dbpath=/opt/module/mongodb-replication-set/data/db-s0
logpath=/opt/module/mongodb-replication-set/log/secondary-0.log
pidfilepath=/opt/module/mongodb-replication-set/pids/secondary-0.pid
bind_ip_all=true
directoryperdb=true
logappend=true
replSet=rs
port=37011
oplogSize=10000
fork=true

# Secondary-1配置
vim /opt/module/mongodb-replication-set/etc/mongo-s1.conf
# 配置如下:
dbpath=/opt/module/mongodb-replication-set/data/db-s1
logpath=/opt/module/mongodb-replication-set/log/secondary-1.log
pidfilepath=/opt/module/mongodb-replication-set/pids/secondary-1.pid
bind_ip_all=true
directoryperdb=true
logappend=true
replSet=rs
port=37012
oplogSize=10000
fork=true

# Arbiter配置
vim /opt/module/mongodb-replication-set/etc/mongo-arbiter.conf
# 配置如下:
dbpath=/opt/module/mongodb-replication-set/data/db-arbiter
logpath=/opt/module/mongodb-replication-set/log/db-arbiter.log
pidfilepath=/opt/module/mongodb-replication-set/pids/db-arbiter.pid
bind_ip_all=true
directoryperdb=true
logappend=true
replSet=rs
port=37013
oplogSize=10000
fork=true

启动各节点

①、启动

bin/mongod --config /opt/module/mongodb-replication-set/etc/mongo-primary.conf
bin/mongod --config /opt/module/mongodb-replication-set/etc/mongo-s0.conf
bin/mongod --config /opt/module/mongodb-replication-set/etc/mongo-s1.conf
bin/mongod --config /opt/module/mongodb-replication-set/etc/mongo-arbiter.conf

②、访问主节点

 bin/mongo --port 37010

③、初始化复制集

rs.initiate({
    # 复制集命名,与配置文件对应
    _id:"rs",
    members:[
        # _id:唯一标记,host:主机地址,priority:权重(数字越大优先级越高),arbiterOnly:是否是仲裁节点
        {_id:0,host:"127.0.0.1:37010",priority:3},
        {_id:1,host:"127.0.0.1:37011",priority:1},
        {_id:2,host:"127.0.0.1:37012",priority:1},
        {_id:3,host:"127.0.0.1:37013",arbiterOnly:true}
    ]
});

④、查看状态

rs.status();

④、查看当前连接节点是否是Primary节点

rs.isMaster();

总结

当主节点宕机时,仲裁节点会根据配置信息中的权重值优先选举权重高的节点作为主节点继续提供服务。当宕机的主节点恢复后,复制集会恢复原主节点状态,临时主节点重新成为从节点。默认情况下直接连接从节点是无法查询数据的。因为从节点是不可读的。如果需要在从节点上读取数据,则需要在从节点控制台输入命令rs.slaveOk([true|false])来设置。rs.slaveOk()rs.slaveOk(true)代表可以在从节点上做读操作;rs.slaveOk(false)代表不可在从节点上做读操作。

分片集群(shard cluster)

在Mongodb里面存在另一种集群,就是分片技术,可以满足MongoDB数据量大量增长的需求。当MongoDB存储海量的数据时,一台机器可能不足以存储数据,也可能不足以提供可接受的读写吞吐量。这时,我们就可以通过在多台机器上分割数据,使得数据库系统能存储和处理更多的数据。

sharding方案将整个数据集拆分成多个更小的chunk,并分布在集群中多个mongod节点上,最终达到存储和负载能力扩容、压力分流的作用。在sharding架构中,每个负责存储一部分数据的mongod节点称为shard(分片),shard上分布的数据块称为chunk,collections可以根据shard key(称为分片键)将数据集拆分为多个chunks,并相对均衡的分布在多个shards上。

各术语解释

Shard

用于存储实际的数据块,实际生产环境中一个shard server角色可由几台机器组个一个replica set承担,防止主机单点故障

Config Server

mongod实例,存储了整个ClusterMetadata,其中包括chunk信息。

Routers

前端路由,客户端由此接入,且让整个集群看上去像单一数据库,前端应用可以透明使用。

Shard Key

数据的分区根据shard key,对于每个需要sharding的collection,都需要指定shard key(分片键);分片键必须是索引字段或者为组合索引的左前缀;mongodb根据分片键将数据分成多个chunks,并将它们均匀分布在多个shards节点上。目前,mongodb支持两种分区算法:区间分区(Range)和哈希(Hash)分区。

Range分区

首先shard key必须是数字类型或字符串类型(字符串类型根据索引排序作为分裂依据),整个区间的上下边界分别为正无穷大、负无穷大,每个chunk覆盖一段子区间,即整体而言,任何shard key均会被某个特定的chunk所覆盖。区间均为左闭右开。每个区间均不会有重叠覆盖,且互相临近。当然chunk并不是预先创建的,而是随着chunk数据的增大而不断split。

Hash分区

计算shard key的hash值(64位数字),并以此作为Range来分区。Hash值具有很强的散列能力,通常不同的shard key具有不同的hash值(冲突是有限的),这种分区方式可以将document更加随机的分散在不同的chunks上。

搭建分片集群

在一台主机上模拟使用单主机多端口的方式搭建集群,两个复制集(1主1备1仲裁),三个配置服务器(1主2备),一个路由节点

环境准备
# 分片0的3个节点的数据目录,RS复制集,1主1备1仲裁
mkdir -p /opt/module/mongodb-cluster/data/rs0/primary
mkdir -p /opt/module/mongodb-cluster/data/rs0/slave
mkdir -p /opt/module/mongodb-cluster/data/rs0/arbiter

# 分片1的3个节点的数据目录,RS复制集,1主1备1仲裁
mkdir -p /opt/module/mongodb-cluster/data/rs1/primary
mkdir -p /opt/module/mongodb-cluster/data/rs1/slave
mkdir -p /opt/module/mongodb-cluster/data/rs1/arbiter

# 配置服务器的3个节点的数据目录,RS复制集,1主2备
mkdir -p /opt/module/mongodb-cluster/data/cf/primary
mkdir -p /opt/module/mongodb-cluster/data/cf/s0
mkdir -p /opt/module/mongodb-cluster/data/cf/s1

# 创建配置目录
mkdir -p /opt/module/mongodb-cluster/etc
mkdir -p /opt/module/mongodb-cluster/log
mkdir -p /opt/module/mongodb-cluster/pids
搭建Shard
# RS0的Primary配置
vim /opt/module/mongodb-cluster/etc/rs0-primary.conf
# 配置如下:
shardsvr=true                                        代表当前节点是一个shard节点。
dbpath=/opt/module/mongodb-cluster/data/rs0/primary
logpath=/opt/module/mongodb-cluster/log/rs0-primary.log
pidfilepath=/opt/module/mongodb-cluster/pids/rs0-primary.pid
bind_ip_all=true
logappend=true
replSet=rs0
port=27010
oplogSize=10000
fork=true

# RS0的Secondary配置
vim /opt/module/mongodb-cluster/etc/rs0-slave.conf
# 配置如下:
dbpath=/opt/module/mongodb-cluster/data/rs0/slave
logpath=/opt/module/mongodb-cluster/log/rs0-slave.log
pidfilepath=/opt/module/mongodb-cluster/pids/rs0-slave.pid
bind_ip_all=true
shardsvr=true
logappend=true
replSet=rs0
port=27011
oplogSize=10000
fork=true

# RS0的arbiter配置
vim /opt/module/mongodb-cluster/etc/rs0-arbiter.conf
# 配置如下:
dbpath=/opt/module/mongodb-cluster/data/rs0/arbiter
logpath=/opt/module/mongodb-cluster/log/rs0-arbiter.log
pidfilepath=/opt/module/mongodb-cluster/pids/rs0-arbiter.pid
bind_ip_all=true
shardsvr=true
logappend=true
replSet=rs0
port=27012
oplogSize=10000
fork=true

# RS1的Primary配置
vim /opt/module/mongodb-cluster/etc/rs1-primary.conf
# 配置如下:
shardsvr=true
dbpath=/opt/module/mongodb-cluster/data/rs1/primary
logpath=/opt/module/mongodb-cluster/log/rs1-primary.log
pidfilepath=/opt/module/mongodb-cluster/pids/rs1-primary.pid
bind_ip_all=true
logappend=true
replSet=rs1
port=27020
oplogSize=10000
fork=true

# RS1的Secondary配置
vim /opt/module/mongodb-cluster/etc/rs1-slave.conf
# 配置如下:
dbpath=/opt/module/mongodb-cluster/data/rs1/slave
logpath=/opt/module/mongodb-cluster/log/rs1-slave.log
pidfilepath=/opt/module/mongodb-cluster/pids/rs1-slave.pid
bind_ip_all=true
shardsvr=true
logappend=true
replSet=rs1
port=27021
oplogSize=10000
fork=true

# RS1的arbiter配置
vim /opt/module/mongodb-cluster/etc/rs1-arbiter.conf
# 配置如下:
dbpath=/opt/module/mongodb-cluster/data/rs1/arbiter
logpath=/opt/module/mongodb-cluster/log/rs1-arbiter.log
pidfilepath=/opt/module/mongodb-cluster/pids/rs1-arbiter.pid
bind_ip_all=true
shardsvr=true
logappend=true
replSet=rs1
port=27022
oplogSize=10000
fork=true
启动Shard
bin/mongod --config etc/rs0-primary.conf
bin/mongod --config etc/rs0-slave.conf
bin/mongod --config etc/rs0-arbiter.conf
bin/mongod --config etc/rs1-primary.conf
bin/mongod --config etc/rs1-slave.conf
bin/mongod --config etc/rs1-arbiter.conf
配置Shard
bin/mongo --port 27010
rs.initiate({
    _id:"rs0",
    members:[
        {_id:0,host:"127.0.0.1:27010",priority:2},
        {_id:1,host:"127.0.0.1:27011",priority:1},
        {_id:3,host:"127.0.0.1:27012",arbiterOnly:true}
    ]
});

bin/mongo --port 27020
rs.initiate({
    _id:"rs1",
    members:[
        {_id:0,host:"127.0.0.1:27020",priority:2},
        {_id:1,host:"127.0.0.1:27021",priority:1},
        {_id:3,host:"127.0.0.1:27022",arbiterOnly:true}
    ]
});
搭建Config Server

config server的复制集中不允许有单仲裁节点。复制集初始化命令中,不允许设置arbiterOnly:true参数

# Config Server的Primary配置
vim /opt/module/mongodb-cluster/etc/cf-primary.conf
# 配置如下:
dbpath=/opt/module/mongodb-cluster/data/cf/primary
logpath=/opt/module/mongodb-cluster/log/cf-primary.log
pidfilepath=/opt/module/mongodb-cluster/pids/cf-primary.pid
bind_ip_all=true
logappend=true
replSet=cf
port=27030
oplogSize=10000
fork=true
configsvr=true                     代表当前节点是一个配置服务节点

# Config Server的s0配置
vim /opt/module/mongodb-cluster/etc/cf-s0.conf
# 配置如下:
dbpath=/opt/module/mongodb-cluster/data/cf/s0
logpath=/opt/module/mongodb-cluster/log/cf-s0.log
pidfilepath=/opt/module/mongodb-cluster/pids/cf-s0.pid
bind_ip_all=true
logappend=true
replSet=cf
port=27031
oplogSize=10000
fork=true
configsvr=true

# Config Server的s1配置
vim /opt/module/mongodb-cluster/etc/cf-s1.conf
# 配置如下:
dbpath=/opt/module/mongodb-cluster/data/cf/s1
logpath=/opt/module/mongodb-cluster/log/cf-s1.log
pidfilepath=/opt/module/mongodb-cluster/pids/cf-s1.pid
bind_ip_all=true
logappend=true
replSet=cf
port=27032
oplogSize=10000
fork=true
configsvr=true
启动Config Server
bin/mongod --config etc/cf-primary.conf
bin/mongod --config etc/cf-s0.conf
bin/mongod --config etc/cf-s1.conf
配置Config Server
bin/mongo --port 27030
rs.initiate({
    _id:"cf",
    members:[
        {_id:0,host:"127.0.0.1:27030",priority:2},
        {_id:1,host:"127.0.0.1:27031",priority:1},
        {_id:3,host:"127.0.0.1:27032",priority:1}
    ]
});
搭建Router

生成环境中通常提供多个,使用keepalive和haproxy实现高可用。router不需要数据库目录,不需要配置dbpath。

vim /opt/module/mongodb-cluster/etc/rt.conf
# 配置如下:
configdb=cf/127.0.0.1:27030,127.0.0.1:27031,127.0.0.1:27032
logpath=/opt/module/mongodb-cluster/log/rt.log
pidfilepath=/opt/module/mongodb-cluster/pids/rt.pid
port=27040
fork=true
bind_ip_all=true
启动Router

注意使用的是mongos命令

bin/mongos --config etc/rt.conf
配置Router
# 连接
bin/mongo --port 27040

# 进入admin库
use admin

# 加入分片信息
db.runCommand({'addShard':'rs0/127.0.0.1:27010,127.0.0.1:27011,127.0.0.1:27012'});
db.runCommand({'addShard':'rs1/127.0.0.1:27020,127.0.0.1:27021,127.0.0.1:27022'});
开启Shard

首先需要将Database开启sharding,否则数据仍然无法在集群中分布。即数据库、collection默认为non-sharding。对于non-sharding的database或者collection均会保存在primary shard上,直到开启sharding才会在集群中分布。

# 创建测试库
use test
# 开启Shard,开启分片命令必须在admin库下运行。
use admin
db.runCommand({ enablesharding: 'test'})

# collection开启sharding,在此之前需要先指定shard key和建立"shard key索引"
use test
db.users.ensureIndex({'_id':'hashed'});
db.runCommand({ shardcollection: 'test.users', key: {'_id': 'hashed'}})

# users集合将使用"_id"作为第一维shard key,采用hashed分区模式,可以通过sh.status()查看每个chunk的分裂区间
sh.status()

在GridFS开启Shard

db.runCommand( { shardCollection : "test.fs.chunks" , key : {  files_id : 1 } } )
# 在GridFS中,对chunks集合进行分片时,只有两个片键可以选择,{ files_id : 1 , n : 1 } 与 {  files_id : 1 }

十一、Spring Data Mongodb

第一步:引入依赖

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>

第二步:配置YAML

spring:
  data:
    mongodb:
      database: test
      host: 192.168.22.161
      port: 27017
      # 如果需要认证需要下面三行
      authentication-database: admin
      password: root
      username: root

第三步:建立实体类

@Document
@Data
// 如果是集群模式需要指定Shard 
@Sharded(shardKey = {"_id"}, shardingStrategy = ShardingStrategy.HASH)
public class Student {

  // 指定ID,如果是String或者ObjectId可自动生成ID
  // 注意String使用自动生成时更新操作无法进行
  @MongoId
  private ObjectId id;

  // 如果字段名和数据库名称不一致需要指定
  @Field(name = "name")
  private String name;

  private Integer age;

  private Boolean isMan;

  private List<String> course;
}

创建文档

// 添加文档
public void addDocument(){
  Student student = new Student();
  student.setName("kun");
  student.setAge(18);
  student.setIsMan(true);

  Student res = mongoTemplate.insert(student);
  // ID不存在时Save可用于新增文档
  // Student res = mongoTemplate.save(student);

  System.out.println(res);
}

// 批量添加文档
public void addDocuments(){
  List<Student> students = new ArrayList<>();

  Student student = new Student();
  student.setName("kun");
  student.setAge(18);
  student.setIsMan(true);
  student.setCourse(Arrays.asList("spring", "java", "mysql"));
  students.add(student);

  student = new Student();
  student.setName("jack");
  student.setAge(18);
  student.setIsMan(true);
  student.setCourse(Arrays.asList("php", "mysql"));
  students.add(student);

  student = new Student();
  student.setName("lily");
  student.setAge(16);
  student.setIsMan(false);
  student.setCourse(Arrays.asList("vue", "php"));
  students.add(student);

  student = new Student();
  student.setName("jane");
  student.setAge(18);
  student.setIsMan(false);
  student.setCourse(Arrays.asList("html", "js"));
  students.add(student);

  mongoTemplate.insertAll(students);
}

更新文档

  • 使用save修改(覆盖更新)

      public void saveUpdateDocument(){
        Student student = new Student();
        student.setId(new ObjectId("601253d33f108b62de913d54"));
        student.setName("kun");
        student.setAge(20);
        // ID存在时Save可用于更新文档
        Student res = mongoTemplate.save(student);
        System.out.println(res);
      }
    
  • 使用运算符实现(表达式更新)

      public void expressUpdateDocument(){
        // 查询对象
        Query query = new Query();
        Criteria criteria = Criteria.where("name").is("kun");
        query.addCriteria(criteria);
        
        // 更新字段
        Update update = new Update();
        update.inc("age", 1);
        
        // updateFirst为更新第一个匹配文档
        UpdateResult updateResult = mongoTemplate.updateMulti(query, update, Student.class);
        System.out.println(updateResult.getModifiedCount());
      }
    

删除文档

  • 根据主键删除

      public void deleteDocument(){
        Student student = new Student();
        student.setId(new ObjectId("601253d33f108b62de913d54"));
        DeleteResult deleteResult = mongoTemplate.remove(student);
        System.out.println(deleteResult.getDeletedCount());
      }
    
  • 根据条件进行删除

      public void deleteDocumentByCondition(){
        Criteria criteria = Criteria.where("name").is("kun");
        DeleteResult deleteResult = mongoTemplate.remove(Query.query(criteria), Student.class);
        System.out.println(deleteResult.getDeletedCount());
      }
    

查询文档

// 查询全部文档
public void findAll(){
  List<Student> students = mongoTemplate.findAll(Student.class);
}

// 查询单个对象,如果有多个结果只返回第一个
public void findOne(){
  Criteria criteria = Criteria.where("name").is("kun");
  Student student = mongoTemplate.findOne(Query.query(criteria),Student.class);
}

// 带有条件的查询多个
public void findByCondition(){
  Query query = new Query(Criteria.where("age").gte(3));
  List<Student> students = mongoTemplate.find(query, Student.class);
}

// 根据主键进行查询
public void findById(){
  Student student = mongoTemplate.findById(new ObjectId("601266467bb99058c4fc58d6"), Student.class);
}

// 根据字段是否为空进行查询
public void findByIsNull(){
  Query query = new Query(Criteria.where("course").exists(true));
  List<Student> students = mongoTemplate.find(query, Student.class);
}

// 根据正则查询
public void findByReg(){
  Criteria criteria = Criteria.where("name").regex(".*n$");
  List<Student> students = mongoTemplate.find(Query.query(criteria), Student.class);
}

// 多条件查询
public void findByMultiAndCondition(){
  Criteria criteria = new Criteria();
  Criteria ageCriteria = Criteria.where("age").lt(20);
  Criteria isManCriteria = Criteria.where("isMan").is(false);
  criteria.andOperator(ageCriteria, isManCriteria);
  List<Student> students = mongoTemplate.find(Query.query(criteria), Student.class);
}
public void findByMultiOrCondition(){
  Criteria criteria = new Criteria();
  Criteria ageCriteria = Criteria.where("age").lt(20);
  Criteria isManCriteria = Criteria.where("isMan").is(true);
  criteria.orOperator(ageCriteria, isManCriteria);
  List<Student> students = mongoTemplate.find(Query.query(criteria), Student.class);
  students.stream().forEach(System.out::println);
}

// 查询去重复结果
public void findDistinct(){
  // 第一个参数: 查询条件query
  // 第二个参数: 根据哪个属性去重复
  // 第三个参数: 属性所在实体类
  // 第四个参数: 属性的类型,此类型作为结果中List集合的泛型
  List<Integer> ages = mongoTemplate.findDistinct(new Query(), "age", Student.class, Integer.class);
}

// 结果排序
public void findSort(){
  Query query = new Query();
  query.with(Sort.by(Sort.Direction.DESC, "age"));
  List<Student> students = mongoTemplate.find(query, Student.class);
}

// 结果分页
public void findPage(){
  Query query = new Query();
  // 第一个参数Page,从0开始
  // 第二个参数Size
  query.with(PageRequest.of(1, 2));
  List<Student> students = mongoTemplate.find(query, Student.class);
}

聚合操作

// 统计总数
public void aggCount(){
  TypedAggregation<Student> aggregation = TypedAggregation.newAggregation(
    Student.class,                                                                     
    Aggregation.group().count().as("count"));
  AggregationResults<Map> results = mongoTemplate.aggregate(aggregation, Map.class);
  System.out.println(results.getUniqueMappedResult());
  System.out.println(results.getUniqueMappedResult().get("count"));
}

// 分组统计总数
public void aggCountByGroup() {
  TypedAggregation<Student> aggregation = TypedAggregation.newAggregation(
    Student.class,
    Aggregation.group("name").count().as("count"));
  AggregationResults<Map> results = mongoTemplate.aggregate(aggregation, Map.class);
  // 返回的_id 代表分组字段,如果字段有多个则_id为Map(key为字段名,value为值)
  for (Map mappedResult : results.getMappedResults()) {
    System.out.println(mappedResult.get("_id") + " - " + mappedResult.get("count"));
  }
}

// 具有条件的分组
public void aggCount() {
  TypedAggregation<Student> aggregation = TypedAggregation.newAggregation(
    Student.class,
    Aggregation.match(Criteria.where("age").gt(16)),
    Aggregation.group().count().as("count"));
  AggregationResults<Map> results = mongoTemplate.aggregate(aggregation, Map.class);
  System.out.println(results.getUniqueMappedResult().get("count"));
}

GridFS操作

GirdFS是MongoDB提供的用于持久化存储文件的模块。在GridFS存储文件是将文件分块存储,文件会按照256KB的大小分割成多个块进行存储,GridFS使用两个集合(collection)存储文件,从GridFS中读取文件要对文件的各各块进行组装、合并。

  • 一个集合是chunks,用于存储文件的二进制数据
  • 一个集合是files,用于存储文件的元数 据信息(文件名称、块大小、上传时间等信息)
// 上传文件
public void gridFsUpload() throws FileNotFoundException {
  //选择要存储的文件
  File file = new File("C:\\Users\\kun\\Desktop\\TIM截图.png");
  InputStream inputStream = new FileInputStream(file);
  //存储文件并起名称
  ObjectId objectId = gridFsTemplate.store(inputStream, "TIM截图.png");
  String id = objectId.toString();
  //获取到文件的id,可以从数据库中查找
  System.out.println(id);
}

// 查询下载文件
public void queryFile() throws IOException {
  // 根据id查找文件
  // GridFSFile gridFSFile = gridFsTemplate.findOne(
  //    new Query(Criteria.where("_id").is("60136de452d6203936d1b035")));
  GridFSFile gridFSFile = gridFsTemplate.findOne(
    new Query(Criteria.where("filename").is("TIM截图.png")));
  //打开下载流对象
  GridFSDownloadStream gridFS = gridFSBucket.openDownloadStream(gridFSFile.getObjectId());
  //创建gridFsSource,用于获取流对象
  GridFsResource gridFsResource = new GridFsResource(gridFSFile,gridFS);
  File file = new File("D:\\" + gridFsResource.getFilename());
  IoUtil.copy( gridFsResource.getInputStream(), new FileOutputStream(file));
}

//删除文件
@Test
public void delFile() throws IOException {
  //根据文件id删除fs.files和fs.chunks中的记录
  gridFsTemplate.delete(Query.query(Criteria.where("_id").is("6013678f7d212159fe3a0a64")));
}