BlockingQueue

BlockingQueue即阻塞队列,当一个线程要对一个空的队列进行出队操作或对一个满的队列进行入队操作时,该线程会被阻塞。
当我们在程序中需要自己实现一个阻塞队列时,我们需要去考虑不同的线程对队列进行入队出队操作时如何正确的进行高效的协作以及线程安全问题等等细节,但是有了J.U.C为我们提供的BlockingQueue接口以及它的各种实现类,事情就变得简单多了。
BlockingQueue接口提供了以下方法
BlockingQueue
下面挑选几个方法介绍它们的作用:

  • add:加入一个元素到队列中,如果队列已满则无法插入,并立即抛出一个IllegalStateException
  • offer:分为不带timeout参数和带timeout参数的两个版本。不带timeout参数的offer方法会在队列已满时返回fasle,而带timeout参数的offer方法只会让线程阻塞指定的时间,到达超时时间后,则会根据是否入队成功返回boolean
  • put:加入一个元素到队列中,如果队列已满则阻塞线程
  • take:取出队头元素,如果队列为空则阻塞线程
  • poll:该方法带有tiemout参数,作用是取出队头元素,如果队列为空,则让线程阻塞指定的时间
  • remove:移除队列中的指定元素
  • contains:判断队列中是否有指定元素存在

知道了BlockingQueue接口中的方法的作用,我们来看它的实现类中是如何实现这些方法的。下面选择ArrayBlockingQueueLinkedBlockingQueue来分析,源码基于jdk1.8.0_66

ArrayBlockingQueue

顾名思义,ArrayBlockingQueue是一个基于数组实现的阻塞队列。先来看它有哪些成员变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 存放队列元素的数组
final Object[] items;

// 下一次出队元素的索引
int takeIndex;

// 下一次入队元素的索引
int putIndex;

// 队列中的元素个数
int count;

// 一个可重入锁来保证线程安全
final ReentrantLock lock;

// 队列不为空的条件
private final Condition notEmpty;

// 队列不为满的条件
private final Condition notFull;

由此我们可以知道,ArrayBlockingQueue基于数组实现,有两个索引分别指向下次出队及入队的元素的位置,并使用ReentrantLock进行并发控制,两个Condition对线程之间的协作进行控制。
下面来看它的三个构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// 仅指定队列的长度
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}

/**
* 指定队列的长度及锁的类型
* 如果fair为true,则表示使用公平锁,即会按线程的阻塞顺序唤醒线程
* fair默认为false,唤醒的顺序是不确定的
*/

public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}

// 还支持传入一个集合来初始化队列
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c)
{

this(capacity, fair);

final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion
try {
int i = 0;
try {
for (E e : c) {
checkNotNull(e);
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}

下面来看入队的方法,由于入队的方法在思路上大同小异,下面只选了两种方法来分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public void put(E e) throws InterruptedException {
// 检查元素是否为null,会抛出NullPointerException
checkNotNull(e);
final ReentrantLock lock = this.lock;
/**
* 获取可中断的锁,线程在被阻塞时允许被中断
* 对几种锁的区别还不是很了解,之后会写一篇文章进行学习,先挖个坑
*/

lock.lockInterruptibly();
try {
while (count == items.length)
// 当队列已满,线程阻塞,等待队列不为满的条件
notFull.await();
// 条件成立,进行入队操作
enqueue(e);
} finally {
lock.unlock();
}
}

public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {


checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) {
// 如果队列已满,等待阻塞timeout指定的时间,如果notFull的条件一直没有成立,则直接返回false
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}

接下来看看enqueue方法的实现

1
2
3
4
5
6
7
8
9
10
11
12
private void enqueue(E x) {
final Object[] items = this.items;
// 将元素放到putIndex的位置
items[putIndex] = x;
// putIndex自增,如果下次入队的位置已经等于队列的容量,那么putIndex置为0,类似一个循环队列
if (++putIndex == items.length)
putIndex = 0;
// 元素数量 + 1
count++;
// 入队操作后,队列必不为空,因此可以唤醒因等待队列非空条件而被阻塞的线程
notEmpty.signal();
}

下面来看出队的一系列方法,同样选取两个方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public E poll() {
final ReentrantLock lock = this.lock;
// 获取锁
lock.lock();
try {
// 队列为空,直接返回null,否则进行出队操作
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}

public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 获取可中断的锁
lock.lockInterruptibly();
try {
// 如果第一列为空,阻塞线程,等待非空条件成立
while (count == 0)
notEmpty.await();
// 出队操作
return dequeue();
} finally {
lock.unlock();
}
}

下面来看dequeue方法的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private E dequeue() {
final Object[] items = this.items;
// 获取takeIndex位置的元素
E x = (E) items[takeIndex];
// 置为null
items[takeIndex] = null;
// takeIndex自增,如果和队列容量相同,则将takeIndex置为0
if (++takeIndex == items.length)
takeIndex = 0;
// 元素个数 - 1
count--;
if (itrs != null)
itrs.elementDequeued();
// 进行出队操作后,队列必不为满,因此可以唤醒因等待队列不为满条件而被阻塞的线程
notFull.signal();
return x;
}

ArrayBlockingQueue的分析先到这里。

LinkedBlockingQueue

同样的,先来看有哪些成员变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 队列的容量,若不指定,则默认为Integer.MAX_VALUE
private final int capacity;

/**
* 我们注意到,不像ArrayBlockingQueue使用int来作为队列的计数器,
* LinkedBlockingQueue使用的是AtomicInteger,
* 这样做的原因是ArrayBlockingQueue只用1个锁来进行并发控制,
* 线程对count的操作都在获取同一个锁的前提下,因此不会有线程安全问题,
* 而LinkedBlockingQueue使用了两个锁,不同的线程对count的增减不在同一个锁对象的控制之下,
* 所以使用AtomicInteger来确保线程安全。
*/

private final AtomicInteger count = new AtomicInteger();

// 链表头
transient Node<E> head;

// 链表尾
private transient Node<E> last;

// 元素出队列时线程持有的锁
private final ReentrantLock takeLock = new ReentrantLock();

// 出队列时,队列非空条件
private final Condition notEmpty = takeLock.newCondition();

// 元素入队列时线程持有的锁
private final ReentrantLock putLock = new ReentrantLock();

// 入队列时,队列不为满条件
private final Condition notFull = putLock.newCondition();

再来看链表中节点的结构

1
2
3
4
5
6
7
8
// 静态内部类,非常简单,一个值域,一个指向下一个节点的域
static class Node<E> {
E item;

Node<E> next;

Node(E x) { item = x; }
}

下面来看构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 没有指定队列长度时,默认队列长度为Integer.MAX_VALUE
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}

// 指定队列长度,初始化链表,链头和链尾都为null
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}

// 还可以传入一个集合进行队列的初始化
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}

下面来看put方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public void put(E e) throws InterruptedException {
// 插入的元素为null,抛出NullPointerException
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
/**
* 获取入队列的锁对象
* 获取count
*/

final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
// 持有锁
putLock.lockInterruptibly();
try {
// 队列已满,线程阻塞,等待队列不为满的条件成立
while (count.get() == capacity) {
notFull.await();
}
// 入队列操作,enqueue不再分析,就是对链表的操作
enqueue(node);
// count原子性自增
// 此时c的值为元素入队列前队列的元素个数
c = count.getAndIncrement();
// c + 1,如果此时元素个数小于队列容量,则唤醒因等待队列不为满而被阻塞的要执行入队列操作的线程
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
/**
* c为0说明该元素入队前,队列为空,执行出队列操作的线程全都被阻塞,
* 次数入队列一个元素,队列不为空,那么就可以唤醒被阻塞的线程了
*/

if (c == 0)
signalNotEmpty();
}

private void signalNotEmpty() {
// 获取出队列时的锁对象
final ReentrantLock takeLock = this.takeLock;
// 加锁
takeLock.lock();
try {
// 唤醒因等待notEmpty条件而被阻塞的线程
notEmpty.signal();
} finally {
takeLock.unlock();
}
}

再来看poll方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public E poll() {
// 队列元素个数计数器
final AtomicInteger count = this.count;
// 元素个数为0则返回null
if (count.get() == 0)
return null;
E x = null;
int c = -1;
// 出队列操作的锁对象
final ReentrantLock takeLock = this.takeLock;
// 加锁
takeLock.lock();
try {
// 元素个数大于0,则进行出队操作
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
// c大于1说明在出队列操作之前,队列中有元素,则唤醒因等待notEmpty条件而被阻塞的进行出队列操作的线程
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
// 出队列操作之前,队列中的元素等于容量,此时出队列操作完成,则队列必不为满,则可以唤醒因等待notFull条件而被阻塞的进行入队列操作的线程
if (c == capacity)
signalNotFull();
return x;
}

private void signalNotFull() {
// 入队列操作的锁对象
final ReentrantLock putLock = this.putLock;
// 加锁
putLock.lock();
try {
// 唤醒因等待notFull条件而被阻塞的进行入队列操作的线程
notFull.signal();
} finally {
putLock.unlock();
}
}

对于LinkedBlockingQueue的分析先到这里。两种队列的入队出队方法很多,但是实现上大同小异,仅仅是语义上的不同,每个方法都要分析十分繁琐。

总结

最后总结一下ArrayBlockingQueue和LinkedBlockingQueue的异同

  • 相同点:都实现了BlockingQueue接口,因此他们的各种出队列、入队列的方法在语义上都是相同的
  • 不同点
    1. ArrayBlockingQueue基于数组实现,LinkedBlockingQueue基于链表实现
    2. ArrayBlockingQueue必须要在初始化时指定队列的长度,那么可以认为他是一个有界的队列,它的性能可预测性更强。而LinkedBlockingQueue可以不指定队列长度,可以视为一个无界的队列,则它的可伸缩性更强,但是有可能会因为不停的入队而发生内存泄漏
    3. ArrayBlockingQueue用同一个锁控制出队和入队,而LinkedBlockingQueue分别使用入队的锁和出队的锁,其入队和出队操作可以同时进行,因此吞吐量更大