Java线程池实现原理及实践

AQS 介绍

1 AQS 简单介绍

AQS 的全称为(AbstractQueuedSynchronizer),这个类在 java.util.concurrent.locks 包下面。

类图

AQS 是一个用来构建锁和同步器的框架,使用 AQS 能简单且高效地构造出应用广泛的大量的同步器,比如我们提到的 ReentrantLock,Semaphore,其他的诸如 ReentrantReadWriteLock,SynchronousQueue,FutureTask(jdk1.7) 等等皆是基于 AQS 的。当然,我们自己也能利用 AQS 非常轻松容易地构造出符合我们自己需求的同步器。

2 AQS 原理

在面试中被问到并发知识的时候,大多都会被问到“请你说一下自己对于 AQS 原理的理解”。下面给大家一个示例供大家参考,面试不是背题,大家一定要加入自己的思想,即使加入不了自己的思想也要保证自己能够通俗的讲出来而不是背出来。

下面大部分内容其实在 AQS 类注释上已经给出了,不过是英语看着比较吃力一点,感兴趣的话可以看看源码。

2.1 AQS 原理概览

AQS 核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制 AQS 是用 CLH 队列锁实现的,即将暂时获取不到锁的线程加入到队列中。

CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS 是将每条请求共享资源的线程封装成一个 CLH 锁队列的一个结点(Node)来实现锁的分配。

看个 AQS(AbstractQueuedSynchronizer)原理图:

453525

AQS 使用一个 int 成员变量来表示同步状态,通过内置的 FIFO 队列来完成获取资源线程的排队工作。AQS 使用 CAS 对该同步状态进行原子操作实现对其值的修改。

1
private volatile int state;//共享变量,使用volatile修饰保证线程可见性

状态信息通过 protected 类型的getStatesetStatecompareAndSetState进行操作

1
2
3
4
5
6
7
8
9
10
11
12
//返回同步状态的当前值
protected final int getState() {
return state;
}
// 设置同步状态的值
protected final void setState(int newState) {
state = newState;
}
//原子地(CAS操作)将同步状态值设置为给定值update如果当前同步状态的值等于expect(期望值)
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

2.2 AQS 对资源的共享方式

AQS 定义两种资源共享方式

1)Exclusive(独占)

只有一个线程能执行,如 ReentrantLock。又可分为公平锁和非公平锁,ReentrantLock 同时支持两种锁,下面以 ReentrantLock 对这两种锁的定义做介绍:

  • 公平锁:按照线程在队列中的排队顺序,先到者先拿到锁
  • 非公平锁:当线程要获取锁时,先通过两次 CAS 操作去抢锁,如果没抢到,当前线程再加入到队列中等待唤醒。

说明:下面这部分关于 ReentrantLock 源代码内容节选自:https://www.javadoop.com/post/AbstractQueuedSynchronizer-2 ,这是一篇很不错文章,推荐阅读。

下面来看 ReentrantLock 中相关的源代码:

ReentrantLock 默认采用非公平锁,因为考虑获得更好的性能,通过 boolean 来决定是否用公平锁(传入 true 用公平锁)。

1
2
3
4
5
6
7
8
9
/** Synchronizer providing all implementation mechanics */
private final Sync sync;
public ReentrantLock() {
// 默认非公平锁
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

ReentrantLock 中公平锁的 lock 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
static final class FairSync extends Sync {
final void lock() {
acquire(1);
}
// AbstractQueuedSynchronizer.acquire(int arg)
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 1. 和非公平锁相比,这里多了一个判断:是否有线程在等待
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

非公平锁的 lock 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
static final class NonfairSync extends Sync {
final void lock() {
// 2. 和公平锁相比,这里会直接先进行一次CAS,成功就返回了
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
// AbstractQueuedSynchronizer.acquire(int arg)
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 这里没有对阻塞队列进行判断
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

总结:公平锁和非公平锁只有两处不同:

  1. 非公平锁在调用 lock 后,首先就会调用 CAS 进行一次抢锁,如果这个时候恰巧锁没有被占用,那么直接就获取到锁返回了。
  2. 非公平锁在 CAS 失败后,和公平锁一样都会进入到 tryAcquire 方法,在 tryAcquire 方法中,如果发现锁这个时候被释放了(state == 0),非公平锁会直接 CAS 抢锁,但是公平锁会判断等待队列是否有线程处于等待状态,如果有则不去抢锁,乖乖排到后面。

公平锁和非公平锁就这两点区别,如果这两次 CAS 都不成功,那么后面非公平锁和公平锁是一样的,都要进入到阻塞队列等待唤醒。

相对来说,非公平锁会有更好的性能,因为它的吞吐量比较大。当然,非公平锁让获取锁的时间变得更加不确定,可能会导致在阻塞队列中的线程长期处于饥饿状态。

2)Share(共享)

多个线程可同时执行,如 Semaphore/CountDownLatch。Semaphore、CountDownLatCh、 CyclicBarrier、ReadWriteLock 我们都会在后面讲到。

ReentrantReadWriteLock 可以看成是组合式,因为 ReentrantReadWriteLock 也就是读写锁允许多个线程同时对某一资源进行读。

不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源 state 的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS 已经在上层已经帮我们实现好了。

2.3 AQS 底层使用了模板方法模式

同步器的设计是基于模板方法模式的,如果需要自定义同步器一般的方式是这样(模板方法模式很经典的一个应用):

  1. 使用者继承 AbstractQueuedSynchronizer 并重写指定的方法。(这些重写方法很简单,无非是对于共享资源 state 的获取和释放)
  2. 将 AQS 组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。

这和我们以往通过实现接口的方式有很大区别,这是模板方法模式很经典的一个运用,下面简单的给大家介绍一下模板方法模式,模板方法模式是一个很容易理解的设计模式之一。

模板方法模式是基于”继承“的,主要是为了在不改变模板结构的前提下在子类中重新定义模板中的内容以实现复用代码。举个很简单的例子假如我们要去一个地方的步骤是:购票buyTicket()->安检securityCheck()->乘坐某某工具回家ride()->到达目的地arrive()。我们可能乘坐不同的交通工具回家比如飞机或者火车,所以除了ride()方法,其他方法的实现几乎相同。我们可以定义一个包含了这些方法的抽象类,然后用户根据自己的需要继承该抽象类然后修改 ride()方法。

AQS 使用了模板方法模式,自定义同步器时需要重写下面几个 AQS 提供的模板方法:

1
2
3
4
5
isHeldExclusively()//该线程是否正在独占资源。只有用到condition才需要去实现它。
tryAcquire(int)//独占方式。尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int)//独占方式。尝试释放资源,成功则返回true,失败则返回false。
tryAcquireShared(int)//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int)//共享方式。尝试释放资源,成功则返回true,失败则返回false。

默认情况下,每个方法都抛出 UnsupportedOperationException。 这些方法的实现必须是内部线程安全的,并且通常应该简短而不是阻塞。AQS 类中的其他方法都是 final ,所以无法被其他类使用,只有这几个方法可以被其他类使用。

以 ReentrantLock 为例,state 初始化为 0,表示未锁定状态。A 线程 lock()时,会调用 tryAcquire()独占该锁并将 state+1。此后,其他线程再 tryAcquire()时就会失败,直到 A 线程 unlock()到 state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A 线程自己是可以重复获取此锁的(state 会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证 state 是能回到零态的。

再以 CountDownLatch 以例,任务分为 N 个子线程去执行,state 也初始化为 N(注意 N 要与线程个数一致)。这 N 个子线程是并行执行的,每个子线程执行完后 countDown()一次,state 会 CAS(Compare and Swap)减 1。等到所有子线程都执行完后(即 state=0),会 unpark()主调用线程,然后主调用线程就会从 await()函数返回,继续后余动作。

一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryReleasetryAcquireShared-tryReleaseShared中的一种即可。但 AQS 也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock

推荐两篇 AQS 原理和相关源码分析的文章:

3 Semaphore(信号量)-允许多个线程同时访问

synchronized 和 ReentrantLock 都是一次只允许一个线程访问某个资源,Semaphore(信号量)可以指定多个线程同时访问某个资源。

示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
*
* @author Snailclimb
* @date 2018年9月30日
* @Description: 需要一次性拿一个许可的情况
*/
public class SemaphoreExample1 {
// 请求的数量
private static final int threadCount = 550;

public static void main(String[] args) throws InterruptedException {
// 创建一个具有固定线程数量的线程池对象(如果这里线程池的线程数量给太少的话你会发现执行的很慢)
ExecutorService threadPool = Executors.newFixedThreadPool(300);
// 一次只能允许执行的线程数量。
final Semaphore semaphore = new Semaphore(20);

for (int i = 0; i < threadCount; i++) {
final int threadnum = i;
threadPool.execute(() -> {// Lambda 表达式的运用
try {
semaphore.acquire();// 获取一个许可,所以可运行线程数量为20/1=20
test(threadnum);
semaphore.release();// 释放一个许可
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

});
}
threadPool.shutdown();
System.out.println("finish");
}

public static void test(int threadnum) throws InterruptedException {
Thread.sleep(1000);// 模拟请求的耗时操作
System.out.println("threadnum:" + threadnum);
Thread.sleep(1000);// 模拟请求的耗时操作
}
}

执行 acquire 方法阻塞,直到有一个许可证可以获得然后拿走一个许可证;每个 release 方法增加一个许可证,这可能会释放一个阻塞的 acquire 方法。然而,其实并没有实际的许可证这个对象,Semaphore 只是维持了一个可获得许可证的数量。 Semaphore 经常用于限制获取某种资源的线程数量。

当然一次也可以一次拿取和释放多个许可,不过一般没有必要这样做:

1
2
3
semaphore.acquire(5);// 获取5个许可,所以可运行线程数量为20/5=4
test(threadnum);
semaphore.release(5);// 获取5个许可,所以可运行线程数量为20/5=4

除了 acquire方法之外,另一个比较常用的与之对应的方法是tryAcquire方法,该方法如果获取不到许可就立即返回 false。

Semaphore 有两种模式,公平模式和非公平模式。

  • 公平模式: 调用 acquire 的顺序就是获取许可证的顺序,遵循 FIFO;
  • 非公平模式: 抢占式的。

Semaphore 对应的两个构造方法如下:

1
2
3
4
5
6
7
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

这两个构造方法,都必须提供许可的数量,第二个构造方法可以指定是公平模式还是非公平模式,默认非公平模式。

issue645补充内容 :Semaphore与CountDownLatch一样,也是共享锁的一种实现。它默认构造AQS的state为permits。当执行任务的线程数量超出permits,那么多余的线程将会被放入阻塞队列Park,并自旋判断state是否大于0。只有当state大于0的时候,阻塞的线程才能继续执行,此时先前执行任务的线程继续执行release方法,release方法使得state的变量会加1,那么自旋的线程便会判断成功。 如此,每次只有最多不超过permits数量的线程能自旋成功,便限制了执行任务线程的数量。

由于篇幅问题,如果对 Semaphore 源码感兴趣的朋友可以看下这篇文章:https://juejin.im/post/5ae755366fb9a07ab508adc6

4 CountDownLatch (倒计时器)

CountDownLatch允许 count 个线程阻塞在一个地方,直至所有线程的任务都执行完毕。在 Java 并发中,countdownlatch 的概念是一个常见的面试题,所以一定要确保你很好的理解了它。

CountDownLatch是共享锁的一种实现,它默认构造 AQS 的 state 值为 count。当线程使用countDown方法时,其实使用了tryReleaseShared方法以CAS的操作来减少state,直至state为0就代表所有的线程都调用了countDown方法。当调用await方法的时候,如果state不为0,就代表仍然有线程没有调用countDown方法,那么就把已经调用过countDown的线程都放入阻塞队列Park,并自旋CAS判断state == 0,直至最后一个线程调用了countDown,使得state == 0,于是阻塞的线程便判断成功,全部往下执行。

4.1 CountDownLatch 的两种典型用法

  1. 某一线程在开始运行前等待 n 个线程执行完毕。将 CountDownLatch 的计数器初始化为 n :new CountDownLatch(n),每当一个任务线程执行完毕,就将计数器减 1 countdownlatch.countDown(),当计数器的值变为 0 时,在CountDownLatch上 await() 的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。
  2. 实现多个线程开始执行任务的最大并行性。注意是并行性,不是并发,强调的是多个线程在某一时刻同时开始执行。类似于赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑。做法是初始化一个共享的 CountDownLatch 对象,将其计数器初始化为 1 :new CountDownLatch(1),多个线程在开始执行任务前首先 coundownlatch.await(),当主线程调用 countDown() 时,计数器变为 0,多个线程同时被唤醒。

4.2 CountDownLatch 的使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
*
* @author SnailClimb
* @date 2018年10月1日
* @Description: CountDownLatch 使用方法示例
*/
public class CountDownLatchExample1 {
// 请求的数量
private static final int threadCount = 550;

public static void main(String[] args) throws InterruptedException {
// 创建一个具有固定线程数量的线程池对象(如果这里线程池的线程数量给太少的话你会发现执行的很慢)
ExecutorService threadPool = Executors.newFixedThreadPool(300);
final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
final int threadnum = i;
threadPool.execute(() -> {// Lambda 表达式的运用
try {
test(threadnum);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
countDownLatch.countDown();// 表示一个请求已经被完成
}

});
}
countDownLatch.await();
threadPool.shutdown();
System.out.println("finish");
}

public static void test(int threadnum) throws InterruptedException {
Thread.sleep(1000);// 模拟请求的耗时操作
System.out.println("threadnum:" + threadnum);
Thread.sleep(1000);// 模拟请求的耗时操作
}
}

上面的代码中,我们定义了请求的数量为 550,当这 550 个请求被处理完成之后,才会执行System.out.println("finish");

与 CountDownLatch 的第一次交互是主线程等待其他线程。主线程必须在启动其他线程后立即调用 CountDownLatch.await() 方法。这样主线程的操作就会在这个方法上阻塞,直到其他线程完成各自的任务。

其他 N 个线程必须引用闭锁对象,因为他们需要通知 CountDownLatch 对象,他们已经完成了各自的任务。这种通知机制是通过 CountDownLatch.countDown()方法来完成的;每调用一次这个方法,在构造函数中初始化的 count 值就减 1。所以当 N 个线程都调 用了这个方法,count 的值等于 0,然后主线程就能通过 await()方法,恢复执行自己的任务。

再插一嘴:CountDownLatchawait() 方法使用不当很容易产生死锁,比如我们上面代码中的 for 循环改为:

1
2
3
for (int i = 0; i < threadCount-1; i++) {
.......
}

这样就导致 count 的值没办法等于 0,然后就会导致一直等待。

如果对CountDownLatch源码感兴趣的朋友,可以查看: 【JUC】JDK1.8源码分析之CountDownLatch(五)

4.3 CountDownLatch 的不足

CountDownLatch 是一次性的,计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当 CountDownLatch 使用完毕后,它不能再次被使用。

4.4 CountDownLatch 相常见面试题

解释一下 CountDownLatch 概念?

CountDownLatch 和 CyclicBarrier 的不同之处?

给出一些 CountDownLatch 使用的例子?

CountDownLatch 类中主要的方法?

5 CyclicBarrier(循环栅栏)

CyclicBarrier 和 CountDownLatch 非常类似,它也可以实现线程间的技术等待,但是它的功能比 CountDownLatch 更加复杂和强大。主要应用场景和 CountDownLatch 类似。

CountDownLatch的实现是基于AQS的,而CycliBarrier是基于 ReentrantLock(ReentrantLock也属于AQS同步器)和 Condition 的.

CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。CyclicBarrier 默认的构造方法是 CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉 CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞。

再来看一下它的构造函数:

1
2
3
4
5
6
7
8
9
10
public CyclicBarrier(int parties) {
this(parties, null);
}

public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}

其中,parties 就代表了有拦截的线程的数量,当拦截的线程数量达到这个值的时候就打开栅栏,让所有线程通过。

5.1 CyclicBarrier 的应用场景

CyclicBarrier 可以用于多线程计算数据,最后合并计算结果的应用场景。比如我们用一个 Excel 保存了用户所有银行流水,每个 Sheet 保存一个帐户近一年的每笔银行流水,现在需要统计用户的日均银行流水,先用多线程处理每个 sheet 里的银行流水,都执行完之后,得到每个 sheet 的日均银行流水,最后,再用 barrierAction 用这些线程的计算结果,计算出整个 Excel 的日均银行流水。

5.2 CyclicBarrier 的使用示例

示例 1:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
*
* @author Snailclimb
* @date 2018年10月1日
* @Description: 测试 CyclicBarrier 类中带参数的 await() 方法
*/
public class CyclicBarrierExample2 {
// 请求的数量
private static final int threadCount = 550;
// 需要同步的线程数量
private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5);

public static void main(String[] args) throws InterruptedException {
// 创建线程池
ExecutorService threadPool = Executors.newFixedThreadPool(10);

for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
Thread.sleep(1000);
threadPool.execute(() -> {
try {
test(threadNum);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (BrokenBarrierException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
});
}
threadPool.shutdown();
}

public static void test(int threadnum) throws InterruptedException, BrokenBarrierException {
System.out.println("threadnum:" + threadnum + "is ready");
try {
/**等待60秒,保证子线程完全执行结束*/
cyclicBarrier.await(60, TimeUnit.SECONDS);
} catch (Exception e) {
System.out.println("-----CyclicBarrierException------");
}
System.out.println("threadnum:" + threadnum + "is finish");
}

}

运行结果,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
threadnum:0is ready
threadnum:1is ready
threadnum:2is ready
threadnum:3is ready
threadnum:4is ready
threadnum:4is finish
threadnum:0is finish
threadnum:1is finish
threadnum:2is finish
threadnum:3is finish
threadnum:5is ready
threadnum:6is ready
threadnum:7is ready
threadnum:8is ready
threadnum:9is ready
threadnum:9is finish
threadnum:5is finish
threadnum:8is finish
threadnum:7is finish
threadnum:6is finish
......

可以看到当线程数量也就是请求数量达到我们定义的 5 个的时候, await方法之后的方法才被执行。

另外,CyclicBarrier 还提供一个更高级的构造函数CyclicBarrier(int parties, Runnable barrierAction),用于在线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景。示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
*
* @author SnailClimb
* @date 2018年10月1日
* @Description: 新建 CyclicBarrier 的时候指定一个 Runnable
*/
public class CyclicBarrierExample3 {
// 请求的数量
private static final int threadCount = 550;
// 需要同步的线程数量
private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> {
System.out.println("------当线程数达到之后,优先执行------");
});

public static void main(String[] args) throws InterruptedException {
// 创建线程池
ExecutorService threadPool = Executors.newFixedThreadPool(10);

for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
Thread.sleep(1000);
threadPool.execute(() -> {
try {
test(threadNum);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (BrokenBarrierException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
});
}
threadPool.shutdown();
}

public static void test(int threadnum) throws InterruptedException, BrokenBarrierException {
System.out.println("threadnum:" + threadnum + "is ready");
cyclicBarrier.await();
System.out.println("threadnum:" + threadnum + "is finish");
}

}

运行结果,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
threadnum:0is ready
threadnum:1is ready
threadnum:2is ready
threadnum:3is ready
threadnum:4is ready
------当线程数达到之后,优先执行------
threadnum:4is finish
threadnum:0is finish
threadnum:2is finish
threadnum:1is finish
threadnum:3is finish
threadnum:5is ready
threadnum:6is ready
threadnum:7is ready
threadnum:8is ready
threadnum:9is ready
------当线程数达到之后,优先执行------
threadnum:9is finish
threadnum:5is finish
threadnum:6is finish
threadnum:8is finish
threadnum:7is finish
......

5.3 CyclicBarrier源码分析

当调用 CyclicBarrier 对象调用 await() 方法时,实际上调用的是dowait(false, 0L)方法。 await() 方法就像树立起一个栅栏的行为一样,将线程挡住了,当拦住的线程数量达到 parties 的值时,栅栏才会打开,线程才得以通过执行。

1
2
3
4
5
6
7
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}

dowait(false, 0L)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
// 当线程数量或者请求数量达到 count 时 await 之后的方法才会被执行。上面的示例中 count 的值就为 5。
private int count;
/**
* Main barrier code, covering the various policies.
*/
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
// 锁住
lock.lock();
try {
final Generation g = generation;

if (g.broken)
throw new BrokenBarrierException();

// 如果线程中断了,抛出异常
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// cout减1
int index = --count;
// 当 count 数量减为 0 之后说明最后一个线程已经到达栅栏了,也就是达到了可以执行await 方法之后的条件
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
// 将 count 重置为 parties 属性的初始化值
// 唤醒之前等待的线程
// 下一波执行开始
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}

// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}

if (g.broken)
throw new BrokenBarrierException();

if (g != generation)
return index;

if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}

总结:CyclicBarrier 内部通过一个 count 变量作为计数器,cout 的初始值为 parties 属性的初始化值,每当一个线程到了栅栏这里了,那么就将计数器减一。如果 count 值为 0 了,表示这是这一代最后一个线程到达栅栏,就尝试执行我们构造方法中输入的任务。

5.4 CyclicBarrier 和 CountDownLatch 的区别

下面这个是国外一个大佬的回答:

CountDownLatch 是计数器,只能使用一次,而 CyclicBarrier 的计数器提供 reset 功能,可以多次使用。但是我不那么认为它们之间的区别仅仅就是这么简单的一点。我们来从 jdk 作者设计的目的来看,javadoc 是这么描述它们的:

CountDownLatch: A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.(CountDownLatch: 一个或者多个线程,等待其他多个线程完成某件事情之后才能执行;) CyclicBarrier : A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.(CyclicBarrier : 多个线程互相等待,直到到达同一个同步点,再继续一起执行。)

对于 CountDownLatch 来说,重点是“一个线程(多个线程)等待”,而其他的 N 个线程在完成“某件事情”之后,可以终止,也可以等待。而对于 CyclicBarrier,重点是多个线程,在任意一个线程没有完成,所有的线程都必须等待。

CountDownLatch 是计数器,线程完成一个记录一个,只不过计数不是递增而是递减,而 CyclicBarrier 更像是一个阀门,需要所有线程都到达,阀门才能打开,然后继续执行。

6 ReentrantLock 和 ReentrantReadWriteLock

ReentrantLock 和 synchronized 的区别在上面已经讲过了这里就不多做讲解。另外,需要注意的是:读写锁 ReentrantReadWriteLock 可以保证多个线程可以同时读,所以在读操作远大于写操作的时候,读写锁就非常有用了。

参考

Java IO/NIO 对比

Java NIO Buffer, Channel 及 Selector

Java IO VS NIO

  • JDK 1.4 之前,java.io 包,

    面向流的I/O系统

    (字节流或者字符流)

    • 系统一次处理一个字节
    • 速度慢
  • JDK 1.4 提供,java.nio 包,

    面向块的I/O系统

    • 系统一次处理一个块
    • 速度快

    NIO主要有三大核心部分:Channel(通道),Buffer(缓冲区),Selector。

    ​ 传统IO基于字节流和字符流进行操作,而NIO基于Channel和Buffer(缓冲区)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector(选择区)用于监听多个通道的事件(比如:连接打开,数据到达)。因此,单个线程可以监听多个数据通道。

    NIO和传统IO(一下简称IO)之间第一个最大的区别是,IO是面向流的,NIO是面向缓冲区的。

Buffer 缓冲区

缓冲区实际上是一个容器对象,更直接的说,其实就是一个数组。
在 NIO 库中,所有数据都是用缓冲区处理的:

  • 在读取数据时,它是直接读到缓冲区中的;
  • 在写入数据时,它也是写入到缓冲区中的;

在 NIO 中,所有的缓冲区类型都继承于抽象类 Buffer。常见的缓冲区 Buffer 包括:

  • ByteBuffer 存储了字节数组 final byte[] hb;

  • CharBuffer

    1
    final char[] hb;
    • ByteBuffer 与 CharBuffer 之间的转换需要使用字符集 Charset
    • Charset 具体使用,参见 Java Charset 字符集
  • ShortBuffer final short[] hb;

  • IntBuffer final int[] hb;

  • LongBuffer final long[] hb;

  • FloatBuffer final float[] hb;

  • DoubleBuffer final double[] hb;

Buffer 类的属性:

  • private int mark = -1; 记录一个标记位置
  • private int position = 0;

A buffer’s position is the index of the next element to be read or written. A buffer’s position is never negative and is never greater than its limit.
当前操作的位置

  • private int limit;

A buffer’s limit is the index of the first element that should not be read or written. A buffer’s limit is never negative and is never greater than its capacity.
可以存放的元素的个数

  • private int capacity;

A buffer’s capacity is the number of elements it contains. The capacity of a buffer is never negative and never changes.
数组容量

  • 大小关系:mark <= position <= limit <= capacity

Buffer 类的方法:

  • allocate(int capacity) 分配一个缓冲区,默认 limit = capacity
  • put() 在当前位置添加元素
  • get() 得到当前位置的元素
  • clear() 将 Buffer 从 读模式 切换到 写模式 (该方法实际不会清空原 Buffer 的内容)
1
2
3
4
5
6
public final Buffer clear() {
position = 0;
limit = capacity;
mark = -1;
return this;
}
  • flip() 将 Buffer 从 写模式 切换到 读模式
1
2
3
4
5
6
public final Buffer flip() {
limit = position;
position = 0;
mark = -1;
return this;
}

** clear() VS flip()**:

  • 在写模式下,Buffer 的 limit 表示你最多能往 Buffer 里写多少数据。
    • 因此写之前,调用 clear(),使得 limit = capacity;
  • 在读模式时,Buffer 的 limit 表示你最多能从 Buffer 里读多少数据。
    • 因此读之前,调用 flip(),使得 limit = position;

IntBuffer 的使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public static void main(String[] args) throws Exception {
// 创建 int 缓冲区 capacity 为 4
// 默认 limit = capacity
IntBuffer buffer = IntBuffer.allocate(4);
System.out.println("Capacity & Limit: " + buffer.capacity() + " " + buffer.limit());

// 往 Buffer 中写数据
buffer.put(11);
buffer.put(22);
buffer.put(33);
buffer.put(44);

System.out.println("Position: " + buffer.position());

// 在从 Buffer 中读数据之前,调用 flip()
buffer.flip();

while (buffer.hasRemaining()) {
System.out.print(buffer.get() + " ");
}
}

输出:

Capacity & Limit: 4 4
Position: 4
11 22 33 44

Channel 通道

  • Java NIO 的核心概念,表示的是对支持 I/O 操作的实体的一个连接
  • 通过它可以读取和写入数据(并不是直接操作,而是通过 Buffer 来处理)
  • 双向的

常用的 Channel 包括:

  • FileChannel 从文件中读写数据
  • DatagramChannel 从 UDP 中读写数据
  • SocketChannel 从 TCP 中读写数据
  • ServerSocketChannel 监听新进来的 TCP 连接,每一个新进来的连接都会创建一个 SocketChannel。

FileChannel 连接到文件的通道

FileChannel 无法设置为非阻塞模式,只能运行在阻塞模式下
常用方法:

  • int read(ByteBuffer dst) 从 Channel 中读取数据,写入 Buffer
  • int write(ByteBuffer src) 从 Buffer 中读取数据,写入 Channel
  • long size() 得到 Channel 中文件的大小
  • long position() 得到 Channel 中文件的当前操作位置
  • FileChannel position(long newPosition) 设置 Channel 中文件的当前操作位置

使用 FileChannel 来复制文件的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static void main(String[] args) throws Exception {
// 通过 InputStream 或者 OutputStream 来构造 FileChannel
FileChannel in = new FileInputStream("a.txt").getChannel();
FileChannel out = new FileOutputStream("b.txt").getChannel();

ByteBuffer buffer = ByteBuffer.allocate(1024);

// 调用 channel 的 read 方法往 Buffer 中写数据
while(in.read(buffer) != -1) {
// 在从 Buffer 中读数据之前,调用 flip()
buffer.flip();
// 从 Buffer 中读数据,写入到 channel
out.write(buffer);
// 在往 Buffer 中写数据之前,调用 clear()
buffer.clear();
}

// 或者使用如下代码
// out.transferFrom(in, 0, in.size());
}

SocketChannel 连接到 TCP 套接字的通道

SocketChannel 可以设置为阻塞模式或非阻塞模式
使用 SocketChannel 来建立 TCP 连接,发送并接收数据,默认使用 阻塞模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public static void main(String[] args) throws Exception {
// 打开 SocketChannel
SocketChannel channel = SocketChannel.open();
// connect 方法会阻塞,直至连接建立成功
channel.connect(new InetSocketAddress("127.0.0.1", 8080));

ByteBuffer buffer = ByteBuffer.allocate(1024);

// 发送数据
String msg = "This is client.";
// 在往 Buffer 中写数据之前,调用 clear()
buffer.clear();
buffer.put(msg.getBytes());

// 在从 Buffer 中读数据之前,调用 flip()
buffer.flip();
channel.write(buffer);

// 接收数据
// 在往 Buffer 中写数据之前,调用 clear()
buffer.clear();

// 调用 channel 的 read 方法往 Buffer 中写数据
channel.read(buffer);

// 在从 Buffer 中读数据之前,调用 flip()
buffer.flip();

// 从 Buffer 中读数据
while (buffer.hasRemaining()) {
System.out.print(buffer.get());
}
}

使用 SocketChannel 的 非阻塞模式 来建立 TCP 连接,发送并接收数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public static void main(String[] args) throws Exception {
// 打开 SocketChannel
SocketChannel channel = SocketChannel.open();

channel.configureBlocking(false);
channel.connect(new InetSocketAddress("127.0.0.1", 8080));

ByteBuffer buffer = ByteBuffer.allocate(1024);

while (!channel.finishConnect()) {
// 发送数据
String msg = "This is client.";
// 在往 Buffer 中写数据之前,调用 clear()
buffer.clear();
buffer.put(msg.getBytes());

// 在从 Buffer 中读数据之前,调用 flip()
buffer.flip();
channel.write(buffer);

// 接收数据
// 在往 Buffer 中写数据之前,调用 clear()
buffer.clear();

// 调用 channel 的 read 方法往 Buffer 中写数据
channel.read(buffer);

// 在从 Buffer 中读数据之前,调用 flip()
buffer.flip();

// 从 Buffer 中读数据
while (buffer.hasRemaining()) {
System.out.print(buffer.get());
}
}
}

ServerSocketChannel 监听 TCP 连接的通道

ServerSocketChannel 可以设置为阻塞模式或非阻塞模式
使用 ServerSocketChannel 来监听 TCP 连接,默认使用 阻塞模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public static void main(String[] args) throws Exception {
// 打开 SocketChannel
ServerSocketChannel channel = ServerSocketChannel.open();
// 绑定端口
channel.socket().bind(new InetSocketAddress(8080));

ByteBuffer buffer = ByteBuffer.allocate(1024);

while (true) {
// accept 方法会阻塞,直至监听到 TCP 连接
SocketChannel socketChannel = channel.accept();
System.out.println("A new connection...");

// 接收数据
// 在往 Buffer 中写数据之前,调用 clear()
buffer.clear();

// 调用 channel 的 read 方法往 Buffer 中写数据
socketChannel.read(buffer);

// 在从 Buffer 中读数据之前,调用 flip()
buffer.flip();

// 从 Buffer 中读数据
while (buffer.hasRemaining()) {
System.out.print(buffer.get());
}

// 在往 Buffer 中写数据之前,调用 clear()
// 发送数据
String msg = "This is server.";
// 在往 Buffer 中写数据之前,调用 clear()
buffer.clear();
buffer.put(msg.getBytes());

// 在从 Buffer 中读数据之前,调用 flip()
buffer.flip();
socketChannel.write(buffer);
}
}

Selector 选择器

Selector 允许单个进程可以同时处理多个网络连接的 IO,即监听多个端口的 Channel

关于 IO 模式,参见 Linux IO 模型 中对多路复用 IO Multiplexing IO 的说明。

引用:


多路复用 IO Multiplexing IO

  • 单个进程可以同时处理多个网络连接的 IO,即监听多个端口的 IO
  • 适用于连接数很高的情况
  • 实现方式:select,poll,epoll 系统调用
    • 注册多个端口的监听 Socket,比如 8080,8081
    • 当用户进程调用 select 方法后,整个用户进程被阻塞,OS 内核会监听所有注册的 Socket
    • 当任何一个端口的 Socket 中的数据准备好了( 8080 或者 8081),select 方法就会返回
    • 随后用户进程再调用 read 操作,将数据从 OS 内核缓存区拷贝到应用程序的地址空间。
  • 多路复用 IO 类似于 多线程结合阻塞 IO
    • 要实现监听多个端口的 IO,还可以通过多线程的方式,每一个线程负责监听一个端口的 IO
    • 如果处理的连接数不是很高的话,使用 多路复用 IO 不一定比使用 多线程结合阻塞 IO 的服务器性能更好,可能延迟还更大
    • 多路复用 IO 的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接

Selector 使用步骤:

  • 创建 Selector

  • 创建 Channel,可以创建多个 Channel,即监听多个端口,比如 8080,8081

  • 将 Channel 注册到 Selector 中

    • 如果一个 Channel 要注册到 Selector 中, 那么这个 Channel 必须是非阻塞的, 即 channel.configureBlocking(false);

    • 因此 FileChannel 是不能够使用 Selector 的, 因为 FileChannel 都是阻塞的

    • 注册时,需要指定了对 Channel 的什么事件感兴趣,包括:

      • SelectionKey.OP_CONNECT:TCP 连接 static final int OP_CONNECT = 1 << 3;
      • SelectionKey.OP_ACCEPT:确认 static final int OP_ACCEPT = 1 << 4;
      • SelectionKey.OP_READ:读 static final int OP_READ = 1 << 0;
      • SelectionKey.OP_WRITE:写 static final int OP_WRITE = 1 << 2;
      • 可以使用或运算 | 来组合,例如 SelectionKey.OP_READ | SelectionKey.OP_WRITE
    • register 方法返回一个 SelectionKey 对象,包括:

      • int interestOps():调用 register 注册 channel 时所设置的 interest set.

      • ```java
        int readyOps()

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        36
        37
        38
        39
        40
        41
        42
        43
        44
        45
        46
        47
        48
        49
        50
        51
        52
        53
        54
        55
        56
        57
        58
        59
        60
        61
        62
        63
        64
        65
        66
        67
        68
        69
        70
        71
        72
        73
        74
        75
        76
        77
        78
        79
        80
        81
        82
        83
        84
        85
        86
        87
        88
        89
        90
        91
        92

        :Channel 所准备好了的操作

        - `selectionKey.isAcceptable();`
        - `selectionKey.isConnectable();`
        - `selectionKey.isReadable();`
        - `selectionKey.isWritable();`

        - `public abstract SelectableChannel channel();`: 得到 Channel

        - `public abstract Selector selector();`:得到 Selector

        - `public final Object attachment`:得到附加对象

        - 不断重复:

        - 调用 Selector 对象的 select() 方法,**该方法会阻塞,直至注册的事件发生**
        - **事件发生**,调用 Selector 对象的 selectedKeys() 方法获取 selected keys
        - 遍历每个 selected key:
        - 从 selected key 中获取对应的 Channel 并处理
        - 在 OP_ACCEPT 事件中, 从 key.channel() 返回的是 ServerSocketChannel
        - 在 OP_WRITE 和 OP_READ 事件中, 从 key.channel() 返回的是 SocketChannel

        - **关闭 Selector**

        示例:

        ```java
        public static void main(String args[]) throws Exception {
        // 创建 Selector
        Selector selector = Selector.open();

        // 创建 Server Socket,监听端口 8080
        ServerSocketChannel serverChannel1 = ServerSocketChannel.open();
        serverChannel1.socket().bind(new InetSocketAddress(8080));
        // 如果一个 Channel 要注册到 Selector 中, 那么这个 Channel 必须是非阻塞的
        serverChannel1.configureBlocking(false);

        // 创建 Server Socket,监听端口 8081
        ServerSocketChannel serverChannel2 = ServerSocketChannel.open();
        serverChannel2.socket().bind(new InetSocketAddress(8081));
        // 如果一个 Channel 要注册到 Selector 中, 那么这个 Channel 必须是非阻塞的
        serverChannel2.configureBlocking(false);

        // 将 Channel 注册到 Selector 中
        serverChannel1.register(selector, SelectionKey.OP_ACCEPT);
        serverChannel2.register(selector, SelectionKey.OP_ACCEPT);

        // 不断重复
        while (true) {
        // 调用 Selector 对象的 select() 方法,该方法会阻塞,直至注册的事件发生
        selector.select();

        // 事件发生,调用 Selector 对象的 selectedKeys() 方法获取 selected keys
        Iterator<SelectionKey> it = selector.selectedKeys().iterator();

        // 遍历每个 selected key:
        while (it.hasNext()) {
        SelectionKey key = it.next();

        if (key.isAcceptable()) {
        // 在 OP_ACCEPT 事件中, 从 key.channel() 返回的是 ServerSocketChannel
        ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();

        // 调用 accept 方法获取 TCP 连接 SocketChanne
        SocketChannel clientChannel = serverChannel.accept();
        clientChannel.configureBlocking(false);

        // 注册 SocketChannel
        clientChannel.register(key.selector(), SelectionKey.OP_READ | SelectionKey.OP_WRITE);

        System.out.println("Accept event");
        }

        if (key.isReadable()) {
        // 在 OP_WRITE 和 OP_READ 事件中, 从 key.channel() 返回的是 SocketChannel
        SocketChannel clientChannel = (SocketChannel) key.channel();
        System.out.println("Read event");
        // 可以从 clientChannel 中读数据,通过 ByteBuffer
        // TO DO
        }

        if (key.isWritable()) {
        // 在 OP_WRITE 和 OP_READ 事件中, 从 key.channel() 返回的是 SocketChannel
        SocketChannel clientChannel = (SocketChannel) key.channel();
        System.out.println("Write event");
        // 可以向 clientChannel 中写数据,通过 ByteBuffer
        // TO DO
        }
        }
        }
        }

scanner、buffer reader、jdk8 stream、apache common io 效率对比

文件行读效率测试

scanner、buffer reader、jdk8 stream、apache common io

一、先看下代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
public static void main(String[] args) throws IOException {
File file = new File(FILE_PATH);

// scanner
System.out.println("------------************************-------------");
System.out.println(">>>>>>scanner 开始 >>>>>>>");
Long cur = System.currentTimeMillis();
scanner(file);
System.out.println("scanner 耗时[" + (System.currentTimeMillis() - cur) + "]ms");
System.out.println("<<<<<<<scanner 结束 <<<<<<<");
System.out.println("------------************************-------------");
System.out.println();

// buffer reader
System.out.println("------------************************-------------");
System.out.println(">>>>>>buffer reader 开始 >>>>>>>");
cur = System.currentTimeMillis();
bufferReader(file);
System.out.println("buffer reader 耗时[" + (System.currentTimeMillis() - cur) + "]ms");
System.out.println("<<<<<<<buffer reader 结束 <<<<<<<");
System.out.println("------------************************-------------");
System.out.println();

// JDK8
System.out.println("------------************************-------------");
System.out.println(">>>>>>JDK8 stream开始 >>>>>>>");
cur = System.currentTimeMillis();
jdk8Reader(FILE_PATH);
System.out.println("JDK8 stream 耗时[" + (System.currentTimeMillis() - cur) + "]ms");
System.out.println("<<<<<<<JDK8 stream 结束 <<<<<<<");
System.out.println("------------************************-------------");
System.out.println();

// apache common io
System.out.println("------------************************-------------");
System.out.println(">>>>>>apache common io >>>>>>>");
cur = System.currentTimeMillis();
commonIo(file);
System.out.println("apache common io 耗时[" + (System.currentTimeMillis() - cur) + "]ms");
System.out.println("<<<<<<<apache common io 结束 <<<<<<<");
System.out.println("------------************************-------------");

}

private static void commonIo(File file) throws IOException {
int total = 0;
try (LineIterator lineIterator = FileUtils.lineIterator(file, "UTF-8")) {
while (lineIterator.hasNext()) {
lineIterator.next();
total += 1;
}
} finally {
System.out.println("[总行数]:" + total);
}
}

/**
* JDK8
*
* @param filePath
* @throws IOException
*/
private static void jdk8Reader(String filePath) throws IOException {
// Files.readAllLine 内部使用的是buffer reader
// Files.readAllLines(file.toPath());
int total = 0;
try (Stream<String> stream = Files.lines(Paths.get(filePath), StandardCharsets.UTF_8)) {
total = stream.reduce(0, (cur, op) -> cur + 1, (a, b) -> a + b);
} finally {
System.out.println("[总行数]:" + total);
}
}

/**
* buffer reader
*
* @param file
* @throws IOException
*/
private static void bufferReader(File file) throws IOException {
int total = 0;
try (BufferedReader br = new BufferedReader(new FileReader(file))) {
for (String line; (line = br.readLine()) != null; total++);
// line is not visible here.
} finally {
System.out.println("[总行数]:" + total);
}
}

private static void scanner(File file) throws FileNotFoundException {
int total = 0;
try(Scanner scanner = new Scanner(file)){
while (scanner.hasNextLine()) {
scanner.nextLine();
total+=1;
}
} finally {
System.out.println("[总行数]:" + total);
}
}

100W条真实数据测试结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
------------************************-------------
>>>>>>scanner 开始 >>>>>>>
[总行数]:1056322
scanner 耗时[8379]ms
<<<<<<<scanner 结束 <<<<<<<
------------************************-------------

------------************************-------------
>>>>>>buffer reader 开始 >>>>>>>
[总行数]:1056322
buffer reader 耗时[901]ms
<<<<<<<buffer reader 结束 <<<<<<<
------------************************-------------

------------************************-------------
>>>>>>JDK8 stream开始 >>>>>>>
[总行数]:1056322
JDK8 stream 耗时[916]ms
<<<<<<<JDK8 stream 结束 <<<<<<<
------------************************-------------

------------************************-------------
>>>>>>apache common io >>>>>>>
[总行数]:1056322
apache common io 耗时[929]ms
<<<<<<<apache common io 结束 <<<<<<<
------------************************-------------

1000W条真实数据测试结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
------------************************-------------
>>>>>>scanner 开始 >>>>>>>
[总行数]:10563211
scanner 耗时[79109]ms
<<<<<<<scanner 结束 <<<<<<<
------------************************-------------

------------************************-------------
>>>>>>buffer reader 开始 >>>>>>>
[总行数]:10563211
buffer reader 耗时[8477]ms
<<<<<<<buffer reader 结束 <<<<<<<
------------************************-------------

------------************************-------------
>>>>>>JDK8 stream开始 >>>>>>>
[总行数]:10563211
JDK8 stream 耗时[8623]ms
<<<<<<<JDK8 stream 结束 <<<<<<<
------------************************-------------

------------************************-------------
>>>>>>apache common io >>>>>>>
[总行数]:10563211
apache common io 耗时[8573]ms
<<<<<<<apache common io 结束 <<<<<<<
------------************************-------------

总结:

scanner:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// Tries to read more input. May block.
private void readInput() {
if (buf.limit() == buf.capacity())
makeSpace();

// Prepare to receive data
int p = buf.position();
buf.position(buf.limit());
buf.limit(buf.capacity());

int n = 0;
try {
n = source.read(buf);
} catch (IOException ioe) {
lastException = ioe;
n = -1;
}

if (n == -1) {
sourceClosed = true;
needInput = false;
}

if (n > 0)
needInput = false;

// Restore current position and limit for reading
buf.limit(buf.position());
buf.position(p);
}

buffer reader

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
// 初始大小为8192字节
private static int defaultCharBufferSize = 8192;
private static int defaultExpectedLineLength = 80;

String readLine(boolean ignoreLF) throws IOException {
StringBuffer s = null;
int startChar;

synchronized (lock) {
ensureOpen();
boolean omitLF = ignoreLF || skipLF;

bufferLoop:
for (;;) {

if (nextChar >= nChars)
fill();
if (nextChar >= nChars) { /* EOF */
if (s != null && s.length() > 0)
return s.toString();
else
return null;
}
boolean eol = false;
char c = 0;
int i;

/* Skip a leftover '\n', if necessary */
if (omitLF && (cb[nextChar] == '\n'))
nextChar++;
skipLF = false;
omitLF = false;

charLoop:
for (i = nextChar; i < nChars; i++) {
c = cb[i];
if ((c == '\n') || (c == '\r')) {
eol = true;
break charLoop;
}
}

startChar = nextChar;
nextChar = i;

if (eol) {
String str;
if (s == null) {
str = new String(cb, startChar, i - startChar);
} else {
s.append(cb, startChar, i - startChar);
str = s.toString();
}
nextChar++;
if (c == '\r') {
skipLF = true;
}
return str;
}

if (s == null)
s = new StringBuffer(defaultExpectedLineLength);
s.append(cb, startChar, i - startChar);
}
}
}

JDK stream

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static Stream<String> lines(Path path, Charset cs) throws IOException {
// 实际也用的buffer reader
BufferedReader br = Files.newBufferedReader(path, cs);
try {
return br.lines().onClose(asUncheckedRunnable(br));
} catch (Error|RuntimeException e) {
try {
br.close();
} catch (IOException ex) {
try {
e.addSuppressed(ex);
} catch (Throwable ignore) {}
}
throw e;
}
}

common io

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public boolean hasNext() {
if (this.cachedLine != null) {
return true;
} else if (this.finished) {
return false;
} else {
try {
String line;
do {
line = this.bufferedReader.readLine();
if (line == null) {
this.finished = true;
return false;
}
} while(!this.isValidLine(line));

this.cachedLine = line;
return true;
} catch (IOException var4) {
try {
this.close();
} catch (IOException var3) {
var4.addSuppressed(var3);
}

throw new IllegalStateException(var4);
}
}
}

所以出了Scanner用的自己的缓存机制,其他的都用的buffer reader的缓存机制,所以后面的三种方法效果差不多,不过笔者还是喜欢jdk8的stream机制,所以选择了jdk8的

JDK8 parallelStream 与 fork/join 框架

JDK1.8 parallelStream 与 fork/join 框架

一、JDK8开启并行串行流

tream是java8中新增加的一个特性,被java猿统称为流.

  Stream 不是集合元素,它不是数据结构并不保存数据,它是有关算法和计算的,它更像一个高级版本的 Iterator。原始版本的 Iterator,用户只能显式地一个一个遍历元素并对其执行某些操作;高级版本的 Stream,用户只要给出需要对其包含的元素执行什么操作,比如 “过滤掉长度大于 10 的字符串”、“获取每个字符串的首字母”等,Stream 会隐式地在内部进行遍历,做出相应的数据转换。

  Stream 就如同一个迭代器(Iterator),单向,不可往复,数据只能遍历一次,遍历过一次后即用尽了,就好比流水从面前流过,一去不复返。

  而和迭代器又不同的是,Stream 可以并行化操作,迭代器只能命令式地、串行化操作。顾名思义,当使用串行方式去遍历时,每个 item 读完后再读下一个 item。而使用并行去遍历时,数据会被分成多个段,其中每一个都在不同的线程中处理,然后将结果一起输出。Stream 的并行操作依赖于 Java7 中引入的 Fork/Join 框架(JSR166y)来拆分任务和加速处理过程。Java 的并行 API 演变历程基本如下:

1
2
3
4
5
1.0-1.4 中的 java.lang.Thread  
5.0 中的 java.util.concurrent
6.0 中的 Phasers 等
7.0 中的 Fork/Join 框架
8.0 中的 Lambda

Stream 的另外一大特点是,数据源本身可以是无限的。

1.1 什么是parallelStream

parallelStream其实就是一个并行执行的流.它通过默认的ForkJoinPool,可能提高你的多线程任务的速度.实际是多线程,注意线程安全问题

  在从stream和parallelStream方法中进行选择时,我们可以考虑以下几个问题:

1
2
3
  1. 是否需要并行?  
  2. 任务之间是否是独立的?是否会引起任何竞态条件?
  3. 结果是否取决于任务的调用顺序?

  对于问题1,需要弄清楚要解决的问题是什么,数据量有多大,计算的特点是什么?并不是所有的问题都适合使用并发程序来求解,比如当数据量不大时,顺序执行往往比并行执行更快。毕竟,准备线程池和其它相关资源也是需要时间的。但是,当任务涉及到I/O操作并且任务之间不互相依赖时,那么并行化就是一个不错的选择。通常而言,将这类程序并行化之后,执行速度会提升好几个等级。

  对于问题2,如果任务之间是独立的,并且代码中不涉及到对同一个对象的某个状态或者某个变量的更新操作,那么就表明代码是可以被并行化的。

  对于问题3,由于在并行环境中任务的执行顺序是不确定的,因此对于依赖于顺序的任务而言,并行化也许不能给出正确的结果。  

  场景:默认值适用的场景是CPU密集型的,而一般的Web项目是IO密集型的(一般的Web项目都是需要跟数据库打交道的,针对数据库的操作主要就都是IO,而对CPU的消耗并不高)。

  当不能使用默认值的时候,需要开发人员额外去了解parallelStream的用法,如下:

1.2 parallelStream作用

Stream具有平行处理能力,处理的过程会分而治之,也就是将一个大任务切分成多个小任务,这表示每个任务都是一个操作,因此像以下的程式片段:

1
2
3
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
numbers.parallelStream()
.forEach(out::println);

  得到的展示顺序不一定会是1、2、3、4、5、6、7、8、9,而可能是任意的顺序,就forEach()这个操作來讲,如果平行处理时,希望最后顺序是按照原来Stream的数据顺序,那可以调用forEachOrdered()。例如:

1
2
3
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
numbers.parallelStream()
.forEachOrdered(out::println);

  注意:如果forEachOrdered()中间有其他如filter()的中介操作,会试着平行化处理,然后最终forEachOrdered()会以原数据顺序处理,因此,使用forEachOrdered()这类的有序处理,可能会(或完全失去)失去平行化的一些优势,实际上中介操作亦有可能如此,例如sorted()方法。

1.3 开启串行流和并行流:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Test
public void testStream() {
List<Integer> list = getList();

// 串行流
list.stream();
list.stream().sequential();

// 并行流
list.stream().parallel();
list.parallelStream().reduce(null);

// 串行流执行
list.stream().reduce((first, second) -> first+second);/

// 并行流执行
list.parallelStream().reduce((first, second) -> first+second);
}

1.2 流处理

因为比较关心并行流的实现,所以看并行流的代码:

java.util.stream.ReduceOps.ReduceOp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private static abstract class ReduceOp<T, R, S extends AccumulatingSink<T, R, S>>
implements TerminalOp<T, R> {
private final StreamShape inputShape;

/**
* Create a {@code ReduceOp} of the specified stream shape which uses
* the specified {@code Supplier} to create accumulating sinks.
*
* @param shape The shape of the stream pipeline
*/
ReduceOp(StreamShape shape) {
inputShape = shape;
}

public abstract S makeSink();

@Override
public StreamShape inputShape() {
return inputShape;
}

@Override
public <P_IN> R evaluateSequential(PipelineHelper<T> helper,
Spliterator<P_IN> spliterator) {
return helper.wrapAndCopyInto(makeSink(), spliterator).get();
}

@Override
public <P_IN> R evaluateParallel(PipelineHelper<T> helper,
Spliterator<P_IN> spliterator) {
// 这里new出了一个 ReduceTask
return new ReduceTask<>(this, helper, spliterator).invoke().get();
}
}

名称上看,这是个task任务(java.util.stream.ReduceOps.ReduceTask),再看下类图:

58929987340

看到类图结构加上熟悉fork/join框架,大概明白了stream的并行流实现了,借助于fork/join

二、Fork/Join 框架

Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。

它同ThreadPoolExecutor一样,也实现了Executor和ExecutorService接口。它使用了一个无限队列来保存需要执行的任务,而线程的数量则是通过构造函数传入,如果没有向构造函数中传入希望的线程数量,那么当前计算机可用的CPU数量会被设置为线程数量作为默认值。

img

2.1分治法

ForkJoinPool主要用来使用分治法(Divide-and-Conquer Algorithm)来解决问题。

  典型的应用比如快速排序算法。这里的要点在于,ForkJoinPool需要使用相对少的线程来处理大量的任务。比如要对1000万个数据进行排序,那么会将这个任务分割成两个500万的排序任务和一个针对这两组500万数据的合并任务。以此类推,对于500万的数据也会做出同样的分割处理,到最后会设置一个阈值来规定当数据规模到多少时,停止这样的分割处理。比如,当元素的数量小于10时,会停止分割,转而使用插入排序对它们进行排序。那么到最后,所有的任务加起来会有大概2000000+个。问题的关键在于,对于一个任务而言,只有当它所有的子任务完成之后,它才能够被执行。

  所以当使用ThreadPoolExecutor时,使用分治法会存在问题,因为ThreadPoolExecutor中的线程无法像任务队列中再添加一个任务并且在等待该任务完成之后再继续执行。而使用ForkJoinPool时,就能够让其中的线程创建新的任务,并挂起当前的任务,此时线程就能够从队列中选择子任务执行。

那么使用ThreadPoolExecutor或者ForkJoinPool,会有什么性能的差异呢?

  首先,使用ForkJoinPool能够使用数量有限的线程来完成非常多的具有父子关系的任务,比如使用4个线程来完成超过200万个任务。但是,使用ThreadPoolExecutor时,是不可能完成的,因为ThreadPoolExecutor中的Thread无法选择优先执行子任务,需要完成200万个具有父子关系的任务时,也需要200万个线程,显然这是不可行的。

​ 尝试改变JDK8的工作线程数量:

1
2
3
public static final String DEFAULT_FORK_JOIN_PARALLELISM = "java.util.concurrent.ForkJoinPool.common.parallelism";
// 设置线程数
System.setProperty(DEFAULT_FORK_JOIN_PARALLELISM, (Runtime.getRuntime().availableProcessors() * 2) + "");

2.2 工作窃取法

  forkjoin最核心的地方就是利用了现代硬件设备多核,在一个操作时候会有空闲的cpu,那么如何利用好这个空闲的cpu就成了提高性能的关键,而这里我们要提到的工作窃取(work-stealing)算法就是整个forkjion框架的核心理念,工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。

  那么为什么需要使用工作窃取算法呢?

  假如我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应,比如A线程负责处理A队列里的任务。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。

  工作窃取的运行流程图如下:

img

  工作窃取算法的优点:充分利用线程进行并行计算,并减少了线程间的竞争;

  工作窃取算法的缺点:在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且消耗了更多的系统资源,比如创建多个线程和多个双端队列。

2.3 Fork/Join 涉及到的关键名称

  ForkJoinPool: 用来执行Task,或生成新的ForkJoinWorkerThread,执行 ForkJoinWorkerThread 间的 work-stealing 逻辑。ForkJoinPool 不是为了替代 ExecutorService,而是它的补充,在某些应用场景下性能比 ExecutorService 更好。

  ForkJoinTask: 执行具体的分支逻辑,声明以同步/异步方式进行执行

  ForkJoinWorkerThread: 是 ForkJoinPool 内的 worker thread,执行

  ForkJoinTask, 内部有 ForkJoinPool.WorkQueue来保存要执行的ForkJoinTask。

  ForkJoinPool.WorkQueue:保存要执行的ForkJoinTask。

2.4 Fork/Join框架的实现原理

在Java的Fork/Join框架中,它提供了两个类来帮助我们完成任务分割以及执行任务并合并结果:

  1、ForkJoinTask:我们要使用ForkJoin框架,必须首先创建一个ForkJoin任务。它提供在任务中执行fork()和join()操作的机制,通常情况下我们不需要直接继承ForkJoinTask类,而只需要继承它的子类,Fork/Join框架提供了以下两个子类:

    RecursiveAction:用于没有返回结果的任务。
    RecursiveTask :用于有返回结果的任务。

  2、ForkJoinPool :ForkJoinTask需要通过ForkJoinPool来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。

  ForkJoinPoolForkJoinTask数组和ForkJoinWorkerThread数组组成,ForkJoinTask数组负责将存放程序提交给ForkJoinPool,而ForkJoinWorkerThread负责执行这些任务。

基本思想

  ForkJoinPool 的每个工作线程都维护着一个工作队列(WorkQueue),这是一个双端队列(Deque),里面存放的对象是任务(ForkJoinTask)。

  每个工作线程在运行中产生新的任务(通常是因为调用了 fork())时,会放入工作队列的队尾,并且工作线程在处理自己的工作队列时,使用的是 LIFO(后进先出) 方式,也就是说每次从队尾取出任务来执行。

  每个工作线程在处理自己的工作队列同时,会尝试窃取一个任务(或是来自于刚刚提交到 pool 的任务,或是来自于其他工作线程的工作队列),窃取的任务位于其他线程的工作队列的队首,也就是说工作线程在窃取其他工作线程的任务时,使用的是 FIFO 方式。

  在遇到 join() 时,如果需要 join 的任务尚未完成,则会先处理其他任务,并等待其完成。

  在既没有自己的任务,也没有可以窃取的任务时,进入休眠。

2.4.1 ForkJoinPool属性说明、工作队列说明、控制中心说明
1
2
3
4
5
6
7
8
9
10
11
// Instance fields
volatile long ctl; // 控制中心:非常重要,看下图解析
volatile int runState; // 负数是shutdown,其余都是2的次方
final int config; // 配置:二进制的低16位代表 并行度(parallelism),
//高16位:mode可选FIFO_QUEUE(1 << 16)和LIFO_QUEUE(1 << 31),默认是LIFO_QUEUE
int indexSeed; // 生成worker的queue索引
volatile WorkQueue[] workQueues; // main registry
final ForkJoinWorkerThreadFactory factory;
final UncaughtExceptionHandler ueh; // per-worker UEH
final String workerNamePrefix; // to create worker name string
volatile AtomicLong stealCounter; // also used as sync monitor
2.4.1.1 工作队列workQueues

用于保存向ForkJoinPool提交的任务,而具体的执行由ForkJoinWorkerThread执行,而ForkJoinWorkerThreadFactory可以用于生产出ForkJoinWorkerThread:

1
2
3
4
5
6
7
8
9
10
public static interface ForkJoinWorkerThreadFactory {
/**
* Returns a new worker thread operating in the given pool.
*
* @param pool the pool this thread works in
* @return the new worker thread
* @throws NullPointerException if the pool is null
*/
public ForkJoinWorkerThread newThread(ForkJoinPool pool);
}

img

1
2
3
4
5
6
7
8
9
10
11
// Instance fields
volatile int scanState; // 负数:inactive, 非负数:active, 其中奇数代表scanning
int stackPred; // sp = (int)ctl, 前一个队列栈的标示信息,包含版本号、是否激活、以及队列索引
int nsteals; // 窃取的任务数
int hint; // 一个随机数,用来帮助任务窃取,在 helpXXXX()的方法中会用到
int config; // 配置:二进制的低16位代表 在 queue[] 中的索引,
// 高16位:mode可选FIFO_QUEUE(1 << 16)和LIFO_QUEUE(1 << 31),默认是LIFO_QUEUE
volatile int qlock; // 锁定标示位:1: locked, < 0: terminate; else 0
volatile int base; // index of next slot for poll
int top; // index of next slot for push
ForkJoinTask<?>[] array; // 任务列表
2.4.1.2 控制中心ctl

img

2.4.1.3 方法说明

队列与关键任务调用说明

img

2.4.1.4 externalPush || externalSubmit

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
final void externalPush(ForkJoinTask<?> task) {
WorkQueue[] ws; WorkQueue q; int m;
//我们以前常用的Random,在并发下,多个线程同时计算种子需要用到同一个原子变量。
//由于更新操作使用CAS,同时执行只有一个线程成功,其他线程的大量自旋造成性能损失,ThreadLocalRandom继承Random,对此进行了改进。
//ThreadLocalRandom运用了ThreadLocal,每个线程内部维护一个种子变量,多线程下计算新种子时使用线程自己的种子变量进行更新,避免了竞争。
int r = ThreadLocalRandom.getProbe();
int rs = runState;
// 外部提交的task,肯定会到偶数位下标的队列上
// SQMASK = 0x007e = 1111110,任何数和 SQMASK 进行 & 运算 都会是偶数
if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
(q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
//队列上锁
U.compareAndSwapInt(q, QLOCK, 0, 1)) {
ForkJoinTask<?>[] a; int am, n, s;
if ((a = q.array) != null &&
(am = a.length - 1) > (n = (s = q.top) - q.base)) {
int j = ((am & s) << ASHIFT) + ABASE;
//把 task 放到队列的 top端
U.putOrderedObject(a, j, task);
U.putOrderedInt(q, QTOP, s + 1);
U.putIntVolatile(q, QLOCK, 0);
if (n <= 1)
signalWork(ws, q);
return;
}
//队列解锁
U.compareAndSwapInt(q, QLOCK, 1, 0);
}
externalSubmit(task);
}
2.4.1.5 registerWorker
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
//......
if ((ws = workQueues) != null && (n = ws.length) > 0) {
int s = indexSeed += SEED_INCREMENT; // unlikely to collide
int m = n - 1;
// worker的queue肯定放在pool中的queue[]中的奇数下标
// m = ws.lenght - 1, ws.lenght 肯定是偶数,则m 肯定是奇数
// 1的二进制位:00000001, 所以任何数 "|" 1 都是奇数
// 所以 奇数 & 奇数 , 1&1 = 1,所以i肯定是奇数
i = ((s << 1) | 1) & m; // odd-numbered indices
if (ws[i] != null) { // collision
int probes = 0; // step by approx half n
int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
// 如果下标已经有队列,则重新生成奇数下标
// step肯定为偶数:EVENMASK:0xfffe:1111111111111110
// 所以 奇数+偶数,奇偶性不变
while (ws[i = (i + step) & m] != null) {
if (++probes >= n) {
workQueues = ws = Arrays.copyOf(ws, n <<= 1);
m = n - 1;
probes = 0;
}
}
}
//...
}
//......
}
2.4.1.6 scan
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
private ForkJoinTask<?> scan(WorkQueue w, int r) {
WorkQueue[] ws; int m;
if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) {
int ss = w.scanState; // initially non-negative
// k = r & m 。 r是一个随机数,m 是 队列数组长度 - 1;用于定位去哪个 队列 窃取 task
for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) {
WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
int b, n; long c;
if ((q = ws[k]) != null) {
// 如果有还没执行的task,尝试窃取队列q 中的base下标的 task。 即FIFO
// i: 在内存中,b下标对应的对象的偏移值。 a.length - 1 的二进制位 永远是 0[1...]s,所以 (a.length - 1) & b = b,主要是保证了b不会越界
if ((n = (b = q.base) - q.top) < 0 &&
(a = q.array) != null) { // non-empty
long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
if ((t = ((ForkJoinTask<?>)
U.getObjectVolatile(a, i))) != null &&
q.base == b) {
// ss 是小偷的 scanState,大于0代表当前的worker是激活的
if (ss >= 0) {
// 把 task 从 队列中取出来,然后队列的base+1,如果被窃取的队列中有多于1个的task,则尝试唤醒其他的worker
if (U.compareAndSwapObject(a, i, t, null)) {
q.base = b + 1;
if (n < -1) // signal others
signalWork(ws, q);
return t;
}
}
// ss小于0代表当前的worker是未激活的,并且当前是第一次扫描,这时候尝试激活worker
// oldSum: 上一次遍历周期的 base 值的和。
// (int) c : 可以拿到当前栈顶的空闲worker。sp = (int) c
else if (oldSum == 0 && // try to activate
w.scanState < 0)
tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);
}
if (ss < 0) // refresh
ss = w.scanState;
// 更新随机值,重新初始化所有控制变量,重新定位队列
r ^= r << 1; r ^= r >>> 3; r ^= r << 10;
origin = k = r & m; // move and rescan
oldSum = checkSum = 0;
continue;
}
checkSum += b;
}
// 每次没有窃取到task的时候,都会k+1(k值不会超过m),当k遍历了一圈还没有steal到任务,则当前小偷worker是过剩的,则inactive这个小偷worker
if ((k = (k + 1) & m) == origin) { // continue until stable
// oldSum == (oldSum = checkSum) 实际上就是 oldSum == checkSum , oldSum = checkSum
// oldSum == checkSum 是判断 这个周期和上个周期 的base和是否一直,如果一直, 说明base可能没有变过
if ((ss >= 0 || (ss == (ss = w.scanState))) &&
oldSum == (oldSum = checkSum)) {
if (ss < 0 || w.qlock < 0) // already inactive
break;
int ns = ss | INACTIVE; // try to inactivate
long nc = ((SP_MASK & ns) |
(UC_MASK & ((c = ctl) - AC_UNIT)));
// 维护 队列的 stack,可以指向前一个栈顶的队列
w.stackPred = (int)c; // hold prev stack top
U.putInt(w, QSCANSTATE, ns);
if (U.compareAndSwapLong(this, CTL, c, nc))
ss = ns;
else
w.scanState = ss; // back out
}
checkSum = 0;
}
}
}
return null;
}
2.4.1.7 signalWork
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
final void signalWork(WorkQueue[] ws, WorkQueue q) {
long c; int sp, i; WorkQueue v; Thread p;
// AC是负数,所以 active worker不足
while ((c = ctl) < 0L) { // too few active
// sp:第一位是0,没有版本号,没有inactive的worker
if ((sp = (int)c) == 0) { // no idle workers
//tc: tc不为0,就是代表 total worker - parallelism < 0, 所以需要添加worker
if ((c & ADD_WORKER) != 0L) // too few workers
tryAddWorker(c);
break;
}
if (ws == null) // unstarted/terminated
break;
// 取栈顶的worker,如果下标已经越界或queue为null,线程池都是终止了
if (ws.length <= (i = sp & SMASK)) // terminated
break;
if ((v = ws[i]) == null) // terminating
break;
// 新的scanState,版本+1,设置状态为激活,INACTIVE = 1 << 31,~INACTIVE = 01111111....
int vs = (sp + SS_SEQ) & ~INACTIVE; // next scanState
// 确认 worker的 sp没有变化
int d = sp - v.scanState; // screen CAS
// 生成新的 ctl,(UC_MASK & (c + AC_UNIT))设置 高32位, (SP_MASK & v.stackPred)设置低32位
long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);
if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {
//激活worker
v.scanState = vs; // activate v
if ((p = v.parker) != null)
U.unpark(p);
break;
}
//当前queue没有task 需要执行了,则停止signal
if (q != null && q.base == q.top) // no more work
break;
}
}
2.4.1.8 ForkJoinTask的fork方法实现原理

当我们调用ForkJoinTask的fork方法时,程序会把任务放在ForkJoinWorkerThread的pushTask的workQueue中,异步地执行这个任务,然后立即返回结果,代码如下:

1
2
3
4
5
6
7
8
public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this);
return this;
}

若当前线程是ForkJoinWorkerThread线程,则强制类型转换(向下转换)成ForkJoinWorkerThread,然后将任务push到这个线程负责的队列里面去,在ForkJoinWorkerThread类中有一个pool和一个workQueue字段:

1
2
3
4
// 线程工作的ForkJoinPool
final ForkJoinPool pool; // the pool this thread works in
// 工作窃取队列
final ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics

pushTask方法把当前任务存放在ForkJoinTask数组队列里。然后再调用ForkJoinPool的signalWork()方法唤醒或创建一个工作线程来执行任务。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
final void push(ForkJoinTask<?> task) {
ForkJoinTask<?>[] a; ForkJoinPool p;
int b = base, s = top, n;
if ((a = array) != null) { // ignore if queue removed
int m = a.length - 1; // fenced write for task visibility
U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
U.putOrderedInt(this, QTOP, s + 1);
if ((n = s - b) <= 1) {
if ((p = pool) != null)
p.signalWork(p.workQueues, this);
}
else if (n >= m)
growArray();
}
}

该方法的主要功能就是将当前任务存放在ForkJoinTask数组array里。然后再调用ForkJoinPool的signalWork()方法唤醒或创建一个工作线程来执行任务。

2.4.1.9 ForkJoinTask的join方法实现原理
1
2
3
4
5
6
public final V join() {
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}

  首先,它调用了doJoin()方法,通过doJoin()方法得到当前任务的状态来判断返回什么结果,任务状态有四种:已完成(NORMAL),被取消(CANCELLED),信号(SIGNAL)和出现异常(EXCEPTIONAL):  

  若状态不是NORMAL,则通过reportException(int)方法来处理状态:

1
2
3
4
5
6
private void reportException(int s) {
if (s == CANCELLED)
throw new CancellationException();
if (s == EXCEPTIONAL)
rethrow(getThrowableException());
}
  • 如果任务状态是已完成,则直接返回任务结果。
  • 如果任务状态是被取消,则直接抛出CancellationException。
  • 如果任务状态是抛出异常,则直接抛出对应的异常。

  doJoin()方法的实现代码:

1
2
3
4
5
6
7
8
9
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
return (s = status) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
wt.pool.awaitJoin(w, this, 0L) :
externalAwaitDone();
}

  在doJoin()方法里,首先通过查看任务的状态,看任务是否已经执行完了,如果执行完了,则直接返回任务状态,如果没有执行完,则从任务数组里取出任务并执行。如果任务顺利执行完成了,则设置任务状态为NORMAL,如果出现异常,则纪录异常,并将任务状态设置为EXCEPTIONAL。

  执行任务是通过doExec()方法来完成的:

1
2
3
4
5
6
7
8
9
10
11
12
13
final int doExec() {
int s; boolean completed;
if ((s = status) >= 0) {
try {
completed = exec();
} catch (Throwable rex) {
return setExceptionalCompletion(rex);
}
if (completed)
s = setCompletion(NORMAL);
}
return s;
}

真正的执行过程是由exec()方法来完成的:

1
protected abstract boolean exec();

这就是我们需要重写的方法,若是我们的任务继承自RecursiveAction,则我们需要重写RecursiveAction的compute()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public abstract class RecursiveAction extends ForkJoinTask<Void> {
private static final long serialVersionUID = 5232453952276485070L;

/**
* The main computation performed by this task.
*/
protected abstract void compute();

/**
* Always returns {@code null}.
*
* @return {@code null} always
*/
public final Void getRawResult() { return null; }

/**
* Requires null completion value.
*/
protected final void setRawResult(Void mustBeNull) { }

/**
* Implements execution conventions for RecursiveActions.
*/
protected final boolean exec() {
compute();
return true;
}
}

若是我们的任务继承自RecursiveTask,则我们同样需要重写RecursiveTask的compute()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
private static final long serialVersionUID = 5232453952276485270L;

/**
* The result of the computation.
*/
V result;

/**
* The main computation performed by this task.
* @return the result of the computation
*/
protected abstract V compute();

public final V getRawResult() {
return result;
}

protected final void setRawResult(V value) {
result = value;
}

/**
* Implements execution conventions for RecursiveTask.
*/
protected final boolean exec() {
result = compute();
return true;
}
}

通过上面的分析可知,执行我们的业务代码是在调用了join()之后的,也就是说,fork仅仅是分割任务,只有当我们执行join的时候,我们的任务才会被执行。

2.4.2 异常处理

ForkJoinTask在执行的时候可能会抛出异常,但是我们没办法在主线程里直接捕获异常,所以ForkJoinTask提供了isCompletedAbnormally()方法来检查任务是否已经抛出异常或已经被取消了,并且可以通过ForkJoinTask的getException方法获取异常。使用如下代码:

1
2
3
if(task.isCompletedAbnormally()){
System.out.println(task.getException());
}

getException方法返回Throwable对象,如果任务被取消了则返回CancellationException。如果任务没有完成或者没有抛出异常则返回null。

1
2
3
4
5
6
public final Throwable getException() {
int s = status & DONE_MASK;
return ((s >= NORMAL) ? null :
(s == CANCELLED) ? new CancellationException() :
getThrowableException());
}
2.4.2forkjoin 的使用

 ForkJoinPool 使用submit 或 invoke 提交的区别:invoke是同步执行,调用之后需要等待任务完成,才能执行后面的代码;submit是异步执行,只有在Future调用get的时候会阻塞。

  这里继承的是RecursiveTask 适用于有返回值的场景;还可以继承RecursiveAction,适合于没有返回值的场景

  执行子任务调用fork方法并不是最佳的选择,最佳的选择是invokeAll方法。

2.4.3 示例代码

这个示例是在做某支付任务异步解耦后的业务逻辑校验,这个是一个接收返回的任务 RecursiveTask

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class CheckTask extends RecursiveTask<Integer> {
/**
* 这个是阈值,具体任务列表拆分到什么程度再开始执行
**/
private int threshold = 10;

/**
* 这个是当前处理的集合列表
**/
private List<CbVaPaymentFileContent> list;

/**
* 初始化任务对象
**/
private CheckTask(int threshold, List<CbVaPaymentFileContent> list) {
if (null == list) throw new IllegalArgumentException("[list] is null.");
this.threshold = threshold <= 0 ? this.threshold : threshold;
// this.threshold = list.size() / Runtime.getRuntime().availableProcessors();
this.list = list;
}

@Override
protected Integer compute() {
if (list.size() <= threshold) {
// 处理
int count = 0;
for (CbVaPaymentFileContent content : list) {
count += process(content);
}
return count;
} else {
// 分解
int middle = list.size() / 2;
List<CbVaPaymentFileContent> leftList = list.subList(0, middle);
List<CbVaPaymentFileContent> rightList = list.subList(middle, list.size());
CheckTask left = new CheckTask(threshold, leftList);
CheckTask right = new CheckTask(threshold, rightList);
// left.fork();
// right.fork();

// 这两个方法,使用invokeAll方法的主要原因是为了充分利用线程池,在invokeAll的N个任务中,其中N-1个任务会使用fork()交给其它线程执行,但是,它还会留一个任务自己执行,这样,就充分利用了线程池,保证没有空闲的不干活的线程。
invokeAll(left, right);

// 结合
return left.join() + right.join();
}
}

}

  该代码就是通过Fork/Join框架来计算数组的和,计算耗时4031毫秒。通过该代码作为应用示例主要是为了告诉大家,使用Fork/Join模型的正确方式,在源代码中可以看到,SumTask继承自RecursiveTask,重写的compute方法为:

  compute()方法使用了invokeAll方法来分解任务,而不是它下面的subtask1.fork();

  这两个方法,使用invokeAll方法的主要原因是为了充分利用线程池,在invokeAll的N个任务中,其中N-1个任务会使用fork()交给其它线程执行,但是,它还会留一个任务自己执行,这样,就充分利用了线程池,保证没有空闲的不干活的线程。

  若是采用另外一种方式来运行,程序的运行时间为6028毫秒,可以看到,明显比invokeAll方式慢了很多。

2.4.3 JDK8中的最佳实践
2.4.3.1 通过forkjoin来看parallelStream

在Java 8引入了自动并行化的概念。它能够让一部分Java代码自动地以并行的方式执行,即使用了ForkJoinPool的ParallelStream。  

  Java 8为ForkJoinPool添加了一个通用线程池,这个线程池用来处理那些没有被显式提交到任何线程池的任务。它是ForkJoinPool类型上的一个静态元素,它拥有的默认线程数量等于运行计算机上的处理器数量。当调用Arrays类上添加的新方法时,自动并行化就会发生。比如用来排序一个数组的并行快速排序,用来对一个数组中的元素进行并行遍历。自动并行化也被运用在Java 8新添加的Stream API中。

  一般ForkJoinPool中的通用线程池处理,也可以使用ThreadPoolExecutor完成,但是就代码的可读性和代码量而言,使用ForkJoinPool明显更胜一筹。

2.4.3.2 线程池数量

1、系统CPU数量:[如机器8核,即8]

1
Runtime.getRuntime().availableProcessors()

  2、parallelStream默认的并发线程数:【parallelStream核心使用ForkJoinPool实现,故如下】【输出是7个】

1
ForkJoinPool.getCommonPoolParallelism()

  3、为什么parallelStream默认的并发线程数要比CPU处理器的数量少1个?

    因为最优的策略是每个CPU处理器分配一个线程,然而主线程也算一个线程,所以要占一个名额。如果只有1个CPU,默认的并发线程数就是1

  4、修改默认并发数

    默认的并发线程数不可以反复修改。因为java.util.concurrent.ForkJoinPool.common.parallelismfinal类型的,整个JVM中只允许设置一次。多次修改以第一次为主

    1、系统property

1
2
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
System.out.println(ForkJoinPool.getCommonPoolParallelism());

    2、当然上述参数也可以通过jvm设置系统属性:-Djava.util.concurrent.ForkJoinPool.common.parallelism=N (N为线程数量)  

  5、既然默认的并发线程数不能反复修改,进行不同线程数量的并发测试,可以引入ForkJoinPool。用法如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Test
public void testSetParallelMutli() throws ExecutionException, InterruptedException {
int[] threadCountArr = {2, 4, 6};
List<Integer> para = new ArrayList<>();
for (int i = 0; i < 7; i++) {
para.add(i);

}
for (int threadCount : threadCountArr) {
new ForkJoinPool(threadCount).submit(() -> {//多线程任务
System.out.println(Thread.currentThread().getName());
}).get();
}
}

    使用get 是为了阻塞 得到结果;如果主线程没有关闭的情况下可以不用get

2.4.3.2 测试示例

实际应用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
logger.info("[消息补偿任务-并行执行]开始,本次预处理总数为[{}]", list.size());

// 手动扩容下,当然这个综合考虑使用
System.setProperty(DEFAULT_FORK_JOIN_PARALLELISM, (Runtime.getRuntime().availableProcessors() * appConfig.getMutiple()) + "");

/**
* 拆分任务到JobSender
* {@link Stream#reduce(Object, BiFunction, BinaryOperator)}
* <p>
* reduce.Object --> 初始值,只是为了来初始化参数类型
* reduce.BiFunction.apply(T t, U u) --> t表示当前值, u表示当前操作对象
* reduce.BinaryOperator(T t, U u) --> t=u=初始值类型,用来合并结果的
* </p>
*/
int total = list.parallelStream().filter(op -> lockAdaptor.lock(op.getId(), LockAdaptor.DEFAULT_TIMEOUT)).reduce(0, (cur, channel) -> {
// 构建发送器 + 并处理
int ava;
try {
ava = (jobSenderFactory.getSender(channel).process().dealSuccess() ? 1 : 0);
} finally {
// 解锁
lockAdaptor.unlock(channel.getId());
}
return cur + ava;
}, (a, b) -> a + b);
logger.info("[消息补偿任务-并行执行]结束,本次预处理总数为[{}], 成功总数[{}], 未成功总数[{}]", list.size(), total, list.size() - total);

1、测试一、8核机器,每个任务均耗时2秒,一共16个任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Test
public void testSetParallelMutli2() throws ExecutionException, InterruptedException {
List<Integer> para = new ArrayList<>();
for (int i = 0; i < 16; i++) {
para.add(i);

}
para.parallelStream().forEach(i -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(LocalDateTime.now() + "||" + Thread.currentThread().getName() + ":" + i);
});
}

  输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
2019-09-13T10:51:04.344||ForkJoinPool.commonPool-worker-1:5
2019-09-13T10:51:04.344||ForkJoinPool.commonPool-worker-6:1
2019-09-13T10:51:04.344||ForkJoinPool.commonPool-worker-2:14
2019-09-13T10:51:04.344||main:10
2019-09-13T10:51:04.344||ForkJoinPool.commonPool-worker-4:13
2019-09-13T10:51:04.344||ForkJoinPool.commonPool-worker-3:2
2019-09-13T10:51:04.344||ForkJoinPool.commonPool-worker-7:4
2019-09-13T10:51:04.344||ForkJoinPool.commonPool-worker-5:7
2019-09-13T10:51:06.350||ForkJoinPool.commonPool-worker-4:3
2019-09-13T10:51:06.350||ForkJoinPool.commonPool-worker-6:0
2019-09-13T10:51:06.350||ForkJoinPool.commonPool-worker-1:12
2019-09-13T10:51:06.350||ForkJoinPool.commonPool-worker-2:15
2019-09-13T10:51:06.350||main:11
2019-09-13T10:51:06.350||ForkJoinPool.commonPool-worker-5:8
2019-09-13T10:51:06.350||ForkJoinPool.commonPool-worker-3:6
2019-09-13T10:51:06.350||ForkJoinPool.commonPool-worker-7:9

  结论:会有7个 ForkJoinPool.commonPool-worker 线程和1个主线程main一起执行任务。并且8个一组一组执行,每个线程执行了两个任务。

2、测试二、8核机器,每个任务耗时2秒内随机,一共16个任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
    @Test
public void testSetParallelMutli2() throws ExecutionException, InterruptedException {
List<Integer> para = new ArrayList<>();
for (int i = 0; i < 16; i++) {
para.add(i);

}
para.parallelStream().forEach(i -> {
try {
// Thread.sleep(2000);
Thread.sleep(new Random().nextInt(2000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(LocalDateTime.now() + "||" + Thread.currentThread().getName() + ":" + i);
});
}

输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
2019-09-13T10:54:01.486||ForkJoinPool.commonPool-worker-5:7
2019-09-13T10:54:01.751||main:10
2019-09-13T10:54:01.774||main:11
2019-09-13T10:54:01.862||ForkJoinPool.commonPool-worker-5:6
2019-09-13T10:54:02.203||ForkJoinPool.commonPool-worker-5:15
2019-09-13T10:54:02.285||ForkJoinPool.commonPool-worker-6:1
2019-09-13T10:54:02.407||ForkJoinPool.commonPool-worker-6:0
2019-09-13T10:54:02.479||ForkJoinPool.commonPool-worker-1:5
2019-09-13T10:54:02.496||ForkJoinPool.commonPool-worker-2:14
2019-09-13T10:54:02.518||ForkJoinPool.commonPool-worker-4:13
2019-09-13T10:54:02.732||main:9
2019-09-13T10:54:02.740||ForkJoinPool.commonPool-worker-7:4
2019-09-13T10:54:02.791||ForkJoinPool.commonPool-worker-3:2
2019-09-13T10:54:03.178||ForkJoinPool.commonPool-worker-5:12
2019-09-13T10:54:03.743||ForkJoinPool.commonPool-worker-1:8
2019-09-13T10:54:04.003||ForkJoinPool.commonPool-worker-6:3

  结论:会有7个 ForkJoinPool.commonPool-worker 线程和1个主线程main一起执行任务。并且是强占式【工作窃取法】的执行任务:如上线程5、主线程执行了各3个任务,其他有2个或一个的。

3、示例三、接收消息队列消息,每次消息个数n个,每个消息是一个100个的list,接收时候使用parallelStream消费并发处理

发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Test
public void testMq() throws Exception {
for (int j = 0; j < 1; j++) {
List<String> list = Lists.newArrayList();
for (int i = 0; i < 100; i++) {
list.add(j+"___________"+i);
}
producerMessageService.sendMessage("test_parallel", UUID.randomUUID().toString(), JSON.toJSONString(list));
Thread.sleep(1000);
}
logger.error("=======================================================生产 ok");

Thread.sleep(2000000000);
}

接收消费

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Override
public void onMessage(List<Message> messages) throws Exception {
if (messages == null || messages.isEmpty()) {
return;
}

for (int i = 0; i < messages.size(); i++) {
Message message = messages.get(i);
logger.info(String.format("收到一条消息,消息主题(队列名):%s,内容是:%s", message.getTopic(), message.getText()));


List<String> strings = JSONArray.parseArray(message.getText(), String.class);
strings.parallelStream().forEach(p -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
logger.error(LocalDateTime.now() + "_______________" + Thread.currentThread().getName() + ":" + p);
});
}
}

  说明:

    发送消息,1s后会发送完毕,此时如果有订阅就会出现一条消息积压。

    订阅消息者,订阅后会收到词条消息,此时如果正常执行完毕(不论使用不使用多线程)消息积压就没有了,因为一般消息监听会在方法正常执行完毕后,使用消息Id将此条消息从订阅队列中移除。

      接收到1条消息,里面会有一个jsonstring,反序列化为List,大小是100,交给parallelStream处理,此时会有8个线程处理【如果是8核机器】,处理速度大约是2秒8个。其余的92进入workQueue中等待处理。

      此时如果程序中断,订阅的消息不会被消费使用,下次重连时,需要做已处理消息的去重。

      此时如果有新消息发送过来,也会在积压中,不会被消息消费。

4、示例四、从a中100个数找出整除5的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Test
public void testExec() throws ExecutionException, InterruptedException {
List<Integer> a = Lists.newArrayList();
for (int i = 0; i < 100; i++) {
a.add(i);
}
List<Integer> b = Lists.newArrayList();
a.parallelStream().forEach(p -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (p % 5 == 0) {
b.add(p);
}
});
System.out.println("==========="+b.size());
b.forEach(p -> System.out.print(p+" "));
}

输出:正确应该是20

1
2
===========18
15 90 45 30 25 35 85 75 0 40 5 80 95 20 60 70 50 55

对此运行结果不一致,以及会有多线程问题

1
2
3
4
5
6
7
8
9
10
11
java.lang.ArrayIndexOutOfBoundsException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
  ……

Caused by: java.lang.ArrayIndexOutOfBoundsException: 15
at java.util.ArrayList.add(ArrayList.java:463)
at com.github.bjlhx15.common.thread.juc.collection.jdk8stream.TStreamTest.lambda$testExec$6(TStreamTest.java:118)
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)

原因:parallelStream 并行执行,多线程异步,可能没有b添加完毕就遍历,ArrayList不是线程安全的

修正:

  方案一、在遍历前,需全部执行完毕【串行】

    将 parallelStream 改为 stream串行处理【不可取,处理速度慢】

  方案二、在遍历前,需全部执行完毕

    继承 RecursiveTask或者RecursiveAction写任务    

  方案三、将ArrayList替换安全集合CopyOnWriteArrayLIst

1
List<Integer> b = Lists.newCopyOnWriteArrayList();

    此时运行就会出现正确结果。按理说应该会有结果不准确问题吧。但是没有,个人理解,因为是每次8个同时执行,所以即使最后一次主线程提前结束,也有其他线程在锁着b,所以最后执行b的操作会有等待

disruptor

背景

Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。基于Disruptor开发的系统单线程能支撑每秒600万订单,2010年在QCon演讲后,获得了业界关注。2011年,企业应用软件专家Martin Fowler专门撰写长文介绍。同年它还获得了Oracle官方的Duke大奖。

目前,包括Apache Storm、Camel、Log4j 2在内的很多知名项目都应用了Disruptor以获取高性能。在美团技术团队它也有不少应用,有的项目架构借鉴了它的设计机制。本文从实战角度剖析了Disruptor的实现原理。

需要特别指出的是,这里所说的队列是系统内部的内存队列,而不是Kafka这样的分布式队列。另外,本文所描述的Disruptor特性限于3.3.4。

Java内置队列

介绍Disruptor之前,我们先来看一看常用的线程安全的内置队列有什么问题。Java的内置队列如下表所示。

队列 有界性 数据结构
ArrayBlockingQueue bounded 加锁 arraylist
LinkedBlockingQueue optionally-bounded 加锁 linkedlist
ConcurrentLinkedQueue unbounded 无锁 linkedlist
LinkedTransferQueue unbounded 无锁 linkedlist
PriorityBlockingQueue unbounded 加锁 heap
DelayQueue unbounded 加锁 heap

队列的底层一般分成三种:数组、链表和堆。其中,堆一般情况下是为了实现带有优先级特性的队列,暂且不考虑。

我们就从数组和链表两种数据结构来看,基于数组线程安全的队列,比较典型的是ArrayBlockingQueue,它主要通过加锁的方式来保证线程安全;基于链表的线程安全队列分成LinkedBlockingQueue和ConcurrentLinkedQueue两大类,前者也通过锁的方式来实现线程安全,而后者以及上面表格中的LinkedTransferQueue都是通过原子变量compare and swap(以下简称“CAS”)这种不加锁的方式来实现的。

通过不加锁的方式实现的队列都是无界的(无法保证队列的长度在确定的范围内);而加锁的方式,可以实现有界队列。在稳定性要求特别高的系统中,为了防止生产者速度过快,导致内存溢出,只能选择有界队列;同时,为了减少Java的垃圾回收对系统性能的影响,会尽量选择array/heap格式的数据结构。这样筛选下来,符合条件的队列就只有ArrayBlockingQueue。

ArrayBlockingQueue的问题

ArrayBlockingQueue在实际使用过程中,会因为加锁和伪共享等出现严重的性能问题,我们下面来分析一下。

加锁

现实编程过程中,加锁通常会严重地影响性能。线程会因为竞争不到锁而被挂起,等锁被释放的时候,线程又会被恢复,这个过程中存在着很大的开销,并且通常会有较长时间的中断,因为当一个线程正在等待锁时,它不能做任何其他事情。如果一个线程在持有锁的情况下被延迟执行,例如发生了缺页错误、调度延迟或者其它类似情况,那么所有需要这个锁的线程都无法执行下去。如果被阻塞线程的优先级较高,而持有锁的线程优先级较低,就会发生优先级反转。

Disruptor论文中讲述了一个实验:

  • 这个测试程序调用了一个函数,该函数会对一个64位的计数器循环自增5亿次。
  • 机器环境:2.4G 6核
  • 运算: 64位的计数器累加5亿次

|Method | Time (ms) | |— | —| |Single thread | 300| |Single thread with CAS | 5,700| |Single thread with lock | 10,000| |Single thread with volatile write | 4,700| |Two threads with CAS | 30,000| |Two threads with lock | 224,000|

CAS操作比单线程无锁慢了1个数量级;有锁且多线程并发的情况下,速度比单线程无锁慢3个数量级。可见无锁速度最快。

单线程情况下,不加锁的性能 > CAS操作的性能 > 加锁的性能。

在多线程情况下,为了保证线程安全,必须使用CAS或锁,这种情况下,CAS的性能超过锁的性能,前者大约是后者的8倍。

综上可知,加锁的性能是最差的。

关于锁和CAS

保证线程安全一般分成两种方式:锁和原子变量。

图1 通过加锁的方式实现线程安全

图1 通过加锁的方式实现线程安全

采取加锁的方式,默认线程会冲突,访问数据时,先加上锁再访问,访问之后再解锁。通过锁界定一个临界区,同时只有一个线程进入。如上图所示,Thread2访问Entry的时候,加了锁,Thread1就不能再执行访问Entry的代码,从而保证线程安全。

下面是ArrayBlockingQueue通过加锁的方式实现的offer方法,保证线程安全。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
insert(e);
return true;
}
} finally {
lock.unlock();
}
}

原子变量

原子变量能够保证原子性的操作,意思是某个任务在执行过程中,要么全部成功,要么全部失败回滚,恢复到执行之前的初态,不存在初态和成功之间的中间状态。例如CAS操作,要么比较并交换成功,要么比较并交换失败。由CPU保证原子性。

通过原子变量可以实现线程安全。执行某个任务的时候,先假定不会有冲突,若不发生冲突,则直接执行成功;当发生冲突的时候,则执行失败,回滚再重新操作,直到不发生冲突。

图2 通过原子变量CAS实现线程安全

图2 通过原子变量CAS实现线程安全

如图所示,Thread1和Thread2都要把Entry加1。若不加锁,也不使用CAS,有可能Thread1取到了myValue=1,Thread2也取到了myValue=1,然后相加,Entry中的value值为2。这与预期不相符,我们预期的是Entry的值经过两次相加后等于3。

CAS会先把Entry现在的value跟线程当初读出的值相比较,若相同,则赋值;若不相同,则赋值执行失败。一般会通过while/for循环来重新执行,直到赋值成功。

代码示例是AtomicInteger的getAndAdd方法。CAS是CPU的一个指令,由CPU保证原子性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* Atomically adds the given value to the current value.
*
* @param delta the value to add
* @return the previous value
*/
public final int getAndAdd(int delta) {
for (;;) {
int current = get();
int next = current + delta;
if (compareAndSet(current, next))
return current;
}
}

/**
* Atomically sets the value to the given updated value
* if the current value {@code ==} the expected value.
*
* @param expect the expected value
* @param update the new value
* @return true if successful. False return indicates that
* the actual value was not equal to the expected value.
*/
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

在高度竞争的情况下,锁的性能将超过原子变量的性能,但是更真实的竞争情况下,原子变量的性能将超过锁的性能。同时原子变量不会有死锁等活跃性问题。

伪共享

什么是共享

下图是计算的基本结构。L1、L2、L3分别表示一级缓存、二级缓存、三级缓存,越靠近CPU的缓存,速度越快,容量也越小。所以L1缓存很小但很快,并且紧靠着在使用它的CPU内核;L2大一些,也慢一些,并且仍然只能被一个单独的CPU核使用;L3更大、更慢,并且被单个插槽上的所有CPU核共享;最后是主存,由全部插槽上的所有CPU核共享。

图3 计算机CPU与缓存示意图

图3 计算机CPU与缓存示意图

当CPU执行运算的时候,它先去L1查找所需的数据、再去L2、然后是L3,如果最后这些缓存中都没有,所需的数据就要去主内存拿。走得越远,运算耗费的时间就越长。所以如果你在做一些很频繁的事,你要尽量确保数据在L1缓存中。

另外,线程之间共享一份数据的时候,需要一个线程把数据写回主存,而另一个线程访问主存中相应的数据。

下面是从CPU访问不同层级数据的时间概念:

从CPU到 大约需要的CPU周期 大约需要的时间
主存 - 约60-80ns
QPI 总线传输(between sockets, not drawn) - 约20ns
L3 cache 约40-45 cycles 约15ns
L2 cache 约10 cycles 约3ns
L1 cache 约3-4 cycles 约1ns
寄存器 1 cycle -

可见CPU读取主存中的数据会比从L1中读取慢了近2个数量级。

缓存行

Cache是由很多个cache line组成的。每个cache line通常是64字节,并且它有效地引用主内存中的一块儿地址。一个Java的long类型变量是8字节,因此在一个缓存行中可以存8个long类型的变量。

CPU每次从主存中拉取数据时,会把相邻的数据也存入同一个cache line。

在访问一个long数组的时候,如果数组中的一个值被加载到缓存中,它会自动加载另外7个。因此你能非常快的遍历这个数组。事实上,你可以非常快速的遍历在连续内存块中分配的任意数据结构。

下面的例子是测试利用cache line的特性和不利用cache line的特性的效果对比。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.meituan.FalseSharing;

/**
* @author gongming
* @description
* @date 16/6/4
*/
public class CacheLineEffect {
//考虑一般缓存行大小是64字节,一个 long 类型占8字节
static long[][] arr;

public static void main(String[] args) {
arr = new long[1024 * 1024][];
for (int i = 0; i < 1024 * 1024; i++) {
arr[i] = new long[8];
for (int j = 0; j < 8; j++) {
arr[i][j] = 0L;
}
}
long sum = 0L;
long marked = System.currentTimeMillis();
for (int i = 0; i < 1024 * 1024; i+=1) {
for(int j =0; j< 8;j++){
sum = arr[i][j];
}
}
System.out.println("Loop times:" + (System.currentTimeMillis() - marked) + "ms");

marked = System.currentTimeMillis();
for (int i = 0; i < 8; i+=1) {
for(int j =0; j< 1024 * 1024;j++){
sum = arr[j][i];
}
}
System.out.println("Loop times:" + (System.currentTimeMillis() - marked) + "ms");
}
}

在2G Hz、2核、8G内存的运行环境中测试,速度差一倍。

结果:

Loop times:30ms Loop times:65ms

什么是伪共享

ArrayBlockingQueue有三个成员变量: - takeIndex:需要被取走的元素下标 - putIndex:可被元素插入的位置的下标 - count:队列中元素的数量

这三个变量很容易放到一个缓存行中,但是之间修改没有太多的关联。所以每次修改,都会使之前缓存的数据失效,从而不能完全达到共享的效果。

图4 ArrayBlockingQueue伪共享示意图

图4 ArrayBlockingQueue伪共享示意图

如上图所示,当生产者线程put一个元素到ArrayBlockingQueue时,putIndex会修改,从而导致消费者线程的缓存中的缓存行无效,需要从主存中重新读取。

这种无法充分使用缓存行特性的现象,称为伪共享。

对于伪共享,一般的解决方案是,增大数组元素的间隔使得由不同线程存取的元素位于不同的缓存行上,以空间换时间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package com.meituan.FalseSharing;

public class FalseSharing implements Runnable{
public final static long ITERATIONS = 500L * 1000L * 100L;
private int arrayIndex = 0;

private static ValuePadding[] longs;
public FalseSharing(final int arrayIndex) {
this.arrayIndex = arrayIndex;
}

public static void main(final String[] args) throws Exception {
for(int i=1;i<10;i++){
System.gc();
final long start = System.currentTimeMillis();
runTest(i);
System.out.println("Thread num "+i+" duration = " + (System.currentTimeMillis() - start));
}

}

private static void runTest(int NUM_THREADS) throws InterruptedException {
Thread[] threads = new Thread[NUM_THREADS];
longs = new ValuePadding[NUM_THREADS];
for (int i = 0; i < longs.length; i++) {
longs[i] = new ValuePadding();
}
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(new FalseSharing(i));
}

for (Thread t : threads) {
t.start();
}

for (Thread t : threads) {
t.join();
}
}

public void run() {
long i = ITERATIONS + 1;
while (0 != --i) {
longs[arrayIndex].value = 0L;
}
}

public final static class ValuePadding {
protected long p1, p2, p3, p4, p5, p6, p7;
protected volatile long value = 0L;
protected long p9, p10, p11, p12, p13, p14;
protected long p15;
}
public final static class ValueNoPadding {
// protected long p1, p2, p3, p4, p5, p6, p7;
protected volatile long value = 0L;
// protected long p9, p10, p11, p12, p13, p14, p15;
}
}

在2G Hz,2核,8G内存, jdk 1.7.0_45 的运行环境下,使用了共享机制比没有使用共享机制,速度快了4倍左右。

结果:

  • Thread num 1 duration = 447
  • Thread num 2 duration = 463
  • Thread num 3 duration = 454
  • Thread num 4 duration = 464
  • Thread num 5 duration = 561
  • Thread num 6 duration = 606
  • Thread num 7 duration = 684
  • Thread num 8 duration = 870
  • Thread num 9 duration = 823

把代码中ValuePadding都替换为ValueNoPadding后的结果:

  • Thread num 1 duration = 446
  • Thread num 2 duration = 2549
  • Thread num 3 duration = 2898
  • Thread num 4 duration = 3931
  • Thread num 5 duration = 4716
  • Thread num 6 duration = 5424
  • Thread num 7 duration = 4868
  • Thread num 8 duration = 4595
  • Thread num 9 duration = 4540

备注:在jdk1.8中,有专门的注解@Contended来避免伪共享,更优雅地解决问题。

Disruptor的设计方案

Disruptor通过以下设计来解决队列速度慢的问题:

  • 环形数组结构

为了避免垃圾回收,采用数组而非链表。同时,数组对处理器的缓存机制更加友好。

  • 元素位置定位

数组长度2^n,通过位运算,加快定位的速度。下标采取递增的形式。不用担心index溢出的问题。index是long类型,即使100万QPS的处理速度,也需要30万年才能用完。

  • 无锁设计

每个生产者或者消费者线程,会先申请可以操作的元素在数组中的位置,申请到之后,直接在该位置写入或者读取数据。

下面忽略数组的环形结构,介绍一下如何实现无锁设计。整个过程通过原子变量CAS,保证操作的线程安全。

一个生产者

写数据

生产者单线程写数据的流程比较简单:

  1. 申请写入m个元素;
  2. 若是有m个元素可以入,则返回最大的序列号。这儿主要判断是否会覆盖未读的元素;
  3. 若是返回的正确,则生产者开始写入元素。

图5 单个生产者生产过程示意图

图5 单个生产者生产过程示意图

多个生产者

多个生产者的情况下,会遇到“如何防止多个线程重复写同一个元素”的问题。Disruptor的解决方法是,每个线程获取不同的一段数组空间进行操作。这个通过CAS很容易达到。只需要在分配元素的时候,通过CAS判断一下这段空间是否已经分配出去即可。

但是会遇到一个新问题:如何防止读取的时候,读到还未写的元素。Disruptor在多个生产者的情况下,引入了一个与Ring Buffer大小相同的buffer:available Buffer。当某个位置写入成功的时候,便把availble Buffer相应的位置置位,标记为写入成功。读取的时候,会遍历available Buffer,来判断元素是否已经就绪。

下面分读数据和写数据两种情况介绍。

读数据

生产者多线程写入的情况会复杂很多:

  1. 申请读取到序号n;
  2. 若writer cursor >= n,这时仍然无法确定连续可读的最大下标。从reader cursor开始读取available Buffer,一直查到第一个不可用的元素,然后返回最大连续可读元素的位置;
  3. 消费者读取元素。

如下图所示,读线程读到下标为2的元素,三个线程Writer1/Writer2/Writer3正在向RingBuffer相应位置写数据,写线程被分配到的最大元素下标是11。

读线程申请读取到下标从3到11的元素,判断writer cursor>=11。然后开始读取availableBuffer,从3开始,往后读取,发现下标为7的元素没有生产成功,于是WaitFor(11)返回6。

然后,消费者读取下标从3到6共计4个元素。

图6 多个生产者情况下,消费者消费过程示意图

图6 多个生产者情况下,消费者消费过程示意图

写数据

多个生产者写入的时候:

  1. 申请写入m个元素;
  2. 若是有m个元素可以写入,则返回最大的序列号。每个生产者会被分配一段独享的空间;
  3. 生产者写入元素,写入元素的同时设置available Buffer里面相应的位置,以标记自己哪些位置是已经写入成功的。

如下图所示,Writer1和Writer2两个线程写入数组,都申请可写的数组空间。Writer1被分配了下标3到下表5的空间,Writer2被分配了下标6到下标9的空间。

Writer1写入下标3位置的元素,同时把available Buffer相应位置置位,标记已经写入成功,往后移一位,开始写下标4位置的元素。Writer2同样的方式。最终都写入完成。

图7 多个生产者情况下,生产者生产过程示意图

图7 多个生产者情况下,生产者生产过程示意图

防止不同生产者对同一段空间写入的代码,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public long tryNext(int n) throws InsufficientCapacityException
{
if (n < 1)
{
throw new IllegalArgumentException("n must be > 0");
}

long current;
long next;

do
{
current = cursor.get();
next = current + n;

if (!hasAvailableCapacity(gatingSequences, n, current))
{
throw InsufficientCapacityException.INSTANCE;
}
}
while (!cursor.compareAndSet(current, next));

return next;
}

通过do/while循环的条件cursor.compareAndSet(current, next),来判断每次申请的空间是否已经被其他生产者占据。假如已经被占据,该函数会返回失败,While循环重新执行,申请写入空间。

消费者的流程与生产者非常类似,这儿就不多描述了。

总结

Disruptor通过精巧的无锁设计实现了在高并发情形下的高性能。

在美团内部,很多高并发场景借鉴了Disruptor的设计,减少竞争的强度。其设计思想可以扩展到分布式场景,通过无锁设计,来提升服务性能。

使用Disruptor比使用ArrayBlockingQueue略微复杂,为方便读者上手,增加代码样例。

代码实现的功能:每10ms向disruptor中插入一个元素,消费者读取数据,并打印到终端。详细逻辑请细读代码。

以下代码基于3.3.4版本的Disruptor包。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package com.meituan.Disruptor;

/**
* @description disruptor代码样例。每10ms向disruptor中插入一个元素,消费者读取数据,并打印到终端
*/
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

import java.util.concurrent.ThreadFactory;


public class DisruptorMain
{
public static void main(String[] args) throws Exception
{
// 队列中的元素
class Element {

private int value;

public int get(){
return value;
}

public void set(int value){
this.value= value;
}

}

// 生产者的线程工厂
ThreadFactory threadFactory = new ThreadFactory(){
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "simpleThread");
}
};

// RingBuffer生产工厂,初始化RingBuffer的时候使用
EventFactory<Element> factory = new EventFactory<Element>() {
@Override
public Element newInstance() {
return new Element();
}
};

// 处理Event的handler
EventHandler<Element> handler = new EventHandler<Element>(){
@Override
public void onEvent(Element element, long sequence, boolean endOfBatch)
{
System.out.println("Element: " + element.get());
}
};

// 阻塞策略
BlockingWaitStrategy strategy = new BlockingWaitStrategy();

// 指定RingBuffer的大小
int bufferSize = 16;

// 创建disruptor,采用单生产者模式
Disruptor<Element> disruptor = new Disruptor(factory, bufferSize, threadFactory, ProducerType.SINGLE, strategy);

// 设置EventHandler
disruptor.handleEventsWith(handler);

// 启动disruptor的线程
disruptor.start();

RingBuffer<Element> ringBuffer = disruptor.getRingBuffer();

for (int l = 0; true; l++)
{
// 获取下一个可用位置的下标
long sequence = ringBuffer.next();
try
{
// 返回可用位置的元素
Element event = ringBuffer.get(sequence);
// 设置该位置元素的值
event.set(l);
}
finally
{
ringBuffer.publish(sequence);
}
Thread.sleep(10);
}
}
}

性能

以下面这些模式测试性能:

img

吞吐量测试数据(每秒的数量)如下。

环境: - CPU:Intel Core i7 860 @ 2.8 GHz without HT - JVM:Java 1.6.0_25 64-bit - OS:Windows 7

- ABQ Disruptor
Unicast: 1P – 1C 5,339,256 25,998,336
Pipeline: 1P – 3C 2,128,918 16,806,157
Sequencer: 3P – 1C 5,539,531 13,403,268
Multicast: 1P – 3C 1,077,384 9,377,871
Diamond: 1P – 3C 2,113,941 16,143,613

环境:

  • CPU:Intel Core i7-2720QM
  • JVM:Java 1.6.0_25 64-bit
  • OS:Ubuntu 11.04
- ABQ Disruptor
Unicast: 1P – 1C 4,057,453 22,381,378
Pipeline: 1P – 3C 2,006,903 15,857,913
Sequencer: 3P – 1C 2,056,118 14,540,519
Multicast: 1P – 3C 260,733 10,860,121
Diamond: 1P – 3C 2,082,725 15,295,197

依据并发竞争的激烈程度的不同,Disruptor比ArrayBlockingQueue吞吐量快4~7倍。

按照Pipeline: 1P – 3C的连接模式测试延迟,生产者两次写入之间的延迟为1ms。

运行环境:

  • CPU:2.2GHz Core i7-2720QM
  • Java: 1.6.0_25 64-bit
  • OS:Ubuntu 11.04.
- Array Blocking Queue (ns) Disruptor (ns)
99% observations less than 2,097,152 128
99.99% observations less than 4,194,304 8,192
Max Latency 5,069,086 175,567
Mean Latency 32,757 52
Min Latency 145 29

可见,平均延迟差了3个数量级。

等待策略

生产者的等待策略

暂时只有休眠1ns。

1
LockSupport.parkNanos(1);

消费者的等待策略

名称 措施 适用场景
BlockingWaitStrategy 加锁 CPU资源紧缺,吞吐量和延迟并不重要的场景
BusySpinWaitStrategy 自旋 通过不断重试,减少切换线程导致的系统调用,而降低延迟。推荐在线程绑定到固定的CPU的场景下使用
PhasedBackoffWaitStrategy 自旋 + yield + 自定义策略 CPU资源紧缺,吞吐量和延迟并不重要的场景
SleepingWaitStrategy 自旋 + yield + sleep 性能和CPU资源之间有很好的折中。延迟不均匀
TimeoutBlockingWaitStrategy 加锁,有超时限制 CPU资源紧缺,吞吐量和延迟并不重要的场景
YieldingWaitStrategy 自旋 + yield + 自旋 性能和CPU资源之间有很好的折中。延迟比较均匀

Log4j 2应用场景

Log4j 2相对于Log4j 1最大的优势在于多线程并发场景下性能更优。该特性源自于Log4j 2的异步模式采用了Disruptor来处理。 在Log4j 2的配置文件中可以配置WaitStrategy,默认是Timeout策略。下面是Log4j 2中对WaitStrategy的配置官方文档:

System Property Default Value Description
AsyncLogger.WaitStrategy Timeout Valid values: Block, Timeout, Sleep, Yield. Block is a strategy that uses a lock and condition variable for the I/O thread waiting for log events. Block can be used when throughput and low-latency are not as important as CPU resource. Recommended for resource constrained/virtualised environments. Timeout is a variation of the Block strategy that will periodically wake up from the lock condition await() call. This ensures that if a notification is missed somehow the consumer thread is not stuck but will recover with a small latency delay (default 10ms). Sleep is a strategy that initially spins, then uses a Thread.yield(), and eventually parks for the minimum number of nanos the OS and JVM will allow while the I/O thread is waiting for log events. Sleep is a good compromise between performance and CPU resource. This strategy has very low impact on the application thread, in exchange for some additional latency for actually getting the message logged. Yield is a strategy that uses a Thread.yield() for waiting for log events after an initially spinning. Yield is a good compromise between performance and CPU resource, but may use more CPU than Sleep in order to get the message logged to disk sooner.

性能差异

loggers all async采用的是Disruptor,而Async Appender采用的是ArrayBlockingQueue队列。

由图可见,单线程情况下,loggers all async与Async Appender吞吐量相差不大,但是在64个线程的时候,loggers all async的吞吐量比Async Appender增加了12倍,是Sync模式的68倍。

图8 Log4j 2各个模式性能比较

图8 Log4j 2各个模式性能比较

美团在公司内部统一推行日志接入规范,要求必须使用Log4j 2,使普通单机QPS的上限不再只停留在几千,极高地提升了服务性能。

参考文档

  1. http://brokendreams.iteye.com/blog/2255720
  2. http://ifeve.com/dissecting-disruptor-whats-so-special/
  3. https://github.com/LMAX-Exchange/disruptor/wiki/Performance-Results
  4. https://lmax-exchange.github.io/disruptor/
  5. https://logging.apache.org/log4j/2.x/manual/async.html

Hello World

Welcome to Hexo! This is your very first post. Check documentation for more info. If you get any problems when using Hexo, you can find the answer in troubleshooting or you can ask me on GitHub.

Quick Start

Create a new post

test

1
$ hexo new "My New Post"

More info: Writing

Run server

1
$ hexo server

More info: Server

Generate static files

1
2
3
$ hexo generate
# 创建layout为 `post-java` 且路径为 java/schedule 文件名为chedule
$ hexo new post-java -p "java/schedule" schedule

More info: Generating

Deploy to remote sites

1
$ hexo deploy

More info: Deployment

schedule

并发编程领域中定时器 相关内容经常被一些介绍并发编程书籍所遗忘,属于并发编程学习优先级较低的知识点。在JDK源码中有两种定时器实现,一种是JDK1.3引入的*Timer,它是一种基于单线程操作的简单任务调度器,虽然存在较多设计缺陷,但仍有很多应用场景和使用案例;另一种JDK1.5引入的*ScheduledThreadPoolExecutor**类,是一种基于线程池操作的较复杂任务调度器,同时也是官方推荐的任务调度器实现。

定时器Timer,也称简单任务调度器。它由以下四个类组成,

  • 定时任务(TimerTask类)
  • 任务队列(TaskQueue类)
  • 定时线程(TimerThread类)
  • 定时器(Timer类)

定时任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public abstract class TimerTask implements Runnable {
final Object lock = new Object();

//任务状态
int state = VIRGIN;
static final int VIRGIN = 0;
static final int SCHEDULED = 1;
static final int EXECUTED = 2;
static final int CANCELLED = 3;

//下次执行时间
long nextExecutionTime;
//调度至执行间隔时间
long period = 0;
}

抽象类TimerTask实现Runnable接口,表明该类作为定时任务模版,用户可以根据业务场景定义具体任务。TimerTask类要维护任务状态 (state)、任务下次执行时间(nextExecutionTime)和任务调度至执行的间隔时间(period)。

任务状态

定时器任务生命周期中可能处于下表所示的4种不同的状态,在给定的时刻定时器任务只能处于其中一种状态。

timer task

执行任务

TimerTask类的抽象方法run来自Runnable接口,TimerTask并未实现该接口,延迟至子类实现。用户可在派生类中自定义任务逻辑。

1
public abstract void run();

抽象类TimerTask的run方法并不一定要来源于Runnable接口,它并未接受线程调度,而是由TimerThread线程从TimerQueue中消费任务,然后直接调用TimerTask.run()执行任务。基于这种理解,TimerTask类完全可以像这样定义:

1
2
3
4
5
public abstract class TimerTask {  // 舍去implement Runnable

//由抽象类自己定义,而非来自Runnable接口
public abstract void run();
}

TimerTask类这种写法可以理解为被过度设计了,读者可思之。

取消任务

如果当前任务正处于SCHEDULED状态,允许撤销当前任务,置任务为CANCELLED状态,返回true表示任务撤销成功;若任务处于其它状态,也置任务为CANCELLED状态,并返回false表示任务撤销失败。

1
2
3
4
5
6
7
8
public boolean cancel() {
synchronized(lock) {
boolean result = (state == SCHEDULED);
//实际上所有任务都能被取消
state = CANCELLED;
return result;
}
}

调用TimerTask.cancel(),虽然对不同状态有不同的返回值,但不管什么状态都能够被取消。设计逻辑匪夷所思,我认为这种设计不合理,读者可思之。

调度执行时间

scheduledExecutionTime方法获取任务被调度后最近的开始执行时间点,保证调度时间在下次执行时间之前。

1
2
3
4
5
6
public long scheduledExecutionTime() {
synchronized(lock) {
return (period < 0 ?
nextExecutionTime + period : nextExecutionTime - period);
}
}

定时线程

从优先级队列里异步消费任务的操作由单线程完成。TimerThread是单线程,因此需要mainLoop循环逻辑来轮询消费任务队列。

1
2
3
4
5
6
7
8
9
10
11
class TimerThread extends Thread {

boolean newTasksMayBeScheduled = true;

//内部维护一个队列
private TaskQueue queue;

TimerThread(TaskQueue queue) {
this.queue = queue;
}
}

轮询任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
@Override
public void run() {
try {
//循环执行逻辑
mainLoop();
} finally {
synchronized(queue) {
newTasksMayBeScheduled = false;
//清空任务队列. 在结束循环后可能仍有任务被加入到队列,因此需要清空.
queue.clear();
}
}
}

private void mainLoop() {
while (true) {
try {
TimerTask task;
boolean taskFired;
synchronized(queue) {
//若队列为空且定时器未被撤销,则挂起定时线程直至被唤醒
while (queue.isEmpty() && newTasksMayBeScheduled) {
queue.wait();
}
//若线程被唤醒后队列仍为空,则结束循环. 说明此时定时器被撤销.
if (queue.isEmpty()) {
break;
}

long currentTime, executionTime;
//获取最近执行时间任务
task = queue.getMin();
synchronized(task.lock) {
//任务若被取消,则从队列中移除,并继续轮询
if (task.state == TimerTask.CANCELLED) {
queue.removeMin();
continue;
}

currentTime = System.currentTimeMillis();
executionTime = task.nextExecutionTime;
//任务最近要执行
if (taskFired = (executionTime<=currentTime)) {
//若为非重复执行任务,从队列中移除该任务,并设置该任务状态为已执行
if (task.period == 0) {
queue.removeMin();
task.state = TimerTask.EXECUTED;
} else {
//若为重复执行任务,则在指定时刻重新调度该任务
queue.rescheduleMin(
task.period<0 ? currentTime-task.period
: executionTime + task.period);
}
}
//若最近无任务要执行,则等待至要执行任务的指定时刻
if (!taskFired) {
queue.wait(executionTime - currentTime);
}
}
}

//任务已释放,运行任务
if (taskFired) {
task.run();
}
} catch(InterruptedException e) {
}
}
}

任务队列

任务队列是基于完全二叉树实现的小顶堆。队列初始容量为128,由于0位置不存储任务,因此实际初始容量为127,size表示队列的任务数。

1
2
3
4
5
6
7
8
class TaskQueue {

//基于顺序表实现的定时任务队列
private TimerTask[] queue = new TimerTask[128];

//队列任务数
private int size = 0;
}

查询容量

查询队列任务数和判断队列是否为空都直接使用任务队列内部维护的size属性,因此这两个操作的时间复杂度为O(1)。

1
2
3
4
5
/** 队列任务数 */
int size() { return size; }

/** 队列是否为空 */
boolean isEmpty() { return size==0; }

添加任务

主线程向任务队列中注入新任务。如果当前任务队列容量已达极限,则在原容量基础上扩容一倍,并在任务队列末尾追加新任务,并根据任务执行时间作为优先级调整新任务在任务队列中的位置。

1
2
3
4
5
6
7
8
9
10
11
/** 新增任务并调整小顶堆 */
void add(TimerTask task) {
//任务数达到队列最大容量,则扩容一倍
if (size + 1 == queue.length) {
queue = Arrays.copyOf(queue, 2*queue.length);
}
//添加任务
queue[++size] = task;
//向上调整任务
fixUp(size);
}

img

获取任务

从任务队列中获取最近将要执行任务的时间复杂度为O(1);获得指定位置任务的时间复杂度也是O(1)。

1
2
3
4
5
/** 获得下次执行时间最小的任务,即最小堆根结点 */
TimerTask getMin() { return queue[1]; }

/** 获得指定位置的任务 */
TimerTask get(int i) { return queue[i]; }

img

移除任务

1
2
3
4
5
6
/** 移除下次执行时间最小的任务,即移除堆顶任务 */
void removeMin() {
queue[1] = queue[size];
queue[size--] = null;
fixDown(1);
}

img

1
2
3
4
5
6
7
8
/** 快速移除指定位置处任务 */
void quickRemove(int i) {
assert i <= size; //assert生效需要编译器开启断言功能

//指定位置元素直接用最后元素代替,不需要向下调整
queue[i] = queue[size];
queue[size--] = null;
}

img

1
2
3
4
5
6
/** 清空任务队列 */
void clear() {
for (int i=1; i<=size; i++)
queue[i] = null;
size = 0;
}

重新调度任务

重新调度任务不删除堆顶任务,而是将堆顶任务的nextExecutionTime加上period后得到新的nextExecutionTime值,然后根据任务优先级向下调整。

1
2
3
4
void rescheduleMin(long newTime) {
queue[1].nextExecutionTime = newTime;
fixDown(1);
}

img

基础算法

任务队列是优先级队列,基于顺序结构完全二叉树实现的小顶堆。优先级的依据是任务下次执行时间。

img

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/** 提升优先级 */
private void fixUp(int k) {
while (k > 1) {
//父结点位置
int j = k >> 1;
//如果父结点的下次任务执行时间小于当前结点下次任务执行时间,结束调整操作
if (queue[j].nextExecutionTime <= queue[k].nextExecutionTime) {
break;
}

//调整任务在任务队列中的位置
TimerTask tmp = queue[j];
queue[j] = queue[k];
queue[k] = tmp;
k = j;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/** 降低优先级 */
private void fixDown(int k) {
int j;
while ((j = k << 1) <= size && j > 0) {
//选择左右两侧子结点,选择更小的交换位置
if (j < size &&
queue[j].nextExecutionTime > queue[j+1].nextExecutionTime) {
j++;
}
if (queue[k].nextExecutionTime <= queue[j].nextExecutionTime) {
break;
}

//调整任务在任务队列中的位置
TimerTask tmp = queue[j];
queue[j] = queue[k];
queue[k] = tmp;
k = j;
}
}

调整当前完全二叉树为最小堆。

1
2
3
4
5
6
/** 堆化 */
void heapify() {
for (int i = size/2; i >= 1; i--) {
fixDown(i);
}
}

定时器

一个定时器内部维护一个任务队列和一个定时线程。在Main线程往任务队列注入任务后,由定时线程异步轮询处理任务队列,这种处理方式实质上是异步串行方式,任务处理并发度为1。

1
2
3
4
5
6
7
8
public class Timer {

/** 任务队列 */
private final TaskQueue queue = new TaskQueue();

/** 定时线程 */
private final TimerThread thread = new TimerThread(queue);
}

构造器

新建Timer实例,同时也新建了任务队列和定时线程,并启动定时线程。启动定时线程前可指定定时线程的名称,以及指定为后台线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public Timer() {
this("Timer-" + serialNumber());
}
public Timer(boolean isDaemon) {
this("Timer-" + serialNumber(), isDaemon);
}
public Timer(String name) {
thread.setName(name);
thread.start();
}
public Timer(String name, boolean isDaemon) {
thread.setName(name);
thread.setDaemon(isDaemon);
thread.start();
}

//单机序列号生成
private final static AtomicInteger nextSerialNumber = new AtomicInteger(0);
private static int serialNumber() {
return nextSerialNumber.getAndIncrement();
}

定间隔调度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/** 延迟调度 */
public void schedule(TimerTask task, long delay) {
if (delay < 0)
throw new IllegalArgumentException("Negative delay.");

//从当前时间开始延时delay毫秒后调度
sched(task, System.currentTimeMillis()+delay, 0);
}

/** 定时调度 */
public void schedule(TimerTask task, Date time) {

//从指定时刻出开始调度
sched(task, time.getTime(), 0);
}

/** 延时周期性调度 */
public void schedule(TimerTask task, long delay, long period) {
if (delay < 0)
throw new IllegalArgumentException("Negative delay.");
if (period <= 0)
throw new IllegalArgumentException("Non-positive period.");
sched(task, System.currentTimeMillis()+delay, -period);
}

/** 定时周期性调度 */
public void schedule(TimerTask task, Date firstTime, long period) {
if (period <= 0)
throw new IllegalArgumentException("Non-positive period.");
sched(task, firstTime.getTime(), -period);
}

Timer.schedule()侧重period时间的一致性,保证执行任务的间隔时间相同。

img

定频率调度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/** 延时周期性定速调度 */
public void scheduleAtFixedRate(TimerTask task, long delay, long period) {
if (delay < 0)
throw new IllegalArgumentException("Negative delay.");
if (period <= 0)
throw new IllegalArgumentException("Non-positive period.");
sched(task, System.currentTimeMillis()+delay, period);
}

/** 定时周期性定速调度 */
public void scheduleAtFixedRate(TimerTask task, Date firstTime, long period) {
if (period <= 0)
throw new IllegalArgumentException("Non-positive period.");
sched(task, firstTime.getTime(), period);
}

Timer.scheduleAtFixedRate()侧重执行频率的一致性,任务执行时间加period时间的和相等。

img

核心调度算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private void sched(TimerTask task, long time, long period) {
if (time < 0)
throw new IllegalArgumentException("Illegal execution time.");
if (Math.abs(period) > (Long.MAX_VALUE >> 1))
period >>= 1;

synchronized(queue) {

//保证定时器未被取消
if (!thread.newTasksMayBeScheduled) {
throw new IllegalStateException("Timer already cancelled.");
}

synchronized(task.lock) {
//保证任务最初处于未使用状态
if (task.state != TimerTask.VIRGIN) {
throw new IllegalStateException(
"Task already scheduled or cancelled");
}

//下次任务执行时间
task.nextExecutionTime = time;
//任务执行周期
task.period = period;
//设置任务状态为已调度
task.state = TimerTask.SCHEDULED;
}

//往任务队列中添加任务
queue.add(task);

//如果队列中该任务为最近要执行的任务,则立即唤醒定时线程处理
if (queue.getMin() == task) {
queue.notify();
}
}
}

撤销定时器

1
2
3
4
5
6
7
8
9
10
public void cancel() {
synchronized(queue) {
//撤销定时器
thread.newTasksMayBeScheduled = false;
//清空任务队列
queue.clear();
//唤醒定时线程
queue.notify();
}
}

清理取消状态的任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public int purge() {
//从队列中移除的任务数
int result = 0;
synchronized(queue) {
for (int i = queue.size(); i > 0; i--) {
//从队列中移除取消状态任务
if (queue.get(i).state == TimerTask.CANCELLED) {
queue.quickRemove(i);
result++;
}
}
//如果仍有非取消任务,队列重新堆化
if (result != 0)
queue.heapify();
}
return result;
}

总结

读完源码后总结如下,

数据结构

小顶堆实现优先级队列,优先级标准是任务下次执行时间。

任务状态转换

img

定时器架构图

img

架构缺陷

单线程串行消费任务,前置任务消费延迟或失败会直接影响后续任务的消费。如果消费前置任务时抛出异常,线程退出,队列中的任务无法被继续消费,定时器失效。

Open API

#架构图

架构类

框架上使用

spring/mybatis/undertow

服务治理使用

dubbo

缓存服务使用

redis(身份认证状态缓存,临时性字典数据缓存,提升效率)

MQ

RocketMQ(mq做业务解耦,流量削峰,业务驱动,回调通知队列,消息持久化和定时任务补偿保证数据不被丢失)

分布式锁服务

zk

webhook通知服务

okhttp3、forkjoin框架、rocketMq队列、elasticJob补偿

流量控制

nginx、lua、redis、mq

服务稳定性

hystrix 熔断

模块划分

接口模块-----api
    核心模块----core
    管理端模块----manager
    webhook回调通知模块----webhook --> 订阅event --> 消息letter生成 -->入库 -->生成待发送队列(两级队列:内存队列20w,rocketMQ消息队列;一个补偿任务elasticJob)-->

架构

Open API 介绍

OpenAPI 项目是基于REST标准来设计的,为保证统一和安全,全局编码格式为UTF-8,全局使用https。我们的API具有可预测的面向资源的url,返回json编码的响应,并使用标准的HTTP响应代码、身份验证和请求动词。

为了数据准确性和生产环境数据安全,建议在沙盒环境测试这些接口.

版本控制

当我们对API进行向后不兼容的更改时,我们会发布新版本。要使用的版本在URL中指定。当前版本是v1,比如:

https://api.itmarte.com/payments/v1/...

授权认证

在不同的对接场景下Open API存在两种认证方式(用户开发者和第三方应用开发者,通常情况下申请用户开发者),使用http头Authorization做认证:
授权图

用户开发者模式

创建了用户开发者之后,会收到给您返回的developerIdmasterTokenmasterToken能行使用户所有权限,请您务必安全保管)和LLP_RSA_PUB_KEY.pem,身份认证格式如下:

Authorization: Basic &lt;&lt;Base64.encode(developerId:masterToken)&gt;&gt;

第三方应用开发者模式

创建第三方应用开发者之后,会收到clientIdclientSecretLLP_RSA_PUB_KEY.pem,至于accessToken则需要通过OAuth2.0模式向有资源的用户申请,身份认证格式如下:

Authorization: Bearer &lt;&lt;accessToken&gt;&gt;

请求安全

为了请求安全防止重放攻击,要求所有请求都得有签名认证,在http头定义了xxx-Signature字段作为签名信息载体,xxx-Signature头文件中包含了请求包体和响应的epoch时间戳(是指格林威治时间1970年01月01日00时00分00秒起至现在的总秒数)例如:xxx-Signature:t=&lt;&lt;epoch&gt;&gt;,v=&lt;&lt;signature&gt;&gt;,一个请求的有效时间是5分钟。下面介绍下请求的签名格式:

请求签名

  1. HTTP请求方式URI请求epoch时间(单位秒)、请求包体的数据按照一定顺序用字符串“&”做拼接后使用对接方的RSA私钥通过SHA256WithRSA算法做签名并用Base64编码,生成的签名字符串(signature)和epoch时间放入HTTP包头的xxx-Signature标签中,格式为:
xxx-Signature:t=&lt;&lt;epoch&gt;&gt;,v=&lt;&lt;signature&gt;&gt;

第一步: 确定签名payload

如下字段请用&一次连接

  • HTTP_METHOD: 对应实际接口的方法(统一用大写),如POSTPUTGETDELETE等;

  • URI: 请求的URI地址(除去host). 例如https://api.sandbox.itmarte.com/collections/v1/merchants/collections/v1/merchants为URI

  • REQUEST_EPOCH: 是指格林威治时间1970年01月01日00时00分00秒起至现在的总秒数,该值应与t值保持一致

  • REQUEST_PAYLOAD: 请求包体 {"currency":"USD"}

  • QUERY_STRING: 查询字段例如:https://api.sandbox.itmarte.com/collections/v1/merchants?attr1=value1&attr2=value2,其中QUERY_STRING=attr1=value1&attr2=value2格式化为attr1%3Dvalue1%26attr2%3Dvalue2

    payload示例:

    POST&/collections/v1/merchants&19879234&{“currency”:”USD”}&attr1%3Dvalue1%26attr2%3Dvalue2

第二部: 准备 xxx-Signature 签名头

你会用到以下内容:

  • REQUEST_EPOCH (Seconds elapsed since 1970/1/1 00:00:00 GMT as a string)
  • 连接字符串 ,
  • payload(第一步的结果)
  • your_rsa_pri_key:你的RSA私钥
xxx-Signature: t=REQUEST_EPOCH,v=BASE64_ENCODE(SHA256WithRSA.sign(&lt;&lt;payload&gt;&gt;, &lt;&lt;your_rsa_pri_key&gt;&gt;))

请求示例

POST /api/mkt/balance HTTP/1.1 
Host: api.itmarte.com 
Content-Type: application/json 
Authorization: Basic WTgzcHNkcFdqY3J0Vml5eHVveTNyWGp2OWpzMjV3aUs6WTgzcHNkcFdqY3J0Vml5eHVveTNyWGp2OWpzMjV3aUs= 
xxx-Signature: t=1574130344,v=cJKgD/EpqNVnITR7yZ8BIev5j1E0ub0VbG4uGA69gR4T1FFc7NzqbiBoDEOBvkQtJXytQd7dY+WDo0Qm0c6gCnRHqIEyBen6SnBk/PjhIn7H93sHMyUEbesJqB6NAzOHA4uVj+8aTfREQWxKaizkDTT1dnrBUZ7KPxz4KKzRXtZ6tEh48HKsA5xqviedc+kpilaFbFSaoJmFj760TV8FB+mKCkZSrvX1Y+4x0bqTVBXAt2kE2Z8vCH16BDtlWGLZRSlWtZWyvpz6F0a/VWYVhoBEmgNFevnYDeAMGB6VEDBE1pZLMnhxfLfz6yu/p1pv1c2N2Yk5YSahQw4lLLiqQQ== 
Accept: */* 
Cache-Control: no-cache 
Content-Length: 18 
Connection: keep-alive 

{"currency":"USD"} 

请求结果签名验证

  • 若请求成功返回200,包体格式查看具体接口,对响应包体使用连连支付的RSA私钥用SHA256WithRSA做签名并用Base64编码,生成的签名字符串放入HTTP包头xxx-Signature标签中,格式为xxx-Signature: t = response_epoch, v = signature。
    其中:
  • t=响应时间戳(格林威治时间1970年01月01日00时00分00秒起至现在的总秒数)
  • v=BASE64_ENCODE(SHA256WithRSA(RESPONSE_EPOCH&RESPONSE_BODY, LLPAY_RSA_PRIVATE_KEY))

第一步: 确定 payload

如下字段创建payload& 做连接

  • Response Timestamp: 响应时间戳(格林威治时间1970年01月01日00时00分00秒起至现在的总秒数)
  • Response Payload: 响应包体,指定为JSON字符串如: {"currency":"USD"}

payload示例:

19879234&{"currency":"USD"}

第二部: 使用连连的RSA公钥校验签名的有效性

SHA256WithRSA.verify(xxx-Signature, '19879234&{" currency":"USD"}',  LLPAY_RSA_PRIVATE_KEY)

响应结果

成功返回结果示例

连连通过http状态码来判断请求的结果,一个成功的请求的http状态码为2XX,请求结果为相应的objects对象,例如:

1
2
3
4
5
6
7
8
HTTP/1.1 200 
status: 200
Content-Type: application/json
Content-Length: 61
Connection: keep-alive
xxx-Signature:t=1574130398,v=b0VbG4uGA69gR4T1FFc7NzqbiBoDEOBvkQtJXytQd7dY+WDo0QmgR4T1FFc7NzqbiBoDEOBvkQtJXytQpzMjV3aUs6R4T1FFc7NzqbiBoDEOBvWTgzcHNkcFdqY3J0Vml5eHVc6gCnRHqIEyBen6SnBk/PjhIn7H93sHMyUEbesJqB6NAzOHA4uVj+8aTfREQWxKaizkDTT1dnrBUZ7KPxz4KKzRXtZ6tEh48HKsA5xqWGLZRSlWtZWyvpz6F0a/VWYVhoBEmgNFevkE2Z8vCH16VEDBE1pZ6VEDBE1pZ6BDBE1pZ6VEDBE1DtlWGLnYviedc+kpilaFbFSaoJmFj76==

{"currency":"USD","balance":"12.25"}

Errors

一个失败的请求会收到4XX类的http状态码表示已知错误内容(具体错误码API文档给出),5XX的状态码表示未知的错误类型:

Attributes

code number
失败码类型,数字类型,用于快速定位错误类型

message string
失败描述

失败返回结果示例

HTTP/1.1 400
status: 400
Date: Tue, 19 Nov 2019 02:26:38 GMT
Content-Type: application/json
Content-Length: 77
Connection: keep-alive

{"code":"999995","message":"[holderType] is invalid"}

HTTP状态码一览表

CODE DESCRIPTION
400 请求错误,例如:参数错误
401 授权认证失败或者是签名认证失败
403 请求未授权
404 资源未找到,这里的资源指的是实际的Objects对象
500, 502, 503, 504 系统错误

请求幂等保证

实际运行场景中,由于网络原因或者其他原因导致的网络中断是不可避免的,所以连连这边特意设计了请求幂等保证操作,所有的POST、PUT、DELETE请求都可以做幂等校验,幂等请求认证成功之后,会返回最初的请求结果(5XX未知异常类型的错误除外)。

你需要在http头加入Idempotency-Key以便让系统失败你的幂等请求:

Idempotency-Key:&lt;&lt;unique id for client &gt;&gt;

Request IDs

每个API请求都有一个关联的请求标识符。您可以响应头找到Request-Id下这个键值。

字段命名规范

连连所有的字段命名规范为驼峰式:

https://api...com/resource/?filterBy="filter"

{
  "storeName": "My Store",
  "kycStatus": "success"
}

Webhook

你可以配置webhook地址来接收连连这边的回调信息(event),具体的回调信息(event)在相应的接口中定义

对象关系模型

对象关系模型

认识MySQL

什么是MySQL?

MySQL 是一种关系型数据库,在Java企业级开发中非常常用,因为 MySQL 是开源免费的,并且方便扩展。阿里巴巴数据库系统也大量用到了 MySQL,因此它的稳定性是有保障的。MySQL是开放源代码的,因此任何人都可以在 GPL(General Public License) 的许可下下载并根据个性化的需要对其进行修改。MySQL的默认端口号是3306

存储引擎

一些常用命令

查看MySQL提供的所有存储引擎

1
mysql> show engines;

查看MySQL提供的所有存储引擎

从上图我们可以查看出 MySQL 当前默认的存储引擎是InnoDB,并且在5.7版本所有的存储引擎中只有 InnoDB 是事务性存储引擎,也就是说只有 InnoDB 支持事务。

查看MySQL当前默认的存储引擎

我们也可以通过下面的命令查看默认的存储引擎。

1
mysql> show variables like '%storage_engine%';

查看表的存储引擎

1
show table status like "table_name" ;

查看表的存储引擎

MyISAM和InnoDB区别

MyISAM是MySQL的默认数据库引擎(5.5版之前)。虽然性能极佳,而且提供了大量的特性,包括全文索引、压缩、空间函数等,但MyISAM不支持事务和行级锁,而且最大的缺陷就是崩溃后无法安全恢复。不过,5.5版本之后,MySQL引入了InnoDB(事务性数据库引擎),MySQL 5.5版本后默认的存储引擎为InnoDB。

大多数时候我们使用的都是 InnoDB 存储引擎,但是在某些情况下使用 MyISAM 也是合适的比如读密集的情况下。(如果你不介意 MyISAM 崩溃恢复问题的话)。

两者的对比:

  1. 是否支持行级锁 : MyISAM 只有表级锁(table-level locking),而InnoDB 支持行级锁(row-level locking)和表级锁,默认为行级锁。
  2. 是否支持事务和崩溃后的安全恢复: MyISAM 强调的是性能,每次查询具有原子性,其执行速度比InnoDB类型更快,但是不提供事务支持。但是InnoDB 提供事务支持事务,外部键等高级数据库功能。 具有事务(commit)、回滚(rollback)和崩溃修复能力(crash recovery capabilities)的事务安全(transaction-safe (ACID compliant))型表。
  3. 是否支持外键: MyISAM不支持,而InnoDB支持。
  4. 是否支持MVCC :仅 InnoDB 支持。应对高并发事务, MVCC比单纯的加锁更高效;MVCC只在 READ COMMITTEDREPEATABLE READ 两个隔离级别下工作;MVCC可以使用 乐观(optimistic)锁 和 悲观(pessimistic)锁来实现;各数据库中MVCC实现并不统一。推荐阅读:MySQL-InnoDB-MVCC多版本并发控制
  5. ……

《MySQL高性能》上面有一句话这样写到:

不要轻易相信“MyISAM比InnoDB快”之类的经验之谈,这个结论往往不是绝对的。在很多我们已知场景中,InnoDB的速度都可以让MyISAM望尘莫及,尤其是用到了聚簇索引,或者需要访问的数据都可以放入内存的应用。

一般情况下我们选择 InnoDB 都是没有问题的,但是某些情况下你并不在乎可扩展能力和并发能力,也不需要事务支持,也不在乎崩溃后的安全恢复问题的话,选择MyISAM也是一个不错的选择。但是一般情况下,我们都是需要考虑到这些问题的。

字符集及校对规则

字符集指的是一种从二进制编码到某类字符符号的映射。校对规则则是指某种字符集下的排序规则。MySQL中每一种字符集都会对应一系列的校对规则。

MySQL采用的是类似继承的方式指定字符集的默认值,每个数据库以及每张数据表都有自己的默认值,他们逐层继承。比如:某个库中所有表的默认字符集将是该数据库所指定的字符集(这些表在没有指定字符集的情况下,才会采用默认字符集) PS:整理自《Java工程师修炼之道》

详细内容可以参考: MySQL字符集及校对规则的理解

索引

MySQL索引使用的数据结构主要有BTree索引哈希索引 。对于哈希索引来说,底层的数据结构就是哈希表,因此在绝大多数需求为单条记录查询的时候,可以选择哈希索引,查询性能最快;其余大部分场景,建议选择BTree索引。

MySQL的BTree索引使用的是B树中的B+Tree,但对于主要的两种存储引擎的实现方式是不同的。

  • MyISAM: B+Tree叶节点的data域存放的是数据记录的地址。在索引检索的时候,首先按照B+Tree搜索算法搜索索引,如果指定的Key存在,则取出其 data 域的值,然后以 data 域的值为地址读取相应的数据记录。这被称为“非聚簇索引”。
  • InnoDB: 其数据文件本身就是索引文件。相比MyISAM,索引文件和数据文件是分离的,其表数据文件本身就是按B+Tree组织的一个索引结构,树的叶节点data域保存了完整的数据记录。这个索引的key是数据表的主键,因此InnoDB表数据文件本身就是主索引。这被称为“聚簇索引(或聚集索引)”。而其余的索引都作为辅助索引,辅助索引的data域存储相应记录主键的值而不是地址,这也是和MyISAM不同的地方。在根据主索引搜索时,直接找到key所在的节点即可取出数据;在根据辅助索引查找时,则需要先取出主键的值,再走一遍主索引。 因此,在设计表的时候,不建议使用过长的字段作为主键,也不建议使用非单调的字段作为主键,这样会造成主索引频繁分裂。 PS:整理自《Java工程师修炼之道》

更多关于索引的内容可以查看文档首页MySQL目录下关于索引的详细总结。

查询缓存的使用

执行查询语句的时候,会先查询缓存。不过,MySQL 8.0 版本后移除,因为这个功能不太实用

my.cnf加入以下配置,重启MySQL开启查询缓存

1
2
query_cache_type=1
query_cache_size=600000

MySQL执行以下命令也可以开启查询缓存

1
2
set global  query_cache_type=1;
set global query_cache_size=600000;

如上,开启查询缓存后在同样的查询条件以及数据情况下,会直接在缓存中返回结果。这里的查询条件包括查询本身、当前要查询的数据库、客户端协议版本号等一些可能影响结果的信息。因此任何两个查询在任何字符上的不同都会导致缓存不命中。此外,如果查询中包含任何用户自定义函数、存储函数、用户变量、临时表、MySQL库中的系统表,其查询结果也不会被缓存。

缓存建立之后,MySQL的查询缓存系统会跟踪查询中涉及的每张表,如果这些表(数据或结构)发生变化,那么和这张表相关的所有缓存数据都将失效。

缓存虽然能够提升数据库的查询性能,但是缓存同时也带来了额外的开销,每次查询后都要做一次缓存操作,失效后还要销毁。 因此,开启缓存查询要谨慎,尤其对于写密集的应用来说更是如此。如果开启,要注意合理控制缓存空间大小,一般来说其大小设置为几十MB比较合适。此外,还可以通过sql_cache和sql_no_cache来控制某个查询语句是否需要缓存:

1
select sql_no_cache count(*) from usr;

什么是事务?

事务是逻辑上的一组操作,要么都执行,要么都不执行。

事务最经典也经常被拿出来说例子就是转账了。假如小明要给小红转账1000元,这个转账会涉及到两个关键操作就是:将小明的余额减少1000元,将小红的余额增加1000元。万一在这两个操作之间突然出现错误比如银行系统崩溃,导致小明余额减少而小红的余额没有增加,这样就不对了。事务就是保证这两个关键操作要么都成功,要么都要失败。

事物的四大特性(ACID)

事物的特性

  1. 原子性(Atomicity): 事务是最小的执行单位,不允许分割。事务的原子性确保动作要么全部完成,要么完全不起作用;
  2. 一致性(Consistency): 执行事务前后,数据保持一致,多个事务对同一个数据读取的结果是相同的;
  3. 隔离性(Isolation): 并发访问数据库时,一个用户的事务不被其他事务所干扰,各并发事务之间数据库是独立的;
  4. 持久性(Durability): 一个事务被提交之后。它对数据库中数据的改变是持久的,即使数据库发生故障也不应该对其有任何影响。

并发事务带来哪些问题?

在典型的应用程序中,多个事务并发运行,经常会操作相同的数据来完成各自的任务(多个用户对同一数据进行操作)。并发虽然是必须的,但可能会导致以下的问题。

  • 脏读(Dirty read): 当一个事务正在访问数据并且对数据进行了修改,而这种修改还没有提交到数据库中,这时另外一个事务也访问了这个数据,然后使用了这个数据。因为这个数据是还没有提交的数据,那么另外一个事务读到的这个数据是“脏数据”,依据“脏数据”所做的操作可能是不正确的。
  • 丢失修改(Lost to modify): 指在一个事务读取一个数据时,另外一个事务也访问了该数据,那么在第一个事务中修改了这个数据后,第二个事务也修改了这个数据。这样第一个事务内的修改结果就被丢失,因此称为丢失修改。 例如:事务1读取某表中的数据A=20,事务2也读取A=20,事务1修改A=A-1,事务2也修改A=A-1,最终结果A=19,事务1的修改被丢失。
  • 不可重复读(Unrepeatableread): 指在一个事务内多次读同一数据。在这个事务还没有结束时,另一个事务也访问该数据。那么,在第一个事务中的两次读数据之间,由于第二个事务的修改导致第一个事务两次读取的数据可能不太一样。这就发生了在一个事务内两次读到的数据是不一样的情况,因此称为不可重复读。
  • 幻读(Phantom read): 幻读与不可重复读类似。它发生在一个事务(T1)读取了几行数据,接着另一个并发事务(T2)插入了一些数据时。在随后的查询中,第一个事务(T1)就会发现多了一些原本不存在的记录,就好像发生了幻觉一样,所以称为幻读。

不可重复读和幻读区别:

不可重复读的重点是修改比如多次读取一条记录发现其中某些列的值被修改,幻读的重点在于新增或者删除比如多次读取一条记录发现记录增多或减少了。

事务隔离级别有哪些?MySQL的默认隔离级别是?

SQL 标准定义了四个隔离级别:

  • READ-UNCOMMITTED(读取未提交): 最低的隔离级别,允许读取尚未提交的数据变更,可能会导致脏读、幻读或不可重复读
  • READ-COMMITTED(读取已提交): 允许读取并发事务已经提交的数据,可以阻止脏读,但是幻读或不可重复读仍有可能发生
  • REPEATABLE-READ(可重复读): 对同一字段的多次读取结果都是一致的,除非数据是被本身事务自己所修改,可以阻止脏读和不可重复读,但幻读仍有可能发生
  • SERIALIZABLE(可串行化): 最高的隔离级别,完全服从ACID的隔离级别。所有的事务依次逐个执行,这样事务之间就完全不可能产生干扰,也就是说,该级别可以防止脏读、不可重复读以及幻读

隔离级别 脏读 不可重复读 幻影读
READ-UNCOMMITTED
READ-COMMITTED ×
REPEATABLE-READ × ×
SERIALIZABLE × × ×

MySQL InnoDB 存储引擎的默认支持的隔离级别是 REPEATABLE-READ(可重读)。我们可以通过SELECT @@tx_isolation;命令来查看

1
2
3
4
5
6
mysql> SELECT @@tx_isolation;
+-----------------+
| @@tx_isolation |
+-----------------+
| REPEATABLE-READ |
+-----------------+

这里需要注意的是:与 SQL 标准不同的地方在于 InnoDB 存储引擎在 REPEATABLE-READ(可重读) 事务隔离级别下使用的是Next-Key Lock 锁算法,因此可以避免幻读的产生,这与其他数据库系统(如 SQL Server) 是不同的。所以说InnoDB 存储引擎的默认支持的隔离级别是 REPEATABLE-READ(可重读) 已经可以完全保证事务的隔离性要求,即达到了 SQL标准的 SERIALIZABLE(可串行化) 隔离级别。因为隔离级别越低,事务请求的锁越少,所以大部分数据库系统的隔离级别都是 READ-COMMITTED(读取提交内容) ,但是你要知道的是InnoDB 存储引擎默认使用 REPEAaTABLE-READ(可重读) 并不会有任何性能损失。

InnoDB 存储引擎在 分布式事务 的情况下一般会用到 SERIALIZABLE(可串行化) 隔离级别。

锁机制与InnoDB锁算法

MyISAM和InnoDB存储引擎使用的锁:

  • MyISAM采用表级锁(table-level locking)。
  • InnoDB支持行级锁(row-level locking)和表级锁,默认为行级锁

表级锁和行级锁对比:

  • 表级锁: MySQL中锁定 粒度最大 的一种锁,对当前操作的整张表加锁,实现简单,资源消耗也比较少,加锁快,不会出现死锁。其锁定粒度最大,触发锁冲突的概率最高,并发度最低,MyISAM和 InnoDB引擎都支持表级锁。
  • 行级锁: MySQL中锁定 粒度最小 的一种锁,只针对当前操作的行进行加锁。 行级锁能大大减少数据库操作的冲突。其加锁粒度最小,并发度高,但加锁的开销也最大,加锁慢,会出现死锁。

详细内容可以参考: MySQL锁机制简单了解一下:https://blog.csdn.net/qq_34337272/article/details/80611486

InnoDB存储引擎的锁的算法有三种:

  • Record lock:单个行记录上的锁
  • Gap lock:间隙锁,锁定一个范围,不包括记录本身
  • Next-key lock:record+gap 锁定一个范围,包含记录本身

相关知识点:

  1. innodb对于行的查询使用next-key lock
  2. Next-locking keying为了解决Phantom Problem幻读问题
  3. 当查询的索引含有唯一属性时,将next-key lock降级为record key
  4. Gap锁设计的目的是为了阻止多个事务将记录插入到同一范围内,而这会导致幻读问题的产生
  5. 有两种方式显式关闭gap锁:(除了外键约束和唯一性检查外,其余情况仅使用record lock) A. 将事务隔离级别设置为RC B. 将参数innodb_locks_unsafe_for_binlog设置为1

大表优化

当MySQL单表记录数过大时,数据库的CRUD性能会明显下降,一些常见的优化措施如下:

1. 限定数据的范围

务必禁止不带任何限制数据范围条件的查询语句。比如:我们当用户在查询订单历史的时候,我们可以控制在一个月的范围内;

2. 读/写分离

经典的数据库拆分方案,主库负责写,从库负责读;

3. 垂直分区

根据数据库里面数据表的相关性进行拆分。 例如,用户表中既有用户的登录信息又有用户的基本信息,可以将用户表拆分成两个单独的表,甚至放到单独的库做分库。

简单来说垂直拆分是指数据表列的拆分,把一张列比较多的表拆分为多张表。 如下图所示,这样来说大家应该就更容易理解了。

数据库垂直分区

  • 垂直拆分的优点: 可以使得列数据变小,在查询时减少读取的Block数,减少I/O次数。此外,垂直分区可以简化表的结构,易于维护。
  • 垂直拆分的缺点: 主键会出现冗余,需要管理冗余列,并会引起Join操作,可以通过在应用层进行Join来解决。此外,垂直分区会让事务变得更加复杂;

4. 水平分区

保持数据表结构不变,通过某种策略存储数据分片。这样每一片数据分散到不同的表或者库中,达到了分布式的目的。 水平拆分可以支撑非常大的数据量。

水平拆分是指数据表行的拆分,表的行数超过200万行时,就会变慢,这时可以把一张的表的数据拆成多张表来存放。举个例子:我们可以将用户信息表拆分成多个用户信息表,这样就可以避免单一表数据量过大对性能造成影响。

数据库水平拆分

水平拆分可以支持非常大的数据量。需要注意的一点是:分表仅仅是解决了单一表数据过大的问题,但由于表的数据还是在同一台机器上,其实对于提升MySQL并发能力没有什么意义,所以 水平拆分最好分库

水平拆分能够 支持非常大的数据量存储,应用端改造也少,但 分片事务难以解决 ,跨节点Join性能较差,逻辑复杂。《Java工程师修炼之道》的作者推荐 尽量不要对数据进行分片,因为拆分会带来逻辑、部署、运维的各种复杂度 ,一般的数据表在优化得当的情况下支撑千万以下的数据量是没有太大问题的。如果实在要分片,尽量选择客户端分片架构,这样可以减少一次和中间件的网络I/O。

下面补充一下数据库分片的两种常见方案:

  • 客户端代理: 分片逻辑在应用端,封装在jar包中,通过修改或者封装JDBC层来实现。 当当网的 Sharding-JDBC 、阿里的TDDL是两种比较常用的实现。
  • 中间件代理: 在应用和数据中间加了一个代理层。分片逻辑统一维护在中间件服务中。 我们现在谈的 Mycat 、360的Atlas、网易的DDB等等都是这种架构的实现。

详细内容可以参考: MySQL大表优化方案: https://segmentfault.com/a/1190000006158186

解释一下什么是池化设计思想。什么是数据库连接池?为什么需要数据库连接池?

池化设计应该不是一个新名词。我们常见的如java线程池、jdbc连接池、redis连接池等就是这类设计的代表实现。这种设计会初始预设资源,解决的问题就是抵消每次获取资源的消耗,如创建线程的开销,获取远程连接的开销等。就好比你去食堂打饭,打饭的大妈会先把饭盛好几份放那里,你来了就直接拿着饭盒加菜即可,不用再临时又盛饭又打菜,效率就高了。除了初始化资源,池化设计还包括如下这些特征:池子的初始值、池子的活跃值、池子的最大值等,这些特征可以直接映射到java线程池和数据库连接池的成员属性中。这篇文章对池化设计思想介绍的还不错,直接复制过来,避免重复造轮子了。

数据库连接本质就是一个 socket 的连接。数据库服务端还要维护一些缓存和用户权限信息之类的 所以占用了一些内存。我们可以把数据库连接池是看做是维护的数据库连接的缓存,以便将来需要对数据库的请求时可以重用这些连接。为每个用户打开和维护数据库连接,尤其是对动态数据库驱动的网站应用程序的请求,既昂贵又浪费资源。在连接池中,创建连接后,将其放置在池中,并再次使用它,因此不必建立新的连接。如果使用了所有连接,则会建立一个新连接并将其添加到池中。 连接池还减少了用户必须等待建立与数据库的连接的时间。

分库分表之后,id 主键如何处理?

因为要是分成多个表之后,每个表都是从 1 开始累加,这样是不对的,我们需要一个全局唯一的 id 来支持。

生成全局 id 有下面这几种方式:

  • UUID:不适合作为主键,因为太长了,并且无序不可读,查询效率低。比较适合用于生成唯一的名字的标示比如文件的名字。
  • 数据库自增 id : 两台数据库分别设置不同步长,生成不重复ID的策略来实现高可用。这种方式生成的 id 有序,但是需要独立部署数据库实例,成本高,还会有性能瓶颈。
  • 利用 redis 生成 id : 性能比较好,灵活方便,不依赖于数据库。但是,引入了新的组件造成系统更加复杂,可用性降低,编码更加复杂,增加了系统成本。
  • Twitter的snowflake算法 :Github 地址:https://github.com/twitter-archive/snowflake。
  • 美团的Leaf分布式ID生成系统 :Leaf 是美团开源的分布式ID生成器,能保证全局唯一性、趋势递增、单调递增、信息安全,里面也提到了几种分布式方案的对比,但也需要依赖关系数据库、Zookeeper等中间件。感觉还不错。美团技术团队的一篇文章:https://tech.meituan.com/2017/04/21/mt-leaf.html
  • ……

一条SQL语句在MySQL中如何执行的

一条SQL语句在MySQL中如何执行的

MySQL高性能优化规范建议

MySQL高性能优化规范建议

一条SQL语句执行得很慢的原因有哪些?

腾讯面试:一条SQL语句执行得很慢的原因有哪些?—不看后悔系列