【多线程高并发编程】三 Callable源码分析

Shanon ·
更新时间:2024-11-11
· 691 次阅读

程序猿学社的GitHub,欢迎Star
https://github.com/ITfqyd/cxyxs
本文已记录到github,形成对应专题。

文章目录前言1.Callable的前世今生1.1 Callable的前世Runnable测试结果1.2 Callable的今生通过FutureTask+Thread调用ExecutorService+Future调用实现2.源码分析2.1 第一步,实现Callable接口2.2 FutureTask类结构图2.3 RunableFuture接口2.4 Runnable接口2.5 Future接口2.6 FutureTask源码分析后记 前言

通过上一章实现多线程有几种方式,我们已经了解多线程实现的方式。但是,一般大家知道的就两种,继承Thread类和实现runnable接口。对Callable接口实现多线程很陌生,我们看通过源码学习,了解callable是怎么一回事。

1.Callable的前世今生

通过runnable接口或者继承Thread实现一个多线程,我们想要获取线程的运行结果,可以通过共享变量的方式来获取。为了解决这个问题,java在jdk1.5中引入callable、Future接口。

1.1 Callable的前世Runnable package com.cxyxs.thread.three; import java.util.Date; /** * Description:转发请注明来源 程序猿学社 - https://ithub.blog.csdn.net/ * Author: 程序猿学社 * Date: 2020/2/17 21:43 * Modified By: */ public class MyThreadRunnable implements Runnable{ //测试账号 private int count=0; @Override public void run() { for (int i = 0; i < 5; i++) { try { Thread.sleep(1000); count++; } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { MyThreadRunnable run = new MyThreadRunnable(); Thread thread = new Thread(run); thread.start(); try { //等待线程执行完毕,不然获取不到最终的结果 thread.join(); System.out.println(run.count); } catch (InterruptedException e) { e.printStackTrace(); } } } 测试结果

在这里插入图片描述

通过这种方式,我们是不是实现了,获取线程的结果。现在我们总结一下通过runnable方式,是如何实现获取多线程的返回接口的。 定义一个类实现Runnable接口 定义一个成员变量,用来接收run方法运行后,我们需要的结果。 调用时,实例化该类,作为形参传入Thread中,调用start方法启动线程 调用线程的join方法 ,等待线程执行完毕,这样我们才能取到线程执行完后的结果5,不然就是0. 1.2 Callable的今生 通过FutureTask+Thread调用 package com.cxyxs.thread.three; import java.util.Date; import java.util.concurrent.Callable; import java.util.concurrent.FutureTask; /** * Description:转发请注明来源 程序猿学社 - https://ithub.blog.csdn.net/ * Author: 程序猿学社 * Date: 2020/2/19 0:12 * Modified By: */ public class MyThreadCallable { public static void main(String[] args) throws Exception{ //第一步 Callable callable = new Callable() { @Override public Integer call() throws Exception { int count=0; for (int i = 0; i < 5; i++) { Thread.sleep(1200); count++; } return count; } }; FutureTask task = new FutureTask(callable); Thread thread = new Thread(task); thread.start(); Integer result = task.get(); System.out.println("获取多线程的值:"+result); } } 为了偷懒,这里第一步简写了,一般都是类实现Callable接口,重写call,我这里使用简写的方法实现。 实例化一个FutureTask,把实现Callable的类,作为构造方法的形参传入进去。 实例化一个线程,把FutureTask这个对象,作为构造方法的形参传入。 调用FutureTask对象的get方法。就可以获取值。 ExecutorService+Future调用实现 public void two() throws Exception{ Callable callable = new Callable() { @Override public Integer call() throws Exception { int count=0; for (int i = 0; i < 5; i++) { Thread.sleep(1200); count++; } return count; } }; ExecutorService e= Executors.newFixedThreadPool(10); Future f1=e.submit(callable); Integer result = f1.get(); System.out.println("获取多线程的值:"+result); }

建议,使用这种方式,通过线程池管理多线程。
引出一个问题,为什么要使用线程池?
为了减少创建和销毁线程的次数,让每个线程可以多次使用。
这里了解即可。知道有这么一回事。后续会有相关的文章进行线程池的相关介绍。
在这里插入图片描述

2.源码分析

话不多说,就按照我们刚刚总结的通过Callable如何实现一个多线程的步骤。我们对ExecutorService+Future调用实现的方式进行分析,下面我们开始一步步得到查看他的源码。有闲情的社友,可以自己试试看看FutureTask+Thread这种方式调用的源码。

2.1 第一步,实现Callable接口

Callable接口

@FunctionalInterface public interface Callable { /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ V call() throws Exception; }

Runnable接口

@FunctionalInterface public interface Runnable { public abstract void run(); }

看了源码后,各位社友,能说一说Callable和Runnable接口的区别吗?

方法名不同,Callable是call,Runnable是run Callable是方法,Runnable是抽象方法 Callable抛出异常,Runnable没有抛出异常 Callable有返回值V,Runnable没有返回值。 Callable可以定义泛型,这样我们通过get调用时,无需拆箱装箱。
而抛出异常和有返回值,就是Callable很重要的一个优点,也就是为了解决Runnable一些痛点问题。 2.2 FutureTask类结构图

首先我们先看看futureTask的类结构。在idea中右键,选择digrams->show digrams(idea版本
在这里插入图片描述
通过这个图,我们能很好的看出对应的关系,C表示是类,I是接口
@FunctionalInterface是函数式接口。
我们根据这个图,一个个进行源码分析。
通过图,我们可以发现FutureTask实现了RunnableFuture接口,让我们一探究竟。

2.3 RunableFuture接口 public interface RunnableFuture extends Runnable, Future { /** * Sets this Future to the result of its computation * unless it has been cancelled. */ void run(); }

看到这里,我们也就知道我们之前为什么说Callable是Runnable的plus版本。实际上就是在就是在Runnable的基础上做了一些优化。

2.4 Runnable接口 @FunctionalInterface public interface Runnable { public abstract void run(); }

这个因为之前有阐述过,这里就不过多的描述。

2.5 Future接口

通过查看类或者接口下的所有方法(idea中使用alt+7)
在这里插入图片描述

cancel方法:还没计算完,可以取消计算过程, isCancelled方法:是否取消 isDone方法:是否完成 get方法:获取计算结果(如果还没计算完,也是必须等待的) get(long,TimeUnit) 获取计算结果,可以指定一个超时时间,如果在规定时间内,没有返回结果,会抛出一个TimeOutException的异常 2.6 FutureTask源码分析

<img src="https://img-blog.csdnimg.cn/20200219092545537.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzE2ODU1MDc3,size_16,color_FFFFFF,t_70" alt="FutureTask task = new FutureTask(callable);">
通过代码,我们发现我们调用FutureTask的构造方法,把callable的实例对象,作为形参,传入进去咯。

public FutureTask(Callable callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable }

如果没有传Callable就抛出NullPointerException.我们发现这里有一个变量state的值为NEW

/* * Possible state transitions: * NEW -> COMPLETING -> NORMAL * NEW -> COMPLETING -> EXCEPTIONAL * NEW -> CANCELLED * NEW -> INTERRUPTING -> INTERRUPTED */ private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; NEW(初始化) COMPLETING(运行中) NORMAL(完成状态) EXCEPTIONAL(任务因异常而结束) CANCELLED(还未运行就调用cancel(true)方法取消) INTERRUPTING(正在运行中就调用cancel(true)方法被取消) INTERRUPTED(被取消状态,由INTERRUPTING转换成INTERRUPTED)
源码注释部分,就是任务的几种状态变化。

一般都是直接把FutureTask对象,作为参数传给Thread类的构造方法传入。
在这里插入图片描述
正常情况下NEW -> COMPLETING -> NORMAL
调用new FutureTask把状态改为New
调用FutureTask的run方法改为COMPLETING
中途不发现异常,执行完run方法,就会返回NORMAL

调用task.get可以获取线程执行的结果 public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); }

COMPLETING标识运行中,判断这个任务是否已经在运行中,也就是已调用run方法。说明任务还未完成,我们需要等待,所以这里调用了awaitDone方法。

返回结果是调用的report方法 private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }

如果状态为已完成(NORMAL),直接返回运行结果。

后记

根据社长,一起了解Callable源码的探索,是不是发现源码分析,也没有我们所想的那么困难。源码分析的每一个步骤不要都了解,知道来龙去脉是怎么一回事就行。

程序猿学社的GitHub,欢迎Star
https://github.com/ITfqyd/cxyxs
本文已记录到github,形成对应专题。


作者:程序猿学社



并发编程 并发 多线程 线程

需要 登录 后方可回复, 如果你还没有账号请 注册新账号