Coding & Life

求知若饥,虚心若愚

最近在做微信的扫码支付,遇到一个问题:如何在用户扫码支付完成之后,客户端立即得到通知,进行下一步的跳转?

首先想到的策略可能是客户端轮询查询订单状态,根据返回结果进行跳转

这个方式有明显的缺点,轮询时间设置短,频繁发送请求,对服务器以及数据库都会产生压力;轮询时间过长,用户等待时间长,体验很差;

针对这个问题想到了微信网页版的扫码登录(扫码完成后,立即登录),现在研究一下它的原理并实现相同的功能

微信扫码登录原理

pengding

根据图片中,前端二维码页面发送一个网络请求,但是这个请求并没有立即返回

408

一段时间没有扫描后,后端返回408,前端重新发起一个相同的网络请求,并继续pending

据此猜测大概实现原理如下:

  1. 进入网站-生成一个唯一标识(比如UUID)
  2. 跳转到二维码页面(二维码中的链接包含次UUID)
  3. 二维码页面向服务端发起请求,查询二维码是被扫登录
  4. 服务器收到请求,查询。如果未扫登录,进入等待(wait),不立即返回
  5. 一旦被扫,立即返回(notify)
  6. 页面收到结果,做后续处理

步骤大概就是如此,但是有个问题,步骤3如果请求超时,如何处理?处理方式是,一段固定时间后,返回408(timeout)

UUID缓存

1
public static Map<String, ScanPool> cacheMap = new ConcurrentHashMap<String, ScanPool>();

一定要使用ConcurrentHashMap否则多线程操作集合会报错ConcurrentModificationException

单线程中出现该异常的原因是,对一个集合遍历的同时,又对该集合进行了增删的操作

多线程中更易出现该异常,当你在一个线程中对一数据集合进行遍历,正赶上另外一个线程对该数据集合进行增删操作时便会出现该异常

缓存还要设置自动清理功能,防止增长过大

生成二维码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@RequestMapping("/qrcode/{uuid}")
@ResponseBody
String createQRCode(@PathVariable String uuid, HttpServletResponse response) {
System.out.println("生成二维码");

String text = "http://2b082e46.ngrok.io/login/" + uuid;
int width = 300;
int height = 300;
String format = "png";
//将UUID放入缓存
ScanPool pool = new ScanPool();
PoolCache.cacheMap.put(uuid, pool);
try {
Map<EncodeHintType, Object> hints = new HashMap<EncodeHintType, Object>();
hints.put(EncodeHintType.CHARACTER_SET, "utf-8");
hints.put(EncodeHintType.ERROR_CORRECTION, ErrorCorrectionLevel.H); //容错率
BitMatrix bitMatrix = new MultiFormatWriter().encode(text, BarcodeFormat.QR_CODE, width, height, hints);
MatrixToImageWriter.writeToStream(bitMatrix, format, response.getOutputStream());
} catch (WriterException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
return null;
}

生成二维码,并将UUID放入缓存中

此处需要注意,二维码url必须是外网可以访问地址,此处可以使用内网穿透工具

验证是否登录

前端发起请求,验证该二维码是否已经被扫登录

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@RequestMapping("/pool")
@ResponseBody
String pool(String uuid) {
System.out.println("检测[" + uuid + "]是否登录");

ScanPool pool = PoolCache.cacheMap.get(uuid);

if (pool == null) {
return "timeout";
}

//使用计时器,固定时间后不再等待扫描结果--防止页面访问超时
new Thread(new ScanCounter(pool)).start();

boolean scanFlag = pool.getScanStatus();

if (scanFlag) {
return "success";
} else {
return "fail";
}
}

获得状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public synchronized boolean getScanStatus() {
try {
if (!isScan()) { //如果还未扫描,则等待
this.wait();
}
if (isScan()) {
return true;
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return false;
}

public synchronized void notifyPool() {
try {
this.notifyAll();
} catch (Exception e) {
e.printStackTrace();
}
}

新开线程防止页面访问超时

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class ScanCounter implements Runnable {

public Long timeout = 27000L;

//传入的对象
private ScanPool scanPool;

public ScanCounter(ScanPool scanPool) {
this.scanPool = scanPool;
}

@Override
public void run() {
try {
Thread.sleep(timeout);
} catch (InterruptedException e) {
e.printStackTrace();
}
notifyPool(scanPool);
}

public synchronized void notifyPool(ScanPool scanPool) {
scanPool.notifyPool();
}
}

verify

定时清理uuid

为防止cacheMap不断增加的问题,需要在静态代码块中开启线程定时清理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class PoolCache {
//缓存超时时间 80秒
private static Long timeOutSecond = 80L;

//每1分钟清理一次缓存
private static Long cleanIntervalSecond = 60L;

public static Map<String, ScanPool> cacheMap = new ConcurrentHashMap<String, ScanPool>();

static {
new Thread(new Runnable() {

@Override
public void run() {
while (true) {
try {
Thread.sleep(cleanIntervalSecond * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
clean();
}
}

public void clean() {
System.out.println("缓存清理...");

if (cacheMap.keySet().size() > 0) {
Iterator<String> iterator = cacheMap.keySet().iterator();
while (iterator.hasNext()) {
String key = iterator.next();
ScanPool pool = cacheMap.get(key);
if (System.currentTimeMillis() - pool.getCreateTime() > timeOutSecond * 1000) {
cacheMap.remove(key);
// 这一行很关键!用于当清理完成,前端请求还在pending时,立即返回结果
pool.notifyPool();
}
}
}
}
}).start();
}

}

扫码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@RequestMapping("/login/{uuid}")
@ResponseBody
String login(@PathVariable String uuid) {

ScanPool pool = PoolCache.cacheMap.get(uuid);

if (pool == null) {
return "timeout,scan fail";
}

// 设置被扫状态,唤起线程
pool.scanSuccess();

return "扫码完成,登录成功";
}

扫码成功,设置扫码状态,唤起线程

1
2
3
4
5
6
7
8
public synchronized void scanSuccess() {
try {
setScan(true);
this.notifyAll();
} catch (Exception e) {
e.printStackTrace();
}
}

手机扫码后

对比完整代码很容易看实现原理

pic

初始化项目

在GitHub或者码云上新建仓库

登录GitHub或者码云,新建仓库,不默认产生README文档

本地新建仓库

1
git init

本地仓库与远程相连

1
git remote add origin git@git.oschina.net:wangweiye/SpringBoot-Learning.git

提交本地到远程

1
2
3
4
5
6
7
git add .

git commit -m ""

git pull origin master

git push -u origin master

标签

列显已有的标签

1
git tag

新建轻量级标签

1
git tag v1.0.0

分享标签

1
git push origin [tagname]

删除远程tag

1
git push origin --delete tag [tagname]

.gitignore无效解决方法

在工程中很容易出现.gitignore并没有忽略掉我们已经添加的文件,那是因为.gitignore对已经追踪(track)的文件是无效的,需要清除缓存,清除缓存后文件将以未追踪的形式出现,这时重新添加(add)并提交(commit)就可以了。

1
2
3
git rm -r --cached .
git add .
git commit -m "comment"

什么是DelayQueue

DelayQueue是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。这种队列是有序的,即队头对象的延迟到期时间最长。注意:不能将null元素放置到这种队列中。

应用场景

订单超时关闭

订单业务中总是出现订单未支付过期关闭的情形。最简单的解决方式是定时任务轮询订单,这种方式浪费资源并不优雅。延时队列能够轻松应对这种情形。

创建订单类

放入DelayQueue的对象需要实现Delayed接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
class Order implements Delayed {
public static AtomicInteger genId = new AtomicInteger(1);

private final long delay; // 延迟时间
private final long expire; // 到期时间
private final long now; // 创建时间

private Integer id; // 订单ID
private Integer state; // 订单状态

public Integer getId() {
return id;
}

public void setId(Integer id) {
this.id = id;
}

public Integer getState() {
return state;
}

public void setState(Integer state) {
this.state = state;
}

public Order(long delay, String msg) {
this.delay = delay;
expire = System.currentTimeMillis() + delay; // 到期时间 = 当前时间+延迟时间
now = System.currentTimeMillis();

this.state = 0;
}

/**
* 需要实现的接口,获得延迟时间 用过期时间-当前时间
*
* @param unit
* @return
*/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}

/**
* 用于延迟队列内部比较排序 当前时间的延迟时间 - 比较对象的延迟时间
*
* @param o
* @return
*/
@Override
public int compareTo(Delayed o) {
return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}
}

生成订单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private static void producer(final DelayQueue<Order> delayQueue) {
new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}

Order element = new Order(new Random().nextInt(1000) * 10, "test");

element.setId(Order.genId.getAndIncrement());

delayQueue.offer(element);
}
}
}).start();

/**
* 每秒打印延迟队列中的对象个数
*/
new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
TimeUnit.MILLISECONDS.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("delayQueue size:" + delayQueue.size());
}
}
}).start();
}

处理超时订单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private static void consumer(final DelayQueue<Order> delayQueue) {
new Thread(new Runnable() {
@Override
public void run() {
while (true) {
Order element = null;
try {
element = delayQueue.take();

if (element.getState().intValue() == 0) {
// 如果现在订单状态还未支付,关闭订单
element.setState(1);

System.out.println("订单" + element.getId() + "超时关闭");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
}

DelayQueue还是一个阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部是延迟期满后保存时间最长的 Delayed 元素。如果延迟都还没有期满,则队列没有头部,此时调用 poll() 将直接返回 null,调用 take() 将会发生阻塞,直到有元素发生到期,take() 才会返回。

测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static void main(String[] args) {
DelayQueue<Order> delayQueue = new DelayQueue<Order>();

// 生产者
producer(delayQueue);

// 消费者
consumer(delayQueue);

while (true) {
try {
TimeUnit.HOURS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

多考生考试

  1. 考试总时间为10秒,至少2秒后才可进行交卷。
  2. 考生可在2-10秒这段时间内的任意时间交卷。
  3. 考试时间一到,所有未交卷的学生必须交卷。

使用enum定义时间常量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
enum Times {
SUMMIT_TIME(10), //考试总时间
SUMBMIT_LIMIT(2), // 交卷限制时间
MAX_RAND_TIME(15); // 模拟考生所需最大时间

private final int value;

private Times(int value) {
this.value = value;
}

public int getValue() {
return value;
}
}

学生类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class Student implements Delayed {
private String name;
private long delay; // 考试花费时间,单位为毫秒
private long expire; // 交卷时间,单位为毫秒

// 此构造可随机生成考试花费时间
public Student(String name) {
this.name = name;
this.delay = TimeUnit.MILLISECONDS.convert(getRandomSeconds(), TimeUnit.SECONDS);
this.expire = System.currentTimeMillis() + this.delay;
}

//此构造可指定考试花费时间
public Student(String name, long delay, TimeUnit unit) {
this.name = name;
this.delay = TimeUnit.MILLISECONDS.convert(delay, unit);
this.expire = System.currentTimeMillis() + this.delay;
}

// ...

public int getRandomSeconds() {
// 获取随机花费时间,范围:2-10秒
return new Random().nextInt(Times.MAX_RAND_TIME.getValue() - Times.SUMBMIT_LIMIT.getValue())
+ Times.SUMBMIT_LIMIT.getValue();
}

@Override
public int compareTo(Delayed o) {
// 此方法的实现用于定义优先级
long td = this.getDelay(TimeUnit.MILLISECONDS);
long od = o.getDelay(TimeUnit.MILLISECONDS);
return td > od ? 1 : td == od ? 0 : -1;
}

@Override
public long getDelay(TimeUnit unit) {
// 这里返回的是剩余延时,当延时为0时,此元素延时期满,可从take()取出
return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
}

主方法实现

  1. 初始化对象
1
DelayQueue<Student> queue = new DelayQueue<>();
  1. 添加测试数据
1
2
3
4
5
6
7
8
9
10
11
queue.add(new Student("范冰冰"));
queue.add(new Student("成 龙"));
queue.add(new Student("李一桐"));
queue.add(new Student("宋小宝"));
queue.add(new Student("吴 京"));
queue.add(new Student("绿巨人"));
queue.add(new Student("洪金宝"));
queue.add(new Student("李云龙"));
queue.add(new Student("钢铁侠"));
queue.add(new Student("刘德华"));
queue.add(new Student("戴安娜"));
  1. 添加一条用于考试结束时强制交卷的属性
1
queue.add(new Student("submit", Times.SUBMIT_TIME.getValue(),TimeUnit.SECONDS));
  1. 开始考试
1
2
3
4
5
6
7
8
9
10
11
12
13
while (true) {
Student s = queue.take(); // 必要时进行阻塞等待
if (s.getName().equals("submit")) {
System.out.println("时间已到,全部交卷!");
// 利用Java8 Stream特性使尚未交卷学生交卷
queue.parallelStream()
.filter(v -> v.getExpire() >= s.getExpire())
.map(Student::submit)
.forEach(System.out::println);
System.exit(0);
}
System.out.println(s);
}

测试结果

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package cn.gss.juc;

import java.text.DateFormat;
import java.util.Date;
import java.util.Random;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

enum Times {
SUBMIT_TIME(10), SUMBMIT_LIMIT(2), MAX_RAND_TIME(15);
private final int value;

private Times(int value) {
this.value = value;
}

public int getValue() {
return value;
}
}
/**
* DelayQueue实现多考生考试
* @author Gss
*/
public class TestDelayedQueue {

public static void main(String[] args) throws InterruptedException {
DelayQueue<Student> queue = new DelayQueue<>();
queue.add(new Student("范冰冰"));
queue.add(new Student("成 龙"));
queue.add(new Student("李一桐"));
queue.add(new Student("宋小宝"));
queue.add(new Student("吴 京"));
queue.add(new Student("绿巨人"));
queue.add(new Student("洪金宝"));
queue.add(new Student("李云龙"));
queue.add(new Student("钢铁侠"));
queue.add(new Student("刘德华"));
queue.add(new Student("戴安娜"));
queue.add(new Student("submit", Times.SUBMIT_TIME.getValue(), TimeUnit.SECONDS));
while (true) {
Student s = queue.take(); // 必要时进行阻塞等待
if (s.getName().equals("submit")) {
System.out.println("时间已到,全部交卷!");
// 利用Java8 Stream使尚未交卷学生交卷
queue.parallelStream()
.filter(v -> v.getExpire() >= s.getExpire())
.map(Student::submit)
.forEach(System.out::println);
System.exit(0);
}
System.out.println(s);
}
}

}

class Student implements Delayed {
private String name;
private long delay; // 考试花费时间,单位为毫秒
private long expire; // 交卷时间,单位为毫秒

// 此构造可随机生成考试花费时间
public Student(String name) {
this.name = name;
this.delay = TimeUnit.MILLISECONDS.convert(getRandomSeconds(), TimeUnit.SECONDS); // 随机生成考试花费时间
this.expire = System.currentTimeMillis() + this.delay;
}

// 此构造可指定考试花费时间
public Student(String name, long delay, TimeUnit unit) {
this.name = name;
this.delay = TimeUnit.MILLISECONDS.convert(delay, unit);
this.expire = System.currentTimeMillis() + this.delay;
}

public int getRandomSeconds() { // 获取随机花费时间
return new Random().nextInt(Times.MAX_RAND_TIME.getValue() - Times.SUMBMIT_LIMIT.getValue())
+ Times.SUMBMIT_LIMIT.getValue();
}

public Student submit() { // 设置花费时间和交卷时间,考试时间结束强制交卷时调用此方法
setDelay(Times.SUBMIT_TIME.getValue(), TimeUnit.SECONDS);
setExpire(System.currentTimeMillis());
return this;
}

public String getName() {
return name;
}

public long getExpire() {
return expire;
}

public void setDelay(long delay, TimeUnit unit) {
this.delay = TimeUnit.MILLISECONDS.convert(delay, TimeUnit.SECONDS);
}

public void setExpire(long expire) {
this.expire = expire;
}

@Override
public int compareTo(Delayed o) { // 此方法的实现用于定义优先级
long td = this.getDelay(TimeUnit.MILLISECONDS);
long od = o.getDelay(TimeUnit.MILLISECONDS);
return td > od ? 1 : td == od ? 0 : -1;
}

@Override
public long getDelay(TimeUnit unit) { // 这里返回的是剩余延时,当延时为0时,此元素延时期满,可从take()取出
return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}

@Override
public String toString() {
return "学生姓名:" + this.name + ",考试用时:" + TimeUnit.SECONDS.convert(delay, TimeUnit.MILLISECONDS) + ",交卷时间:"
+ DateFormat.getDateTimeInstance().format(new Date(this.expire));
}
}

源码地址

GitHub

概述

SnowFlake算法是Twitter设计的一个可以在分布式系统中生成唯一的ID的算法,它可以满足Twitter每秒上万条消息ID分配的请求,这些消息ID是唯一的且有大致的递增顺序。

SnowFlake算法生成的ID大致上是按照时间递增的,用在分布式系统中时,需要注意数据中心标识和机器标识必须唯一,这样就能保证每个节点生成的ID都是唯一的。

原理

SnowFlake算法产生的ID是一个64位的整型,结构如下(每一部分用“-”符号分隔):

1位标识部分

在java中由于long的最高位是符号位,正数是0,负数是1,一般生成的ID为正数,所以为0;

41位时间戳部分

这个是毫秒级的时间,一般实现上不会存储当前的时间戳,而是时间戳的差值(当前时间-固定的开始时间),这样可以使产生的ID从更小值开始;41位的时间戳可以使用69年,(1L << 41) / (1000L * 60 * 60 * 24 * 365) = 69年;

10位节点部分

Twitter实现中使用前5位作为数据中心标识,后5位作为机器标识,可以部署1024个节点;

12位序列号部分

支持同一毫秒内同一个节点可以生成4096个ID;

SnowFlake算法生成的ID大致上是按照时间递增的,用在分布式系统中时,需要注意数据中心标识和机器标识必须唯一,这样就能保证每个节点生成的ID都是唯一的。或许我们不一定都需要像上面那样使用5位作为数据中心标识,5位作为机器标识,可以根据我们业务的需要,灵活分配节点部分,如:若不需要数据中心,完全可以使用全部10位作为机器标识;若数据中心不多,也可以只使用3位作为数据中心,7位作为机器标识。

源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package com.wang.snow;

public class SnowFlake {
/**
* 起始的时间戳
*/
private final static long START_STMP = 1480166465631L;

/**
* 每一部分占用的位数
*/
private final static long SEQUENCE_BIT = 12; //序列号占用的位数
private final static long MACHINE_BIT = 5; //机器标识占用的位数
private final static long DATACENTER_BIT = 5;//数据中心占用的位数

/**
* 每一部分的最大值
*/
private final static long MAX_DATACENTER_NUM = -1L ^ (-1L << DATACENTER_BIT);
private final static long MAX_MACHINE_NUM = -1L ^ (-1L << MACHINE_BIT);
private final static long MAX_SEQUENCE = -1L ^ (-1L << SEQUENCE_BIT);

/**
* 每一部分向左的位移
*/
private final static long MACHINE_LEFT = SEQUENCE_BIT;
private final static long DATACENTER_LEFT = SEQUENCE_BIT + MACHINE_BIT;
private final static long TIMESTMP_LEFT = DATACENTER_LEFT + DATACENTER_BIT;

private long datacenterId; //数据中心
private long machineId; //机器标识
private long sequence = 0L; //序列号
private long lastStmp = -1L;//上一次时间戳

public SnowFlake(long datacenterId, long machineId) {
if (datacenterId > MAX_DATACENTER_NUM || datacenterId < 0) {
throw new IllegalArgumentException("datacenterId can't be greater than MAX_DATACENTER_NUM or less than 0");
}
if (machineId > MAX_MACHINE_NUM || machineId < 0) {
throw new IllegalArgumentException("machineId can't be greater than MAX_MACHINE_NUM or less than 0");
}
this.datacenterId = datacenterId;
this.machineId = machineId;
}

/**
* 产生下一个ID
*
* @return
*/
public synchronized long nextId() {
long currStmp = getNewstmp();
if (currStmp < lastStmp) {
throw new RuntimeException("Clock moved backwards. Refusing to generate id");
}

if (currStmp == lastStmp) {
//相同毫秒内,序列号自增
sequence = (sequence + 1) & MAX_SEQUENCE;
//同一毫秒的序列数已经达到最大
if (sequence == 0L) {
currStmp = getNextMill();
}
} else {
//不同毫秒内,序列号置为0
sequence = 0L;
}

lastStmp = currStmp;

return (currStmp - START_STMP) << TIMESTMP_LEFT //时间戳部分
| datacenterId << DATACENTER_LEFT //数据中心部分
| machineId << MACHINE_LEFT //机器标识部分
| sequence; //序列号部分
}

private long getNextMill() {
long mill = getNewstmp();
while (mill <= lastStmp) {
mill = getNewstmp();
}
return mill;
}

private long getNewstmp() {
return System.currentTimeMillis();
}
}

测试代码

1
2
3
4
5
6
7
@Test
public void snow() {
SnowFlake snowFlake = new SnowFlake(2, 3);
for (int i = 0; i < (1 << 12); i++) {
System.out.println(snowFlake.nextId());
}
}

结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
155632577921953792
155632577921953793
155632577921953794
155632577921953795
155632577921953796
155632577921953797
155632577921953798
155632577921953799
155632577921953800
155632577921953801
155632577921953802
155632577921953803
155632577921953804
...

pic

mysql版本使用的是5.7.20
主库的IP为:47.104.71.88,从库的IP为:45.77.13.74

读写分离(主从库)

原理:让主库(master)处理事务性增改删,而从库(slave)处理查询操作

主库创建用户

1
GRANT REPLICATION SLAVE,RELOAD,SUPER ON *.* TO synchrouser@45.77.13.74 IDENTIFIED BY 'w123456W!';

在主库创建一个synchrouser用户密码为w123456W!,并允许从库以synchrouser用户来登录

配置主库

在[mysqld]下增加配置

1
2
3
server-id=88
log_bin=mysql-bin
binlog_format=mixed

server-id在数据库配置中必须唯一,一般为IP最后一个节点(例如:47.104.71.88,则设置为88)设置完成后,重启mysql

配置从库

在[mysqld]增加配置

1
server-id=74

设置完成后,重启mysql

在主库执行:show master status;

根据以上主库的信息设置从库

1
change master to master_host='47.104.71.88',master_user='synchrouser',master_password='w123456W!',master_log_file='mysql-bin.000005',master_log_pos=840;

master_log_file字段对应了主库的File,master_log_pos字段对应了主库的Position

启动主从同步

从库执行

1
start slave;

检查是否配置成功

如果Slave_IO_Running和Slave_SQL_Running都为Yes,代表配置成功

测试主从同步

主库创建一个库,一个表,观察从库是否同样创建

设置主从库之后MySq会持续产生日志,需要定期清除或则设置自动清除

附录:

到这里,全部库的主从配置就完成了,实际应用中可能会用到单个表的同步,或者部分表的同步,只需要在从库的/etc/my.cnf里加上

只复制某个表replicate-do-table=dbname.tablename
只复制某些表(可用匹配符)replicate-wild-do-table=dbname.tablename%
只复制某个库replicate-do-db=dbname
只复制某些库replicte-wild-do-db=dbname%
不复制某个表replicate-ignore-table=dbname.tablename

pic

相关概念

线程是进程的一个实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位。线程自己基本上不拥有系统资源,只拥有一点在运行中必不可少的资源,但是它可与同属一个进程的其他的线程共享进程所拥有的全部资源。

线程状态

创建

当用new操作符创建一个线程时, 例如new Thread(r),线程还没有开始运行,此时线程处在新建状态。 当一个线程处于新建状态时,程序还没有开始运行线程中的代码

就绪

一个新创建的线程并不自动开始运行,要执行线程,必须调用线程的start()方法。当线程对象调用start()方法即启动了线程,start()方法创建线程运行的系统资源,并调度线程运行run()方法。当start()方法返回后,线程就处于就绪状态。此时线程中代码仍未运行

运行

当线程获得CPU后,它才进入运行状态,真正开始执行run()方法中的代码

阻塞

该线程放弃CPU的使用,暂停运行

常见线程阻塞的原因:

  1. 线程执行了Thread.sleep(int millsecond)方法,当前线程放弃CPU,睡眠一段时间,然后再恢复执行

  2. 线程执行一段同步代码,但是尚且无法获得相关的同步锁,只能进入阻塞状态,等到获取了同步锁,才能回复执行

  3. 线程执行了一个对象的wait()方法,直接进入阻塞状态,等待其他线程执行notify()或者notifyAll()方法

  4. 线程执行某些IO操作,因为等待相关的资源而进入了阻塞状态。比如说监听system.in,但是尚且没有收到键盘的输入,则进入阻塞状态

终止

线程执行完毕

线程的创建方法

继承Thread类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class Thread1 extends Thread {
public void run() {
// 线程执行代码
}

public class Main {

public static void main(String[] args) {
Thread1 mTh1=new Thread1();
Thread1 mTh2=new Thread1();
mTh1.start();
mTh2.start();
}
}

程序启动运行main时候,java虚拟机启动一个进程,主线程main在main()调用时候被创建。随着调用main的两个对象的start方法,另外两个线程也启动了,这样,整个应用就在多线程下运行

注意:
start()方法的调用后并不是立即执行多线程代码,而是使得该线程变为可运行态(Runnable),什么时候运行是由操作系统决定的。

从程序运行的结果可以发现,多线程程序是乱序执行。

Thread.sleep()方法调用目的是不让当前线程独自霸占该进程所获取的CPU资源,以留出一定时间给其他线程执行的机会。

实际上所有的多线程代码执行顺序都是不确定的,每次执行的结果都是随机的。

实现Runable接口(推荐)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class Thread2 implements Runnable{
private String name;

public Thread2(String name) {
this.name=name;
}

@Override
public void run() {
// 线程逻辑代码
}
}
public class Main {

public static void main(String[] args) {
new Thread(new Thread2("C")).start();
new Thread(new Thread2("D")).start();
}
}

两种方法的区别

如果一个类继承Thread,则不适合资源共享。但是如果实现了Runable接口的话,则很容易的实现资源共享。

总结:
实现Runnable接口比继承Thread类所具有的优势:

  1. 适合多个相同的程序代码的线程去处理同一个资源
  2. 可以避免java中的单继承的限制
  3. 增加程序的健壮性,代码可以被多个线程共享,代码和数据独立
  4. 线程池只能放入实现Runable或callable类线程,不能直接放入继承Thread的类

提醒一下大家:main方法其实也是一个线程。在java中所以的线程都是同时启动的,至于什么时候,哪个先执行,完全看谁先得到CPU的资源。

在Java中,每次程序运行至少启动2个线程。一个是main线程,一个是垃圾收集线程。因为每当使用java命令执行一个类的时候,实际上都会启动一个JVM,每一个JVM实际就是在操作系统中启动了一个进程。

线程状态

线程调度

Java线程有优先级,优先级高的线程会优先获得运行机会(但不一定优先级高的一定先执行)
Java线程的优先级用整数表示,取值范围是1~10,Thread类有以下三个静态常量:

1
2
3
static int MAX_PRIORITY  = 10;  //线程可以具有的最高优先级
static int MIN_PRIORITY = 1; //线程可以具有的最低优先级
static int NORM_PRIORITY = 5; //分配给线程的默认优先级

Thread类的setPriority()和getPriority()方法分别用来设置和获取线程的优先级。

线程睡眠

Thread.sleep(long millis)方法,使线程转到阻塞状态。millis参数设定睡眠的时间,以毫秒为单位。当睡眠结束后,就转为就绪(Runnable)状态。sleep()平台移植性好。

线程等待

Object类中的wait()方法,导致当前的线程等待,直到其他线程调用此对象的 notify() 方法或 notifyAll() 唤醒方法。这个两个唤醒方法也是Object类中的方法,行为等价于调用 wait(0) 一样。

线程让步

Thread.yield() 方法,暂停当前正在执行的线程对象,把执行机会让给相同或者更高优先级的线程。

线程加入

join()方法,等待其他线程终止。在当前线程中调用另一个线程的join()方法,则当前线程转入阻塞状态,直到另一个进程运行结束,当前线程再由阻塞转为就绪状态。

线程唤醒

Object类中的notify()方法,唤醒在此对象监视器上等待的单个线程。如果所有线程都在此对象上等待,则会选择唤醒其中一个线程。选择是任意性的,并在对实现做出决定时发生。线程通过调用其中一个 wait 方法,在对象的监视器上等待。 直到当前的线程放弃此对象上的锁定,才能继续执行被唤醒的线程。被唤醒的线程将以常规方式与在该对象上主动同步的其他所有线程进行竞争;例如,唤醒的线程在作为锁定此对象的下一个线程方面没有可靠的特权或劣势。类似的方法还有一个notifyAll(),唤醒在此对象监视器上等待的所有线程。

注意:Thread中suspend()和resume()两个方法在JDK1.5中已经废除,不再介绍。因为有死锁倾向。

常用函数说明

sleep(long millis)

在指定的毫秒数内让当前正在执行的线程休眠(暂停执行)

join

join是Thread类的一个方法,启动线程后直接调用,即join()的作用是:“等待该线程终止”,这里需要理解的就是该线程是指的主线程等待子线程的终止。也就是在子线程调用了join()方法后面的代码,只有等到子线程结束了才能执行。

使用场景

在很多情况下,主线程生成并起动了子线程,如果子线程里要进行大量的耗时的运算,主线程往往将于子线程之前结束,但是如果主线程处理完其他的事务后,需要用到子线程的处理结果,也就是主线程需要等待子线程执行完成之后再结束,这个时候就要用到join()方法了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class Main {

public static void main(String[] args) {
System.out.println("主线程运行开始!");
Thread1 mTh1=new Thread1("A");
Thread1 mTh2=new Thread1("B");
mTh1.start();
mTh2.start();
try {
mTh1.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
mTh2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("主线程运行结束!");
}
}

yield():暂停当前正在执行的线程对象,并执行其他线程。

Thread.yield()方法作用是:暂停当前正在执行的线程对象,并执行其他线程。

yield()应该做的是让当前运行线程回到可运行状态,以允许具有相同优先级的其他线程获得运行机会。因此,使用yield()的目的是让相同优先级的线程之间能适当的轮转执行。但是,实际中无法保证yield()达到让步目的,因为让步的线程还有可能被线程调度程序再次选中。

结论:yield()从未导致线程转到等待/睡眠/阻塞状态。在大多数情况下,yield()将导致线程从运行状态转到可运行状态,但有可能没有效果。可看上面的图。

sleep()和yield()的区别

sleep()使当前线程进入停滞状态,所以执行sleep()的线程在指定的时间内肯定不会被执行;yield()只是使当前线程重新回到可执行状态,所以执行yield()的线程有可能在进入到可执行状态后马上又被执行。

sleep 方法使当前运行中的线程睡眼一段时间,进入不可运行状态,这段时间的长短是由程序设定的,yield 方法使当前线程让出 CPU 占有权,但让出的时间是不可设定的。实际上,yield()方法对应了如下操作:先检测当前是否有相同优先级的线程处于同可运行状态,如有,则把 CPU 的占有权交给此线程,否则,继续运行原来的线程。所以yield()方法称为“退让”,它把运行机会让给了同等优先级的其他线程

另外,sleep 方法允许较低优先级的线程获得运行机会,但 yield() 方法执行时,当前线程仍处在可运行状态,所以,不可能让出较低优先级的线程些时获得 CPU 占有权。在一个运行系统中,如果较高优先级的线程没有调用 sleep 方法,又没有受到 I\O 阻塞,那么,较低优先级线程只能等待所有较高优先级的线程运行结束,才有机会运行。

wait

我们可以利用wait()来让一个线程在某些条件下暂停运行。例如,在生产者消费者模型中,生产者线程在缓冲区为满的时候,消费者在缓冲区为空的时候,都应该暂停运行。如果某些线程在等待某些条件触发,那当那些条件为真时,你可以用 notify 和 notifyAll 来通知那些等待中的线程重新开始运行。不同之处在于,notify 仅仅通知一个线程,并且我们不知道哪个线程会收到通知,然而 notifyAll 会通知所有等待中的线程。换言之,如果只有一个线程在等待一个信号灯,notify和notifyAll都会通知到这个线程。但如果多个线程在等待这个信号灯,那么notify只会通知到其中一个,而其它线程并不会收到任何通知,而notifyAll会唤醒所有等待中的线程。

我们怎么在代码里使用wait()呢?因为wait()并不是Thread类下的函数,我们并不能使用Thread.wait()。事实上很多Java程序员都喜欢这么写,因为它们习惯了使用Thread.sleep(),所以他们会试图使用wait() 来达成相同的目的,但很快他们就会发现这并不能顺利解决问题。正确的方法是对在多线程间共享的那个Object来使用wait。在生产者消费者问题中,这个共享的Object就是那个缓冲区队列。

既然我们应该在synchronized的函数或是对象里调用wait,那哪个对象应该被synchronized呢?答案是,那个你希望上锁的对象就应该被synchronized,即那个在多个线程间被共享的对象。在生产者消费者问题中,应该被synchronized的就是那个缓冲区队列。

永远在循环(loop)里调用 wait 和 notify,不是在 If 语句

现在你知道wait应该永远在被synchronized的背景下和那个被多线程共享的对象上调用,下一个一定要记住的问题就是,你应该永远在while循环,而不是if语句中调用wait。因为线程是在某些条件下等待的——在我们的例子里,即“如果缓冲区队列是满的话,那么生产者线程应该等待”,你可能直觉就会写一个if语句。但if语句存在一些微妙的小问题即使条件没被满足,你的线程你也有可能被错误地唤醒。所以如果你不在线程被唤醒后再次使用while循环检查唤醒条件是否被满足,你的程序就有可能会出错——例如在缓冲区为满的时候生产者继续生成数据,或者缓冲区为空的时候消费者开始消耗数据。所以记住,永远在while循环而不是if语句中使用wait!

在while循环里使用wait的目的,是在线程被唤醒的前后都持续检查条件是否被满足。如果条件并未改变,wait被调用之前notify的唤醒通知就来了,那么这个线程并不能保证被唤醒,有可能会导致死锁问题。

下面我们提供一个使用wait和notify的范例程序。在这个程序里,我们使用了上文所述的一些代码规范。我们有两个线程,分别名为PRODUCER(生产者)和CONSUMER(消费者),他们分别继承了了Producer和Consumer类,而Producer和Consumer都继承了Thread类。Producer和Consumer想要实现的代码逻辑都在run()函数内。Main线程开始了生产者和消费者线程,并声明了一个LinkedList作为缓冲区队列(在Java中,LinkedList实现了队列的接口)。生产者在无限循环中持续往LinkedList里插入随机整数直到LinkedList满。我们在while(queue.size == maxSize)循环语句中检查这个条件。请注意到我们在做这个检查条件之前已经在队列对象上使用了synchronized关键词,因而其它线程不能在我们检查条件时改变这个队列。如果队列满了,那么PRODUCER线程会在CONSUMER线程消耗掉队列里的任意一个整数,并用notify来通知PRODUCER线程之前持续等待。在我们的例子中,wait和notify都是使用在同一个共享对象上的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import java.util.LinkedList;
import java.util.Queue;
import java.util.Random;
/**
* Simple Java program to demonstrate How to use wait, notify and notifyAll()
* method in Java by solving producer consumer problem.
*/
public class ProducerConsumerInJava {
public static void main(String args[]) {
System.out.println("How to use wait and notify method in Java");
System.out.println("Solving Producer Consumper Problem");
Queue<Integer> buffer = new LinkedList<>();
int maxSize = 10;
Thread producer = new Producer(buffer, maxSize, "PRODUCER");
Thread consumer = new Consumer(buffer, maxSize, "CONSUMER");
producer.start(); consumer.start(); }
}
/**
* Producer Thread will keep producing values for Consumer
* to consumer. It will use wait() method when Queue is full
* and use notify() method to send notification to Consumer
* Thread.
*/
class Producer extends Thread
{
private Queue<Integer> queue;
private int maxSize;
public Producer(Queue<Integer> queue, int maxSize, String name){
super(name); this.queue = queue; this.maxSize = maxSize;
}
@Override public void run()
{
while (true)
{
synchronized (queue) {
while (queue.size() == maxSize) {
try {
System.out .println("Queue is full, " + "Producer thread waiting for " + "consumer to take something from queue");
queue.wait();
} catch (Exception ex) {
ex.printStackTrace(); }
}
Random random = new Random();
int i = random.nextInt();
System.out.println("Producing value : " + i); queue.add(i); queue.notifyAll();
}
}
}
}
/**
* Consumer Thread will consumer values form shared queue.
* It will also use wait() method to wait if queue is
* empty. It will also use notify method to send
* notification to producer thread after consuming values
* from queue.
*/
class Consumer extends Thread {
private Queue<Integer> queue;
private int maxSize;
public Consumer(Queue<Integer> queue, int maxSize, String name){
super(name);
this.queue = queue;
this.maxSize = maxSize;
}
@Override public void run() {
while (true) {
synchronized (queue) {
while (queue.isEmpty()) {
System.out.println("Queue is empty," + "Consumer thread is waiting" + " for producer thread to put something in queue");
try {
queue.wait();
} catch (Exception ex) {
ex.printStackTrace();
}
}
System.out.println("Consuming value : " + queue.remove()); queue.notifyAll();
}
}
}
}

为了更好地理解这个程序,我建议你在debug模式里跑这个程序。一旦你在debug模式下启动程序,它会停止在PRODUCER或者CONSUMER线程上,取决于哪个线程占据了CPU。因为两个线程都有wait()的条件,它们一定会停止,然后你就可以跑这个程序然后看发生什么了(很有可能它就会输出我们以上展示的内容)。你也可以使用Eclipse里的Step into和Step over按钮来更好地理解多线程间发生的事情。

阻塞队列实现生产者消费者问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class ProducerConsumerWithQueue {
private int queueSize = 10;
private ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(queueSize);

public static void main(String[] args) {
ProducerConsumerWithQueue test = new ProducerConsumerWithQueue();
Producer producer = test.new Producer();
Consumer consumer = test.new Consumer();

producer.start();
consumer.start();
}

class Consumer extends Thread {

@Override
public void run() {
while (true) {
try {
queue.take();
System.out.println("从队列取走一个元素,队列剩余" + queue.size() + "个元素");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

class Producer extends Thread {

@Override
public void run() {
while (true) {
try {
queue.put(1);
System.out.println("向队列取中插入一个元素,队列剩余空间:" + (queueSize - queue.size()));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}

附:阻塞队列的四种处理方法

方法\处理方式 抛出异常 返回特殊值 一直阻塞 超时退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time,unit)
检查方法 element() peek() 不可用 不可用

另一个例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
* 计算输出其他线程锁计算的数据
*
*/
public class ThreadA {
public static void main(String[] args) throws InterruptedException{
ThreadB b = new ThreadB();
//启动计算线程
b.start();
//线程A拥有b对象上的锁。线程为了调用wait()或notify()方法,该线程必须是那个对象锁的拥有者
synchronized (b) {
System.out.println("等待对象b完成计算。。。");
//当前线程A等待
where(b.total == 0) {
b.wait();
}
System.out.println("b对象计算的总和是:" + b.total);
}
}
}



/**
* 计算1+2+3 ... +100的和
*
*/
class ThreadB extends Thread {
int total = 0;

public void run() {
synchronized (this) {
for (int i = 0; i < 101; i++) {
total += i;
}
//(完成计算了)唤醒在此对象监视器上等待的单个线程,在本例中线程A被唤醒
notify();
System.out.println("计算完成");
}
}
}

执行结果:

等待对象b完成计算。。。
计算完成
b对象计算的总和是:5050

如果我们将b.wait()去掉呢?结果如下:

等待对象b完成计算。。。
b对象计算的总和是:0
计算完成

上述的结果表明,当去掉b.wait()时,新启动的线程ThreadB与主线程ThreadA是各自执行的,没有线程等待的现象。

我们想要的效果是,当线程ThreadB完成计算之后,再去取计算后的结果。所以使用了b.wait()来让主线程等待。

那为什么是使用b.wait(),而不是Thread.currentThread.wait(),或者其他的呢?

如果我们将b.wait()替换成Thread.currentThread.wait(),将会得到如下的结果:

Exception in thread “main” java.lang.IllegalMonitorStateException
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Object.java:485)
at pa.com.thread.ThreadA.main(ThreadA.java:18)
等待对象b完成计算。。。
计算完成

替换的代码Thread.currentThread.wait()好像理所当然应该如我们预期的正确啊,让当前线程处于等待状态,让其他线程先执行。

我们忽略了一个很重要的问题:线程与锁是分不开的,线程的同步、等待、唤醒都与对象锁是密不可分的。

线程ThreadA持有对象b的锁,我们要使用这把锁去让线程释放锁,从而让其他的线程能抢到这把锁。

从我们的程序来分析就是:线程ThreadA首先持有锁对象b,然后调用b.wait()将对象锁释放,线程ThreadB争抢到对象锁b,从而执行run()方法中的计算,计算完了之后使用notify()唤醒主线程ThreadA,ThreadA得以继续执行,从而得到了我们预期的效果。

(之所以ThreadB的对象锁也是b,是因为synchronized(this)中的this指向的就是ThreadB的实例b)

Thread.currentThread.wait()调用的是当前线程对象(即主线程ThreadA)的wait()方法,当前线程对象ThreadA是没有被加锁的,它只是获取了对象锁b。我基本没有看到过这样的调用,一般使用的是锁对象的wait(),本例中为b.wait()

顺带讲一下wait()与sleep()的区别。

如果我们将b.wait()换成Thread.sleep(1000),则会出现如下的结果:

等待对象b完成计算。。。
b对象计算的总和是:0
计算完成

从执行结果可以看出,Thread.sleep(1000)只是让主线程ThreadA睡眠了1秒钟,而并没有释放对象锁,所以在主线程ThreadA睡眠的过程中,ThreadB拿不到对象锁,从而不能执行。

所以我们也就得出了如下的结论:

wait()方法是让线程释放对象锁,让其他线程拿到锁之后去优先执行,当其他全程唤醒wait()中的线程 或者 拿到对象锁的线程都执行完释放了对象锁之后,wait()中的线程才会再次拿到对象锁从而执行。

sleep()方法是让线程睡眠,此时并没有释放对象锁,其他想要拿到睡眠线程的对象锁的线程也就拿不到相应的对象锁,从而不能抢在它前面执行。

补:

wait、notify和notifyAll方法是Object类的final native方法。所以这些方法不能被子类重写,Object类是所有类的超类,因此在程序中有以下三种形式调用wait等方法。

1
2
3
wait();//方式1:
this.wait();//方式2:
super.wait();//方式3

void wait()

导致线程进入等待状态,直到它被其他线程通过notify()或者notifyAll唤醒。该方法只能在同步方法中调用。如果当前线程不是锁的持有者,该方法抛出一个IllegalMonitorStateException异常。

synchronized

卖火车票问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class Seller implements Runnable {
private String name;

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

@Override
public void run() {
synchronized (this) {
if (Synchronized.ticket > 0) {
System.out.println("正在卖票,剩余" + Synchronized.ticket);
Synchronized.ticket--;
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class Synchronized {
static Integer ticket = 100;

public static void main(String[] args) {
Seller s = new Seller();
Thread t1 = new Thread(s);
Thread t2 = new Thread(s);
Thread t3 = new Thread(s);
Thread t4 = new Thread(s);
Thread t5 = new Thread(s);
Thread t6 = new Thread(s);

t1.start();
t2.start();
t3.start();
t4.start();
t5.start();
t6.start();
}
}

源码地址

Predicate操作集合

Java 8为Collection集合新增一个removeIf(Predicate filter)方法,该方法将会批量删除符合filter条件的所有元素。该方法需要一个Predicate对象作为参数,Predicate也是函数式接口,因此可使用Lamada表达式作为参数。

1
2
3
4
5
6
7
8
Collection books = new HashSet();
books.add(new String("123"));
books.add(new String("1234"));
books.add(new String("12345"));
books.add(new String("123456"));
// 使用Lamada表达式过滤集合
books.removeIf(ele -> ((String)ele).length() < 3);
System.out.println(books);

上面程序中调用了Collection集合的removeIf()方法批量删除集合中符合条件的元素,程序中传入了一个Lamada表达式作为过滤条件。

Predicate就是一个函数式接口,可以把它当做C语言中函数指针来使用;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class Test {

public static void operate(Collection c, Predicate p) { // 满足谓词条件p的元素都打印出来
for (Object ele: c) {
if (p.test(ele)) {
System.out.println(ele);
}
}
}

public static void main(String[] args) {
Collection c = new ArrayList();
for (int i = 0; i < 10; i++) { // 加入0 ~ 9的字符串
c.add(String.valueOf(i));
}

operate(c, ele -> Integer.valueOf((String)ele) > 3); // 大于3的打印出来
}
}

pic

最近在各个媒体中出现了区块链的概念,抽时间了解了一下

区块链的本质

区块链是什么?它就是一种特殊的分布式数据库。

首先,区块链的主要作用是储存信息。任何需要保存的信息,都可以写入区块链,也可以从里面读取,所以它是数据库。

其次,任何人都可以架设服务器,加入区块链网络,成为一个节点。区块链的世界里面,没有中心节点,每个节点都是平等的,都保存着整个数据库。你可以向任何一个节点,写入/读取数据,因为所有节点最后都会同步,保证区块链一致。

区块链的最大特点

分布式数据库并非新发明,市场上早有此类产品。但是,区块链有一个革命性特点。

区块链没有管理员,它是彻底无中心的。其他的数据库都有管理员,但是区块链没有。如果有人想对区块链添加审核,也实现不了,因为它的设计目标就是防止出现居于中心地位的管理当局。

正是因为无法管理,区块链才能做到无法被控制。否则一旦大公司大集团控制了管理权,他们就会控制整个平台,其他使用者都必须听命于他们了。

但是,没有了管理员,人人都可以往里面写入数据,怎么才能保证数据的可信呢?被坏人改了怎么办?请接着往下读,着就是区块链的奇妙之处。

区块

区块链由一个个区块(block)组成。区块很像数据库的记录,每次写入数据,就是创建一个区块。类似于数据结构当中的链表。

blockchain

每个区块包含两部分。

  • Head:记录当前区块的元信息
  • Body:实际数据

区块头包含了当前区块的多项元信息。

  • 生成时间
  • 实际数据的Hash
  • 上一区块的Hash

这里,你需要理解什么叫 Hash,这是理解区块链必需的。

hash

所谓 Hash 就是计算机可以对任意内容,计算出一个长度相同的特征值。区块链的 Hash 长度是256位,这就是说,不管原始内容是什么,最后都会计算出一个256位的二进制数字。而且可以保证,只要原始内容不同,对应的 Hash 一定是不同的。

举例来说,字符串123的 Hash 是a8fdc205a9f19cc1c7507a60c4f01b13d11d7fd0(十六进制),转成二进制就是256位,而且只有123能得到这个 Hash。

因此,就有两个重要的推论。

  • 每个区块的 Hash 都是不一样的,可以通过 Hash 标识区块。
  • 如果区块的内容变了,它的 Hash 一定会改变。

Hash 的不可修改性

区块与 Hash 是一一对应的,每个区块的 Hash 都是针对”区块头”(Head)计算的。

Hash = SHA256(区块头)

上面就是区块 Hash 的计算公式,Hash 由区块头唯一决定,SHA256是区块链的 Hash 算法。

前面说过,区块头包含很多内容,其中有当前区块体的 Hash(注意是”区块体”的 Hash,而不是整个区块),还有上一个区块的 Hash。这意味着,如果当前区块的内容变了,或者上一个区块的 Hash 变了,一定会引起当前区块的 Hash 改变。

这一点对区块链有重大意义。如果有人修改了一个区块,该区块的 Hash 就变了。为了让后面的区块还能连到它,该人必须同时修改后面所有的区块,否则被改掉的区块就脱离区块链了。由于后面要提到的原因,Hash 的计算很耗时,同时修改多个区块几乎不可能发生,除非有人掌握了全网51%以上的计算能力。

正是通过这种联动机制,区块链保证了自身的可靠性,数据一旦写入,就无法被篡改。这就像历史一样,发生了就是发生了,从此再无法改变。


每个区块都连着上一个区块,这也是”区块链”这个名字的由来。

什么是比特币

比特币就是一种依托区块链技术的数字货币。

区块链在比特币中的作用

区块链就是一个数据库,记载了所有的交易,用作中央记账系统,分布在无数个节点之上。

每笔交易的核心,就是一句话,比如”张三向李四转移了1个比特币”。为了证明这句话可信,张三为它加上了数字签名。任何人都可以用张三的公钥,证明这确实是张三本人的行为。另一方面,其他人无法伪造张三的数字签名,所以不可能伪造这笔交易。

矿工们收到这句话,首先验证数字签名的可信性,然后验证张三确实拥有这些比特币(每一笔交易都有上一笔交易的编号,用来查询比特币的来源)。验证通过以后,就着手把这句话写入区块链了。一旦写入区块链,所有人就都可以查询到,因此这笔比特币就被认为,从张三转移到了李四。

比特币交易流程

对于比特币来说,钱不是支付给个人的,而是支付给某一把私钥。这就是交易匿名性的根本原因,因为没有人知道,那些私钥背后的主人是谁。

所以,比特币交易的第一件事,就是你必须拥有自己的公钥和私钥。

你去网上那些比特币交易所开户,它们会让你首先生成一个比特币钱包(wallet)。这个钱包不是用来存放比特币,而是存放你的公钥和私钥。软件会帮你生成这两把钥匙,然后放在钱包里面。

你向别人收钱时,只要告诉对方你的钱包地址即可,对方向这个地址付款。由于你是这个地址的拥有者,所以你会收到这笔钱。

一笔交易就是一个地址的比特币,转移到另一个地址。由于比特币的交易记录全部都是公开的,哪个地址拥有多少比特币,都是可以查到的。因此,支付方是否拥有足够的比特币,完成这笔交易,这是可以轻易验证的。确认交易的真实性以后,交易还不算完成。交易数据必须写入数据库,才算成立,对方才能真正收到钱。

首先,所有的交易数据都会传送到矿工那里。矿工负责把这些交易写入区块链。

采矿

交易的确认离不开矿工。为什么有人愿意做矿工呢?

比特币协议规定,挖到新区块的矿工将获得奖励,一开始(2008年)是50个比特币,然后每4年减半,这也是比特币的供给增加机制,流通中新增的比特币都是这样诞生的。

由于必须保证节点之间的同步,所以新区块的添加速度不能太快。试想一下,你刚刚同步了一个区块,准备基于它生成下一个区块,但这时别的节点又有新区块生成,你不得不放弃做了一半的计算,再次去同步。因为每个区块的后面,只能跟着一个区块,你永远只能在最新区块的后面,生成下一个区块。所以,你别无选择,一听到信号,就必须立刻同步。

所以,区块链的发明者中本聪(这是假名,真实身份至今未知)故意让添加新区块,变得很困难。他的设计是,平均每10分钟,全网才能生成一个新区块,一小时也就六个。

这种产出速度不是通过命令达成的,而是故意设置了海量的计算。也就是说,只有通过极其大量的计算,才能得到当前区块的有效 Hash,从而把新区块添加到区块链。由于计算量太大,所以快不起来。

这个过程就叫做采矿(mining),因为计算有效 Hash 的难度,好比在全世界的沙子里面,找到一粒符合条件的沙子。计算 Hash 的机器就叫做矿机,操作矿机的人就叫做矿工。

难度系数

读到这里,你可能会有一个疑问,人们都说采矿很难,可是采矿不就是用计算机算出一个 Hash 吗,这正是计算机的强项啊,怎么会变得很难,迟迟算不出来呢?

原来不是任意一个 Hash 都可以,只有满足条件的 Hash 才会被区块链接受。这个条件特别苛刻,使得绝大部分 Hash 都不满足要求,必须重算。

原来,区块头包含一个难度系数(difficulty),这个值决定了计算 Hash 的难度。举例来说,第100000个区块的难度系数是 14484.16236122。


区块链协议规定,使用一个常量除以难度系数,可以得到目标值(target)。显然,难度系数越大,目标值就越小。

Hash 的有效性跟目标值密切相关,只有小于目标值的 Hash 才是有效的,否则 Hash 无效,必须重算。由于目标值非常小,Hash 小于该值的机会极其渺茫,可能计算10亿次,才算中一次。这就是采矿如此之慢的根本原因。

区块头里面还有一个 Nonce 值,记录了 Hash 重算的次数。第 100000 个区块的 Nonce 值是274148111,即计算了 2.74 亿次,才得到了一个有效的 Hash,该区块才能加入区块链。

难度系数的动态调节

就算采矿很难,但也没法保证,正好十分钟产出一个区块,有时一分钟就算出来了,有时几个小时可能也没结果。总体来看,随着硬件设备的提升,以及矿机的数量增长,计算速度一定会越来越快。

为了将产出速率恒定在十分钟,中本聪还设计了难度系数的动态调节机制。他规定,难度系数每两周(2016个区块)调整一次。如果这两周里面,区块的平均生成速度是9分钟,就意味着比法定速度快了10%,因此难度系数就要调高10%;如果平均生成速度是11分钟,就意味着比法定速度慢了10%,因此难度系数就要调低10%。

难度系数越调越高(目标值越来越小),导致了采矿越来越难。

区块链的分叉

即使区块链是可靠的,现在还有一个问题没有解决:如果两个人同时向区块链写入数据,也就是说,同时有两个区块加入,因为它们都连着前一个区块,就形成了分叉。这时应该采纳哪一个区块呢?

现在的规则是,新节点总是采用最长的那条区块链。如果区块链有分叉,将看哪个分支在分叉点后面,先达到6个新区块(称为”六次确认”)。按照10分钟一个区块计算,一小时就可以确认。

由于新区块的生成速度由计算能力决定,所以这条规则就是说,拥有大多数计算能力的那条分支,就是正宗的比特链。

总结

区块链作为无人管理的分布式数据库,从2009年开始已经运行了8年,没有出现大的问题。这证明它是可行的。

但是,为了保证数据的可靠性,区块链也有自己的代价。一是效率,数据写入区块链,最少要等待十分钟,所有节点都同步数据,则需要更多的时间;二是能耗,区块的生成需要矿工进行无数无意义的计算,这是非常耗费能源的。

因此,区块链的适用场景,其实非常有限。

  • 不存在所有成员都信任的管理当局
  • 写入的数据不要求实时使用
  • 挖矿的收益能够弥补本身的成本

目前,区块链最大的应用场景,就是以比特币为代表的加密货币

pic
mysql5.7设置允许外网登录数据库

创建host

进入mysql数据库,修改user表

1
use mysql
1
update user set host='%' where user='root'

刷新权限

1
flush privileges

授权用户

任意主机以用户root和密码mypwd连接到mysql数据库

1
GRANT ALL PRIVILEGES ON *.* TO 'root'@'%'  IDENTIFIED BY 'mypwd'  WITH GRANT OPTION

IP为192.168.1.11的主机以用户myuser和密码mypwd连接到mysql数据库

1
GRANT ALL PRIVILEGES ON *.* TO 'myuser'@'192.168.1.11' IDENTIFIED BY 'mypwd' WITH GRANT OPTION

刷新权限

1
flush privileges

可能遇到的问题

  • 设置后仍不能访问:防火墙放开mysql的tcp端口
  • 如果是阿里ECS用户:配置安全组规则放开mysql端口

CentOS默认允许任何用户通过ssh登入,但是在企业应用中为了保证服务器的安全性一般不允许用户直接通过root用户登录,而是通过普通用户登录然后切换到root或者使用sudo权限来执行root用户命令

创建用户

创建普通用户

1
useradd wangweiye

为普通用户设置密码

1
passwd wangweiye

将普通用户移入wheel用户组

1
usermod -G wheel wangweiye

在Linux中wheel组类似于管理员组

这样就可以使用普通用户wangweiye来进行登录了,而且该用户拥有sudo权限

在一般情况下,一般用户通过执行su命令、输入正确的root密码,可以登录为root用户来对系统进行管理员级别的配置。但是,为了更进一步加强系统的安全性,有必要建立一个管理员的组,只允许这个组的用户来执行su命令登录为root用户,而让其他组的用户即使执行su输入了正确的root密码,也无法登录为root用户。在UNIX下,这个组的名称通常为wheel

如何配置只有wheel组的用户才能登录root用户呢?

  1. /etc/pam.d/su中的#auth required /lib/security/$ISA/pam_wheel.so use_uid注释去掉

  2. SU_WHEEL_ONLY yes添加到/etc/login.defs文件行末

禁止root用户ssh登录

修改配置文件

修改/etc/ssh/sshd_configPermitRootLogin设置为no即可

重启ssh服务

systemctl restart sshd.service重启ssh服务

给用户分配sudo权限

如果只想给用户sudo权限而不能切换root用户登录(不加入wheel组)

1
vim /etc/sudoers

0%