300字范文,内容丰富有趣,生活中的好帮手!
300字范文 > Java并发编程学习记录

Java并发编程学习记录

时间:2021-09-07 14:17:56

相关推荐

Java并发编程学习记录

Java并发编程汇总

并发问题的分解多线程并发的特性volatile在并发编程中可能出现的问题:管程wait() 的正确姿势notify() 何时可以使用在使用多线程编程的时候,开启多少线程呢为什么局部变量是线程安全的?递归栈溢出的原因?解决并发问题的步骤?Java的 synchronized 也是管程的一种实现,既然 Java 从语言层面已经实现了管程了,那为什么还要在 SDK 里提供另外一种实现呢?Lock如何保证可见性?公平锁与非公平锁用锁的三大最佳实践信号量信号量的模型如何使用信号量读写锁 ReadWriteLock什么是读写锁StampedLockCountDownLatch 和 CyclicBarrier并发容器ListMapQueue原子类原子化的基本数据类型原子化的对象引用类型原子化数组原子化对象属性更新器原子化的累加器java 线程池获取任务的执行结果使用CompletableFutureFork/Join

并发问题的分解

synchronized、wait()、notify() 不过是操作系统领域里管程模型的一种实现。

并发编程可以总结成三个核心问题:分工、同步、互斥。

分工:指的是如何高效地拆解任务并分配给线程,而同步指的是线程之间如何协作,互斥则是保证同一时刻只允许一个线程访问共享资源。。Java SDK 并发包很大部分内容都是按照这三个维度组织的,例如 Fork/Join 框架就是一种分工模式,CountDownLatch 就是一种典型的同步方式,而可重入锁则是一种互斥手段。

同步:的同步,主要指的就是线程间的协作,本质上和现实生活中的协作没区别,不过是一个线程执行完了一个任务,如何通知执行后续任务的线程开工而已。Java SDK 里提供的 CountDownLatch、CyclicBarrier、Phaser、Exchanger 也都是用来解决线程协作问题的。管程是解决并发问题的万能钥匙。

互斥,指的是同一时刻,只允许一个线程访问共享变量。Java SDK 里提供的 ReadWriteLock、StampedLock就可以优化读多写少场景下锁的性能。还可以使用无锁的数据结构,例如 Java SDK 里提供的原子类都是基于无锁技术实现的。还有一些其他的方案,原理是不共享变量或者变量只允许读。这方面,Java 提供了 Thread Local 和 final 关键字,还有一种 Copy-on-write 的模式。

多线程并发的特性

一个线程对共享变量的修改,另外一个线程能够立刻看到,我们称为可见性。

我们把一个或者多个操作在 CPU 执行的过程中不被中断的特性称为原子性。

Java中语句执行的顺序,可能会会被编译器重新排列的特性被称为有序性。

缓存导致的可见性问题,线程切换带来的原子性问题,编译优化带来的有序性问题,

volatile

volatile 禁用CPU缓存和编译优化来保证可见性和有序性。Java 内存模型对 final 类型变量的重排进行了约束。

Happens-Before规则来保证前面一个操作的结果对后续操作是可见的。:Happens-Before 约束了

编译器的优化行为,虽允许编译器优化,但是要求编译器优化后一定遵守 Happens-Before 规则。具体规则如下所示:

程序的顺序性规则:这条规则是指在一个线程中,按照程序顺序,前面的操作 Happens-Before 于后续的任意操作。volatile变量规则:指对一个 volatile 变量的写操作, Happens-Before 于后续对这个 volatile 变量的读操作。传递性:是指如果 A Happens-Before B,且 B Happens-Before C,那么 A Happens-Before C。管程中锁的规则:一个锁的解锁 Happens-Before 于后续对这个锁的加锁。管程是一种通用的同步原语,在Java 中指的就是 synchronized,synchronized 是 Java 里对管程的实现。线程 start() 规则:主线程 A 启动子线程 B 后,子线程 B 能够看到主线程在启动子线程B前的操作。如果线程 A 调用线程 B 的 start() 方法(即在线程 A 中启动线程 B),那么该 start() 操作 Happens-Before 于线程 B 中的任意操作。线程 join() 规则主线程 A 等待子线程 B 完成(主线程 A 通过调用子线程 B的 join() 方法实现),当子线程 B 完成后(主线程 A 中 join() 方法返回),主线程能够看到子线程的操作。当然所谓的“看到”,指的是对共享变量的操作。线程中断规则:对线程interrupt()方法的调用先行发生于被中断线程的代码检测到中断事

件的发生,可以通过Thread.interrupted()方法检测到是否有中断发生。对象终结规则:一个对象的初始化完成(构造函数执行结束)先行发生于它的finalize()方法的

在并发编程中可能出现的问题:

竞态条件:指的是程序的执行结果依赖线程执行的顺序活跃性问题:“死锁”就是一种典型的活跃性问题,当然除了死锁外,还有两种情况,分别是“活锁”和“饥饿”。有时线程虽然没有发生阻塞,但仍然会存在执行不下去的情况,这就是所谓的“活锁”,解决“活锁”的方案很简单,调度的时候尝试等待一个随机的时间就可以了。

所谓“饥饿”指的是线程因无法访问所需资源而无法执行下去的情况。解决“饥饿”问题的方案很简单,有三种方案:一是保证资源充足,二是公平地分配资源,三就是避免持有锁的线程长时间执行。这三个方案中,方案一和方案三的适用场景比较有限,因为很多场景下,资源的稀缺性是没办法解决的,持有锁的线程执行的时间也很难缩短。倒是方案二的适用场景相对来说更多一些。性能 吞吐量、延迟和并发量

管程

管程:管理共享变量以及对共享变量的操作过程,让他们支持并发。

管程解决互斥问题的思路很简单,就是将共享变量及其对共享变量的操作统一封装起来。管程 X 将共享变量 queue 这个队列和相关的操作入队 enq()、出队 deq() 都封装起来了;线程 A 和线程 B 如果想访问共享变量 queue,只能通过调用管程提供的 enq()、deq() 方法来实现;enq()、deq() 保证互斥性,只允许一个线程进入管程。

wait() 的正确姿势

Hasen 模型、Hoare 模型和 MESA 模型的一个核心区别就是当条件满足后,如何通知相关线程。

Hasen 模型里面,要求 notify() 放在代码的最后,这样 T2 通知完 T1 后,T2 就结束了,然后 T1 再执行,这样就能保证同一时刻只有一个线程执行。Hoare 模型里面,T2 通知完 T1 后,T2 阻塞,T1 马上执行;等 T1 执行完,再唤醒T2,也能保证同一时刻只有一个线程执行。但是相比 Hasen 模型,T2 多了一次阻塞唤醒操作。MESA 管程里面,T2 通知完 T1 后,T2 还是会接着执行,T1 并不立即执行,仅仅是从条件变量的等待队列进到入口等待队列里面。这样做的好处是 notify() 不用放到代码的最后,T2 也没有多余的阻塞唤醒操作。但是也有个副作用,就是当 T1 再次执行的时候,可能曾经满足的条件,现在已经不满足了,所以需要以循环方式检验条件变量。

Mesa管程模型特有的编程范式是

while(条件不满足) {wait();}

notify() 何时可以使用

除非经过深思熟虑,否则尽量使用 notifyAll()。那什么时候可以使用 notify() 呢?需

要满足以下三个条件:

所有等待线程拥有相同的等待条件;所有等待线程被唤醒后,执行相同的操作;只需要唤醒一个线程。

在使用多线程编程的时候,开启多少线程呢

cpu密集型的计算场景理论上是线程的数量=cpu的核心数量。但是在工程上线程的数量一般会设置为cpu核数+1.这样的话,当线程以为偶尔的内存也失效或者其他原因导致阻塞时,这个额外线程可以顶上,从而保障cpu的利用率

IO密集型的计算场景 可以使用如下公式来进行计算:

线程数=[ 1+(io耗时/cpu耗时)]* cpu核数

为什么局部变量是线程安全的?

因为每个线程都有自己的调用栈,局部变量保存在线程各自的调用栈里面,不会共享,所以自然也就没有并发问题。再次重申一遍:没有共享,就没有伤害。使用局部变量来解决并发问题也叫线程封闭技术。

采用线程封闭技术的案例非常多,例如从数据库连接池里获取的连接 Connection,在JDBC 规范里并没有要求这个 Connection 必须是线程安全的。数据库连接池通过线程封闭技术,保证一个 Connection 一旦被一个线程获取之后,在这个线程关闭 Connection 之前的这段时间里,不会再分配给其他线程,从而保证了 Connection 不会有并发问题。

递归栈溢出的原因?

因为每调用一个方法就会在栈上创建一个栈帧,方法调用结束后就会弹出该栈帧,而栈的大小不是无限的,所以递归调用次数过多的话就会导致栈溢出。而递归调用的特点是每递归一次,就要创建一个新的栈帧,而且还要保留之前的环境(栈帧),直到遇到结束条件。所以递归调用一定要明确好结束条件,不要出现死循环,而且要避免栈太深。

解决并发问题的步骤?

封装共享变量 将共享变量作为对象属性封装在内部,对所有公共方法制定并发访问策略。对于这些不会发生变化的共享变量,建议你用 final 关键字来修饰。识别共享变量间的约束条件 一定要识别出所有共享变量之间的约束条件,如果约束条件识别不足,很可能导致制定的并发访问策略南辕北辙。制定并发访问策略 避免共享:避免共享的技术主要是利于线程本地存储以及为每个任务分配独立的线程。不变模式:这个在 Java 领域应用的很少,但在其他领域却有着广泛的应用,例如 Actor模式、CSP 模式以及函数式编程的基础都是不变模式。管程及其他同步工具:Java 领域万能的解决方案是管程,但是对于很多特定场景,使用Java 并发包提供的读写锁、并发容器等同步工具会更好。

要注意的问题:

4. 优先使用成熟的工具类:Java SDK 并发包里提供了丰富的工具类,基本上能满足你日常的需要,建议你熟悉它们,用好它们,而不是自己再“发明轮子”,毕竟并发工具类不是随随便便就能发明成功的。

5. 迫不得已时才使用低级的同步原语:低级的同步原语主要指的是 synchronized、Lock、Semaphore 等,这些虽然感觉简单,但实际上并没那么简单,一定要小心使用。

6. 避免过早优化:安全第一,并发程序首先要保证安全,出现性能瓶颈后再优化。在设计期和开发期,很多人经常会情不自禁地预估性能的瓶颈,并对此实施优化,但残酷的现实却是:性能瓶颈不是你想预估就能预估的。

Java SDK 并发包通过 Lock 和 Condition 两个接口来实现管程,其中 Lock 用于解决互斥问题,Condition 用于解决同步问题。

Java的 synchronized 也是管程的一种实现,既然 Java 从语言层面已经实现了管程了,那为什么还要在 SDK 里提供另外一种实现呢?

因为SDK里面的锁具有以下优势:

能够响应中断。synchronized 的问题是,持有锁 A 后,如果尝试获取锁 B 失败,那么

线程就进入阻塞状态,一旦发生死锁,就没有任何机会来唤醒阻塞的线程。但如果阻塞

状态的线程能够响应中断信号,也就是说当我们给阻塞的线程发送中断信号的时候,能

够唤醒它,那它就有机会释放曾经持有的锁 A。这样就破坏了不可抢占条件了。支持超时。如果线程在一段时间之内没有获取到锁,不是进入阻塞状态,而是返回一个

错误,那这个线程也有机会释放曾经持有的锁。这样也能破坏不可抢占条件。非阻塞地获取锁。如果尝试获取锁失败,并不进入阻塞状态,而是直接返回,那这个线

程也有机会释放曾经持有的锁。这样也能破坏不可抢占条件。

Lock如何保证可见性?

它是利用了 volatile 相关的 Happens-Before 规则。Java SDK里面的 ReentrantLock,内部持有一个 volatile 的成员变量 state,获取锁的时候,会读写state 的值;解锁的时候,也会读写 state 的值

class SampleLock {volatile int state;// 加锁lock() {// 省略代码无数state = 1;}// 解锁unlock() {// 省略代码无数state = 0;}}```## 什么是可重入锁所谓可重入锁,顾名思义,指的是线程可以重复获取同一把锁。当线程 T1 执行到 ① 处时,已经获取到了锁 rtl ,当在① 处调用 get() 方法时,会在 ② 再次对锁 rtl 执行加锁操作。此时,如果锁 rtl 是可重入的,那么线程 T1 可以再次加锁成功;如果锁 rtl 是不可重入的,那么线程 T1 此时会被阻塞。```javaclass X {private final Lock rtl =new ReentrantLock();int value;public int get() {// 获取锁rtl.lock(); ②try {return value;} finally {// 保证锁能释放rtl.unlock();}}public void addOne() {// 获取锁rtl.lock();try {value = 1 + get(); ①} finally {// 保证锁能释放rtl.unlock();}}}

公平锁与非公平锁

ReentrantLock 这个类有两个构造函数,一个是无参构造函数,一个是传入 fair 参数的构造函数。fair 参数代表的是锁的公平策略,如果传入 true 就表示需要构造一个公平锁,反之则表示要构造一个非公平锁。

锁都对应着一个等待队列,如果一个线程没有获得锁,就会进入等待队列,当有线程释放锁的时候,就需要从等待队列中唤醒一个等待的线程。如果是公平锁,唤醒的策略就是谁等待的时间长,就唤醒谁,很公平;如果是非公平锁,则不提供这个公平保证,有可能等待时间短的线程反而先被唤醒。

用锁的三大最佳实践

永远只在更新对象的成员变量时加锁永远只在访问可变的成员变量时加锁永远不在调用其他对象的方法时加锁

信号量

信号量的模型

信号量模型就类似操作系统的PV操作,在初始的时候有一个计数器,down操作的时候,将计数器进行减一,如果此时计数器的值小于0,则当前线程被阻塞,否则当前线程可以继续执行. up操作就是将计数器进行加一,如果此时的计数器的值大于等于0,就唤醒等待队列中的一个线程。用代码来表示的话就是如下所示:

class Semaphore{// 计数器int count;// 等待队列Queue queue;// 初始化操作Semaphore(int c){this.count=c;}//void down(){this.count--;if(this.count<0){// 将当前线程插入等待队列// 阻塞当前线程}}void up(){this.count++;if(this.count<=0) {// 移除等待队列中的某个线程 T// 唤醒线程 T}}}

在 Java SDK 里面,信号量模型是由 java.util.concurrent.Semaphore实现的,Semaphore 这个类能够保证这三个方法都是原子操作。在 Java SDK 并发包里,down() 和 up() 对应的则是 acquire() 和 release()。

如何使用信号量

static int count;// 初始化信号量static final Semaphore s = new Semaphore(1);// 用信号量保证互斥static void addOne() {s.acquire();try {count+=1;} finally {s.release();}}

读写锁 ReadWriteLock

ReadWriteLock 主要是针对读多写少的情况下使用的。是一种读写锁

什么是读写锁

所有的读写锁都遵守以下三条基本原则:

1. 允许多个线程同时读共享变量;

2. 只允许一个线程写共享变量;

3. 如果一个写线程正在执行写操作,此时禁止读线程读共享变量。

读写锁与互斥锁的一个重要区别就是读写锁允许多个线程同时读共享变量,而互斥锁是不允许的,这是读写锁在读多写少场景下性能优于互斥锁的关键。但读写锁的写操作是互斥的,当一个线程在写共享变量的时候,是不允许其他线程执行写操作和读操作。

class Cache<K,V> {final Map<K, V> m =new HashMap<>();final ReadWriteLock rwl =new ReentrantReadWriteLock();// 读锁final Lock r = rwl.readLock();// 写锁final Lock w = rwl.writeLock();// 读缓存V get(K key) {r.lock();try {return m.get(key); }finally {r.unlock(); }}// 写缓存V put(String key, Data v) {w.lock();try {return m.put(key, v); }finally {w.unlock(); }}}

读写锁类似于 ReentrantLock,也支持公平模式和非公平模式。读锁和写锁都实现了java.util.concurrent.locks.Lock 接口,所以除了支持 lock() 方法外,tryLock()、lockInterruptibly() 等方法也都是支持的。但是有一点需要注意,那就是只有写锁支持条件变量,读锁是不支持条件变量的,读锁调用 newCondition() 会抛出UnsupportedOperationException 异常。

StampedLock

ReadWriteLock 支持两种模式:一种是读锁,一种是写锁。而 StampedLock 支持三种模式,分别是:写锁、悲观读锁和乐观读。其中,写锁、悲观读锁的语义和 ReadWriteLock的写锁、读锁的语义非常类似,允许多个线程同时获取悲观读锁,但是只允许一个线程获取写锁,写锁和悲观读锁是互斥的。不同的是:StampedLock 里的写锁和悲观读锁加锁成功之后,都会返回一个 stamp;然后解锁的时候,需要传入这个 stamp。

StampedLock 的性能之所以比 ReadWriteLock 还要好,其关键是 StampedLock 支持乐观读的方式,乐观读这个操作是无锁的。ReadWriteLock 支持多个线程同时读,但是当多个线程同时读的时候,所有的写操作会被阻塞;而 StampedLock 提供的乐观读,是允许一个线程获取写锁的,也就是说不是所有的写操作都被阻塞。

class Point {private int x, y;final StampedLock sl =new StampedLock();// 计算到原点的距离int distanceFromOrigin() {// 乐观读long stamp =sl.tryOptimisticRead();// 读入局部变量,// 读的过程数据可能被修改int curX = x, curY = y;// 判断执行读操作期间,// 是否存在写操作,如果存在,// 则 sl.validate 返回 falseif (!sl.validate(stamp)){// 升级为悲观读锁stamp = sl.readLock();try {curX = x;curY = y;} finally{// 释放悲观读锁sl.unlockRead(stamp);}}return Math.sqrt(curX * curX + curY * curY);}}

StampedLock 在命名上并没有增加 Reentrant,所以 StampedLock 应该是不可重入的。StampedLock 的悲观读锁、写锁都不支持条件变量

StampedLock 一定不要调用中断操作,如果需要支持中断功能,一定使用可中断的悲观读锁 readLockInterruptibly() 和写锁 writeLockInterruptibly(),不要使用interrupt()来进行线程中断,不然就会出现CPU飙升100%的现象。

CountDownLatch 和 CyclicBarrier

CountDownLatch 和 CyclicBarrier 是 Java 并发包提供的两个非常易用的线程同步工具类,CountDownLatch 主要用来解决一个线程等待多个线程的场景,可以类比旅游团团长要等待所有的游客到齐才能去下一个景点;而CyclicBarrier 是一组线程之间互相等待,更像是几个驴友之间不离不弃。除此之外 CountDownLatch 的计数器是不能循环利用的,也就是说一旦计数器减到 0,再有线程调用 await(),该线程会直接通过。但CyclicBarrier 的计数器是可以循环利用的,而且具备自动重置的功能,一旦计数器减到 0 会自动重置到你设置的初始值。除此之外,CyclicBarrier 还可以设置回调函数,可以说是功能丰富。

并发容器

List

List 里面只有一个实现类就是CopyOnWriteArrayList。CopyOnWrite,顾名思义就是写的时候会将共享变量新复制一份出来,这样做的好处是读操作完全无锁。

CopyOnWriteArrayList 内部维护了一个数组,成员变量 array 就指向这个内部数组,所有的读操作都是基于 array 进行的,如下图所示,迭代器 Iterator 遍历的就是 array 数组。如果在遍历 array 的同时,还有一个写操作,例如增加元素,CopyOnWriteArrayList 会将 array 复制一份,然后在新复制处理的数组上执行增加元素的操作,执行完之后再将 array 指向这个新的数组。

CopyOnWriteArrayList 仅适用于写操作非常少的场景,而且能够容忍读写的短暂不一致。CopyOnWriteArrayList 迭代器是只读的,不支持增删改。因为迭代器遍历的仅仅是一个快照,而对快照进行增删改是没有意义的。

Map

Map 接口的两个实现是 ConcurrentHashMap 和 ConcurrentSkipListMap,它们从应用的角度来看,主要区别在于ConcurrentHashMap 的 key 是无序的,而ConcurrentSkipListMap 的 key 是有序的。所以如果你需要保证 key 的顺序,就只能使用 ConcurrentSkipListMap。

ConcurrentSkipListMap 里面的 SkipList 本身就是一种数据结构,中文一般都翻译为“跳表”。跳表插入、删除、查询操作平均的时间复杂度是 O(log n),理论上和并发线程数没有关系,所以在并发度非常高的情况下,若你对 ConcurrentHashMap 的性能还不满意,可以尝试一下 ConcurrentSkipListMap

Queue

Java 并发包里面 Queue 这类并发容器是最复杂的,可以从以下两个维度来分类。一个维度是阻塞与非阻塞,所谓阻塞指的是当队列已满时,入队操作阻塞;当队列已空时,出队操作阻塞。另一个维度是单端与双端,单端指的是只能队尾入队,队首出队;而双端指的是队首队尾皆可入队出队。Java 并发包里阻塞队列都用 Blocking 关键字标识,单端队列使用 Queue 标识,双端队列使用 Deque 标识。

1.单端阻塞队列:其实现有 ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue、LinkedTransferQueue、PriorityBlockingQueue 和DelayQueue。内部一般会持有一个队列,这个队列可以是数组(其实现是ArrayBlockingQueue)也可以是链表(其实现是 LinkedBlockingQueue);甚至还可以不持有队列(其实现是 SynchronousQueue),此时生产者线程的入队操作必须等待消费

者线程的出队操作。而 LinkedTransferQueue 融合 LinkedBlockingQueue 和SynchronousQueue 的功能,性能比 LinkedBlockingQueue 更好;PriorityBlockingQueue 支持按照优先级出队;DelayQueue 支持延时出队。

2.双端阻塞队列:其实现是 LinkedBlockingDeque

3.单端非阻塞队列:其实现是 ConcurrentLinkedQueue。

4.双端非阻塞队列:其实现是 ConcurrentLinkedDeque

上面的这些 Queue 中,只有 ArrayBlockingQueue 和LinkedBlockingQueue 是支持有界的,所以在使用其他无界队列时,一定要充分考虑是否存在导致 OOM 的隐患。

原子类

原子类是使用无锁的方案来实现原子性的。无锁方案相对互斥锁方案,最大的好处就是性能,原子类利用CPU提供的cas指令来解决并发问题,(CAS,全称是 Compare And Swap,即“比较并交换”)。CAS 指令包含 3 个参数:共享变量的内存地址 A、用于比较的值 B 和共享变量的新值 C;并且只有当内存中地址 A 处的值等于 B 时,才能将内存中地址 A 处的值更新为新值 C。作为一条 CPU 指令,CAS 指令本身是能够保证原子性的。

使用 CAS 来解决并发问题,一般都会伴随着自旋。

原子化的基本数据类型

相关实现有 AtomicBoolean、AtomicInteger 和 AtomicLong。

getAndIncrement() // 原子化 i++getAndDecrement() // 原子化的 i--incrementAndGet() // 原子化的 ++idecrementAndGet() // 原子化的 --i// 当前值 +=delta,返回 += 前的值getAndAdd(delta)// 当前值 +=delta,返回 += 后的值addAndGet(delta)//CAS 操作,返回是否成功compareAndSet(expect, update)// 以下四个方法// 新值可以通过传入 func 函数来计算getAndUpdate(func)updateAndGet(func)getAndAccumulate(x,func)accumulateAndGet(x,func)

原子化的对象引用类型

原子化的对象引用类型相关实现有 AtomicReference、AtomicStampedReference 和

AtomicMarkableReference,利用它们可以实现对象引用的原子化更新。AtomicReference 提供的方法和原子化的基本数据类型差不多,这里不再赘述。不过需要注意的是,对象引用的更新需要重点关注 ABA 问题,AtomicStampedReference 和AtomicMarkableReference 这两个原子类可以解决 ABA 问题。

解决 ABA 问题的思路其实很简单,增加一个版本号维度就可以了。AtomicStampedReference 实现的 CAS 方法就增加了版本号参数,

boolean compareAndSet(V expectedReference,V newReference,int expectedStamp,int newStamp

原子化数组

相关实现有 AtomicIntegerArray、AtomicLongArray 和 AtomicReferenceArray,利用这些原子类,我们可以原子化地更新数组里面的每一个元素。这些类提供的方法和原子化的基本数据类型的区别仅仅是:每个方法多了一个数组的索引参数。

原子化对象属性更新器

相关实现有 AtomicIntegerFieldUpdater、AtomicLongFieldUpdater 和

AtomicReferenceFieldUpdater,利用它们可以原子化地更新对象的属性,这三个方法都

是利用反射机制实现的,创建更新器的方法如下:

public static <U>AtomicXXXFieldUpdater<U>newUpdater(Class<U> tclass,String fieldName)

对象属性必须是 volatile 类型的,只有这样才能保证可见性;如果对象属性不是 volatile 类型的,newUpdater() 方法会抛出 IllegalArgumentException 这个运行时异常。

原子化的累加器

DoubleAccumulator、DoubleAdder、LongAccumulator 和 LongAdder,这四个类仅仅用来执行累加操作,相比原子化的基本数据类型,速度更快,但是不支持compareAndSet() 方法。如果你仅仅需要累加操作,使用原子化的累加器性能会更好。

java 线程池

创建一个线程,却需要调用操作系统内核的 API,然后操作系统要为线程分配一系列的资源,这个成本就很高了,所以线程是一个重量级的对象,应该避免频繁创建和销毁。

Java 并发包里提供的线程池,最核心的是ThreadPoolExecutor,它强调的是 Executor,而不是一般意义上的池化资源。

ThreadPoolExecutor 的构造函数非常复杂,如下面代码所示,这个最完备的构造函数有 7

个参数。

ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)

corePoolSize:表示线程池保有的最小线程数。有些项目很闲,但是也不能把人都撤

了,至少要留 corePoolSize 个人坚守阵地。

maximumPoolSize:表示线程池创建的最大线程数。

keepAliveTime & unit:一个线程如果在一段时间内,都没有执行任务,说明很闲,

keepAliveTime 和 unit 就是用来定义这个“一段时间”的参数。也就是说,如果一个线

程空闲了keepAliveTime & unit这么久,而且线程池的线程数大于 corePoolSize ,

那么这个空闲的线程就要被回收了。

workQueue:工作队列.

threadFactory:通过这个参数你可以自定义如何创建线程.

handler:通过这个参数你可以自定义任务的拒绝策略。如果线程池中所有的线程都在忙

碌,并且工作队列也满了(前提是工作队列是有界队列),那么此时提交任务,线程池就

会拒绝接收。至于拒绝的策略,你可以通过 handler 这个参数来指定.

ThreadPoolExecutor 已经提供了以下 4 种策略。

CallerRunsPolicy:提交任务的线程自己去执行该任务。AbortPolicy:默认的拒绝策略,会 throws RejectedExecutionException。DiscardPolicy:直接丢弃任务,没有任何异常抛出。DiscardOldestPolicy:丢弃最老的任务,其实就是把最早进入工作队列的任务丢弃,然后把新任务加入到工作队列

考虑到 ThreadPoolExecutor 的构造函数实在是有些复杂,所以 Java 并发包里提供了一个

线程池的静态工厂类 Executors,利用 Executors 你可以快速创建线程池。不过目前大厂

的编码规范中基本上都不建议使用 Executors 了。

不建议使用 Executors 的最重要的原因是:Executors 提供的很多方法默认使用的都是无界的 LinkedBlockingQueue,高负载情境下,无界队列很容易导致 OOM,而 OOM 会导致所有请求都无法处理,这是致命问题。所以强烈建议使用有界队列。

使用有界队列,当任务过多时,线程池会触发执行拒绝策略,线程池默认的拒绝策略会throw RejectedExecutionException 这是个运行时异常,对于运行时异常编译器并不强制catch 它,所以开发人员很容易忽略。因此默认拒绝策略要慎重使用。如果线程池处理的任务非常重要,建议自定义自己的拒绝策略;并且在实际工作中,自定义的拒绝策略往往和降级策略配合使用。

使用线程池,还要注意异常处理的问题,例如通过 ThreadPoolExecutor 对象的 execute()

方法提交任务时,如果任务在执行的过程中出现运行时异常,会导致执行任务的线程终止;不过,最致命的是任务虽然异常了,但是你却获取不到任何通知,这会让你误以为任务都执行得很正常。

try {// 业务逻辑} catch (RuntimeException x) {// 按需处理} catch (Throwable x) {// 按需处理}

获取任务的执行结果

Java 通过 ThreadPoolExecutor 提供的 3 个 submit() 方法和 1 个 FutureTask 工具类来

支持获得任务执行结果的需求。下面我们先来介绍这 3 个 submit() 方法,这 3 个方法的方

法签名如下。

// 提交 Runnable 任务Future<?> submit(Runnable task);// 提交 Callable 任务<T> Future<T> submit(Callable<T> task);// 提交 Runnable 任务及结果引用<T> Future<T> submit(Runnable task, T result);

Future 接口有 5 个方法,我都列在下面了,它们分别是

取消任务的方法 cancel()、判断任务是否已取消的方法 isCancelled()、判断任务是否已结束的方法 isDone()2 个获得任务执行结果的 get() 和 get(timeout, unit),其中最后一个 get(timeout, unit) 支持超时机制。

这 3 个 submit() 方法之间的区别在于方法参数不同:

提交 Runnable 任务 submit(Runnable task) :这个方法的参数是一个 Runnable

接口,Runnable 接口的 run() 方法是没有返回值的,所以 submit(Runnable task)

这个方法返回的 Future 仅可以用来断言任务已经结束了,类似于 Thread.join()。提交 Callable 任务 submit(Callable task):这个方法的参数是一个 Callable

接口,它只有一个 call() 方法,并且这个方法是有返回值的,所以这个方法返回的

Future 对象可以通过调用其 get() 方法来获取任务的执行结果。提交 Runnable 任务及结果引用 submit(Runnable task, T result):这个方法

很有意思,假设这个方法返回的 Future 对象是 f,f.get() 的返回值就是传给 submit()

方法的参数 result。这个方法该怎么用呢?下面这段示例代码展示了它的经典用法。需

要你注意的是 Runnable 接口的实现类 Task 声明了一个有参构造函数 Task(Result

r) ,创建 Task 对象的时候传入了 result 对象,这样就能在类 Task 的 run() 方法中对

result 进行各种操作了。result 相当于主线程和子线程之间的桥梁,通过它主子线程可

以共享数据。

ExecutorService executor= Executors.newFixedThreadPool(1);// 创建 Result 对象 rResult r = new Result();r.setAAA(a);// 提交任务Future<Result> future =executor.submit(new Task(r), r);Result fr = future.get();// 下面等式成立fr === r;fr.getAAA() === a;fr.getXXX() === x;class Task implements Runnable{Result r;// 通过构造函数传入 resultTask(Result r){this.r = r;}void run() {// 可以操作 resulta = r.getAAA();r.setXXX(x);}}

Future 是一个接口,而FutureTask 是一个实实在在的工具类,这个工具类有两个构造函数,它们的参数和前面介绍的 submit() 方法类似,FutureTask 实现了 Runnable 和 Future 接口,由于实现了 Runnable 接口,所以可以将 FutureTask 对象作为任务提交ThreadPoolExecutor 去执行,也可以直接被 Thread 执行;又因为实现了 Future 接口,所以也能用来获得任务的执行结果

// 创建 FutureTaskFutureTask<Integer> futureTask= new FutureTask<>(()-> 1+2);// 创建线程池ExecutorService es =Executors.newCachedThreadPool();// 提交 FutureTaskes.submit(futureTask);// 获取计算结果Integer result = futureTask.get();

import java.util.concurrent.Callable;import java.util.concurrent.ExecutionException;import java.util.concurrent.FutureTask;import java.util.concurrent.TimeUnit;public class FutureTaskTest {public static void main(String[] args) throws ExecutionException, InterruptedException {// 创建任务 T2 的 FutureTaskFutureTask<String> ft2 = new FutureTask<>(new T2Task());// 创建任务 T1 的 FutureTaskFutureTask<String> ft1 = new FutureTask<>(new T1Task(ft2));// 线程 T1 执行任务 ft1Thread T1 = new Thread(ft1);T1.start();// 线程 T2 执行任务 ft2Thread T2 = new Thread(ft2);T2.start();// 等待线程 T1 执行结果System.out.println(ft1.get());}// T2Task 需要执行的任务:// 洗茶壶、洗茶杯、拿茶叶static class T2Task implements Callable<String> {@Overridepublic String call() throws Exception {System.out.println("T2: 洗茶壶...");TimeUnit.SECONDS.sleep(1);System.out.println("T2: 洗茶杯...");TimeUnit.SECONDS.sleep(2);System.out.println("T2: 拿茶叶...");TimeUnit.SECONDS.sleep(1);return " 龙井 ";}}// T1Task 需要执行的任务:// 洗水壶、烧开水、泡茶static class T1Task implements Callable<String>{FutureTask<String> ft2;T1Task(FutureTask<String> ft2){this.ft2 = ft2;}@Overridepublic String call() throws Exception {System.out.println("T1: 洗水壶...");TimeUnit.SECONDS.sleep(1);System.out.println("T1: 烧开水...");TimeUnit.SECONDS.sleep(15);// 获取 T2 线程的茶叶String tf = ft2.get();System.out.println("T1: 拿到茶叶:"+tf);System.out.println("T1: 泡茶...");return " 上茶:" + tf;}}}

使用CompletableFuture

使用Future获得异步执行结果时,要么调用阻塞方法get(),要么轮询看isDone()是否为true,这两种方法都不是很好,因为主线程也会被迫等待。

从Java 8开始引入了CompletableFuture,它针对Future做了改进,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。

我们以获取股票价格为例,看看如何使用CompletableFuture:

public class Main {public static void main(String[] args) throws Exception {// 创建异步执行任务:CompletableFuture<Double> cf = CompletableFuture.supplyAsync(Main::fetchPrice);// 如果执行成功:cf.thenAccept((result) -> {System.out.println("price: " + result);});// 如果执行异常:cf.exceptionally((e) -> {e.printStackTrace();return null;});// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:Thread.sleep(200);}static Double fetchPrice() {try {Thread.sleep(100);} catch (InterruptedException e) {}if (Math.random() < 0.3) {throw new RuntimeException("fetch price failed!");}return 5 + Math.random() * 20;}}

可见CompletableFuture的优点是:

异步任务结束时,会自动回调某个对象的方法;异步任务出错时,会自动回调某个对象的方法;

主- 线程设置好回调后,不再关心异步任务的执行。

如果只是实现了异步回调机制,我们还看不出CompletableFuture相比Future的优势。CompletableFuture更强大的功能是,多个CompletableFuture可以串行执行.除了串行执行外,多个CompletableFuture还可以并行执行。主要是由于CompletableFuture实现了CompletionStage接口,该接口提供了有串行关系、并行关系、汇聚关系等的工作流实现。 描述串行关系

CompletionStage 接口里面描述串行关系,主要是 thenApply、thenAccept、thenRun

和 thenCompose 这四个系列的接口。描述 AND 汇聚关系

CompletionStage 接口里面描述 AND 汇聚关系,主要是 thenCombine、thenAcceptBoth 和 runAfterBoth 系列的接口,CompletionStage 接口里面描述 OR 汇聚关系

主要是 applyToEither、acceptEither 和runAfterEither 系列的接口

当需要批量提交异步任务的时候建议你使用 CompletionService。CompletionService 将线程池 Executor 和阻塞队列 BlockingQueue 的功能融合在了一起,能够让批量异步任的管理更简单。除此之外,CompletionService 能够让异步任务的执行结果有序化,先执行完的先进入阻塞队列,利用这个特性,你可以轻松实现后续处理的有序性。

CompletionService 的实现类 ExecutorCompletionService,需要你自己创建线程池,虽看上去有些啰嗦,但好处是你可以让多个 ExecutorCompletionService 的线程池隔离,这种隔离性能避免几个特别耗时的任务拖垮整个应用的风险。

Fork/Join

Fork/Join 并行计算框架主要解决的是分治任务。分治的核心思想是“分而治之”:将一个

大的任务拆分成小的子任务去解决,然后再把子任务的结果聚合起来从而得到最终结果。这个过程非常类似于大数据处理中的 MapReduce,所以你可以把 Fork/Join 看作单机版的

MapReduce。

Fork/Join 并行计算框架的核心组件是 ForkJoinPool。ForkJoinPool 支持任务窃取机制,

能够让所有线程的工作量基本均衡,不会出现有的线程很忙,而有的线程很闲的状况,所以性能很好。Java 1.8 提供的 Stream API 里面并行流也是以 ForkJoinPool 为基础的。不过需要你注意的是,默认情况下所有的并行流计算都共享一个 ForkJoinPool,这个共享的

ForkJoinPool 默认的线程数是 CPU 的核数;如果所有的并行流计算都是 CPU 密集型计算

的话,完全没有问题,但是如果存在 I/O 密集型的并行流计算,那么很可能会因为一个很

慢的 I/O 计算而拖慢整个系统的性能。所以建议用不同的 ForkJoinPool 执行不同类型的计算任务。

ForkJoinPool 本质上也是一个生产者 - 消费者的实现,ThreadPoolExecutor 内部只有一个任务队列,而 ForkJoinPool 内部有多个任务队列,当我们通过 ForkJoinPool 的 invoke() 或者submit() 方法提交任务时,ForkJoinPool 根据一定的路由规则把任务提交到一个任务队列中,如果任务在执行过程中会创建出子任务,那么子任务会提交到工作线程对应的任务队列中。

ForkJoinPool 支持一种叫做“任务窃取”的机制,如果工作线程空闲了,那它可以“窃取”其他工作任务队列里的任务。如此一来,所有的工作线程都不会闲下来了。ForkJoinPool 中的任务队列采用的是双端队列,工作线程正常获取任务和“窃取任务”分别是从任务队列不同的端消费,这样能避免很多不必要的数据竞争.

static void main(String[] args){// 创建分治任务线程池ForkJoinPool fjp =new ForkJoinPool(4);// 创建分治任务Fibonacci fib =new Fibonacci(30);// 启动分治任务Integer result =fjp.invoke(fib);// 输出结果System.out.println(result);}// 递归任务static class Fibonacci extends RecursiveTask<Integer>{final int n;Fibonacci(int n){this.n = n;}protected Integer compute(){if (n <= 1)return n;Fibonacci f1 =new Fibonacci(n - 1);// 创建子任务f1.fork();Fibonacci f2 =new Fibonacci(n - 2);// 等待子任务结果,并合并结果return pute() + f1.join();}}

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。