进程:是执行中一段程序,即一旦程序被载入到内存中并准备执行,它就是一个进程。进程是表示资源分配的的基本概念,又是调度运行的基本单位,是系统中的并发执行的单位。
线程:单个进程中执行中每个任务就是一个线程。线程是进程中执行运算的最小单位。
一个线程只能属于一个进程,但是一个进程可以拥有多个线程。多线程处理就是允许一个进程中在同一时刻执行多个任务。
下面展示一些 内联代码片
。
/**
* 使用继承Thread 方式创建线程池
* @作者 陈晨辉
* @创建日期 2020/3/18 15:31
**/
public class Demo1 {
public static void main(String[] args) {
// 创建线程对象
MyThread myThread = new MyThread();
// 启动线程
myThread.start();
}
// 继承Thread类 实现run方法
static class MyThread extends Thread{
@Override
public void run() {
for (int i = 0; i < 1000; i++) {
System.out.println("输出打印"+i);
}
}
}
}
1.2.2.实现Runnable接口
下面展示一些 内联代码片
。
/**
* 使用实现Runnable接口方式
* 创建线程
* @作者 陈晨辉
* @创建日期 2020/3/18 15:31
**/
public class Dmeo2{
public static void main(String[] args) {
// 创建线程对象 传入要执行的任务
Thread thread = new Thread(()->{
// do something
});
// 调用线程.start方法
thread.start();
}
// 实现Runnable接口 实现run方法
static class MyRunnable implements Runnable {
@Override
public void run() {
for (int i = 0; i < 1000; i++) {
System.out.println("输出:"+i);
}
}
}
}
使用匿名内部类
下面展示一些 内联代码片
。
public static void main(String[] args) {
//使用匿名内部类方式创建Runnable实例
Thread t1 = new Thread(new Runnable(){
@Override
public void run() {
for (int i = 0; i {
for (int i = 0; i < 1000; i++) {
System.out.println("输出"+i);
}
});
t2.start();
}
1.2.3.实现Callable接口
下面展示一些 内联代码片
。
/**
* 实现Callable的接口
* 实现一个带返回值的任务
* @作者 陈晨辉
* @创建日期 2020/3/18 15:31
**/
public class Demo3{
public static void main(String[] args) {
//FutureTask包装我们的任务,FutureTask可以用于获取执行结果
FutureTask ft = new FutureTask(new MyCallable());
//创建线程执行线程任务
Thread thread = new Thread(ft);
thread.start();
try {
//得到线程的执行结果
Integer num = ft.get();
System.out.println("得到线程处理结果:" + num);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
// 实现Callable接口,实现带返回值的任务
static class MyCallable implements Callable {
@Override
public Integer call() throws Exception {
int num = 0;
for (int i = 0; i < 1000; i++) {
System.out.println("输出"+i);
num += i;
}
return num;
}
}
}
1.3 多线程运行状态及生命周期
线程的状态
新建状态(NEW)
当一个线程刚new出来,例如new Thread(),线程还没有开始运行,此时线程处于新建状态。当一个线程处于新生状态时,程序还没有运行线程中的代码
就绪状态
一个新建的线程并不会自动执行,要执行线程,必须调用线程的start()方法。当线程对象调用start方法即启动线程,开始调度线程开始执行run方法。当start方法返回后,线程就处于就绪状态。
处于就绪状态的线程并不会立执执行,而是要同其他线程进行竟争,只有当获得cpu时间片后线程才会开始执行任务。因为在单cpu的环境中不可能同一时候执行多条线程,因此此时可能有多条线程处于就绪状态。对多个处于就绪状态的线程就需要由运行的cpu线程调度程序(thread scheduler)来调度
运行状态 (RUNNABLE)
当线程获得cpu时间后线程就处于运行时间
阻塞状态 (WAITING)(TIMED_WAITING)(BLOCKED)
线程运行过程中,可能由于各种原因进入阻塞状态:
1.当调用了sleep方法(线程休眠)
2.线程调用一个在I/O被阻塞的方法,即该操作在输入输出操作完成之前不会返回到它的调用者;
3.线程试图得到一个锁,而这个锁被其他线程占有(死锁)
4.线程在等待某个触发条件
死亡状态 (TERMINATED)
线程执行完run方法自然死亡
未捕获的异常使线程意外终止(线程猝死)
为了确定线程在当前是否存活着(就是要么是可运行的,要么是被阻塞了),需要使用isAlive方法。如果是可运行或被阻塞,这个方法返回true; 如果线程仍旧是new状态且不是可运行的, 或者线程死亡了,则返回false.
线程的生命周期
通过线程的生命周期,我们可以看到,当线程处于就绪的状态等待CPU的调用执行,那么当多个线程都处于就绪状态时,它们会被同时调用执行吗?
我们的CPU处理器有内核线程的概念,
比如:
单核单线程
代表计算机的CPU有一个核心内核,同一时间只能执行一个线程任务
4核8线程:
代表计算机的CPU有4个核心内核,每个内核同一时间能执行两个线程任务
那如果我们在单核单线程的情况下,创建了多个线程的时候,CPU处理器该如何执行这些线程任务呢?
通过JVM的线程调度机制!!!
所有的Java虚拟机都有一个线程调度器,用来确定哪个时刻运行那个线程。
线程调度器: 抢占式线程调度 和 协调式线程调度
在抢占模式下,操作系统负责分配CPU时间给各个进程,一旦当前的进程使用完分配给自己的CPU时间,操作系统将决定下一个占用CPU时间的是哪一个线程。因此操作系统将定期的中断当前正在执行的线程,将CPU分配给在等待队列的下一个线程。所以任何一个线程都不能独占CPU。每个线程占用CPU的时间取决于进程和操作系统。进程分配给每个线程的时间很短,以至于我们感觉所有的线程是同时执行的。实际上,系统运行每个进程的时间有2毫秒,然后调度其他的线程。它同时他维持着所有的线程和循环,分配很少量的CPU时间给线程。线程的的切换和调度是如此之快,以至于感觉是所有的线程是同步执行的
线程池是为突然大量爆发的线程设计的,通过有限的几个固定线程为大量的操作服务,通过对线程的复用减少了创建和销毁线程所需的时间,从而提高效率。
合理地使用线程池能够带来3个好处:
第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
第三:提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。
线程池的体系结构:
java.util.concurrent.Executor 负责线程的使用和调度的根接口
|–ExecutorService 子接口: 线程池的主要接口
|–ThreadPoolExecutor 线程池的实现类
|–ScheduledExceutorService 子接口: 负责线程的调度
|–ScheduledThreadPoolExecutor : 继承ThreadPoolExecutor,实现了ScheduledExecutorService
工具类 : Executors 提供快捷的创建线程池的方法
关键类或接口 | 含义 |
---|---|
Executor | 是一个接口,它是Executor框架的基础, 它将任务的提交与任务的执行分离开来 |
ExecutorService | 线程池的主要接口,是Executor的子接口 |
ThreadPoolExecutor | 是线程池的核心实现类,用来执行被提交的任务 |
ScheduledThreadPoolExecutor | 另一个关键实现类,可以进行延迟或者定期执行任务。ScheduledThreadPoolExecutor比Timer定时器更灵活,功能更强大 |
Future接口与FutureTask实现类 | 代表异步计算的结果 |
Runnable接口和Callable接口的实现类: | 都可以被ThreadPoolExecutor或ScheduledThreadPoolExecutor执行的任务 |
Executors | 线程池的工具类,可以快捷的创建线程池 |
线程池就是线程的集合,线程池集中管理线程,以实现线程的重用,降低资源消耗,提高响应速度等。 线程用于执行异步任务,单个的线程既是工作单元也是执行机制,从JDK1.5开始,为了把工作单元与执行机制分离开,Executor框架诞生了,它是一个用于统一创建任务与运行任务的接口。框架就是异步执行任务的线程池框架。
ThreadPoolExecutorExecutor框架的最核心实现是ThreadPoolExecutor类,通过传入不同的参数,就可以构造出适用于不同应用场景下的线程池.
核心构造器参数组件 | 含义 |
---|---|
int corePoolSize | 核心线程池的大小 |
int maximunSize | 最大线程池的大小 |
BlokingQueue wordQueue | 用来暂时保存任务的工作队列 |
long keepAlineTime | 空闲线程保持存活时间 |
TimeUnit | 时间格式 |
ThreadFactory | 指定创建线程的线程工厂 |
SynchronousQueue没有容量,是无缓冲等待队列,是一个不存储元素的阻塞队列,会直接将任务交给消费者,必须等队列中的添加元素被消费后才能继续添加新的元素。
使用SynchronousQueue阻塞队列一般要求maximumPoolSizes为无界,避免线程拒绝执行操作。
LinkedBlockingQueue是一个无界缓存等待队列。当前执行的线程数量达到corePoolSize的数量时,剩余的元素会在阻塞队列里等待。(所以在使用此阻塞队列时maximumPoolSizes就相当于无效了),每个线程完全独立于其他线程。生产者和消费者使用独立的锁来控制数据的同步,即在高并发的情况下可以并行操作队列中的数据。
3.ArrayBlockingQueueArrayBlockingQueue是一个有界缓存等待队列,可以指定缓存队列的大小,当正在执行的线程数等于corePoolSize时,多余的元素缓存在ArrayBlockingQueue队列中等待有空闲的线程时继续执行,当ArrayBlockingQueue已满时,加入ArrayBlockingQueue失败,会开启新的线程去执行,当线程数已经达到最大的maximumPoolSizes时,再有新的元素尝试加入ArrayBlockingQueue时会报错。
队列示列
package com.chen;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
* 2020年3月29日12:13:41
* 陈晨辉
*/
public class Test6 {
public static void main(String[] args) {
/*ArrayBlockingQueue是一个有界缓存等待队列,可以指定缓存队列的大小,当正在执行的线程数等于corePoolSize时,
多余的元素缓存在ArrayBlockingQueue队列中等待有空闲的线程时继续执行,当ArrayBlockingQueue已满时,
加入ArrayBlockingQueue失败,会开启新的线程去执行,当线程数已经达到最大的maximumPoolSizes时,
再有新的元素尝试加入ArrayBlockingQueue时会报错*/
BlockingQueue queue = new ArrayBlockingQueue(10);
//SynchronousQueue没有容量,是无缓冲等待队列,是一个不存储元素的阻塞队列,会直接将任务交给消费者,必须等队列中的添加元素被消费后才能继续添加新的元素。
//使用SynchronousQueue阻塞队列一般要求maximumPoolSizes为无界,避免线程拒绝执行操作
//BlockingQueue queue = new SynchronousQueue();
/*LinkedBlockingQueue是一个无界缓存等待队列。当前执行的线程数量达到corePoolSize的数量时,剩余的元素会在阻塞队列里等待。
(所以在使用此阻塞队列时maximumPoolSizes就相当于无效了),每个线程完全独立于其他线程。生产者和消费者使用独立的锁来控制数据的同步,
即在高并发的情况下可以并行操作队列中的数据。*/
//BlockingQueue queue = new LinkedBlockingQueue();
//生产者线程
Thread t1=new Thread(()->{
for (int i = 0; i {
while (true){
System.out.println("1秒后取数据");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
System.out.println(queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t1.start();
t2.start();
}
}
2.3 线程池原理剖析
/**
* 创建一个线程
* 通过对任务的执行
* 演示线程池的运行原理
* @作者 陈晨辉
* @创建日期 2020/3/17 9:27
**/
public class Demo7{
public static volatile boolean flag = true;
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2,
5,
10,
TimeUnit.SECONDS,
new ArrayBlockingQueue(10),
Executors.defaultThreadFactory(),
//饱和策略
/*ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新提交被拒绝的任务
ThreadPoolExecutor.CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务*/
new ThreadPoolExecutor.DiscardOldestPolicy());
for (int i = 0; i {
// 接收控制台参数
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String s = scanner.nextLine();
// 如果控制输入stop 将flag设置false, 所有任务都会执行完毕
if ("stop".equals(s)){
flag = false;
}
if("show".equals(s)){
System.out.println("活跃线程数量==>"+executor.getActiveCount());
}
}
}).start();
}
static class MyRunnable implements Runnable{
private String name;
public MyRunnable(String name) {
this.name = name;
}
@Override
public void run() {
System.out.println(name);
while (flag){
//flag是一个开关,为true时线程任务会一直执行让线程一直执行
}
}
}
}
提交一个任务到线程池中,线程池的处理流程如下:
流程1 判断核心线程数
判断正在运行的工作线程是否小于 设置的核心线程数,小于尝试创建一个
新的工作线程,如果不小于进入下一流程
流程2 判断任务队列
判断当前线程池的任务队列是否已满,未满的话将任务加入任务队列,如果满了,进入下一个流程
流程3 判断最大线程数
判断当前线程池的工作线程是否小于 设置的最大线程数,小于尝试创建一个新的临时工作线程,如果不小于进入下一流程
流程4 判断 饱和/拒绝 策略
到此流程,说明当前线程池已经饱和,需要进行拒绝策略,根据设置的拒绝策略进行处理
线程池中的Worker源码分析
// 存放工作线程 worker的set集合
private final HashSet workers = new HashSet();
// Worker工作者 代表一个工作线程
// Worker 是线程池的内部类, 它实现了Runnable接口
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
// Worker中的两个重要属性
final Thread thread;// 工作线程
Runnable firstTask;// 第一次要执行的任务
// Worker中的构造器
Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;// 创建worker时,传入第一次要运行的任务
this.thread = getThreadFactory().newThread(this);// 通过线程工厂的newThread方法创建线程,并传入参数this
}
// 在线程池工厂中, new了一个线程 ,把worker对象作为任务,传入
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
return t;
}
// 也就是说如果线程启动.start(),那么worker类的run方法会被执行
// worker的run方法中直接调用了 runWorker(this); this代表worker对象
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// 第一次要执行的任务
Runnable task = w.firstTask;
...
try {
// 任务执行完毕后,会获取新的任务
while (task != null || (task = getTask()) != null) {
w.lock();
...
try {
...
try {
// 执行任务
task.run();
}
...
} finally {
// 清空已经执行完的任务
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
// Worker线程启动后,会不断的使用getTask()方法获取任务执行
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
try {
// 在workQueue任务队列中 获取任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
线程池的execute执行方法分析:
// 执行任务 任务
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// AtomicInteger ctl 是一个integer的原子类对象
// 主要作用: 1.记录线程池的状态信息 2.记录线程池工作线程的数量
int c = ctl.get();
// workerCountOf(c) : 工作线程的数量
// isRunning(c): 线程池是否运行状态
// 流程1: 工作线程的数量如果小于 核心线程的数量
if (workerCountOf(c) = (core ? corePoolSize : maximumPoolSize))
return false;
...
}
Worker w = null;
try {
// 通过构造器得到worker对象
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 将worker对象添加到workers集合中
workers.add(w);
...
}
} finally {
mainLock.unlock();
}
// 如果添加worker成功
if (workerAdded) {
// 启动该worker中的线程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}