Java并发Executor框架

Aba ·
更新时间:2024-09-21
· 762 次阅读

  1、Executor框架简介   从JDK5开始,工作单元和执行机制隔离开来,工作单元包括Runnable和Callable,执行机制由Executor提供。   调用关系:Java线程一对一映射到本地操作系统的系统线程,当多线程程序分解若干任务,使用用户级的调度器(Executor框架)将任务映射为固定数量的线程,底层,操作系统吧、内核将这些线程映射到硬件处理器上。   2、EXecutor结构成员

  Executor是一个接口,它将任务的提交与任务的执行分离开来。   ThreadPoolExecutor是线程池的核心实现类,执行被提交的任务   ScheduledThreadPoolExecutor是一个实现类,在给定的延迟后运行或定期执行命令   Future接口和实现Future接口的FutureTask类,代表异步计算的结果   Runable接口和Callable接口的实现类,可以被ThreadPoolExecutor或ScheduledThreadPoolExecutor执行   2.1 使用流程   (1)创建任务对象   创建实现Runnable接口或Callable接口的任务对象。Runnable不会返回结果,Callable可以返回结果   可以使用工厂类Executors把Runnable封装成一个Callable   public static Callable<Object> callable(Runnable task)   Executors.callable(Runnable task)或Executors.callable(Runnable task,Object result)   (2)对象提交执行   把Runnable对象直接交给ExecutorService执行(ExecutorService.execute(Runnable command); ExecutorService.submit(Runnable task);ExecutorService.submit(Callable<T> task))。execute()方法用于提交不需要返回值的任务,无法判断任务是否被线程池执行成功。submit()用于提交需要返回值的任务。   (3)返回值   如果执行ExecutorService.submit(...),将返回一个实现Future接口的对象FutureTask对象,主线程可以执行FutureTask.get()方法等待任务执行完成,也可以执行FutureTask.cancel(boolean mayInterruptIfRunning)来取消任务的执行。   2.2 框架成员   ThreadPoolExecutor、ScheduledThreadPoolExecutor、Future接口、Runnable接口、Callable接口、Executors。   2.2.1 线程池介绍   public  ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)   corePoolSize:线程池的基本大小,新任务到来时会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,当需要执行的任务数大于线程池基本大小则不创建。如果调用线程池的prestartAllCoreThread()方法,线程池会提前创建并启动所有基本线程。   maximumPoolSize:线程池大数量,线程池允许创建的大线程数,如果队列满了,并且已创建的线程数小于大线程数则创建新线程,对于无界队列该参数无效。   KeepAliveTime:线程活动保持时间,工作线程空闲后保持存活的时间,如果任务多每个任务执行时间短可以调大时间,提高线程利用率   TimeUnit:线程活动保持时间单位,可选单位,DAYS,HOURS,MINUTES,MILLISECONDS,MICROSECONDS,NANOSECONDS   BlockingQueue<Runnable>:任务队列,等待执行任务的阻塞队列,ArrayBlockingQueue:基于数组结构的有界阻塞队列,按照FIFO排序,LinkedBlockingQueue:基于链表结构的阻塞队列,按FIFO排序,吞吐量高于前者,Executors.newFixedThreadPool()使用该队列。SysnchronousQueue:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量高于前者,Executors.newCachedThreadPool()使用该队列,PriorityBlockingQueue:一个具有优先级的无线阻塞队列。   ThreadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个线程设置有意义的命名,可默认设置。   RejectedExecutionHandler:饱和策略,当队列和线程池满了,必须采取一种策略处理新提交的任务。默认AbortPolicy:直接抛出异常。CallerRunsPolicy:只用调用者所在线程来运行任务;DiscardOldestPolicy:丢弃队列里近的一个任务,并执行当前任务;DiscardPolicy:不处理,丢弃掉。2.2.1.1 线程池处理流程   1)线程池判断核心线程池里的线程是否都在执行任务。否,创建一个新的线程执行任务,如果都在执行任务进入下一步;   2)线程池判断工作队列是否已经满了,没有则新提交的任务存储在工作队列里,满了进入下一步;   3)线程池判断线程池的线程是否处于工作状态。没有,创建新的工作线程来执行任务,满了交给饱和策略处理。   ThreadPoolExecutor执行execute()方法流程:   1)如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(该步需要获取全局锁)   2)如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue;   3)如果队列已满无法将任务加入队列,则创建新的线程来处理任务(该步骤获取全局锁)   4)如果创建新线程将使得当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandle.rejectedExecution()方法。   2.2.2 ThreadPoolExecutor   FixedThreadPool,创建使用固定线程数,为了满足资源管理的需求,而需要限制当前线程数量的应用场景,适用于负载比较重的服务器   public static ExecutorService newFixedThreadPool(int nThreads) {   return new ThreadPoolExecutor(nThreads,nThreads,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());   }   SingleThreadPool,创建使用单个线程,适用于保证顺序执行各个任务,并且在任意时间点都不会有多个线程活动的场景,可用于处理共享资源问题   public static ExecutorService newSingleThreadExecutor(){   return new ThreadPoolExecutor(1,1,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());   }   CachedThreadPool,创建会根据需要创建新线程,初始化一定数量的线程,当新任务到来时没有空闲线程则创建新线程,有空闲重用以前的线程   public static ExecutorService newCachedThreadPool(){   return new ThreadPoolExecutor(0,Integer.Max_VALUE,60L,TimeUnit.SECONDS,new SysnchronousQueue<Runnable>());   }超过60秒空闲线程将会终止   2.2.3 ScheduledThreadPoolExecutor   ScheduledThreadPoolExecutor继承ThreadPoolExecutor,用于在给定的延迟以后执行任务或者定期执行任务,比Timer灵活,Timer对应单个后台线程而ScheduledThreadPoolExecutor可以在构造函数中指定多个对应的后台线程数。   ScheduledExecutorService中至少有2个方法可用于周期性执行任务。   scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)   我们可以使用该方法延迟执行任务,设置任务的执行周期。时间周期从线程池中首先开始执行的线程算起,所以假设period为1s,线程执行了5s,那么下一个线程在第一个线程运行完后会很快被执行。   比如下面的代码   for (int i = 0; i < 3; i++) {   Thread.sleep(1000);   WorkerThread worker = new WorkerThread("do heavy processing");   // schedule task to execute at fixed rate   scheduledThreadPool.scheduleAtFixedRate(worker, 0, 10,   TimeUnit.SECONDS);   输出 Current Time = Tue Oct 29 16:10:00 IST 2013 pool-1-thread-1 Start. Time = Tue Oct 29 16:10:01 IST 2013 pool-1-thread-2 Start. Time = Tue Oct 29 16:10:02 IST 2013 pool-1-thread-3 Start. Time = Tue Oct 29 16:10:03 IST 2013 pool-1-thread-1 End. Time = Tue Oct 29 16:10:06 IST 2013 pool-1-thread-2 End. Time = Tue Oct 29 16:10:07 IST 2013 pool-1-thread-3 End. Time = Tue Oct 29 16:10:08 IST 2013 pool-1-thread-1 Start. Time = Tue Oct 29 16:10:11 IST 2013 pool-1-thread-4 Start. Time = Tue Oct 29 16:10:12 IST 2013   scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit)   该方法可被用于延迟周期性执行任务,delaytime是线程停止执行到下一次开始执行之间的延迟时间,假设有下面的代码   for (int i = 0; i < 3; i++) {   Thread.sleep(1000);   WorkerThread worker = new WorkerThread("do heavy processing");   scheduledThreadPool.scheduleWithFixedDelay(worker, 0, 1,   TimeUnit.SECONDS);   }   输出结果 Current Time = Tue Oct 29 16:14:13 IST 2013 pool-1-thread-1 Start. Time = Tue Oct 29 16:14:14 IST 2013 pool-1-thread-2 Start. Time = Tue Oct 29 16:14:15 IST 2013 pool-1-thread-3 Start. Time = Tue Oct 29 16:14:16 IST 2013 pool-1-thread-1 End. Time = Tue Oct 29 16:14:19 IST 2013 pool-1-thread-2 End. Time = Tue Oct 29 16:14:20 IST 2013 pool-1-thread-1 Start. Time = Tue Oct 29 16:14:20 IST 2013 pool-1-thread-3 End. Time = Tue Oct 29 16:14:21 IST 2013 pool-1-thread-4 Start. Time = Tue Oct 29 16:14:21 IST 2013



JAVA executor

需要 登录 后方可回复, 如果你还没有账号请 注册新账号