java并发容器和框架
ConcurrentHashMap的实现原理和使用
ConcurrentHashMap是线程安全且高效的HashMap
为什么使用ConcurrentHashMap
HashMap可能导致死循环,而HashTable效率非常低下
线程不安全的HashMap
在多线程环境下,使用HashMap进行put操作会导致死循环。
final HashMap<String, String> map = new HashMap<>();
Thread thread = new Thread(() -> {
for (int i = 0; i < 10000; i++) {
new Thread(() -> {
map.put(UUID.randomUUID().toString(), "");
}, "ftf" + i).start();
}
}, "ftf");
thread.start();
thread.join();
原因在于多线程会导致HashMap的Entry链表形成环形数据结构。而形成环形数据结构,Entry的next节点永远不为空,就会死循环
效率低下的HashTable
HashTable使用Synchronized保证线程安全,当一个线程访问HashTable的同步方法,其他线程就不能访问HashTable的的方法
ConcurrentHashMap使用锁分段技术提高访问效率
ConcurrentHashMap将数据分成一段一段的保存,每一个段数据配一把锁,当一个线程占用锁的数据时候,其他段的数据还是能被其他线程访问
ConcurrentHashMap的数据结构
ConcurrentHashMap由Segment数组和HashEntry数组组成。Segment是可重入锁。HashEntry是存储键值对的数据。一个ConcurrentHashMap包含一个Segment数组,每一个Segment包含一个HashEntry数组,当对HashEntry修改时,需要先获取对一个的Segment锁
ConcurrentLinkedQueue
线程安全的队列。
如果要实现线程安全的队列有两种方式:阻塞算法和非阻塞算法。阻塞算法的独列可以使用一个锁(入队和出队是同一把锁)或两个锁。另一种是非阻塞算法,非阻塞算法的实现方式是使用循环CAS的方式来实现的。ConcurrentLinkedQueue是使用非阻塞的方式实现线程安全队列的
ConcurrentLinkedQueue的结构
ConcurrentLinkedQueue由head和tail节点构成,每个节点包含节点元素和下一个节点(next)构成。
入队列
入队列就是将队列节点添加到队列的尾部。
入队主要做两件事情:将入队节点设置为当前队尾节点的下一个节点。更新tail节点,如果tail节点的next节点不为空,则将入队节点设置成tail节点,如果tail节点的next节点为空,则将入队节点设置成tail的next节点,所以tail节点不总是尾节点。
public boolean offer(E e) {
checkNotNull(e);
// 创建一个入队节点
final Node<E> newNode = new Node<E>(e);
// 死循环,入队不成功反复入队
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
// 判断tail节点的next是否为空,为空则将入队节点设置成tail节点的next节点
if (q == null) {
// p is last node
// 更新tail节点的next节点
if (p.casNext(null, newNode)) {
// Successful CAS is the linearization point
// for e to become an element of this queue,
// and for newNode to become "live".
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.
return true;
}
// Lost CAS race to another thread; re-read next
}
// tail节点的next节点不为空,则将入队节点设置成tail节点
else if (p == q)
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
p = (t != (t = tail)) ? t : head;
else
// Check for tail updates after two hops.
p = (p != t && t != (t = tail)) ? t : q;
}
}
出队列
初队列就是从队列中返回一个节点元素,并清空该节点对元素的引用。
并不是每次出队都会更新head节点,当head节点有元素时,直接弹出head节点里的元素,而不更新head节点。当head节点没有元素时,出队操作才会更新head节点。
这样的作法也是通过hops变量减少使用CAS更新head节点的消耗,从而提高出对效率。
public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
// 获取head节点的元素
E item = p.item;
// 如果元素不为空,使用CAS设置p节点引用的元素为null
if (item != null && p.casItem(item, null)) {
// Successful CAS is the linearization point
// for item to be removed from this queue.
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
// 如果head节点的下一个节点空,则说明这个队列为空
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
java中的阻塞队列
什么是阻塞队列
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作支持阻塞的插入和移除方法。
- 支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满。
- 支持阻塞的移除方法:意思是队列为空时,获取元素的线程会等待队列变成非空。
- 阻塞队列常用于生产者和消费者的场景,生产者向队列里加元素,消费者西安队列中获取元素。阻塞队列是获取存放元素的容器。
方法/处理方式 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
---|---|---|---|---|
插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除方法 | remove(e) | poll() | take() | poll(time,unit) |
检查方法 | element() | peek | 不可用 | 不可用 |
- 抛出异常:当队列满时,插入元素会报错,队列空时,获取元素也会报错。
- 返回特殊值:插入元素,返回元素是否插入成功,成功为true
java中的阻塞队列
ArrayBlockingQueue:一个由数组结构构成的有界阻塞队列
LinkedBlockingQueue:一个由链表结构组成的游街阻塞队列
PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列
DelayQueue:一个使用优先级队列实现的无界阻塞队列
SynchronousQueue:一个不存储元素的阻塞队列
LinkedTransferQueue:一个由链表结构组成的无界阻塞队列
LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列
阻塞队列的实现原理
使用通知模式实现
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
当生产者往满的队列中添加元素时,会阻塞住生产者,当消费者消费了一个队列中的元素时,会通知生产者当前队列可用。
Fork/join框架
什么是Fork/Join框架?
- fork/join 可以将一个大人物分割成若干小惹怒我,最终汇总每个小任务结果后得到大任务结果的框架
工作窃取算法
工作窃取算法是指某个线程从其他队列中窃取任务来执行。假如我们需要做一个比较大的任务,可以将任务分割成若干会不依赖的小任务,为了减少线程中的竞争,把这些子任务分别放到不同的队列中。并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应。有的线程干得快,有的干得慢,于是干得快的线程就去干得慢的线程队列里获取任务,为了减少竞争,通常使用双端队列,被窃取任务的线程从头部获取任务,窃取任务的线程从尾部获取任务。
fork/join框架的设计
- 分割任务。需要有一个fork类来把分割任务。
- 执行任务并合并结果。分割的子任务放到双端队列中,然后启动线程分别从双端队列中获取任务执行。子任务执行完的结果统一放到一个队列中。启动一个线程从队列中获取数据,合并数据。
fork/join使用两个类完成这个事情。
- ForkJoinTask
- 两个子类:
- RecursiveAction:用于没有返回结果的任务
- RecursiveTask:用于有结果返回的任务
使用fork/join框架
public class CountTask extends RecursiveTask<Integer> {
private static final int THREADSHOLD = 2;
private int start;
private int end;
public CountTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
// 如果任务足够小就计算任务
boolean canCompute = (end - start) <= THREADSHOLD;
if (canCompute) {
for (int i = start; i <= end; i++) {
sum += i ;
}
} else {
// 如果任务大于阈值,就分裂成两个任务计算
int middle = (start + end) / 2;
CountTask leftTask = new CountTask(start, middle);
CountTask rightTask = new CountTask(middle, end);
leftTask.fork();
rightTask.fork();
// 等待子任务执行完得到结果
int leftResult = leftTask.join();
int rightResult = rightTask.join();
sum = leftResult + rightResult;
}
return sum;
}
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
// 生成一个计算任务,负责计算
CountTask task = new CountTask(1, 4);
ForkJoinTask<Integer> result = forkJoinPool.submit(task);
try {
// 获得结果
System.out.println(result.get());
} catch (Exception e) {
e.printStackTrace();
}
}
}
fork/join的框架异常处理
fork/join 可能在执行时抛出异常,但是我们没办法在主线程里直接捕获异常,所以可以通过ForkJoinTask::isCompletedAbormally方法检查是否抛出异常。