Coding & Life

求知若饥,虚心若愚

0%

逻辑卷(LVM)是在硬盘分区和文件系统之间添加的一个逻辑层,为文件系统屏蔽下层硬盘分区部局,并提供一个抽象的盘卷,在盘卷上建立文件系统。可以利用LVM在硬盘不用重新分区的情况下动态调整文件系统的大小,并且利用LVM管理的文件系统可以跨越物理硬盘。当服务器添加新的硬盘后,管理员不必将原有的文件移动到新的硬盘上,而是通过LVM直接扩展文件系统来跨越物理硬盘

在Debian系统中使用lvs命令,如果找不到,说明未安装LVM系统,使用sudo apt-get install lvm2命令进行安装,CentOS一般默认安装LVM

创建分区

使用fdisk -l命令查看现有硬盘信息

发现有一个vdb硬盘未创建分区,为其创建3个分区,大小分别是10G、13G、27G,使用fdisk /dev/vdb命令

再次使用fdisk -l命令查看硬盘及分区信息,发现vdb硬盘下已有3个分区

创建物理卷

CentOS7以上系统,在创建卷组时会自动创建物理卷,可以省略此步骤

1
pvcreate /dev/vdb1 /dev/vdb2 /dev/vdb3

创建卷组

将3个分区中的1和2分区纳入到卷组中,卷组中共23G空间

1
vgcreate vg1 /dev/vdb1 /dev/vdb2

创建卷组时并不是只能指定分区,可以直接使用硬盘,如:vgcreate vg1 /dev/vdb,并且CentOS7以上的系统可以在创建卷组时自动创建物理卷,可以跳过创建物理卷的过程。例如

创建逻辑卷

1
lvcreate -L 10G -n lv1 vg1

逻辑卷大小10G,逻辑卷名是lv1,从vg1卷组中创建

如果要把卷组中所有的容量加入到逻辑卷中,使用如下命令:

1
lvcreate -l 100%FREE -n lv1 vg1

为逻辑卷创建文件系统

1
mkfs.xfs /dev/vg1/lv1

如果提示命令不存在,那么请安装xfs文件系统

1
sudo apt-get install xfsprogs

挂载逻辑卷

1
mount /dev/vg1/lv1 /mnt/test

开机自动挂载

/etc/fstab用于存放文件系统信息,当系统启动时,系统会自动读取文件内容将指定的文件系统挂载到指定的目录

1
/dev/vg1/lv1 /mnt/test xfs defaults 0 0

第一个字段:要挂载的设备路径

第二个字段:挂载点目录

第三个字段:设备文件系统类型

第四、五、六个字段,不知道什么意思,直接填写即可…

填写完成后测试内容是否正确:

  1. 将逻辑卷卸载

    1
    umount /mnt/test
  2. 依照配置文件/etc/fstab的数据将所有未挂载的磁盘都挂载上来

    1
    mount -a
  3. 查看是否重新挂载

    1
    df -Th

扩展逻辑卷

逻辑卷的空间来源于卷组,当卷组有足够的空间时,才可以扩展逻辑卷。卷组中一共有23G空间,lv1占用了10G,可以扩展

1
lvextend -L +5G /dev/vg1/lv1

同样,如果将卷组中剩余的所有空间都扩展到逻辑卷中,使用如下命令:

1
lvextend -l +100%FREE /dev/vg1/lv1

使用lvs查看,发现逻辑卷已有15G空间

此时我们查看文件系统大小,仍然是10G

这种情况需要扩展文件系统

1
xfs_growfs /mnt/test

扩展卷组

卷组的空间来源于物理分区或者物理硬盘,当卷组没有足够空间提供给逻辑卷时,须扩容卷组。我们之前把vdb1和vdb2两个分区加入到卷组,现在我们把vdb3扩展到卷组中

1
vgextend vg1 /dev/vdb3

删除逻辑卷

逻辑卷的删除不允许联机操作,需要先卸载,再执行删除

删除顺序:先删除逻辑卷,再删除卷组,最后删除物理卷

卸载

1
umount /mnt/test

删除逻辑卷

1
lvremove /dev/vg1/lv1

删除卷组

1
vgremove vg1

删除物理卷

删除分区

pic

在涉及跨系统接口调用时,我们容易碰到以下安全问题:

  • 请求身份被伪造。
  • 请求参数被篡改。
  • 请求被抓包,然后重放攻击。

需求场景

假设我们有如下业务需求:

用户在 A 系统参与活动成功后,活动奖励以余额的形式下发到 B 系统。

初始方案:裸奔

在不考虑安全问题的情况下,我们很容易完成这个需求:

  1. 在B系统开放一个接口

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    /**
    * 为指定用户添加指定余额
    *
    * @param userId 用户 id
    * @param money 要添加的余额,单位:分
    * @return /
    */
    @RequestMapping("addMoney")
    public SaResult addMoney(long userId, long money) {
    // 处理业务
    // ...

    // 返回
    return SaResult.ok();
    }
  2. 在 A 系统使用 http 工具类调用这个接口。

    1
    2
    3
    long userId = 10001;
    long money = 1000;
    String res = HttpUtil.request("http://b.com/api/addMoney?userId=" + userId + "&money=" + money);

上述代码简单的完成了需求,但是很明显它有安全问题

方案升级:增加secretKey校验

为防止 B 系统开放的接口被陌生人任意调用,我们增加一个 secretKey 参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 为指定用户添加指定余额
@RequestMapping("addMoney")
public SaResult addMoney(long userId, long money, String secretKey) {
// 1、先校验 secretKey 参数是否正确,如果不正确直接拒绝响应请求
if( ! check(secretKey) ) {
return SaResult.error("无效 secretKey,无法响应请求");
}

// 2、业务代码
// ...

// 3、返回
return SaResult.ok();
}

由于 A 系统是我们 “自己人”,所以它可以拿着 secretKey 进行合法请求:

1
2
3
4
5
long userId = 10001;
long money = 1000;
String secretKey = "xxxxxxxxxxxxxxxxxxxx";
String res = HttpUtil.request("http://b.com/api/addMoney?userId=" + userId + "&money=" + money + "&secretKey=" + secretKey);

现在,即使 B 系统的接口被暴露了,也不会被陌生人任意调用了,安全性得到了一定的保证,但是仍然存在一些问题:

  • 如果请求被抓包,secretKey 就会泄露,因为每次请求都在 url 中明文传输了 secretKey 参数。
  • 如果请求被抓包,请求的其它参数就可以被任意修改,例如可以将 money 参数修改为 9999999,B系统无法确定参数是否被修改过。

方案再升级:使用摘要算法生成参数签名

首先,在 A 系统不要直接发起请求,而是先计算一个 sign 参数:

1
2
3
4
5
6
7
8
9
10
11
// 声明变量
long userId = 10001;
long money = 1000;
String secretKey = "xxxxxxxxxxxxxxxxxxxx";

// 计算 sign 参数
String sign = md5("money=" + money + "&userId=" + userId + "&key=" + secretKey);

// 将 sign 拼接在请求地址后面
String res = HttpUtil.request("http://b.com/api/addMoney?userId=" + userId + "&money=" + money + "&sign=" + sign);

注意此处计算签名时,需要将所有参数按照字典顺序依次排列(key除外,挂在最后面)以下所有计算签名时同理,不再赘述。

然后在 B 系统接收请求时,使用同样的算法、同样的秘钥,生成 sign 字符串,与参数中 sign 值进行比较:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 为指定用户添加指定余额
@RequestMapping("addMoney")
public SaResult addMoney(long userId, long money, String sign) {

// 在 B 系统,使用同样的算法、同样的密钥,计算出 sign2,与传入的 sign 进行比对
String sign2 = md5("money=" + money + "&userId=" + userId + "&key=" + secretKey);
if(!sign2.equals(sign)) {
return SaResult.error("无效 sign,无法响应请求");
}

// 2、业务代码
// ...

// 3、返回
return SaResult.ok();
}

因为 sign 的值是由 userId、money、secretKey 三个参数共同决定的,所以只要有一个参数不一致,就会造成最终生成 sign 也是不一致的,所以,根据比对结果:

  • 如果sign一致,说明这是个合法请求
  • 如果sign不一致,说明发起请求的客户端密钥不正确,或者请求的参数被篡改过,是个不合法请求。

此方案有点:

  • 不在url中直接传递secretKey参数了,避免了泄露风险
  • 由于sign参数的限制,请求中的参数也不可被篡改,B系统可放心的使用这些参数

此方案扔存在以下缺陷:

  • 被抓包后,请求可以被无限重放,B系统无法判断请求是真正来自于A系统,还是被抓包后重放的

方案再再升级:追加nonce随机字符串

首先,在 A 系统发起调用前,追加一个 nonce 参数,一起参与到签名中:

1
2
3
4
5
6
7
8
9
10
11
12
// 声明变量
long userId = 10001;
long money = 1000;
String nonce = SaFoxUtil.getRandomString(32); // 随机32位字符串
String secretKey = "xxxxxxxxxxxxxxxxxxxx";

// 计算 sign 参数
String sign = md5("money=" + money + "&nonce=" + nonce + "&userId=" + userId + "&key=" + secretKey);

// 将 sign 拼接在请求地址后面
String res = HttpUtil.request("http://b.com/api/addMoney?userId=" + userId + "&money=" + money + "nonce=" + nonce + "&sign=" + sign);

然后在 B 系统接收请求时,也把 nonce 参数加进去生成 sign 字符串,进行比较:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 为指定用户添加指定余额
@RequestMapping("addMoney")
public SaResult addMoney(long userId, long money, String nonce, String sign) {

// 1、检查此 nonce 是否已被使用过了
if(CacheUtil.get("nonce_" + nonce) != null) {
return SaResult.error("此 nonce 已被使用过了,请求无效");
}

// 2、验证签名
String sign2 = md5("money=" + money + "&nonce=" + nonce + "&userId=" + userId + "&key=" + secretKey);
if(!sign2.equals(sign)) {
return SaResult.error("无效 sign,无法响应请求");
}

// 3、将 nonce 记入缓存,防止重复使用
CacheUtil.set("nonce_" + nonce, "1");

// 4、业务代码
// ...

// 5、返回
return SaResult.ok();
}

代码分析:

  • 为方便理解,我们先看第 3 步:此处在校验签名成功后,将 nonce 随机字符串记入缓存中。
  • 再看第 1 步:每次请求进来,先查看一下缓存中是否已经记录了这个随机字符串,如果是,则立即返回:无效请求。

这两步的组合,保证了一个 nonce 随机字符串只能被使用一次,如果请求被抓包后重放,是无法通过nonce校验的。

至此,问题似乎已经解决了……吗?

别急,我们还有一个问题没有考虑:这个 nonce 在字符串在缓存应该被保存多久呢?

  • 保存 15 分钟?那抓包的人只需要等待 15 分钟,你的 nonce 记录在缓存中消失,请求就可以被重放了。
  • 那保存 24 小时?保存一周?保存半个月?好像无论保存多久,都无法从根本上解决这个问题。

你可能会想到,那我永久保存吧。这样确实能解决问题,但显然服务器承载不了这么做,即使再微小的数据量,在时间的累加下,也总一天会超出服务器能够承载的上限。

方案再再再升级:追加timestamp时间戳

我们可以再追加一个 timestamp 时间戳参数,将请求的有效性限定在一个有限时间范围内,例如 15分钟。

首先,在 A 系统追加 timestamp 参数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 声明变量
long userId = 10001;
long money = 1000;
String nonce = SaFoxUtil.getRandomString(32); // 随机32位字符串
long timestamp = System.currentTimeMillis(); // 随机32位字符串
String secretKey = "xxxxxxxxxxxxxxxxxxxx";

// 计算 sign 参数
String sign = md5("money=" + money + "&nonce=" + nonce + "&timestamp=" + timestamp + "&userId=" + userId + "&key=" + secretKey);

// 将 sign 拼接在请求地址后面
String res = HttpUtil.request("http://b.com/api/addMoney" +
"?userId=" + userId + "&money=" + money + "&nonce=" + nonce + "&timestamp=" + timestamp + "&sign=" + sign);

在 B 系统检测这个 timestamp 是否超出了允许的范围

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 为指定用户添加指定余额
@RequestMapping("addMoney")
public SaResult addMoney(long userId, long money, long timestamp, String nonce, String sign) {

// 1、检查 timestamp 是否超出允许的范围(此处假定最大允许15分钟差距)
long timestampDisparity = System.currentTimeMillis() - timestamp; // 实际的时间差
if(timestampDisparity > 1000 * 60 * 15) {
return SaResult.error("timestamp 时间差超出允许的范围,请求无效");
}

// 2、检查此 nonce 是否已被使用过了
// 代码同上,不再赘述

// 3、验证签名
// 代码同上,不再赘述

// 4、将 nonce 记入缓存,ttl 有效期和 allowDisparity 允许时间差一致
CacheUtil.set("nonce_" + nonce, "1", 1000 * 60 * 15);

// 5、业务代码 ...

// 6、返回
return SaResult.ok();
}

至此,抓包者:

  • 如果在 15 分钟内重放攻击,nonce 参数不答应:缓存中可以查出 nonce 值,直接拒绝响应请求。
  • 如果在 15 分钟后重放攻击,timestamp 参数不答应:超出了允许的 timestamp 时间差,直接拒绝响应请求。

服务器的时钟差异造成安全问题

以上的代码,均假设 A 系统服务器与 B 系统服务器的时钟一致,才可以正常完成安全校验,但在实际的开发场景中,有些服务器会存在时钟不准确的问题。

假设 A 服务器与 B 服务器的时钟差异为 10 分钟,即:在 A 服务器为 8:00 的时候,B 服务器为 7:50。

  1. A 系统发起请求,其生成的时间戳也是代表 8:00。
  2. B 系统接受到请求后,完成业务处理,此时 nonce 的 ttl 为 15分钟,到期时间为 7:50 + 15分 = 8:05。
  3. 8.05 后,nonce 缓存消失,抓包者重放请求攻击:
  • timestamp 校验通过:因为时间戳差距仅有 8.05 - 8.00 = 5分钟,小于 15 分钟,校验通过。
  • nonce 校验通过:因为此时 nonce 缓存已经消失,可以通过校验。
  • sign 校验通过:因为这本来就是由 A 系统构建的一个合法签名。
  • 攻击完成。

要解决上述问题,有两种方案:

  • 方案一:修改服务器时钟,使两个服务器时钟保持一致。
  • 方案二:在代码层面兼容时钟不一致的场景。

要采用方案一的同学可自行搜索一下同步时钟的方法,在此暂不赘述,此处详细阐述一下方案二。

我们只需简单修改一下,B 系统校验参数的代码即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 为指定用户添加指定余额
@RequestMapping("addMoney")
public SaResult addMoney(long userId, long money, long timestamp, String nonce, String sign) {

// 1、检查 timestamp 是否超出允许的范围 (重点一:此处需要取绝对值)
long timestampDisparity = Math.abs(System.currentTimeMillis() - timestamp);
if(timestampDisparity > 1000 * 60 * 15) {
return SaResult.error("timestamp 时间差超出允许的范围,请求无效");
}

// 2、检查此 nonce 是否已被使用过了
// 代码同上,不再赘述

// 3、验证签名
// 代码同上,不再赘述

// 4、将 nonce 记入缓存,防止重复使用(重点二:此处需要将 ttl 设定为允许 timestamp 时间差的值 x 2 )
CacheUtil.set("nonce_" + nonce, "1", (1000 * 60 * 15) * 2);

// 5、业务代码 ...

// 6、返回
return SaResult.ok();
}

以上代码中时间差的计算改为当前时间和请求时间的绝对值计算,同时将缓存时间设置为2倍时间差,这样就可以保证时间戳的计算和缓存nonce有一项能在异常请求时拦截成功

最终方案

此处再贴一下完整的代码。

A 系统(发起请求端):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 声明变量
long userId = 10001;
long money = 1000;
String nonce = SaFoxUtil.getRandomString(32); // 随机32位字符串
long timestamp = System.currentTimeMillis(); // 当前时间戳
String secretKey = "xxxxxxxxxxxxxxxxxxxx";

// 计算 sign 参数
String sign = md5("money=" + money + "&nonce=" + nonce + "&timestamp=" + timestamp + "&userId=" + userId + "&key=" + secretKey);

// 将 sign 拼接在请求地址后面
String res = HttpUtil.request("http://b.com/api/addMoney" +
"?userId=" + userId + "&money=" + money + "&nonce=" + nonce + "&timestamp=" + timestamp + "&sign=" + sign);

B 系统(接收请求端):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 为指定用户添加指定余额
@RequestMapping("addMoney")
public SaResult addMoney(long userId, long money, long timestamp, String nonce, String sign) {

// 1、检查 timestamp 是否超出允许的范围
long allowDisparity = 1000 * 60 * 15; // 允许的时间差:15分钟
long timestampDisparity = Math.abs(System.currentTimeMillis() - timestamp); // 实际的时间差
if(timestampDisparity > allowDisparity) {
return SaResult.error("timestamp 时间差超出允许的范围,请求无效");
}

// 2、检查此 nonce 是否已被使用过了
if(CacheUtil.get("nonce_" + nonce) != null) {
return SaResult.error("此 nonce 已被使用过了,请求无效");
}

// 3、验证签名
String sign2 = md5("money=" + money + "&nonce=" + nonce + "&timestamp=" + timestamp + "&userId=" + userId + "&key=" + secretKey);
if(!sign2.equals(sign)) {
return SaResult.error("无效 sign,无法响应请求");
}

// 4、将 nonce 记入缓存,防止重复使用,注意此处需要将 ttl 设定为允许 timestamp 时间差的值 x 2
CacheUtil.set("nonce_" + nonce, "1", allowDisparity * 2);

// 5、业务代码 ...

// 6、返回
return SaResult.ok();
}

可以使用Sa-Token提供的API接口参数签名插件快速完成以上需求

MongoDB是一个基于分布式文件存储的数据库。由C++语言编写。旨在为WEB应用提供可扩展高性能数据存储解决方案

MongoDB是一个介于关系数据库和非关系数据库之间的产品,是非关系数据库当中功能最丰富,最像关系数据库的。他支持的数据结构非常松散,是类似json的bson格式,因此可以存储比较复杂的数据类型 。Mongo最大的特点是他支持的查询语言非常强大,其语法有点类似于面向对象的查询语言,几乎可以实现类似关系数据库单表查询的绝大部分功能,而且还支持对数据建立索引

总结: mongoDB 是一个非关系型文档数据库

副本集

说明

https://docs.mongodb.com/manual/replication/

MongoDB 副本集(Replica Set)是有自动故障恢复功能的主从集群,有一个Primary节点和一个或多个Secondary节点组成。副本集没有固定的主节点,当主节点发生故障时整个集群会选举一个主节点为系统提供服务以保证系统的高可用。

Automatic Failover

​自动故障转移机制: 当主节点未与集合的其他成员通信超过配置的选举超时时间(默认为 10 秒)时,合格的辅助节点将调用选举以将自己提名为新的主节点。集群尝试完成新主节点的选举并恢复正常操作。

搭建副本集

演示环境使用Debian 10.13MongoDB v5.0.14

创建数据目录

1
2
3
4
# 在安装目录中创建
- mkdir -p ../repl/data1
- mkdir -p ../repl/data2
- mkdir -p ../repl/data3

搭建副本集

1
2
3
4
5
./mongod --port 27017  --dbpath ../repl/data1 --bind_ip 0.0.0.0 --replSet  myreplace/124.239.149.46:27018,124.239.149.46:27019

./mongod --port 27018 --dbpath ../repl/data2 --bind_ip 0.0.0.0 --replSet myreplace/124.239.149.46:27019,124.239.149.46:27017

./mongod --port 27019 --dbpath ../repl/data3 --bind_ip 0.0.0.0 --replSet myreplace/124.239.149.46:27017,124.239.149.46:27018

配置副本集

连接任意节点,切换到adminuse admin

初始化副本集

1
2
3
4
5
6
7
8
9
10
var config = {
_id:"myreplace",
members:[
{_id:0,host:"124.239.149.46:27017"},
{_id:1,host:"124.239.149.46:27018"},
{_id:2,host:"124.239.149.46:27019"}
]
}

rs.initiate(config);//初始化配置

设置从节点客户端临时访问

1
rs.secondaryOk();

分片集群

说明

https://docs.mongodb.com/manual/sharding/

分片(sharding)是指将数据拆分,将其分散存在不同机器的过程,有时也用分区(partitioning)来表示这个概念,将数据分散在不同的机器上,不需要功能强大的大型计算机就能存储更多的数据,处理更大的负载。

​分片目的是通过分片能够增加更多机器来应对不断的增加负载和数据,还不影响应用运行。

​MongoDB支持自动分片,可以摆脱手动分片的管理困扰,集群自动切分数据做负载均衡。MongoDB分片的基本思想就是将集合拆分成多个块,这些块分散在若干个片里,每个片只负责总数据的一部分,应用程序不必知道哪些片对应哪些数据,甚至不需要知道数据拆分了,所以在分片之前会运行一个路由进程,mongos进程,这个路由器知道所有的数据存放位置,应用只需要直接与mongos交互即可。mongos自动将请求转到相应的片上获取数据,从应用角度看分不分片没有什么区别。

架构

  • Shard: 用于存储实际的数据块,实际生产环境中一个shard server角色可由几台机器组个一个replica set承担,防止主机单点故障

  • Config Server:mongod实例,存储了整个 ClusterMetadata。

  • Query Routers: 前端路由,客户端由此接入,且让整个集群看上去像单一数据库,前端应用可以透明使用。

  • Shard Key: 片键,设置分片时需要在集合中选一个键,用该键的值作为拆分数据的依据,这个片键称之为(shard key),片键的选取很重要,片键的选取决定了数据散列是否均匀。

搭建

集群规划

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Shard Server 1:27017
Shard Repl 1:27018

Shard Server 2:27019
Shard Repl 2:27020

Shard Server 3:27021
Shard Repl 3:27022

Config Server :27023
Config Server :27024
Config Server :27025

Route Process :27026

进入bin目录创建数据目录

1
2
3
4
5
6
7
8
9
10
11
12
mkdir -p ../cluster/shard/s0
mkdir -p ../cluster/shard/s0-repl

mkdir -p ../cluster/shard/s1
mkdir -p ../cluster/shard/s1-repl

mkdir -p ../cluster/shard/s2
mkdir -p ../cluster/shard/s2-repl

mkdir -p ../cluster/shard/config1
mkdir -p ../cluster/shard/config2
mkdir -p ../cluster/shard/config3

启动分片服务

启动s0,r0
1
2
3
./mongod --port 27017 --dbpath ../cluster/shard/s0 --bind_ip 0.0.0.0 --shardsvr --replSet r0/124.239.149.46:27018

./mongod --port 27018 --dbpath ../cluster/shard/s0-repl --bind_ip 0.0.0.0 --shardsvr --replSet r0/124.239.149.46:27017
配置
1
2
3
4
5
6
7
8
9
use admin

config = {_id: "r0", members: [
{_id:0,host:"124.239.149.46:27017"},
{_id:1,host:"124.239.149.46:27018"},
]
}

rs.initiate(config);//初始化
启动s1,r1
1
2
3
./mongod --port 27019 --dbpath ../cluster/shard/s1 --bind_ip 0.0.0.0 --shardsvr --replSet r1/124.239.149.46:27020

./mongod --port 27020 --dbpath ../cluster/shard/s1-repl --bind_ip 0.0.0.0 --shardsvr --replSet r1/124.239.149.46:27019
配置
1
2
3
4
5
6
7
8
9
use admin

config = {_id: "r1", members: [
{_id:0,host:"124.239.149.46:27019"},
{_id:1,host:"124.239.149.46:27020"},
]
}

rs.initiate(config);//初始化
启动s2,r2
1
2
3
./mongod --port 27021 --dbpath ../cluster/shard/s2 --bind_ip 0.0.0.0 --shardsvr --replSet r2/124.239.149.46:27022

./mongod --port 27022 --dbpath ../cluster/shard/s2-repl --bind_ip 0.0.0.0 --shardsvr --replSet r2/124.239.149.46:27021
配置
1
2
3
4
5
6
7
8
9
use admin

config = {_id: "r2", members: [
{_id:0,host:"124.239.149.46:27021"},
{_id:1,host:"124.239.149.46:27022"},
]
}

rs.initiate(config);//初始化

启动3个config服务

1
2
3
4
5
./mongod --port 27023 --dbpath ../cluster/shard/config1 --bind_ip 0.0.0.0 --replSet  config/124.239.149.46:27024,124.239.149.46:27025 --configsvr

./mongod --port 27024 --dbpath ../cluster/shard/config2 --bind_ip 0.0.0.0 --replSet config/124.239.149.46:27023,124.239.149.46:27025 --configsvr

./mongod --port 27025 --dbpath ../cluster/shard/config3 --bind_ip 0.0.0.0 --replSet config/124.239.149.46:27023,124.239.149.46:27024 --configsvr

初始化 config server 副本集

1
2
3
4
5
6
7
8
9
10
11
12
13
use admin 

config = {
_id:"config",
configsvr: true,
members:[
{_id:0,host:"124.239.149.46:27023"},
{_id:1,host:"124.239.149.46:27024"},
{_id:2,host:"124.239.149.46:27025"}
]
}

rs.initiate(config); //初始化副本集配置

启动 mongos 路由服务

1
./mongos --port 27026 --configdb config/124.239.149.46:27023,124.239.149.46:27024,124.239.149.46:27025 --bind_ip 0.0.0.0

添加分片信息

1
2
3
4
5
6
7
8
9
10
11
# 登录mongos服务
./mongo --port 27026
use admin
# 添加分片信息
db.runCommand({ addshard:"r0/124.239.149.46:27017,124.239.149.46:27018", "allowLocal":true });
db.runCommand({ addshard:"r1/124.239.149.46:27019,124.239.149.46:27020", "allowLocal":true });
db.runCommand({ addshard:"r2/124.239.149.46:27021,124.239.149.46:27022", "allowLocal":true });
# 指定分片库
db.runCommand({ enablesharding: "baizhi" });
# 设置库的片键信息
db.runCommand({ shardcollection: "baizhi.emps", key: { _id: "hashed"}})

测试分片

1
2
3
4
5
6
7
8
9
# 连接mongos
./mongo --port 27026
# 切换库
use baizhi
# 插入1000条信息
for(let i = 0; i < 1000; i++) {
db.emps.insert({_id: i, name: "xxx_" + i});
}
# 登录3个分片分别查询插入的数量

其他资料

MongoDB权威指南
MongoDB基本语法
MongoDB笔记

MySQL是一种关系型数据库,适合存储结构化数据,而JSON是一种非结构化格式,通常使用mongoDB等存储。但是在5.7之后,MySQL也开始支持JSON操作。此文章使用的版本是8.0.30

使用JSON数据类型的优点

  • 不用预创建字段: 字段可以无限扩展,更加灵活;
  • 处理稀疏字段: 避免了某些很少使用字段的NULL值,避免了冗余存储;
  • 支持索引。支持JSON之前,可以使用VARCHAR存储,但是不支持索引等查询优化

实战

创建测试表

1
2
3
4
5
6
CREATE TABLE `user` (
`id` bigint NOT NULL AUTO_INCREMENT,
`name` varchar(20) NOT NULL DEFAULT '',
`details` json DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci

插入数据

1
2
3
4
5
insert into user(name, details) values ('jack', '{"phone": "13100000000", "age": 30, "address": {"country": "CN", "province": "浙江省", "city": "杭州市"}}');
insert into user(name, details) values ('Linda', '{"phone": "music", "age": 25, "address": {"country": "CN", "province": "河北省"}}');
insert into user(name, details) values ('Lily', '{"age": 30, "address": {"country": "CN", "province": "广东省", "city": "广州市"}}');
insert into user(name, details) values ('Lisa', '{"likes": ["music", "basketball"], "likes2": ["music"]}');
insert into user(name, details) values ('Kobe', '{"likes": ["art"]}');

完成数据准备之后的数据如图:

查询

  1. 使用JSON_EXTRACT查询JSON中的某个字段
1
select name, JSON_EXTRACT(details, '$.address.city') city from user;

使用符号,同样可以替代JSON_EXTRACT函数的作用。->查询出的带有双引号,->>查询出的没有双引号

  1. 使用JSON中的字段作为查询条件
    1
    select name, details from user where details -> '$.address.city' = '杭州市';

  1. JSON_CONTAINS函数,查询JSON文档是否在指定path包含指定的数据,包含则返回1,否则返回0;如果有参数为NULL或者path不存在,则返回NULL

使用方式:JSON_CONTAINS(json_doc, val [,path]), 第二个参数'val'表示val为整型,'"val"'表示val为字符串类型,第二个参数最外层必须使用''

例子2中的查询可以等同于:

1
select name, details from user where JSON_CONTAINS(details, '"杭州市"', '$.address.city');

查询爱好包含音乐的用户信息

1
select name, details from user where JSON_CONTAINS(details, '"music"', '$.likes');

1
select JSON_CONTAINS(details, '"杭州市"', '$.address.city') from user;

  1. JSON_SEARCH函数,返回字符串匹配的路径

使用方式: JSON_SEARCH(json_doc, one_or_all, search_str[, escape_char[, path] ...])

  • one: The search terminates after the first match and returns one path string. It is undefined which match is considered first. 搜索在第一次匹配后终止,并返回一个路径字符串
  • all: The search returns all matching path strings such that no duplicate paths are included. If there are multiple strings, they are autowrapped as an array. The order of the array elements is undefined. 搜索返回所有匹配的路径字符串
1
select json_search(details, 'all', 'music') from user;

1
select json_search(details, 'one', 'music') from user;

更新

JSON_SET更新如果key存在则覆盖,不存在则新增

1
2
select * from user where id = 1;
update user set details = JSON_SET(details, '$.phone', '13127366399', '$.email', 'wwyknight@163.com') where id = 1;

JSON_INSERT只是插入数据,不会替换已经存在的值

1
update user set details = JSON_INSERT(details, '$.sex', '男', '$.age', '20') where id = 1;

JSON_REPLACE只是替换已经存在的值,不存在的无影响

1
update user set details = JSON_REPLACE(details, '$.age', 20, '$.height', 180) where id = 1;

总结:

  • JSON_INSERT 只新增,不更新
  • JSON_REPLACE 只更新,不新增
  • JSON_SET 既新增,也更新

删除

1
update user set details = JSON_REMOVE(details, '$.email') where id = 1;

其他函数

JSON_KEYS() 获取JSON文档中所有的key

JSON_LENGTH() 给出JSON文档中的key的个数

JSON_MERGE() 合并

参考https://dev.mysql.com/doc/refman/8.0/en/json.html

数据安全重要性无需多言,本文讲解如何进行MySQL的备份与恢复

全量备份

逻辑备份

可以使用mysqldump直接备份所有库或者某一个库或者某一个库中的某个表

备份所有库

1
mysqldump -uroot -p --lock-all-tables --all-databases > all.sql

备份某一个库(test库)

1
mysqldump -uroot -p --lock-all-tables --databases test > test.sql

备份某一个库(test)中某一张(hy_express)表

1
mysqldump -uroot -p --lock-all-tables test hy_express > test.press.sql

--lock-all-tables保证在导出过程中整个mysql实例加全局读锁,操作线上数据库谨慎添加该参数

增量备份

增量备份是指在一次全备份或上一次增量备份后,以后每次的备份只需备份与前一次相比增加或者被修改的文件。这就意味着,第一次增量备份的对象是进行全备后所产生的增加和修改的文件

增量备份是使用bin-log日志进行备份的,所以需要开启bin-log日志

开启bin-log

1
2
3
4
5
[mysqld]
log_bin=/var/mysql/mysql-bin
server-id=1
max_binlog_size=100m
binlog_format=row
  • log_bin日志路径,注意此路径必须mysql用户拥有读写权限
  • server-id服务唯一标识,可以随机填写,但不要重复
  • max_binlog_size单文件大小,超过此设置,生成新的文件。同时当MySQL数据库重启时,也会生成新的日志文件,文件序号递增
  • binlog_format设置记录模式。

binlog的一些常用操作

1
mysql> show master logs; #查看数据库所有日志文件

1
mysql> show binlog events in 'mysql-bin.000015'; #查看某个binlog文件信息

1
mysql> flush logs; #将缓存中的日志写磁盘,保存到当前binlog文件中,并产生一个新的binlog文件
1
mysql> flush logs; reset master; #删除所有binlog,并重新开始记录

实例

准备全量数据

1
2
3
4
mysql> create database t1;
mysql> use t1;
mysql> create table full(c1 int(10), c2 varchar(20)) engine=innodb;
mysql> insert into full values(1, 'full1'), (2, 'full2');

全量数据备份

  1. 备份前,数据库加读锁,防止数据在备份时写入

    1
    mysql> flush tables with read lock;
  2. 将binlog刷盘,写入当前binlog(mysql-bin.000001),再生成一个新的binlog

    1
    mysql> flush logs;
  3. 全量备份

    1
    [root] mysqldump -uroot -p --lock-all-tables --databases t1 > t1.sql
  4. 解除读锁

    1
    mysql> unlock tables;

至此,全量备份结束。将全量数据文件t1.sql保存即可。数据库再有新的数据更新,会记录在mysql-bin.000002文件中

准备增量数据

1
2
3
4
mysql> create database t2;
mysql> use t2;
mysql> create table increment(c1 int(10), c2 varchar(20)) engine=innodb;
mysql> insert into increment values(3, 'increment1'), (4, 'increment2');

将第一份增量数据进行备份

  1. 备份前,数据库加读锁,防止数据在备份时写入

    1
    mysql> flush tables with read lock;
  2. 将binlog刷盘,写入当前binlog(mysql-bin.000002),再生成一个新的binlog

    1
    mysql> flush logs;
  3. 将增备文件(mysql-bin.000002)直接复制保存即可

  4. 解除读锁

    1
    mysql> unlock tables;

准备第二份增量数据

1
2
mysql> use t2;
mysql> insert into increment values(5, 'increment3'), (6, 'increment4');

将第二份增量数据备份

步骤同上一步,生成mysql-bin.000003文件,复制保存

模拟故障

删除t1t2

1
2
mysql> drop database t1;
mysql> drop database t2;

数据恢复

还原全量备份数据

1
mysql -uroot -p < t1.sql;

此时,查看数据,恢复成功

还原增量备份数据

1
mysqlbinlog mysql-bin.000002 | mysql -uroot -p

查看结果,恢复成功

还原第二份增量

方法同上,查看数据

至此数据全部还原成功!

mysqlbinlog按照时间点和位置点恢复

按位置点恢复

1
mysqlbinlog --start-position=500 --stop-position=716 mysql-bin.000002 | mysql -uroot -p

按时间点恢复

1
mysqlbinlog --start-datetime='2022-07-29 15:45:59'  --stop-datetime='2022-07-29 15:46:24' mysql-bin.000002 | mysql -uroot -p

生产中建议定期使用全量备份,然后搭配增量备份的方式来保证数据的安全性。附上全量备份脚本,在进行全量备份后对binlog进行刷盘,这样方便搭配恢复。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#!/bin/bash
#获取当前时间
date_now=$(date "+%Y%m%d-%H%M%S")
backUpFolder=/root/backup/full
username="root"
password="xxx"
db_name="test"
#日志记录文件
logFile=/root/backup/full_bak.log
#定义备份文件名
fileName="${db_name}_${date_now}.sql"
#定义备份文件目录
backUpFileName="${backUpFolder}/${fileName}"
echo "starting backup mysql ${db_name} at ${date_now}." >> $logFile
/usr/bin/mysqldump -u${username} -p${password} --lock-all-tables --flush-logs --databases ${db_name} > ${backUpFileName}
#进入到备份文件目录
cd ${backUpFolder}
#压缩备份文件
tar zcvf ${fileName}.tar.gz ${fileName}

date_end=$(date "+%Y%m%d-%H%M%S")
echo "finish backup mysql database ${db_name} at ${date_end}. filename is ${backUpFileName}" >> $logFile
echo "" >> $logFile

在工作实践中发现,很多人在Spring项目开发中,如果@Transactional注解结合同步锁或者分布式锁时,经常会犯错,今天在此记录问题,分析原因同时给出解决方案

错误案例

数据库准备

新建一个sys_user表,同时把username字段设为唯一索引

业务逻辑

新建一个接口test,在接口上添加@Transactional注解,同时方法内添加一个synchronized同步锁。方法实现的是查询usernamea的用户是否存在,如果不存在插入一条usernamea的记录

测试问题

我们使用siege(一款压测工具)模拟并发请求

通过压测,发现其中有1个请求失败,此时观察java控制台输出


输出显示出现唯一索引冲突的问题导致插入失败。我们明明已经为方法整体增加了同步锁,为什么还会导致相同username执行插入的逻辑呢?

此处就是许多开发者经常犯的错误

原因分析

此时首先需要了解你当前使用数据库的事务隔离级别

我这里使用的是RR级别,如果有对事务隔离不清楚的可以查看我这篇文章

@Transactional是使用代理机制实现的数据库事务,有时间我会单独讲一下它的原理。它会在请求进入方法之前开启事务,同时方法结束之后提交事务(或者发生异常时回滚)

此案例中,当两个请求同时打来,会各自开启数据库事务,同时只有1个请求获得锁,执行锁内代码。由于锁内代码是包含在事务之内的,所以当执行完锁内代码,释放锁之后,另一个请求就可以获得锁,执行锁内代码。此时由于数据库事务的隔离性,第2个请求并不能查到第1个请求新增的数据(无论第1个请求是否进行了事务提交),所以它认为数据库并没有usernamea的数据,再次执行插入时,由于数据库的唯一索引,导致程序报错

解决方案

既然是锁包含在事务之内导致的问题,那我们就把事务包含在锁内就能解决这个问题,代码如下

我们抽象出一个addUser方法,该方法使用@Transactional注解。在调用该方法时,使用锁包围

通过多次测试,没有出现唯一索引冲突的问题

总结

@Transactional和同步锁一起使用时,一定要将事务的开启和关闭包含在锁内

现实企业级线上应用可能会发生内存泄露、线程死锁、Java进程消耗CPU过高等等情况,有的人遇到上述问题只是重启服务,而不会深究问题根源,导致此后问题会重复出现,所以理解并解决这些问题是Java高级开发必备要求,本文将介绍JVM性能监控工具,帮你解决这些问题

为什么java面试喜欢问垃圾回收以及内存模型这种问题?是属于面试八股文吗?

简历中你是否敢写熟悉GC常用算法,熟悉常见垃圾回收器,具有实际JVM调优实战经验?

常用命令

jps

jps(JVM Process Status Tool),它的功能和Linux中ps命令类似,可以列出正在运行的虚拟机进程

1
jps 列出运行中的JVM进程
1
jps -v 输出虚拟机进程启动时显式指定的JVM参数

jmap

jmap(Memory Map for Java)生成虚拟机的内存快照,当内存飙升时候可以使用jmap来查看

1
jmap -histo:live <pid> | more

打印堆的对象统计,包括对象数 、内存大小等。这个命令执行的话,JVM会先触发GC,然后再统计信息

参数 说明
num 序号
#instances 实例数量
#bytes 占用内存大小
class name 类名称
1
jmap -dump:live,format=b,file=xxx <pid>

在当前目录下导出一个存储当前堆内存信息的dump文件(dump文件是进程的内存镜像),文件中存储的是当时那一时刻的堆的信息

这个命令执行,JVM会把整个heap信息写入到一个文件,heap如果比较大的话,就会导致这个过程非常耗时,并且执行过程中为了保证dump信息是可靠的,所以会暂停应用,线上系统一定要慎用!

导出成功后,用分析工具(比如jhat MAT)导入文件即可以分析

因为jmap很耗时,建议在java项目启动时添加非标参数-XX:+HeapDumpOnOutOfMemoryError这样当溢出时会自动将内存快照打印

jstack

jstack(Stack Trace for Java)显示虚拟机的线程快照。线程快照就是当前虚拟机内每一条线程正在执行的方法堆栈的集合,生成线程快照的主要目的是定位线程出现长时间停顿的原因,如线程间死锁、死循环、请求外部资源导致的长时间等待等都是导致线程长时间停顿的常见原因。线程出现停顿的时候通过 jstack 来查看各个线程的调用堆栈,就可以知道没有响应的线程到底在后台做些什么事情,或者等待着什么资源

1
jstack <pid> 

参数 说明
http-nio-8088-Acceptor-0 线程名
prio java优先级
os_prio 操作系统优先级
tid java线程的id, threadId
nid 系统内核里面的线程唯一id

使用实例:查找线上Java进程导致CPU占用过高的问题

  1. 使用一段死循环模拟CPU高占用

  1. 通过top命令查看当前cpu占用率最高的进程

  2. 通过top -H -p 9522命令查看某个pid进程下各线程的cpu占用情况

可以看到线程id=9548cpu占用率最高,因为线程id在jstack中是以十六进制显示的,我们将9548转成十六进制是:0x254c

  1. 使用jstack 9522 | grep -10 '0x254c'找到进程的调用堆栈情况,并通过grep按照线程id搜索,返回匹配结果前后10行的数据

  1. 结果:图中可以看到JvmMonitorApplication类的第18行导致的问题,分析正确

通过Arthas可以更快速的定位,后面给出方法

jinfo

jinfo(Configuration Info for Java)的作用是实时查看和调整虚拟机各项参数。使用jps命令的-v参数可以查看虚拟机启动时显式指定的参数列表,但如果想知道未被显式指定的参数的系统默认值,就可以使用info进行查询

1
2
3
4
5
jinfo <pid> 查看所有参数

jinfo -flags <pid> 查看JVM参数

jinfo -sysprops <pid> 查看java系统参数

jstat

jstat(JVM Statistics Monitoring Tool)是用于监视虚拟机各种运行状态信息的命令行工具。它可以显示本地或者远程虚拟机进程中的类装载、内存、垃圾收集、JIT编译等运行数据,在没有GUI图形界面,只提供了纯文本控制台环境的服务器上,它将是运行期定位虚拟机性能问题的首选工具

  • class 用于查看类加载情况的统计
  • compiler 用于查看HotSpot中即时编译器编译情况的统计
  • gc 用于查看JVM中堆的垃圾收集情况的统计
  • gccapacity 用于查看新生代、老生代及持久代的存储容量情况
  • gcmetacapacity 显示metaspace的大小
  • gcnew 用于查看新生代垃圾收集的情况
  • gcnewcapacity 用于查看新生代存储容量的情况
  • gcold 用于查看老生代及持久代垃圾收集的情况
  • gcoldcapacity 用于查看老生代的容量
  • gcutil 显示垃圾收集信息
  • gccause 显示垃圾回收的相关信息(通-gcutil),同时显示最后一次仅当前正在发生的垃圾收集的原因
  • printcompilation 输出JIT编译的方法信息

例子: 查看垃圾回收统计

1
jstat -gc <pid> 1000 10
参数 说明
-gc 打印方式
进程id
1000 每1000ms打印一次
10 打印10次

参数 说明
S0C 第一个幸存区的大小
S1C 第二个幸存区的大小
S0U 第一个幸存区的使用大小
S1U 第二个幸存区的使用大小
EC 伊甸园区的大小
EU 伊甸园的使用大小
OC 老年代的大小
OU 老年代的使用大小
MC 方法区大小
MU 方法区的使用大小
CCSC 压缩类空间大小
CCSU 压缩类空间的使用大小
YGC 年轻代垃圾回收次数
YGCT 年轻代垃圾回收消耗的时间
FGC 老年代回收次数
FGCT 老年代垃圾回收消耗的时间
CGC 并发回收次数
CGCT 并发回收消耗的时间
GCT 垃圾回收消耗总时间

使用实例:写一个需要频繁GC的程序,用jstat观察GC情况

  1. 代码

  1. 观察状态信息

分析图中结果可以发现YGC(年轻代垃圾回收次数)增加的非常快,而FGC(老年代回收次数)不会执行

Arthas

简介

Arthas是Alibaba开源的Java诊断工具,深受开发者喜爱。

安装方式极为简单,按照首页引导即可,下面介绍简单使用

dashboard

当前系统的实时数据面板

thread

查看当前线程信息,查看线程的堆栈

实例1: 前文使用jstack查找线上Java进程导致CPU占用过高的问题,使用arthas会更加简单

  1. dashboard查询占用cpu较高线程
1
dashboard


发现线程id为20(注意这个ID不能跟jstack中的nativeID一一对应)的线程占用过高

  1. 查看线程堆栈
1
thread 20


图中分析正确

实例2: 查询java进程中的所有死锁信息

  1. 代码模拟死锁

  1. 启动Arthas,执行thread -b查看死锁位置


更多Arthas用法,去官网查询

同样使用jstack同样可以查找死锁

1
jstack <pid> | grep -10 'deadlock'

可以看到图中Thread-1想要的锁被Thread-2持有;Thread-2想要的锁被Thread-1持有

垃圾回收

什么是垃圾

垃圾就是在程序运行中没有任何指向的对象,这个对象就是需要被回收的垃圾

Java中的垃圾回收

Java不像C语言那样,需要自己手动垃圾回收,而是将运行时产生的垃圾交给JVM处理。在Java的内存模型中主要分为堆、方法区、虚拟机栈、本地方法栈、程序计数器。垃圾回收器主要负责对堆中产生的垃圾对象进行回收。

Java中堆内存一般分为新生代和老年代,根据各个年代的特点采用不同的收集算法。据调查统计新生代中的对象只会在一次垃圾回收后存活10%(少量存活),因此使用拷贝算法,老年代(对象存活率高)使用”标记清除”算法

垃圾搜索算法

引用计数算法(Reference Counting)

给对象添加一个引用计数器,每当一个地方引用时,计数器加1;当引用失效时,计数器减1。计数器为0时,即可被回收

根搜索算法(GC Root Tracing)

过一系列的名为GC Root的对象作为起始点,从这些节点开始向下搜索,搜索所有走过的路径称为引用链(Reference Chain),当一个对象到GC Root没有任何引用链相连时(用图论来说就是GC Root到这个对象不可达时),证明该对象是可以被回收的。

Java采用了根搜索算法

在Java中哪些对象可以成为GC Root?

  • 虚拟机栈(栈帧中的本地变量表)中的引用对象
  • 方法区中的类静态属性引用的对象
  • 方法区中的常量引用对象
  • 本地方法栈中JNI(即Native方法)的引用对象

垃圾清除算法

标记清除算法(Mark-Sweep)

最基础的垃圾处理器算法,分为“标记”和“清除”两个阶段。先标记需要回收的对象,标记完成后,统一回收掉所有被标记的对象。js通常采用此算法

缺点:

  1. 效率低,标记和清除的效率都不高
  2. 清除后会产生大量的不连续内存碎片,可能会导致在程序需要为较大对象分配内存时无法找到足够连续的内存,不得不提前触发垃圾回收动作。

拷贝算法(Copying)

将内存容量分成大小相等的两块,每次只使用其中一块,当一块用完时,将还存活的对象复制到另一块去,然后把之前使用满的那块空间一次性清理掉,如此反复

缺点: 内存空间浪费大,每次只能使用当前能够使用内存空间的一半;当对象存活率较高时,需要有大量的复制操作,效率低

标记整理算法(Mark-Compact)

标记整理是在标记-清除上改进得来,前面说到标记-清除内存碎片的问题,在标记-整理中有解决。同样有标记阶段,标记出所有需要回收的对象,但是不会直接清理,而是将存活的对象向一端移动,在移动过程中清理掉可回收对象

分代收集算法

  • 堆内存被划分为两块,一块是年轻代,另一块是老年代
  • 年轻代又分为EdenSurvivor他俩空间大小比例默认为8:2
  • 幸存区又分为s0s1这两个空间大小是一模一样的,就是一对双胞胎,他俩是1:1比例

回收过程

  1. 新生成的对象首先放到Eden区,当Eden区满了会触发Minor GC
  2. 第一步GC活下来的对象,会被移动到Survivor区中的S0区,S0区满了之后会触发Minor GC,S0区存活下来的对象会被移动到S1区,S0区空闲。S1满了之后再GC,存活下来的再次移动到S0区,S1区空闲,这样反反复复GC,每GC一次,对象的年龄就涨一岁,达到某个值(默认15)后,就会进入老年代
  3. 在发生一次Minor Gc后(前提条件),老年代可能会出现Major GC

Full GC触发条件

  • 手动调用System.gc,会不断的执行Full GC
  • 老年代空间不足/满了
  • 方法区空间不足/满了

pic

集群概念

Redis集群实现了对Redis的水平扩容,即启动N个Redis节点,将整个数据库分布存储在这N个节点中,每个节点存储总数据的1/N。每个节点负责一部分插槽(slot),注意在Redis Cluster中,只有mater才拥有插槽的所有权。

分片实现

Redis集群通过分片的方式来保存数据库中的键值对,集群的整个数据库被分为16384(0-16383)个槽,数据库中的每个键都属于这16384个槽的其中一个,集群中的每个节点可以处理0个或最多16384个。只有当数据库中的16384个槽都有节点在处理时,集群才处于上线状态。

集群搭建

我们在一台机器上使用6个端口,模拟集群搭建

安装Redis

下载redis安装包,进行解压,编译,安装。此处省略

集群配置

  1. 创建6个节点的配置文件目录conf,日志目录logs,数据存储目录data,如下命令:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
mkdir -p /usr/local/redis/redis_cluster/7001/conf/
mkdir -p /usr/local/redis/redis_cluster/7001/logs/
mkdir -p /usr/local/redis/redis_cluster/7001/data/

mkdir -p /usr/local/redis/redis_cluster/7002/conf/
mkdir -p /usr/local/redis/redis_cluster/7002/logs/
mkdir -p /usr/local/redis/redis_cluster/7002/data/

mkdir -p /usr/local/redis/redis_cluster/7003/conf/
mkdir -p /usr/local/redis/redis_cluster/7003/logs/
mkdir -p /usr/local/redis/redis_cluster/7003/data/

mkdir -p /usr/local/redis/redis_cluster/7004/conf/
mkdir -p /usr/local/redis/redis_cluster/7004/logs/
mkdir -p /usr/local/redis/redis_cluster/7004/data/

mkdir -p /usr/local/redis/redis_cluster/7005/conf/
mkdir -p /usr/local/redis/redis_cluster/7005/logs/
mkdir -p /usr/local/redis/redis_cluster/7005/data/

mkdir -p /usr/local/redis/redis_cluster/7006/conf/
mkdir -p /usr/local/redis/redis_cluster/7006/logs/
mkdir -p /usr/local/redis/redis_cluster/7006/data/
  1. 创建7001的配置文件,并添加如下内容
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 绑定服务器域名或IP地址
bind 127.0.0.1
# 设置端口,区分集群中Redis的实例
port 7001
# 后台运行
daemonize yes
# pid进程文件名,以端口号命名
pidfile /var/run/redis-7001.pid
# 日志文件名称,以端口号为目录来区分
logfile /usr/local/redis/redis_cluster/7001/logs/redis.log
# 数据文件存放地址,以端口号为目录名来区分
dir /usr/local/redis/redis_cluster/7001/data
# 启用集群
cluster-enabled yes
# 配置每个节点的配置文件,同样以端口号为名称
cluster-config-file nodes_7001.conf
# 配置集群节点的超时时间
cluster-node-timeout 15000
# 启动AOF增量持久化策略
appendonly yes
# 发生改变,则记录日志
appendfsync always

其他节点配置文件仿照7001分别创建

启动集群

  1. 启动每一个Redis节点
1
2
3
4
5
6
redis-server /usr/local/redis/redis_cluster/7001/conf/redis.conf
redis-server /usr/local/redis/redis_cluster/7002/conf/redis.conf
redis-server /usr/local/redis/redis_cluster/7003/conf/redis.conf
redis-server /usr/local/redis/redis_cluster/7004/conf/redis.conf
redis-server /usr/local/redis/redis_cluster/7005/conf/redis.conf
redis-server /usr/local/redis/redis_cluster/7006/conf/redis.conf
  1. 使用redis-cli创建Redis集群
1
redis-cli --cluster create 127.0.0.1:7001 127.0.0.1:7002 127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005 127.0.0.1:7006 --cluster-replicas 1
  • redis-cli --cluster代表集群操作命令
  • create 代表创建集群
  • --cluster-replicas 1 指定集群中每个mater的副本数为1,此时节点总数 ÷ (replicas + 1)得到的就是master的数量n。因此节点列表中的前n个就是master节点,其他节点都是slave节点,随机分配到不同master
  1. 查看刚创建的集群状态,如下命令:(在任一台机器中查看任一节点信息,会带出所有节点信息)
1
redis-cli --cluster check 127.0.0.1:7001

节点信息

  1. 测试集群是否正常

连接集群中任一节点,注意:集群操作时,需要给redis-cli加上-c参数才可以

1
redis-cli -c -p 7001

添加一个key进入集群

1
2
3
4
127.0.0.1:7001> set name wangweiye
-> Redirected to slot [5798] located at 127.0.0.1:7002
OK
127.0.0.1:7002>

可以看到,set之后,Redis会自动重定向到7002节点的5798插槽,接着我们进入7003节点,观察是否能查到此key

1
2
3
4
127.0.0.1:7003> get name
-> Redirected to slot [5798] located at 127.0.0.1:7002
"wangweiye"
127.0.0.1:7002>

出现以上结果,说明我们搭建的集群运作正常。当集群中某个master节点故障时,相应的slave节点会自动升级为master,保证集群的可靠性。当故障节点恢复正常,则变为slave节点提供副本职能,请自行测试

注意

Redis集群中每个实例会使用两个TCP端口,一个用于客户端(redis-cli或其他应用)通信,另一个用于集群中实例相互通信的总线端口,且第二个端口比第一个端口一定大1000。如果外网配置时,请注意网络的连通性

pic

在开发场景中,如果需要对接第三方系统的数据库,与自己的应用结合,这时就需要动态切换数据库进行对接

什么是多数据源

最常见的单一应用中最多涉及到一个数据库,既是一个数据源(DataSource)。那么顾名思义,多数据源就是在一个单一应用中涉及到了两个及以上的数据库。

其实在配置数据源的时候就已经很明确这个定义了,如以下代码:

1
2
3
4
5
6
7
8
9
@Bean(name = "dataSource")
public DataSource dataSource() {
DruidDataSource druidDataSource = new DruidDataSource();
druidDataSource.setUrl(url);
druidDataSource.setUsername(username);
druidDataSource.setDriverClassName(driverClassName);
druidDataSource.setPassword(password);
return druidDataSource;
}

urlusernamepassword这三个属性已经唯一确定了一个数据库了,DataSource则是依赖这三个创建出来的。则多数据源即是配置多个DataSource

应用场景

相信大多数做过医疗系统的都会和HIS打交道,为了简化护士以及医生的操作流程,必须要将必要的信息从HIS系统对接过来,对接方式可以通过HIS提供视图,比如医护视图,患者视图等,而此时其他系统只需要定时从HIS视图中读取数据同步到自己数据库中即可。这是就涉及到了至少两个数据库了,一个是HIS数据库,一个自己系统的数据库,在单一应用中必然需要用到多数据源切换才能达到目的。

整合单一的数据源

本文使用阿里的数据库连接池druid,添加依赖如下:

1
2
3
4
5
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.2.9</version>
</dependency>

Druid连接池的starter的自动配置类是DruidDataSourceAutoConfigure,类上标注如下一行注解:

1
@EnableConfigurationProperties({DruidStatProperties.class, DataSourceProperties.class})

@EnableConfigurationProperties这个注解使得配置文件中的配置生效并且映射到指定类的属性

DruidStatProperties中指定的前缀是spring.datasource.druid,这个配置主要是用来设置连接池的一些参数。

DataSourceProperties中指定的前缀是spring.datasource,这个主要是用来设置数据库的urlusernamepassword等信息。

因此我们只需要在全局配置文件中指定数据库的一些配置以及连接池的一些配置信息即可,前缀分别是spring.datasource.druidspring.datasource

在全局配置文件application.yml中配置数据库信息即可注入一个数据源到Spring Boot中。其实这仅仅是一种方式,下面介绍另外一种方式。

在自动配置类中DruidDataSourceAutoConfigure中有如下一段代码:

1
2
3
4
5
6
@Bean(initMethod = "init")
@ConditionalOnMissingBean
public DataSource dataSource() {
LOGGER.info("Init DruidDataSource");
return new DruidDataSourceWrapper();
}

@ConditionalOnMissingBean@Bean这两个注解的结合,意味着我们可以覆盖,只需要提前在IOC中注入一个DataSource类型的Bean即可。

因此我们在自定义的配置类中定义如下配置即可:

1
2
3
4
5
6
7
8
9
10
11
/**
* @Bean:向IOC容器中注入一个Bean
* @ConfigurationProperties:使得配置文件中以spring.datasource为前缀的属性映射到Bean的属性中
* @return
*/
@ConfigurationProperties(prefix = "spring.datasource")
@Bean
public DataSource dataSource(){
//做一些其他的自定义配置,比如密码加密等......
return new DruidDataSource();
}

以上介绍了两种数据源的配置方式,第一种比较简单,第二种适合扩展,按需选择

整合Mybatis

Spring Boot整合Mybatis其实很简单,简单的几步就搞定,首先添加依赖:

1
2
3
4
5
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.0.0</version>
</dependency>

第二步找到自动配置类MybatisAutoConfiguration,有如下一行代码:

1
@EnableConfigurationProperties({MybatisProperties.class})

老套路了,全局配置文件中配置前缀为mybatis的配置将会映射到该类中的属性

直接在全局配置文件配置各种属性是一种比较简单的方式,其实的任何组件的整合都有不少于两种的配置方式,下面来介绍下配置类如何配置。

MybatisAutoConfiguration自动配置类中有如下一段代码:

1
2
3
@Bean
@ConditionalOnMissingBean
public SqlSessionFactory sqlSessionFactory(DataSource dataSource) throws Exception {}

@ConditionalOnMissingBean@Bean真是老搭档了,意外着我们又可以覆盖,只需要在IOC容器中注入SqlSessionFactory(Mybatis六剑客之一生产者)

在自定义配置类中注入即可,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 注入SqlSessionFactory
*/
@Bean("sqlSessionFactory1")
public SqlSessionFactory sqlSessionFactory(DataSource dataSource) throws Exception {
SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
sqlSessionFactoryBean.setDataSource(dataSource);
sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath*:/mapper/**/*.xml"));
org.apache.ibatis.session.Configuration configuration = new org.apache.ibatis.session.Configuration();
// 自动将数据库中的下划线转换为驼峰格式
configuration.setMapUnderscoreToCamelCase(true);
configuration.setDefaultFetchSize(100);
configuration.setDefaultStatementTimeout(30);
sqlSessionFactoryBean.setConfiguration(configuration);
return sqlSessionFactoryBean.getObject();
}

以上介绍了配置Mybatis的两种方式,其实在大多数场景中使用第一种已经够用了,至于为什么介绍第二种呢?当然是为了多数据源的整合而做准备了。

MybatisAutoConfiguration中有一行很重要的代码,如下:

1
@ConditionalOnSingleCandidate(DataSource.class)

@ConditionalOnSingleCandidate这个注解的意思是当IOC容器中只有一个候选Bean的实例才会生效。言外之意就是当IOC容器中只有一个数据源DataSource,这个自动配置类才会生效

哦?照这样搞,多数据源是不是不能用Mybatis?

可能大家会有一个误解,认为多数据源就是多个的DataSource并存的,当然这样说也不是不正确。

多数据源的情况下并不是多个数据源并存的,Spring提供了AbstractRoutingDataSource这样一个抽象类,使得能够在多数据源的情况下任意切换,相当于一个动态路由的作用,作者称之为动态数据源。因此Mybatis只需要配置这个动态数据源即可。

什么是动态数据源

动态数据源简单的说就是能够自动切换的数据源,类似于一个动态路由的感觉,Spring提供了一个抽象类AbstractRoutingDataSource,这个抽象类中有一个属性,如下:

1
private Map<Object, Object> targetDataSources;

targetDataSources是一个Map结构,所有需要切换的数据源都存放在其中,根据指定的KEY进行切换。当然还有一个默认的数据源。

AbstractRoutingDataSource这个抽象类中有一个抽象方法需要子类实现,如下:

1
protected abstract Object determineCurrentLookupKey();

determineCurrentLookupKey()这个方法的返回值决定了需要切换的数据源的KEY,就是根据这个KEYtargetDataSources取值(数据源)

数据源切换如何保证线程隔离?

数据源属于一个公共的资源,在多线程的情况下如何保证线程隔离呢?不能我这边切换了影响其他线程的执行。

“说到线程隔离,自然会想到ThreadLocal了,将切换数据源的KEY(用于从targetDataSources中取值)存储在ThreadLocal中,执行结束之后清除即可”

单独封装了一个DataSourceHolder,内部使用ThreadLocal隔离线程,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* 使用ThreadLocal存储切换数据源后的KEY
*/
public class DataSourceHolder {

//线程 本地环境
private static final ThreadLocal<String> dataSources = new InheritableThreadLocal();

//设置数据源
public static void setDataSource(String datasource) {
dataSources.set(datasource);
}

//获取数据源
public static String getDataSource() {
return dataSources.get();
}

//清除数据源
public static void clearDataSource() {
dataSources.remove();
}
}

如何构造一个动态数据源?

上文说过只需继承一个抽象类AbstractRoutingDataSource,重写其中的一个方法determineCurrentLookupKey()即可。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* 动态数据源,继承AbstractRoutingDataSource
*/
public class DynamicDataSource extends AbstractRoutingDataSource {

/**
* 返回需要使用的数据源的key,将会按照这个KEY从Map获取对应的数据源(切换)
* @return
*/
@Override
protected Object determineCurrentLookupKey() {
//从ThreadLocal中取出KEY
return DataSourceHolder.getDataSource();
}

/**
* 构造方法填充Map,构建多数据源
*/
public DynamicDataSource(DataSource defaultTargetDataSource, Map<Object, Object> targetDataSources) {
// 默认的数据源,可以作为主数据源
super.setDefaultTargetDataSource(defaultTargetDataSource);
// 目标数据源
super.setTargetDataSources(targetDataSources);
// 执行afterPropertiesSet方法,完成属性的设置
super.afterPropertiesSet();
}

}

上述代码很简单,分析如下:

  1. 一个多参的构造方法,指定了默认的数据源和目标数据源。
  2. 重写determineCurrentLookupKey()方法,返回数据源对应的KEY,这里是直接从ThreadLocal中取值,就是上文封装的DataSourceHolder

定义一个注解

为了操作方便且低耦合,不能每次需要切换数据源的时候都要手动调一下接口吧,可以定义一个切换数据源的注解,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* 切换数据源的注解
*/
@Target(value = ElementType.METHOD)
@Retention(value = RetentionPolicy.RUNTIME)
@Documented
public @interface SwitchSource {

/**
* 默认切换的数据源KEY
*/
String DEFAULT_NAME = "hisDataSource";

/**
* 需要切换到数据的KEY
*/
String value() default DEFAULT_NAME;
}

注解中只有一个value属性,指定了需要切换数据源的KEY

有注解还不行,当然还要有切面,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Aspect
// 优先级要设置在事务切面执行之前
@Order(1)
@Component
@Slf4j
public class DataSourceAspect {


@Pointcut("@annotation(SwitchSource)")
public void pointcut() {
}

/**
* 在方法执行之前切换到指定的数据源
* @param joinPoint
*/
@Before(value = "pointcut()")
public void beforeOpt(JoinPoint joinPoint) {
/*因为是对注解进行切面,所以这边无需做过多判定,直接获取注解的值,进行环绕,将数据源设置成远方,然后结束后,清楚当前线程数据源*/
Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
SwitchSource switchSource = method.getAnnotation(SwitchSource.class);
log.info("[Switch DataSource]:" + switchSource.value());
DataSourceHolder.setDataSource(switchSource.value());
}

/**
* 方法执行之后清除掉ThreadLocal中存储的KEY,这样动态数据源会使用默认的数据源
*/
@After(value = "pointcut()")
public void afterOpt() {
DataSourceHolder.clearDataSource();
log.info("[Switch Default DataSource]");
}

}

这个ASPECT很容易理解,beforeOpt()在方法之前执行,取值@SwitchSource中value属性设置到ThreadLocal中;afterOpt()方法在方法执行之后执行,清除掉ThreadLocal中的KEY,保证了如果不切换数据源,则用默认的数据源。

如何与Mybatis整合?

单一数据源与Mybatis整合上文已经详细讲解了,数据源DataSource作为参数构建了SqlSessionFactory,同样的思想,只需要把这个数据源换成动态数据源即可。注入的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* 创建动态数据源的SqlSessionFactory,传入的是动态数据源
*
* @Primary这个注解很重要,如果项目中存在多个SqlSessionFactory,这个注解一定要加上
*/
@Primary
@Bean("sqlSessionFactory")
public SqlSessionFactory sqlSessionFactoryBean(DynamicDataSource dynamicDataSource) throws Exception {
SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
sqlSessionFactoryBean.setDataSource(dynamicDataSource);
org.apache.ibatis.session.Configuration configuration = new org.apache.ibatis.session.Configuration();
// 自动将数据库中的下划线转换为驼峰格式
configuration.setMapUnderscoreToCamelCase(true);
configuration.setDefaultFetchSize(100);
configuration.setDefaultStatementTimeout(30);
sqlSessionFactoryBean.setConfiguration(configuration);
return sqlSessionFactoryBean.getObject();
}

与Mybatis整合很简单,只需要把数据源替换成自定义的动态数据源DynamicDataSource

那么动态数据源如何注入到IOC容器中呢?看上文自定义的DynamicDataSource构造方法,肯定需要两个数据源了,因此必须先注入两个或者多个数据源到IOC容器中,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* @Bean:向IOC容器中注入一个Bean
* @ConfigurationProperties:使得配置文件中以spring.datasource为前缀的属性映射到Bean的属性中
*/
@ConfigurationProperties(prefix = "spring.datasource")
@Bean("dataSource")
public DataSource dataSource() {
return new DruidDataSource();
}

/**
* 向IOC容器中注入另外一个数据源
* 全局配置文件中前缀是spring.datasource.his
*/
@Bean(name = SwitchSource.DEFAULT_NAME)
@ConfigurationProperties(prefix = "spring.datasource.his")
public DataSource hisDataSource() {
return new DruidDataSource();
}

以上构建的两个数据源,一个是默认的数据源,一个是需要切换到的数据源(targetDataSources),这样就组成了动态数据源了。数据源的一些信息,比如urlusername需要自己在全局配置文件中根据指定的前缀配置即可,代码不再贴出

动态数据源的注入代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* 创建动态数据源的SqlSessionFactory,传入的是动态数据源
*
* @Primary这个注解很重要,如果项目中存在多个SqlSessionFactory,这个注解一定要加上
*/
@Primary
@Bean("sqlSessionFactory")
public SqlSessionFactory sqlSessionFactoryBean(DynamicDataSource dynamicDataSource) throws Exception {
SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
sqlSessionFactoryBean.setDataSource(dynamicDataSource);
org.apache.ibatis.session.Configuration configuration = new org.apache.ibatis.session.Configuration();
// 自动将数据库中的下划线转换为驼峰格式
configuration.setMapUnderscoreToCamelCase(true);
configuration.setDefaultFetchSize(100);
configuration.setDefaultStatementTimeout(30);
sqlSessionFactoryBean.setConfiguration(configuration);
return sqlSessionFactoryBean.getObject();
}

这里还有一个问题:IOC中存在多个数据源了,那么事务管理器怎么办呢?它也懵逼了,到底选择哪个数据源呢?因此事务管理器肯定还是要重新配置的

事务管理器此时管理的数据源将是动态数据源DynamicDataSource,配置如下:

1
2
3
4
5
6
7
8
/**
* 重写事务管理器,管理动态数据源
*/
@Primary
@Bean(value = "transactionManager2")
public PlatformTransactionManager annotationDrivenTransactionManager(DynamicDataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}

至此,Mybatis与多数据源的整合就完成了

源码地址

GitHub

参考@dd