0%

java并发容器和框架

java并发容器和框架

ConcurrentHashMap的实现原理和使用

ConcurrentHashMap是线程安全且高效的HashMap

为什么使用ConcurrentHashMap

HashMap可能导致死循环,而HashTable效率非常低下

线程不安全的HashMap

在多线程环境下,使用HashMap进行put操作会导致死循环。

final HashMap<String, String> map = new HashMap<>();
        Thread thread = new Thread(() -> {
            for (int i = 0; i < 10000; i++) {
                new Thread(() -> {
                    map.put(UUID.randomUUID().toString(), "");
                }, "ftf" + i).start();
            }

        }, "ftf");

        thread.start();
        thread.join();

原因在于多线程会导致HashMap的Entry链表形成环形数据结构。而形成环形数据结构,Entry的next节点永远不为空,就会死循环

效率低下的HashTable

HashTable使用Synchronized保证线程安全,当一个线程访问HashTable的同步方法,其他线程就不能访问HashTable的的方法

ConcurrentHashMap使用锁分段技术提高访问效率

ConcurrentHashMap将数据分成一段一段的保存,每一个段数据配一把锁,当一个线程占用锁的数据时候,其他段的数据还是能被其他线程访问

ConcurrentHashMap的数据结构

ConcurrentHashMap由Segment数组和HashEntry数组组成。Segment是可重入锁。HashEntry是存储键值对的数据。一个ConcurrentHashMap包含一个Segment数组,每一个Segment包含一个HashEntry数组,当对HashEntry修改时,需要先获取对一个的Segment锁

ConcurrentLinkedQueue

线程安全的队列。

如果要实现线程安全的队列有两种方式:阻塞算法和非阻塞算法。阻塞算法的独列可以使用一个锁(入队和出队是同一把锁)或两个锁。另一种是非阻塞算法,非阻塞算法的实现方式是使用循环CAS的方式来实现的。ConcurrentLinkedQueue是使用非阻塞的方式实现线程安全队列的

ConcurrentLinkedQueue的结构

ConcurrentLinkedQueue由head和tail节点构成,每个节点包含节点元素和下一个节点(next)构成。

入队列

入队列就是将队列节点添加到队列的尾部。

入队主要做两件事情:将入队节点设置为当前队尾节点的下一个节点。更新tail节点,如果tail节点的next节点不为空,则将入队节点设置成tail节点,如果tail节点的next节点为空,则将入队节点设置成tail的next节点,所以tail节点不总是尾节点。

public boolean offer(E e) {
        checkNotNull(e);
        // 创建一个入队节点
        final Node<E> newNode = new Node<E>(e);
        // 死循环,入队不成功反复入队
        for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;
            // 判断tail节点的next是否为空,为空则将入队节点设置成tail节点的next节点
            if (q == null) {
                // p is last node
                // 更新tail节点的next节点
                if (p.casNext(null, newNode)) {
                    // Successful CAS is the linearization point
                    // for e to become an element of this queue,
                    // and for newNode to become "live".
                    if (p != t) // hop two nodes at a time
                        casTail(t, newNode);  // Failure is OK.
                    return true;
                }
                // Lost CAS race to another thread; re-read next
            }
            // tail节点的next节点不为空,则将入队节点设置成tail节点
            else if (p == q)
                // We have fallen off list.  If tail is unchanged, it
                // will also be off-list, in which case we need to
                // jump to head, from which all live nodes are always
                // reachable.  Else the new tail is a better bet.
                p = (t != (t = tail)) ? t : head;
            else
                // Check for tail updates after two hops.
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }

出队列

初队列就是从队列中返回一个节点元素,并清空该节点对元素的引用。

并不是每次出队都会更新head节点,当head节点有元素时,直接弹出head节点里的元素,而不更新head节点。当head节点没有元素时,出队操作才会更新head节点。

这样的作法也是通过hops变量减少使用CAS更新head节点的消耗,从而提高出对效率。

public E poll() {
        restartFromHead:
        for (;;) {
            for (Node<E> h = head, p = h, q;;) {
                // 获取head节点的元素
                E item = p.item;
                // 如果元素不为空,使用CAS设置p节点引用的元素为null
                if (item != null && p.casItem(item, null)) {
                    // Successful CAS is the linearization point
                    // for item to be removed from this queue.
                    if (p != h) // hop two nodes at a time
                        updateHead(h, ((q = p.next) != null) ? q : p);
                    return item;
                }
                // 如果head节点的下一个节点空,则说明这个队列为空
                else if ((q = p.next) == null) {
                    updateHead(h, p);
                    return null;
                }
                else if (p == q)
                    continue restartFromHead;
                else
                    p = q;
            }
        }
    }

java中的阻塞队列

什么是阻塞队列

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作支持阻塞的插入和移除方法。

  • 支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满。
  • 支持阻塞的移除方法:意思是队列为空时,获取元素的线程会等待队列变成非空。
  • 阻塞队列常用于生产者和消费者的场景,生产者向队列里加元素,消费者西安队列中获取元素。阻塞队列是获取存放元素的容器。
方法/处理方式 抛出异常 返回特殊值 一直阻塞 超时退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove(e) poll() take() poll(time,unit)
检查方法 element() peek 不可用 不可用
  • 抛出异常:当队列满时,插入元素会报错,队列空时,获取元素也会报错。
  • 返回特殊值:插入元素,返回元素是否插入成功,成功为true

java中的阻塞队列

  • ArrayBlockingQueue:一个由数组结构构成的有界阻塞队列

  • LinkedBlockingQueue:一个由链表结构组成的游街阻塞队列

  • PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列

  • DelayQueue:一个使用优先级队列实现的无界阻塞队列

  • SynchronousQueue:一个不存储元素的阻塞队列

  • LinkedTransferQueue:一个由链表结构组成的无界阻塞队列

  • LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列

阻塞队列的实现原理

使用通知模式实现

public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();
        return x;
    }

当生产者往满的队列中添加元素时,会阻塞住生产者,当消费者消费了一个队列中的元素时,会通知生产者当前队列可用。

Fork/join框架

什么是Fork/Join框架?

  • fork/join 可以将一个大人物分割成若干小惹怒我,最终汇总每个小任务结果后得到大任务结果的框架

工作窃取算法

工作窃取算法是指某个线程从其他队列中窃取任务来执行。假如我们需要做一个比较大的任务,可以将任务分割成若干会不依赖的小任务,为了减少线程中的竞争,把这些子任务分别放到不同的队列中。并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应。有的线程干得快,有的干得慢,于是干得快的线程就去干得慢的线程队列里获取任务,为了减少竞争,通常使用双端队列,被窃取任务的线程从头部获取任务,窃取任务的线程从尾部获取任务。

fork/join框架的设计

  • 分割任务。需要有一个fork类来把分割任务。
  • 执行任务并合并结果。分割的子任务放到双端队列中,然后启动线程分别从双端队列中获取任务执行。子任务执行完的结果统一放到一个队列中。启动一个线程从队列中获取数据,合并数据。

fork/join使用两个类完成这个事情。

  • ForkJoinTask
  • 两个子类:
    • RecursiveAction:用于没有返回结果的任务
    • RecursiveTask:用于有结果返回的任务

使用fork/join框架

public class CountTask extends RecursiveTask<Integer> {

    private static final int THREADSHOLD = 2;

    private int start;
    private int end;

    public CountTask(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int sum = 0;
        // 如果任务足够小就计算任务
        boolean canCompute = (end - start) <= THREADSHOLD;
        if (canCompute) {
            for (int i = start; i <= end; i++) {
                sum += i ;
            }
        } else {
            // 如果任务大于阈值,就分裂成两个任务计算
            int middle = (start + end) / 2;
            CountTask leftTask = new CountTask(start, middle);
            CountTask rightTask = new CountTask(middle, end);
            leftTask.fork();
            rightTask.fork();
            // 等待子任务执行完得到结果
            int leftResult = leftTask.join();
            int rightResult = rightTask.join();

            sum = leftResult + rightResult;
        }

        return sum;
    }

    public static void main(String[] args) {

        ForkJoinPool forkJoinPool = new ForkJoinPool();
        // 生成一个计算任务,负责计算
        CountTask task = new CountTask(1, 4);
        ForkJoinTask<Integer> result = forkJoinPool.submit(task);
        try {
            // 获得结果
            System.out.println(result.get());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

fork/join的框架异常处理

fork/join 可能在执行时抛出异常,但是我们没办法在主线程里直接捕获异常,所以可以通过ForkJoinTask::isCompletedAbormally方法检查是否抛出异常。

~~