300字范文,内容丰富有趣,生活中的好帮手!
300字范文 > 并发编程五:java并发线程池底层原理详解和源码分析

并发编程五:java并发线程池底层原理详解和源码分析

时间:2023-10-08 06:14:57

相关推荐

并发编程五:java并发线程池底层原理详解和源码分析

文章目录

java并发线程池底层原理详解和源码分析线程和线程池性能对比Executors创建的三种线程池分析自定义线程池分析线程池源码分析继承关系ThreadPoolExecutor源码分析拒绝策略线程池流程图

java并发线程池底层原理详解和源码分析

线程和线程池性能对比

上篇分析了java线程。现在来分析java线程池。在分析线程池之前,先来思考下我们的线程是创建的越多越好吗,显然不是,我们为什么要使用线程池,用下面的例子来看下

/**** 使用线程的方式去执行程序*/public class ThreadTest {public static void main(String[] args) throws InterruptedException {Long start = System.currentTimeMillis();final Random random = new Random();final List<Integer> list = new ArrayList<Integer>();for (int i = 0; i < 100000; i++) {Thread thread = new Thread() {@Overridepublic void run() {list.add(random.nextInt());}};thread.start();thread.join();}System.out.println("时间:" + (System.currentTimeMillis() - start));System.out.println("大小:" + list.size());}}

/**** 线程池执行*/public class ThreadPoolTest {public static void main(String[] args) throws InterruptedException {Long start = System.currentTimeMillis();final Random random = new Random();final List<Integer> list = new ArrayList<Integer>();ExecutorService executorService = Executors.newSingleThreadExecutor();for (int i = 0; i < 100000; i++) {executorService.execute(new Runnable() {@Overridepublic void run() {list.add(random.nextInt());}});}executorService.shutdown();executorService.awaitTermination(1, TimeUnit.DAYS);System.out.println("时间:"+(System.currentTimeMillis() - start));System.out.println("大小:"+list.size());}}

上面两份代码都是执行十万次把随机数添加到集合,不同的是第一份代码每次循环都要创建线程执行任务,第二份代码通过线程池的方式执行任务。那个会比较快。答案是线程池的要快很多。

为什么,在上篇有说到java创建线程是重量级的,涉及到从用户态到内核态,同时过多的线程导致cpu不断的上下文切换。

在第一份代码中创建了10万个对象,创建了10万零一个线程。第二代码中同样创建了十万个对象,但只创建了两个线程。为什么说只创建了两个线程呢,后面会分析这个newSingleThreadExecutor。

那么线程池性能就一定好吗?看看下面的例子

public class ThreadPoolDemo {public static void main(String[] args) {ExecutorService executorService1 = Executors.newCachedThreadPool();//快ExecutorService executorService2 = Executors.newFixedThreadPool(10);//慢ExecutorService executorService3 = Executors.newSingleThreadExecutor();//最慢for (int i = 1; i <= 100; i++) {executorService1.execute(new MyTask(i));}}}/**** 项目*/class MyTask implements Runnable {int i = 0;public MyTask(int i) {this.i = i;}@Overridepublic void run() {System.out.println(Thread.currentThread().getName() + "程序员做第" + i + "个项目");try {Thread.sleep(3000L);//业务逻辑} catch (Exception e) {e.printStackTrace();}}}

上述代码很简单,就是不同的线程池执行MyTask 的run方法,这个run方法就输出一句话,然后阻塞三秒。那个线程池会快呢,答案已经在注释上面了。

在不同的场景使用不同的线程得到的性能是不一样的。

Executors创建的三种线程池分析

平时有关注阿里巴巴开发手册的话应该知道,不推荐使用Executors创建的三种线程池。那么为什么不推荐,下面来一个个分析。

先来看下线程池几个参数的含义

corePoolsize:核心线程数量,创建线程池时就含有的线程数量

maximumPoolSize最大的线程数量,当核心线程数量不够时,最多可以创建多少个线程(包含核心线程)

keepAliveTime和 TimeUnit非核心线程的存活时间长度和时间单位

BlockingQueue<Runnable>队列(队列会在后续博客中单独分析)

ThreadFactory线程工厂,在上篇有说过

RejectedExecutionHandler拒绝策略

newCachedThreadPool分析

ExecutorService executorService1 = Executors.newCachedThreadPool()

newCachedThreadPool的核心线程数为0,最大线程数为最大值,线程池里面的线程存活时间是60秒。队列采用的是SynchronousQueue同步队列。

根据上面的案例来分析下,

首先newCachedThreadPool没有核心线程,所以当接收到任务时会放入到同步队列中(同步队列是典型的生产和消费模式,当同步队列中有任务,必须要先消费这个任务才能接收其他任务),此时会创建线程1去执行任务1。那么同步队列的任务被消费了,就能接收第二个任务,同样会创建线程2去执行任务2。那么又会接收任务3,此时如果线程1执行完了任务1的话,并且空闲时间在60秒内(存活时间是60秒),那么任务3会分配给线程1,此时线程1执行了任务1和任务3。这是线程复用。如果接任务3的时候没有空闲线程,那么就会创建线程3来执行,这就是newCachedThreadPool线程池的流程。

newFixedThreadPool分析

ExecutorService executorService2 = Executors.newFixedThreadPool(10)

newFixedThreadPool的核心线程数是10,最大线程数是10,非核心线程存活时间为0,最大线程数和核心线程数相等,也没办法创建其他非核心线程了。队列是LinkedBlockingQueue无界队列,先简单说是可以无限存储数据。队列是一个数据结构,有着FIFO,就是先进先出。根据上面案例分析:

首先newFixedThreadPool有10个核心线程,那么一开始就可以接收10个任务,这10个任务不需要放入队列。从第11个任务开始就会放入到队列中,每个核心线程任务执行完后,会从队列中获取任务。

这也就是上面案例使用newFixedThreadPool执行的时候为什么是10个10个来打印语句的原因。

如果说ExecutorService executorService2 = Executors.newFixedThreadPool(100)的话,那么上面案例执行效率会和newCachedThreadPool是一样的。

newSingleThreadExecutor分析

ExecutorService executorService3 = Executors.newSingleThreadExecutor()

newSingleThreadExecutornewFixedThreadPool差不多,只不过核心线程和最大线程都是1,当接收到任务是核心线程会执行任务1,那么任务2开始到任务100都会放入到队列中,等待核心线程执行。因此这个在上述案例中时执行效率最慢的。

根据上面的分析,不同的任务不同场景就要使用不同的线程池参数。那为什么阿里巴巴的开发手册不推荐使用这三种线程池。

首先最根本的原因是开发者不一定知道线程池参数的含义,或者说开发者根本不知道newFixedThreadPool、newFixedThreadPool、newCachedThreadPool底层这些参数,只是单纯的想创建一个线程池,那么这种时候就会出现问题。刚才也分析,在不同场景不同的业务使用到的线程池参数是不同,使用不当就会造成性能下降。

其次呢,这三个线程池有着不同的问题。例如newSingleThreadExecutornewFixedThreadPool底层使用的队列是LinkedBlockingQueue无界队列。

可以看到LinkedBlockingQueue容量的最大值是Integer的最大值,就是说当任务过多的时候有可能导致OOM。不过LinkedBlockingQueue是可以指定大小的,但是newSingleThreadExecutornewFixedThreadPool的底层都没有指定大小。因此这两个线程池有可能倒是OOM,就算没有导致OOM,容量过大也会导致频繁GC。

对于newCachedThreadPool使用的是SynchronousQueue,这是同步队列,不会导致OOM,但是它的最大线程数是多少,是Integer.MAX_VALUE。如果无限的创建线程会导致什么问题,CPU100%。

所以三种线程池都各自有各自的问题。不过对于中小项目来说,由于量不够,不会导致这些问题。使用也是没有关系的。具体情况具体分析。

那么不推荐使用上面的三种线程池,那用什么线程池呢?推荐使用自定义线程池

自定义线程池分析

在上述案例中,上面三种线程好像都不是很满意,要么创建的线程数太多了,要么队列空间太大了,要么线程数太少了。这时候使用自定义线程池。

public class ThreadPoolDemo {public static void main(String[] args) {ExecutorService executorService1 = Executors.newCachedThreadPool();//快ExecutorService executorService2 = Executors.newFixedThreadPool(10);//慢ExecutorService executorService3 = Executors.newSingleThreadExecutor();//最慢ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(10, 20, 0L, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(10));for (int i = 1; i <= 100; i++) {poolExecutor.execute(new MyTask(i));}}}

看看执行结果

这里抛出的异常是因为队列的长度不够,而且是在第31个任务的时候抛出的异常,为什么会在第31个任务抛出异常?还有从结果上来看还有一个问题就是顺序的问题。从上图来看执行任务是从1-10,然后20-30,在到11-20。我们假想的执行顺序是1-10,10-20,20-30。为什么会出现这种情况呢?

这里涉及到两个知识点:提交优先级、执行优先级。这些在源码当中都有体现。

现在开始分析源码来解答上述出现的问题。

线程池源码分析

继承关系

先来看下类继承关系图:

Executor只有一个execute的空方法

ExecutorService继承了Executor 同时提供了submit接口。ExecutorService是一个接口所以下面这些方法都是空方法。

经常会问submit方法和execute方法有什么不同,后面分析的时候也会说到。

AbstractExecutorService是一个抽象类,实现了ExecutorService方法并且实现了submit方法

线程池ThreadPoolExecutor继承了AbstractExecutorService。源码分析的核心。重写了execute方法。

ThreadPoolExecutor源码分析

AbstractExecutorService实现了ExecutorService方法并且实现了submit方法

submit底层调用了execute方法。所以execute和submit有什么区别。submit底层调用execute,第一个区别就是execute是没有返回值的,submit是有返回值的。

当我们使用线程池执行任务时候poolExecutor.execute(new MyTask(i));调用了execute方法来执行任务。之前说过有提交优先级和执行优先级。虽然execute翻译过来是执行的意思,但是execute的源码是提交优先级,在执行前得先提交任务。

现在来看看execute方法

public void execute(Runnable command) {//判断Runnable 是否为空,为空抛空指针异常if (command == null)throw new NullPointerException();//ctl是一个CAS操作AtomicInteger类型 包含了 workerCount 和 runState 具体看ctl的注释//这里获取ctl的int值int c = ctl.get();//workerCountOf(c) 获取线程池的工作线程数//判断当前的线程数是否小于corePoolSize 如果是通过addWord方法创建一个新的线程//如果能完成新线程创建exexute方法结束,成功提交任务;if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}//isRunning(c) 判断线程池的状态,//workQueue.offer 类似队列的一个add方法 把任务添加到队列中//这个if判断状态如果是运行状态,并且能够把任务添加到队列中if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();//这里做了双重检测//如果状态在任务加入队列后变为了非运行(有可能是在执行到这里线程池shutdown了)if (! isRunning(recheck) && remove(command))//调用拒绝策略reject(command);//如果双重检测成功了 并且线程池中的数量==0else if (workerCountOf(recheck) == 0)//这里为什么是null 在addWorker源码的时候在分析// 先理解为创建了一个线程给了一个空任务addWorker(null, false);}else if (!addWorker(command, false)) //这里的addWorker(command, false) 意思是创建非核心线程 如果创建失败了返回false//当addWorker返回false时 调用拒绝策略reject(command);}

源码中的ctl是ThreadPoolExecutor定义的private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

workQueue(队列)的offer方法和add方法什么不同?workQueue的offer和add方法其实调用的是AbstractQueue的offer和add

add方法底层也是调用了offer方法,如果队列满了的话add会抛出IllegalStateException异常,而offer只会返回false。并且offer会抛出三种异常,add会抛出四种异常,多出来的就是IllegalStateException异常。

上述的execute的源码流程在注释中写有,现在通过上面的案例,再梳理一遍流程。在自定义线程池分析的案例中,通过线程池循环100次执行任务。整个过程是这样的:

当第一个任务进来就是第一次执行execute的时候,判断任务是否为空,不为空,判断当前线程数是否小于核心线程数,案例中定义的核心线程数是10。当前没有线程那么肯定小于10,那么通过addwork方法创建了一个线程,此时线程是核心线程,并且创建成功后return 返回。这个线程认为是从0开始的计数的,小于10也是10个核心线程。所以1-10的任务就交给了10个核心线程。然后第11个任务进来了。此时核心线程不小于10了。判断线程状态是否是运行状态(默认就是运行状态),然后通过offer方法添加到队列中,成功以后此时第11个任务就放到了队列中,然后进行二次判断,再次判断线程运行状态,如果不是运行状态并且能够把该任务从队列中移除掉则调用拒绝策略。如果二次判断成功并且线程池的数量等于0那么调用addwork方法传入空参数。由于案例中队列的容量是10所以从11-20的任务都能添加到队列中,并且由于创建了核心线程所以线程池的数量此时不等于0,也就不会执行addwork(null,false);方法然后第21个任务进来。此时队列已经满了offer方法返回false,因此执行了else if的方法通过addwork创建非核心线程,如果创建成功那么第21个任务就交给了非核心线程;如果创建失败了,调用拒绝策略。由于案例中最大线程数是20,除去核心线程10个还能创建10个非核心线程数。所以21到30的任务就交给了非核心线程。当第31个任务进来,核心线程满了,队列也满了offer放回false 执行else if中的addwork方法,由于最大线程数是20,此时已经有20个了没法在创建线层。则调用了拒绝策略抛出了异常。

这就是自定义线程池案例中为什么是第31个任务抛出的异常。同时核心线程执行1-10的任务,非核心线程执行21-30的任务。只有当执行完任务的线程才能够从队列中获取任务执行,队列中的任务是11-20,这既是为什么先执行1-10的任务,在执行21-30的任务,最后才执行11-20的任务的原因。

下面就来分析下addWorker的源码

private boolean addWorker(Runnable firstTask, boolean core) {retry: //使用了java不推荐的goto语句for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.//判断如果线程非运行,并且非SHUTDOWN状态下任务为空,队列非空就不能再增加线程if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {//获取当前线程数int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))//core 为true 时如果当线程数大于核心线程数 不增加线程//core 为false 时 如果当前线程数大于最大线程数 不增加线程return false;//compareAndIncrementWorkerCount(c) ctl+1 工作线程数+1 如果成功//这里只是线程数+1,并没有真正创建新线程,创建工作在后面if (compareAndIncrementWorkerCount(c))//跳出循环break retry;c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {//创建一个worker对象 可以暂时理解为这一个线程 并传入执行任务w = new Worker(firstTask);//从worker对象中获取线程final Thread t = w.thread;if (t != null) {//加锁final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//获取线程池状态int rs = runStateOf(ctl.get());//小于shutdown就是running状态//或者SHUTDOWN 和firstTask 为空是从队列中处理任务 那就可以放到集合中if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {//判断线程还没start 就alive就直接异常if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();//一切正常的话就把worker加到workers中 它是HashSet集合workers.add(w);//获取当前worker的数量int s = workers.size();if (s > largestPoolSize)//记录worker的数量 相当于记录线程的数量largestPoolSize = s;//标志线程添加成功workerAdded = true;}} finally {mainLock.unlock();}//如果线程添加成功if (workerAdded) {//执行线程start方法t.start();//标志线程开始执行workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;}

addWorker大致流程上面有注释,在进一步分析之前,先来解决一些问题。首先这个Worker是什么。看下worker的源码

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {//线程final Thread thread;// 第一个任务Runnable firstTask;//执行了多少个任务volatile long completedTasks;//有参构造 这里创建了线程 并设置任务Worker(Runnable firstTask) {setState(-1); // 初始化的过程中不允许中断this.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}//实现 Runnable 的run 方法public void run() {runWorker(this);}}

以上就是Worker 比较重要的源码展示。

再来解决一个问题。在addWorker和execute方法中都有int c = ctl.get();这个c有时候既表示了线程池状态,又表示线程池的工作线程数。这是怎么做到。这里做一个简单的讲解。看下线程池ThreadPoolExecutor的定义

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));private static final int COUNT_BITS = Integer.SIZE - 3;private static final int CAPACITY = (1 << COUNT_BITS) - 1;// 高三位表示线程池的状态private static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;// 通过ctl的高三位获取到线程池状态private static int runStateOf(int c){return c & ~CAPACITY; }//通过ctl的其他29位 获取到线程池的工作线程的个数private static int workerCountOf(int c) {return c & CAPACITY; }//计算ctl的值private static int ctlOf(int rs, int wc) {return rs | wc; }

其中COUNT_BITS是 int 位数 private static final int COUNT_BITS = Integer.SIZE - 3; //Integer.SIZE=32 所以实际 COUNT_BITS = 29, 用上面的5个常量表示线程池的状态,实际上是使用32位中的高3位表示

解决这两个问题后,现在来通过上面的案例来案分析addwork方法。

当任务进来执行了execute方法,继而调用了addWorker(command, true)方法。进行第一个for循环的判断,判断线程池状态,队列非空等等;通过以后进行第二个for循环的判断,判断当前线程数有没有大于核心线程数,并且再次判断线程状态。都通过以后ctl+1,然后退出循环。通过创建worker对象创建了线程并且赋予了任务(具体看上面的worker源码)。然后上锁,再次判断线程状态,如果是运行状态或者shutdown状态并且任务为空,那么就添加到workers这个HashSet集合。记录此时的线程数。并且调用start方法,worker实现了Runnable 就会调用run方法。那么当第1个任务进来执行了addWorker(command, true);通过第一个for循环的判断,此时没有线程所以也能通过第二个for循环的判断。开始创建worker对象,通过worker的构造方法把第一个任务提交给worker同时创建了线程。然后又经过判断把worker对象添加到workers,然后调用线程的start方法。前十个任务就对应是个核心线程第11个任务进来在execute方法中放入队列不会执行addWorker,因此不会创建线程。所以11-20的任务就会在队列中21个任务进来同理执行了addWorker(command, false);方法同理创建了线程,此时第二个for循环判断的不再是核心线程数,而是最大线程数

在addwork中最后执行了start();方法,由于Worker实现了Runnable 接口,其实start();方法就是调用了Worker的run方法。这个run方法又调用了runWorker(this);所以还要看runWorker的源码

final void runWorker(Worker w) {Thread wt = Thread.currentThread();//获取当前线程Runnable task = w.firstTask;//获取worker中的任务w.firstTask = null;//将worker的任务设为nullw.unlock(); // allow interruptsboolean completedAbruptly = true;try {//如果任务为不是null 执行这个任务,如果任务是null从队列中获取任务while (task != null || (task = getTask()) != null) {w.lock();//判断线程状态如果是stop 就立即中断if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);//任务执行前置方法 这是空方法 Throwable thrown = null;try {task.run();//开始执行任务} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);//任务执行后置方法 这是空方法 }} finally {task = null; //将任务设为null 下次就从队列中获取pletedTasks++;w.unlock();}}completedAbruptly = false;} finally {//这里是线程复用processWorkerExit(w, completedAbruptly);}}

runWorker的源码意思很简单,获取当前线程和任务,然后执行,不论如何都会将任务设为null,为了下次从队列中获取任务。重点是(task != null || (task = getTask()) != null)之前说过,提交优先级和执行优先级。execute代码阐述了提交优先级,那么这一行代码就是执行优先级,先判断worker对象有没有任务,有就执行;没有通过getTask();从队列中获取对象。

还有一个知识点就是beforeExecute和afterExecute这两个是空方法,如果有必要可以自己实现。

getTask();方法的源码就不分析了,就是从队列中获取任务。然后来分析下processWorkerExit的源码

private void processWorkerExit(Worker w, boolean completedAbruptly) {if (completedAbruptly) // If abrupt, then workerCount wasn't adjusteddecrementWorkerCount();final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {completedTaskCount += pletedTasks;//重点删除掉workerworkers.remove(w);} finally {mainLock.unlock();}tryTerminate();int c = ctl.get();if (runStateLessThan(c, STOP)) {if (!completedAbruptly) {int min = allowCoreThreadTimeOut ? 0 : corePoolSize;if (min == 0 && ! workQueue.isEmpty())min = 1;if (workerCountOf(c) >= min)return; // replacement not needed}//重点 重新创建了worker对象而且任务为nulladdWorker(null, false);}}

processWorkerExit 的代码是用来实现线程复用,在源码中显示remove执行完成的worker,然后addWorker(null, false);那么创建出来的worker 就会从队列中获取到任务。

到这里线程池的源码就分析结束。

拒绝策略

ThreadPoolExecutor内部有实现4个拒绝策略:

CallerRunsPolicy,由调用execute方法提交任务的线程来执行这个任务;AbortPolicy,抛出异常RejectedExecutionException拒绝提交任务;DiscardPolicy,直接抛弃任务,不做任何处理;DiscardOldestPolicy,去除任务队列中的第一个任务(最旧的),重新提交;

一般来说通常不会使用到内部提供的拒绝策略,而是自己实现拒绝策略,然后做业务处理,比如说记录数据库,记录日志等等。怎么使用呢?

ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(1, 5, 1L, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(10), new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {System.out.println("业务处理,记录日志");}});

线程池流程图

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。