一篇文章带你搞懂Java线程池实现原理

Kande ·
更新时间:2024-11-10
· 15 次阅读

目录

1. 为什么要使用线程池

2. 线程池的使用

3. 线程池核心参数

4. 线程池工作原理

5. 线程池源码剖析

5.1 线程池的属性

5.2 线程池状态

5.3 execute源码

5.4 worker源码

5.5 runWorker源码

1. 为什么要使用线程池

使用线程池通常由以下两个原因:

频繁创建销毁线程需要消耗系统资源,使用线程池可以复用线程。

使用线程池可以更容易管理线程,线程池可以动态管理线程个数、具有阻塞队列、定时周期执行任务、环境隔离等。

2. 线程池的使用 /** * @author 一灯架构 * @apiNote 线程池示例 **/ public class ThreadPoolDemo { public static void main(String[] args) { // 1. 创建线程池 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 3, 3, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); // 2. 往线程池中提交3个任务 for (int i = 0; i < 3; i++) { threadPoolExecutor.execute(() -> { System.out.println(Thread.currentThread().getName() + " 关注公众号:一灯架构"); }); } // 3. 关闭线程池 threadPoolExecutor.shutdown(); } }

线程池的使用非常简单:

调用new ThreadPoolExecutor()构造方法,指定核心参数,创建线程池。

调用execute()方法提交Runnable任务

使用结束后,调用shutdown()方法,关闭线程池。

再看一下线程池构造方法中核心参数的作用。

3. 线程池核心参数

线程池共有七大核心参数:

参数名称参数含义
int corePoolSize核心线程数
int maximumPoolSize最大线程数
long keepAliveTime线程存活时间
TimeUnit unit时间单位
BlockingQueue workQueue阻塞队列
ThreadFactory threadFactory线程创建工厂
RejectedExecutionHandler handler拒绝策略

1.corePoolSize 核心线程数

当往线程池中提交任务,会创建线程去处理任务,直到线程数达到corePoolSize,才会往阻塞队列中添加任务。默认情况下,空闲的核心线程并不会被回收,除非配置了allowCoreThreadTimeOut=true。

2.maximumPoolSize 最大线程数

当线程池中的线程数达到corePoolSize,阻塞队列又满了之后,才会继续创建线程,直到达到maximumPoolSize,另外空闲的非核心线程会被回收。

3.keepAliveTime 线程存活时间

非核心线程的空闲时间达到了keepAliveTime,将会被回收。

4.TimeUnit 时间单位

线程存活时间的单位,默认是TimeUnit.MILLISECONDS(毫秒),可选择的有:

TimeUnit.NANOSECONDS(纳秒)

TimeUnit.MICROSECONDS(微秒)

TimeUnit.MILLISECONDS(毫秒)

TimeUnit.SECONDS(秒)

TimeUnit.MINUTES(分钟)

TimeUnit.HOURS(小时)

TimeUnit.DAYS(天)

5.workQueue 阻塞队列

当线程池中的线程数达到corePoolSize,再提交的任务就会放到阻塞队列的等待,默认使用的是LinkedBlockingQueue,可选择的有:

LinkedBlockingQueue(基于链表实现的阻塞队列)

ArrayBlockingQueue(基于数组实现的阻塞队列)

SynchronousQueue(只有一个元素的阻塞队列)

PriorityBlockingQueue(实现了优先级的阻塞队列)

DelayQueue(实现了延迟功能的阻塞队列)

6.threadFactory 线程创建工厂

用来创建线程的工厂,默认的是Executors.defaultThreadFactory(),可选择的还有Executors.privilegedThreadFactory()实现了线程优先级。当然也可以自定义线程创建工厂,创建线程的时候最好指定线程名称,便于排查问题。

7.RejectedExecutionHandler 拒绝策略

当线程池中的线程数达到maximumPoolSize,阻塞队列也满了之后,再往线程池中提交任务,就会触发执行拒绝策略,默认的是AbortPolicy(直接终止,抛出异常),可选择的有:

AbortPolicy(直接终止,抛出异常)

DiscardPolicy(默默丢弃,不抛出异常)

DiscardOldestPolicy(丢弃队列中最旧的任务,执行当前任务)

CallerRunsPolicy(返回给调用者执行)

4. 线程池工作原理

线程池的工作原理,简单理解如下:

当往线程池中提交任务的时候,会先判断线程池中线程数是否核心线程数,如果小于,会创建核心线程并执行任务。

如果线程数大于核心线程数,会判断阻塞队列是否已满,如果没有满,会把任务添加到阻塞队列中等待调度执行。

如果阻塞队列已满,会判断线程数是否小于最大线程数,如果小于,会继续创建最大线程数并执行任务。

如果线程数大于最大线程数,会执行拒绝策略,然后结束。

5. 线程池源码剖析 5.1 线程池的属性 public class ThreadPoolExecutor extends AbstractExecutorService { // 线程池的控制状态,Integer长度是32位,前3位用来存储线程池状态,后29位用来存储线程数量 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 线程个数所占的位数 private static final int COUNT_BITS = Integer.SIZE - 3; // 线程池的最大容量,2^29-1,约5亿个线程 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 独占锁,用来控制多线程下的并发操作 private final ReentrantLock mainLock = new ReentrantLock(); // 工作线程的集合 private final HashSet<Worker> workers = new HashSet<>(); // 等待条件,用来响应中断 private final Condition termination = mainLock.newCondition(); // 是否允许回收核心线程 private volatile boolean allowCoreThreadTimeOut; // 线程数的历史峰值 private int largestPoolSize; /** * 以下是线程池的七大核心参数 */ private volatile int corePoolSize; private volatile int maximumPoolSize; private volatile long keepAliveTime; private final BlockingQueue<Runnable> workQueue; private volatile ThreadFactory threadFactory; private volatile RejectedExecutionHandler handler; }

线程池的控制状态ctl用来存储线程池状态和线程个数,前3位用来存储线程池状态,后29位用来存储线程数量。

设计者多聪明,用一个变量存储了两块内容。

5.2 线程池状态

线程池共有5种状态:

状态名称状态含义状态作用
RUNNING运行中线程池创建后默认状态,接收新任务,并处理阻塞队列中的任务。
SHUTDOWN已关闭调用shutdown方法后处于该状态,不再接收新任务,处理阻塞队列中任务。
STOP已停止调用shutdownNow方法后处于该状态,不再新任务,并中断所有线程,丢弃阻塞队列中所有任务。
TIDYING处理中所有任务已完成,所有工作线程都已回收,等待调用terminated方法。
TERMINATED已终止调用terminated方法后处于该状态,线程池的最终状态。

5.3 execute源码

看一下往线程池中提交任务的源码,这是线程池的核心逻辑:

// 往线程池中提交任务 public void execute(Runnable command) { // 1. 判断提交的任务是否为null if (command == null) throw new NullPointerException(); int c = ctl.get(); // 2. 判断线程数是否小于核心线程数 if (workerCountOf(c) < corePoolSize) { // 3. 把任务包装成worker,添加到worker集合中 if (addWorker(command, true)) return; c = ctl.get(); } // 4. 判断如果线程数不小于corePoolSize,并且可以添加到阻塞队列 if (isRunning(c) && workQueue.offer(command)) { // 5. 重新检查线程池状态,如果线程池不是运行状态,就移除刚才添加的任务,并执行拒绝策略 int recheck = ctl.get(); if (!isRunning(recheck) && remove(command)) reject(command); // 6. 判断如果线程数是0,就创建非核心线程(任务是null,会从阻塞队列中拉取任务) else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 7. 如果添加阻塞队列失败,就创建一个Worker else if (!addWorker(command, false)) // 8. 如果创建Worker失败说明已经达到最大线程数了,则执行拒绝策略 reject(command); }

execute方法的逻辑也很简单,最终就是调用addWorker方法,把任务添加到worker集合中,再看一下addWorker方法的源码:

// 添加worker private boolean addWorker(Runnable firstTask, boolean core) { retry: for (; ; ) { int c = ctl.get(); int rs = runStateOf(c); // 1. 检查是否允许提交任务 if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) return false; // 2. 使用死循环保证添加线程成功 for (; ; ) { int wc = workerCountOf(c); // 3. 校验线程数是否超过容量限制 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 4. 使用CAS修改线程数 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // 5. 如果线程池状态变了,则从头再来 if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 6. 把任务和新线程包装成一个worker w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { // 7. 加锁,控制并发 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 8. 再次校验线程池状态是否异常 int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // 9. 如果线程已经启动,就抛出异常 if (t.isAlive()) throw new IllegalThreadStateException(); // 10. 添加到worker集合中 workers.add(w); int s = workers.size(); // 11. 记录线程数历史峰值 if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { // 12. 启动线程 t.start(); workerStarted = true; } } } finally { if (!workerStarted) addWorkerFailed(w); } return workerStarted; }

方法虽然很长,但是逻辑很清晰。就是把任务和线程包装成worker,添加到worker集合,并启动线程。

5.4 worker源码

再看一下worker类的结构:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable { // 工作线程 final Thread thread; // 任务 Runnable firstTask; // 创建worker,并创建一个新线程(用来执行任务) Worker(Runnable firstTask) { setState(-1); this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } } 5.5 runWorker源码

再看一下run方法的源码:

// 线程执行入口 public void run() { runWorker(this); } // 线程运行核心方法 final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); boolean completedAbruptly = true; try { // 1. 如果当前worker中任务是null,就从阻塞队列中获取任务 while (task != null || (task = getTask()) != null) { // 加锁,保证thread不被其他线程中断(除非线程池被中断) w.lock(); // 2. 校验线程池状态,是否需要中断当前线程 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { // 3. 执行run方法 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; // 解锁 w.unlock(); } } completedAbruptly = false; } finally { // 4. 从worker集合删除当前worker processWorkerExit(w, completedAbruptly); } }

runWorker方法逻辑也很简单,就是不断从阻塞队列中拉取任务并执行。

再看一下从阻塞队列中拉取任务的逻辑:

// 从阻塞队列中拉取任务 private Runnable getTask() { boolean timedOut = false; for (; ; ) { int c = ctl.get(); int rs = runStateOf(c); // 1. 如果线程池已经停了,或者阻塞队列是空,就回收当前线程 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // 2. 再次判断是否需要回收线程 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { // 3. 从阻塞队列中拉取任务 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }

以上就是一篇文章带你搞懂Java线程池实现原理的详细内容,更多关于Java线程池的资料请关注软件开发网其它相关文章!



JAVA 程池

需要 登录 后方可回复, 如果你还没有账号请 注册新账号