多线程学习篇之线程池基础

Harmony ·
更新时间:2024-11-14
· 840 次阅读

1.并发编程之线程池基础 1.1 线程与进程区别

进程:是执行中一段程序,即一旦程序被载入到内存中并准备执行,它就是一个进程。进程是表示资源分配的的基本概念,又是调度运行的基本单位,是系统中的并发执行的单位。
线程:单个进程中执行中每个任务就是一个线程。线程是进程中执行运算的最小单位。
一个线程只能属于一个进程,但是一个进程可以拥有多个线程。多线程处理就是允许一个进程中在同一时刻执行多个任务。

1.2 多线程创建方式 1.2.1.继承Thread类

下面展示一些 内联代码片

/** * 使用继承Thread 方式创建线程池 * @作者 陈晨辉 * @创建日期 2020/3/18 15:31 **/ public class Demo1 { public static void main(String[] args) { // 创建线程对象 MyThread myThread = new MyThread(); // 启动线程 myThread.start(); } // 继承Thread类 实现run方法 static class MyThread extends Thread{ @Override public void run() { for (int i = 0; i < 1000; i++) { System.out.println("输出打印"+i); } } } } 1.2.2.实现Runnable接口

下面展示一些 内联代码片

/** * 使用实现Runnable接口方式 * 创建线程 * @作者 陈晨辉 * @创建日期 2020/3/18 15:31 **/ public class Dmeo2{ public static void main(String[] args) { // 创建线程对象 传入要执行的任务 Thread thread = new Thread(()->{ // do something }); // 调用线程.start方法 thread.start(); } // 实现Runnable接口 实现run方法 static class MyRunnable implements Runnable { @Override public void run() { for (int i = 0; i < 1000; i++) { System.out.println("输出:"+i); } } } } 使用匿名内部类

下面展示一些 内联代码片

public static void main(String[] args) { //使用匿名内部类方式创建Runnable实例 Thread t1 = new Thread(new Runnable(){ @Override public void run() { for (int i = 0; i { for (int i = 0; i < 1000; i++) { System.out.println("输出"+i); } }); t2.start(); } 1.2.3.实现Callable接口

下面展示一些 内联代码片

/** * 实现Callable的接口 * 实现一个带返回值的任务 * @作者 陈晨辉 * @创建日期 2020/3/18 15:31 **/ public class Demo3{ public static void main(String[] args) { //FutureTask包装我们的任务,FutureTask可以用于获取执行结果 FutureTask ft = new FutureTask(new MyCallable()); //创建线程执行线程任务 Thread thread = new Thread(ft); thread.start(); try { //得到线程的执行结果 Integer num = ft.get(); System.out.println("得到线程处理结果:" + num); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } // 实现Callable接口,实现带返回值的任务 static class MyCallable implements Callable { @Override public Integer call() throws Exception { int num = 0; for (int i = 0; i < 1000; i++) { System.out.println("输出"+i); num += i; } return num; } } } 1.3 多线程运行状态及生命周期

线程的状态

新建状态(NEW)
当一个线程刚new出来,例如new Thread(),线程还没有开始运行,此时线程处于新建状态。当一个线程处于新生状态时,程序还没有运行线程中的代码
就绪状态
一个新建的线程并不会自动执行,要执行线程,必须调用线程的start()方法。当线程对象调用start方法即启动线程,开始调度线程开始执行run方法。当start方法返回后,线程就处于就绪状态。
处于就绪状态的线程并不会立执执行,而是要同其他线程进行竟争,只有当获得cpu时间片后线程才会开始执行任务。因为在单cpu的环境中不可能同一时候执行多条线程,因此此时可能有多条线程处于就绪状态。对多个处于就绪状态的线程就需要由运行的cpu线程调度程序(thread scheduler)来调度
运行状态 (RUNNABLE)
当线程获得cpu时间后线程就处于运行时间
阻塞状态 (WAITING)(TIMED_WAITING)(BLOCKED)
线程运行过程中,可能由于各种原因进入阻塞状态:
1.当调用了sleep方法(线程休眠)
2.线程调用一个在I/O被阻塞的方法,即该操作在输入输出操作完成之前不会返回到它的调用者;
3.线程试图得到一个锁,而这个锁被其他线程占有(死锁)
4.线程在等待某个触发条件
死亡状态 (TERMINATED)
线程执行完run方法自然死亡
未捕获的异常使线程意外终止(线程猝死)
为了确定线程在当前是否存活着(就是要么是可运行的,要么是被阻塞了),需要使用isAlive方法。如果是可运行或被阻塞,这个方法返回true; 如果线程仍旧是new状态且不是可运行的, 或者线程死亡了,则返回false.

线程的生命周期
线程的生命周期

通过线程的生命周期,我们可以看到,当线程处于就绪的状态等待CPU的调用执行,那么当多个线程都处于就绪状态时,它们会被同时调用执行吗?
我们的CPU处理器有内核线程的概念,
比如:
单核单线程
代表计算机的CPU有一个核心内核,同一时间只能执行一个线程任务
4核8线程:
代表计算机的CPU有4个核心内核,每个内核同一时间能执行两个线程任务
那如果我们在单核单线程的情况下,创建了多个线程的时候,CPU处理器该如何执行这些线程任务呢?
通过JVM的线程调度机制!!!
所有的Java虚拟机都有一个线程调度器,用来确定哪个时刻运行那个线程。
线程调度器: 抢占式线程调度 和 协调式线程调度
在抢占模式下,操作系统负责分配CPU时间给各个进程,一旦当前的进程使用完分配给自己的CPU时间,操作系统将决定下一个占用CPU时间的是哪一个线程。因此操作系统将定期的中断当前正在执行的线程,将CPU分配给在等待队列的下一个线程。所以任何一个线程都不能独占CPU。每个线程占用CPU的时间取决于进程和操作系统。进程分配给每个线程的时间很短,以至于我们感觉所有的线程是同时执行的。实际上,系统运行每个进程的时间有2毫秒,然后调度其他的线程。它同时他维持着所有的线程和循环,分配很少量的CPU时间给线程。线程的的切换和调度是如此之快,以至于感觉是所有的线程是同步执行的

2 并发编程之J.U.C - 线程池 2.1 java中的线程池

线程池是为突然大量爆发的线程设计的,通过有限的几个固定线程为大量的操作服务,通过对线程的复用减少了创建和销毁线程所需的时间,从而提高效率。
合理地使用线程池能够带来3个好处:
第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
第三:提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。

2.2 线程池体系结构

线程池的体系结构:
java.util.concurrent.Executor 负责线程的使用和调度的根接口
|–ExecutorService 子接口: 线程池的主要接口
|–ThreadPoolExecutor 线程池的实现类
|–ScheduledExceutorService 子接口: 负责线程的调度
|–ScheduledThreadPoolExecutor : 继承ThreadPoolExecutor,实现了ScheduledExecutorService
工具类 : Executors 提供快捷的创建线程池的方法

关键类或接口 含义
Executor 是一个接口,它是Executor框架的基础,
它将任务的提交与任务的执行分离开来
ExecutorService 线程池的主要接口,是Executor的子接口
ThreadPoolExecutor 是线程池的核心实现类,用来执行被提交的任务
ScheduledThreadPoolExecutor 另一个关键实现类,可以进行延迟或者定期执行任务。ScheduledThreadPoolExecutor比Timer定时器更灵活,功能更强大
Future接口与FutureTask实现类 代表异步计算的结果
Runnable接口和Callable接口的实现类: 都可以被ThreadPoolExecutor或ScheduledThreadPoolExecutor执行的任务
Executors 线程池的工具类,可以快捷的创建线程池
Executor

线程池就是线程的集合,线程池集中管理线程,以实现线程的重用,降低资源消耗,提高响应速度等。 线程用于执行异步任务,单个的线程既是工作单元也是执行机制,从JDK1.5开始,为了把工作单元与执行机制分离开,Executor框架诞生了,它是一个用于统一创建任务与运行任务的接口。框架就是异步执行任务的线程池框架。

ThreadPoolExecutor

Executor框架的最核心实现是ThreadPoolExecutor类,通过传入不同的参数,就可以构造出适用于不同应用场景下的线程池.

核心构造器参数
组件 含义
int corePoolSize 核心线程池的大小
int maximunSize 最大线程池的大小
BlokingQueue wordQueue 用来暂时保存任务的工作队列
long keepAlineTime 空闲线程保持存活时间
TimeUnit 时间格式
ThreadFactory 指定创建线程的线程工厂
线程池在三种队列 1.SynchronsQueue

SynchronousQueue没有容量,是无缓冲等待队列,是一个不存储元素的阻塞队列,会直接将任务交给消费者,必须等队列中的添加元素被消费后才能继续添加新的元素。
使用SynchronousQueue阻塞队列一般要求maximumPoolSizes为无界,避免线程拒绝执行操作。

2.LinkedBlockingQueue

LinkedBlockingQueue是一个无界缓存等待队列。当前执行的线程数量达到corePoolSize的数量时,剩余的元素会在阻塞队列里等待。(所以在使用此阻塞队列时maximumPoolSizes就相当于无效了),每个线程完全独立于其他线程。生产者和消费者使用独立的锁来控制数据的同步,即在高并发的情况下可以并行操作队列中的数据。

3.ArrayBlockingQueue

ArrayBlockingQueue是一个有界缓存等待队列,可以指定缓存队列的大小,当正在执行的线程数等于corePoolSize时,多余的元素缓存在ArrayBlockingQueue队列中等待有空闲的线程时继续执行,当ArrayBlockingQueue已满时,加入ArrayBlockingQueue失败,会开启新的线程去执行,当线程数已经达到最大的maximumPoolSizes时,再有新的元素尝试加入ArrayBlockingQueue时会报错。

队列示列

package com.chen; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; /** * 2020年3月29日12:13:41 * 陈晨辉 */ public class Test6 { public static void main(String[] args) { /*ArrayBlockingQueue是一个有界缓存等待队列,可以指定缓存队列的大小,当正在执行的线程数等于corePoolSize时, 多余的元素缓存在ArrayBlockingQueue队列中等待有空闲的线程时继续执行,当ArrayBlockingQueue已满时, 加入ArrayBlockingQueue失败,会开启新的线程去执行,当线程数已经达到最大的maximumPoolSizes时, 再有新的元素尝试加入ArrayBlockingQueue时会报错*/ BlockingQueue queue = new ArrayBlockingQueue(10); //SynchronousQueue没有容量,是无缓冲等待队列,是一个不存储元素的阻塞队列,会直接将任务交给消费者,必须等队列中的添加元素被消费后才能继续添加新的元素。 //使用SynchronousQueue阻塞队列一般要求maximumPoolSizes为无界,避免线程拒绝执行操作 //BlockingQueue queue = new SynchronousQueue(); /*LinkedBlockingQueue是一个无界缓存等待队列。当前执行的线程数量达到corePoolSize的数量时,剩余的元素会在阻塞队列里等待。 (所以在使用此阻塞队列时maximumPoolSizes就相当于无效了),每个线程完全独立于其他线程。生产者和消费者使用独立的锁来控制数据的同步, 即在高并发的情况下可以并行操作队列中的数据。*/ //BlockingQueue queue = new LinkedBlockingQueue(); //生产者线程 Thread t1=new Thread(()->{ for (int i = 0; i { while (true){ System.out.println("1秒后取数据"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } try { System.out.println(queue.take()); } catch (InterruptedException e) { e.printStackTrace(); } } }); t1.start(); t2.start(); } } 2.3 线程池原理剖析 /** * 创建一个线程 * 通过对任务的执行 * 演示线程池的运行原理 * @作者 陈晨辉 * @创建日期 2020/3/17 9:27 **/ public class Demo7{ public static volatile boolean flag = true; public static void main(String[] args) { ThreadPoolExecutor executor = new ThreadPoolExecutor( 2, 5, 10, TimeUnit.SECONDS, new ArrayBlockingQueue(10), Executors.defaultThreadFactory(), //饱和策略 /*ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常。 ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新提交被拒绝的任务 ThreadPoolExecutor.CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务*/ new ThreadPoolExecutor.DiscardOldestPolicy()); for (int i = 0; i { // 接收控制台参数 Scanner scanner = new Scanner(System.in); while (scanner.hasNext()){ String s = scanner.nextLine(); // 如果控制输入stop 将flag设置false, 所有任务都会执行完毕 if ("stop".equals(s)){ flag = false; } if("show".equals(s)){ System.out.println("活跃线程数量==>"+executor.getActiveCount()); } } }).start(); } static class MyRunnable implements Runnable{ private String name; public MyRunnable(String name) { this.name = name; } @Override public void run() { System.out.println(name); while (flag){ //flag是一个开关,为true时线程任务会一直执行让线程一直执行 } } } }

在这里插入图片描述

提交一个任务到线程池中,线程池的处理流程如下:

流程1 判断核心线程数
判断正在运行的工作线程是否小于 设置的核心线程数,小于尝试创建一个
新的工作线程,如果不小于进入下一流程

流程2 判断任务队列
判断当前线程池的任务队列是否已满,未满的话将任务加入任务队列,如果满了,进入下一个流程

流程3 判断最大线程数
判断当前线程池的工作线程是否小于 设置的最大线程数,小于尝试创建一个新的临时工作线程,如果不小于进入下一流程

流程4 判断 饱和/拒绝 策略
到此流程,说明当前线程池已经饱和,需要进行拒绝策略,根据设置的拒绝策略进行处理

2.4 源码分析 线程池中的Worker源码分析 // 存放工作线程 worker的set集合 private final HashSet workers = new HashSet(); // Worker工作者 代表一个工作线程 // Worker 是线程池的内部类, 它实现了Runnable接口 private final class Worker extends AbstractQueuedSynchronizer implements Runnable // Worker中的两个重要属性 final Thread thread;// 工作线程 Runnable firstTask;// 第一次要执行的任务 // Worker中的构造器 Worker(Runnable firstTask) { setState(-1); this.firstTask = firstTask;// 创建worker时,传入第一次要运行的任务 this.thread = getThreadFactory().newThread(this);// 通过线程工厂的newThread方法创建线程,并传入参数this } // 在线程池工厂中, new了一个线程 ,把worker对象作为任务,传入 public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); return t; } // 也就是说如果线程启动.start(),那么worker类的run方法会被执行 // worker的run方法中直接调用了 runWorker(this); this代表worker对象 final void runWorker(Worker w) { Thread wt = Thread.currentThread(); // 第一次要执行的任务 Runnable task = w.firstTask; ... try { // 任务执行完毕后,会获取新的任务 while (task != null || (task = getTask()) != null) { w.lock(); ... try { ... try { // 执行任务 task.run(); } ... } finally { // 清空已经执行完的任务 task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } } // Worker线程启动后,会不断的使用getTask()方法获取任务执行 private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { try { // 在workQueue任务队列中 获取任务 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } } 线程池的execute执行方法分析: // 执行任务 任务 public void execute(Runnable command) { if (command == null) throw new NullPointerException(); // AtomicInteger ctl 是一个integer的原子类对象 // 主要作用: 1.记录线程池的状态信息 2.记录线程池工作线程的数量 int c = ctl.get(); // workerCountOf(c) : 工作线程的数量 // isRunning(c): 线程池是否运行状态 // 流程1: 工作线程的数量如果小于 核心线程的数量 if (workerCountOf(c) = (core ? corePoolSize : maximumPoolSize)) return false; ... } Worker w = null; try { // 通过构造器得到worker对象 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 将worker对象添加到workers集合中 workers.add(w); ... } } finally { mainLock.unlock(); } // 如果添加worker成功 if (workerAdded) { // 启动该worker中的线程 t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
作者:你好!0和1



学习 程池 线程池 多线程 线程

需要 登录 后方可回复, 如果你还没有账号请 注册新账号