Java的线程池-创新互联
一、简介
什么是线程池
线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。如果某个线程在托管代码中空闲(如正在等待某个事件),则线程池将插入另一个辅助线程来使所有处理器保持繁忙。如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过大值。超过大值的线程可以排队,但他们要等到其他线程完成后才启动。
在未央等地区,都构建了全面的区域性战略布局,加强发展的系统性、市场前瞻性、产品创新能力,以专注、极致的服务理念,为客户提供成都网站制作、网站建设 网站设计制作按需定制开发,公司网站建设,企业网站建设,品牌网站制作,营销型网站,外贸网站制作,未央网站建设费用合理。线程池的功能
应用程序可以有多个线程,这些线程在休眠状态中需要耗费大量时间来等待事件发生。其他线程可能进入睡眠状态,并且仅定期被唤醒以轮循更改或更新状态信息,然后再次进入休眠状态。为了简化对这些线程的管理,.NET框架为每个进程提供了一个线程池,一个线程池有若干个等待操作状态,当一个等待操作完成时,线程池中的辅助线程会执行回调函数。线程池中的线程由系统管理,程序员不需要费力于线程管理,可以集中精力处理应用程序任务。
二、Executor 框架
Executor 框架是一个根据一组执行策略调用,调度,执行和控制的异步任务的框架,目的是提供一种将”任务提交”与”任务如何运行”分离开来的机制。
核心 API 概述
Executor 框架核心 API 如下:
Executor
- 运行任务的简单接口。ExecutorService
- 扩展了Executor
接口。扩展能力:- 支持有返回值的线程;
- 支持管理线程的生命周期。
ScheduledExecutorService
- 扩展了ExecutorService
接口。扩展能力:支持定期执行任务。AbstractExecutorService
-ExecutorService
接口的默认实现。ThreadPoolExecutor
- Executor 框架最核心的类,它继承了AbstractExecutorService
类。ScheduledThreadPoolExecutor
-ScheduledExecutorService
接口的实现,一个可定时调度任务的线程池。Executors
- 可以通过调用Executors
的静态工厂方法来创建线程池并返回一个ExecutorService
对象。
Executor
Executor
接口中只定义了一个 execute
方法,用于接收一个 Runnable
对象。
public interface Executor {
void execute(Runnable command);
}
ExecutorService
ExecutorService
接口继承了 Executor
接口,它还提供了 invokeAll
、invokeAny
、shutdown
、submit
等方法。
public interface ExecutorService extends Executor {
void shutdown();
List shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
Future submit(Callable task);
Future submit(Runnable task, T result);
Future> submit(Runnable task);
List> invokeAll(Collection extends Callable> tasks)
throws InterruptedException;
List> invokeAll(Collection extends Callable> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
T invokeAny(Collection extends Callable> tasks)
throws InterruptedException, ExecutionException;
T invokeAny(Collection extends Callable> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
从其支持的方法定义,不难看出:相比于 Executor
接口,ExecutorService
接口主要的扩展是:
- 支持有返回值的线程 -
sumbit
、invokeAll
、invokeAny
方法中都支持传入Callable
对象。 - 支持管理线程生命周期 -
shutdown
、shutdownNow
、isShutdown
等方法。
ScheduledExecutorService
ScheduledExecutorService
接口扩展了 ExecutorService
接口。
它除了支持前面两个接口的所有能力以外,还支持定时调度线程。
public interface ScheduledExecutorService extends ExecutorService {
public ScheduledFuture> schedule(Runnable command,
long delay, TimeUnit unit);
public ScheduledFuture schedule(Callable callable,
long delay, TimeUnit unit);
public ScheduledFuture> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
public ScheduledFuture> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
}
其扩展的接口提供以下能力:
schedule
方法可以在指定的延时后执行一个Runnable
或者Callable
任务。scheduleAtFixedRate
方法和scheduleWithFixedDelay
方法可以按照指定时间间隔,定期执行任务。
三、ThreadPoolExecutor
java.uitl.concurrent.ThreadPoolExecutor
类是 Executor
框架中最核心的类。所以,本文将着重讲述一下这个类。
重要字段
ThreadPoolExecutor
有以下重要字段:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
参数说明:
ctl
- 用于控制线程池的运行状态和线程池中的有效线程数量。它包含两部分的信息:- 线程池的运行状态 (
runState
) - 线程池内有效线程的数量 (
workerCount
) - 可以看到,
ctl
使用了Integer
类型来保存,高 3 位保存runState
,低 29 位保存workerCount
。COUNT_BITS
就是 29,CAPACITY
就是 1 左移 29 位减 1(29 个 1),这个常量表示workerCount
的上限值,大约是 5 亿。
- 线程池的运行状态 (
- 运行状态 - 线程池一共有五种运行状态:
RUNNING
- 运行状态。接受新任务,并且也能处理阻塞队列中的任务。SHUTDOWN
- 关闭状态。不接受新任务,但可以处理阻塞队列中的任务。- 在线程池处于
RUNNING
状态时,调用shutdown
方法会使线程池进入到该状态。 finalize
方法在执行过程中也会调用shutdown
方法进入该状态。
- 在线程池处于
STOP
- 停止状态。不接受新任务,也不处理队列中的任务。会中断正在处理任务的线程。在线程池处于RUNNING
或SHUTDOWN
状态时,调用shutdownNow
方法会使线程池进入到该状态。TIDYING
- 整理状态。如果所有的任务都已终止了,workerCount
(有效线程数) 为 0,线程池进入该状态后会调用terminated
方法进入TERMINATED
状态。TERMINATED
- 已终止状态。在terminated
方法执行完后进入该状态。默认terminated
方法中什么也没有做。进入TERMINATED
的条件如下:- 线程池不是
RUNNING
状态; - 线程池状态不是
TIDYING
状态或TERMINATED
状态; - 如果线程池状态是
SHUTDOWN
并且workerQueue
为空; workerCount
为 0;- 设置
TIDYING
状态成功。
- 线程池不是
构造方法
ThreadPoolExecutor
有四个构造方法,前三个都是基于第四个实现。第四个构造方法定义如下:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
参数说明:
corePoolSize
- 核心线程数量。当有新任务通过execute
方法提交时 ,线程池会执行以下判断:- 如果运行的线程数少于
corePoolSize
,则创建新线程来处理任务,即使线程池中的其他线程是空闲的。 - 如果线程池中的线程数量大于等于
corePoolSize
且小于maximumPoolSize
,则只有当workQueue
满时才创建新的线程去处理任务; - 如果设置的
corePoolSize
和maximumPoolSize
相同,则创建的线程池的大小是固定的。这时如果有新任务提交,若workQueue
未满,则将请求放入workQueue
中,等待有空闲的线程去从workQueue
中取任务并处理; - 如果运行的线程数量大于等于
maximumPoolSize
,这时如果workQueue
已经满了,则使用handler
所指定的策略来处理任务; - 所以,任务提交时,判断的顺序为
corePoolSize
=>workQueue
=>maximumPoolSize
。
- 如果运行的线程数少于
maximumPoolSize
- 大线程数量。- 如果队列满了,并且已创建的线程数小于大线程数,则线程池会再创建新的线程执行任务。
- 值得注意的是:如果使用了***的任务队列这个参数就没什么效果。
keepAliveTime
:线程保持活动的时间。- 当线程池中的线程数量大于
corePoolSize
的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了keepAliveTime
。 - 所以,如果任务很多,并且每个任务执行的时间比较短,可以调大这个时间,提高线程的利用率。
- 当线程池中的线程数量大于
unit
-keepAliveTime
的时间单位。有 7 种取值。可选的单位有天(DAYS),小时(HOURS),分钟(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。workQueue
- 等待执行的任务队列。用于保存等待执行的任务的阻塞队列。 可以选择以下几个阻塞队列。ArrayBlockingQueue
- 有界阻塞队列。- 此队列是基于数组的先进先出队列(FIFO)。
- 此队列创建时必须指定大小。
LinkedBlockingQueue
- ***阻塞队列。- 此队列是基于链表的先进先出队列(FIFO)。
- 如果创建时没有指定此队列大小,则默认为
Integer.MAX_VALUE
。 - 吞吐量通常要高于
ArrayBlockingQueue
。 - 使用
LinkedBlockingQueue
意味着:maximumPoolSize
将不起作用,线程池能创建的大线程数为corePoolSize
,因为任务等待队列是***队列。 Executors.newFixedThreadPool
使用了这个队列。
SynchronousQueue
- 不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。- 每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态。
- 吞吐量通常要高于
LinkedBlockingQueue
。 Executors.newCachedThreadPool
使用了这个队列。
PriorityBlockingQueue
- 具有优先级的***阻塞队列。
threadFactory
- 线程工厂。可以通过线程工厂给每个创建出来的线程设置更有意义的名字。handler
- 饱和策略。它是RejectedExecutionHandler
类型的变量。当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。线程池支持以下策略:AbortPolicy
- 丢弃任务并抛出异常。这也是默认策略。DiscardPolicy
- 丢弃任务,但不抛出异常。DiscardOldestPolicy
- 丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)。CallerRunsPolicy
- 只用调用者所在的线程来运行任务。- 如果以上策略都不能满足需要,也可以通过实现
RejectedExecutionHandler
接口来定制处理策略。如记录日志或持久化不能处理的任务。
execute 方法
默认情况下,创建线程池之后,线程池中是没有线程的,需要提交任务之后才会创建线程。
提交任务可以使用 execute
方法,它是 ThreadPoolExecutor
的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。
execute
方法工作流程如下:
- 如果
workerCount < corePoolSize
,则创建并启动一个线程来执行新提交的任务; - 如果
workerCount >= corePoolSize
,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中; - 如果
workerCount >= corePoolSize && workerCount < maximumPoolSize
,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务; - 如果
workerCount >= maximumPoolSize
,并且线程池内的阻塞队列已满,则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。
其他重要方法
在 ThreadPoolExecutor
类中还有一些重要的方法:
submit
- 类似于execute
,但是针对的是有返回值的线程。submit
方法是在ExecutorService
中声明的方法,在AbstractExecutorService
就已经有了具体的实现。ThreadPoolExecutor
直接复用AbstractExecutorService
的submit
方法。shutdown
- 不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务。- 将线程池切换到
SHUTDOWN
状态; - 并调用
interruptIdleWorkers
方法请求中断所有空闲的 worker; - 最后调用
tryTerminate
尝试结束线程池。
- 将线程池切换到
shutdownNow
- 立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务。与shutdown
方法类似,不同的地方在于:- 设置状态为
STOP
; - 中断所有工作线程,无论是否是空闲的;
- 取出阻塞队列中没有被执行的任务并返回。
- 设置状态为
isShutdown
- 调用了shutdown
或shutdownNow
方法后,isShutdown
方法就会返回 true。isTerminaed
- 当所有的任务都已关闭后,才表示线程池关闭成功,这时调用isTerminaed
方法会返回 true。setCorePoolSize
- 设置核心线程数大小。setMaximumPoolSize
- 设置大线程数大小。getTaskCount
- 线程池已经执行的和未执行的任务总数;getCompletedTaskCount
- 线程池已完成的任务数量,该值小于等于taskCount
;getLargestPoolSize
- 线程池曾经创建过的大线程数量。通过这个数据可以知道线程池是否满过,也就是达到了maximumPoolSize
;getPoolSize
- 线程池当前的线程数量;getActiveCount
- 当前线程池中正在执行任务的线程数量。
使用示例
public class ThreadPoolExecutorDemo {
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 500, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue(),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
for (int i = 0; i < 100; i++) {
threadPoolExecutor.execute(new MyThread());
String info = String.format("线程池中线程数目:%s,队列中等待执行的任务数目:%s,已执行玩别的任务数目:%s",
threadPoolExecutor.getPoolSize(),
threadPoolExecutor.getQueue().size(),
threadPoolExecutor.getCompletedTaskCount());
System.out.println(info);
}
threadPoolExecutor.shutdown();
}
static class MyThread implements Runnable {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 执行");
}
}
}
四、Executors
JDK 的 Executors
类中提供了几种具有代表性的线程池,这些线程池 都是基于 ThreadPoolExecutor
的定制化实现。
在实际使用线程池的场景中,我们往往不是直接使用 ThreadPoolExecutor
,而是使用 JDK 中提供的具有代表性的线程池实例。
newSingleThreadExecutor
创建一个单线程的线程池。
只会创建唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。 如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它 。
单工作线程大的特点是:可保证顺序地执行各个任务。
示例:
public class SingleThreadExecutorDemo {
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
for (int i = 0; i < 100; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 执行");
}
});
}
executorService.shutdown();
}
}
newFixedThreadPool
创建一个固定大小的线程池。
每次提交一个任务就会新创建一个工作线程,如果工作线程数量达到线程池大线程数,则将提交的任务存入到阻塞队列中。
FixedThreadPool
是一个典型且优秀的线程池,它具有线程池提高程序效率和节省创建线程时所耗的开销的优点。但是,在线程池空闲时,即线程池中没有可运行任务时,它不会释放工作线程,还会占用一定的系统资源。
示例:
public class FixedThreadPoolDemo {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(3);
for (int i = 0; i < 100; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 执行");
}
});
}
executorService.shutdown();
}
}
newCachedThreadPool
创建一个可缓存的线程池。
- 如果线程池长度超过处理任务所需要的线程数,就会回收部分空闲的线程;
- 如果长时间没有往线程池中提交任务,即如果工作线程空闲了指定的时间(默认为 1 分钟),则该工作线程将自动终止。终止后,如果你又提交了新的任务,则线程池重新创建一个工作线程。
- 此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说 JVM)能够创建的大线程大小。 因此,使用
CachedThreadPool
时,一定要注意控制任务的数量,否则,由于大量线程同时运行,很有会造成系统瘫痪。
示例:
public class CachedThreadPoolDemo {
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 100; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 执行");
}
});
}
executorService.shutdown();
}
}
newScheduleThreadPool
创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。
public class ScheduledThreadPoolDemo {
public static void main(String[] args) {
schedule();
scheduleAtFixedRate();
}
private static void schedule() {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
for (int i = 0; i < 100; i++) {
executorService.schedule(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 执行");
}
}, 1, TimeUnit.SECONDS);
}
executorService.shutdown();
}
private static void scheduleAtFixedRate() {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
for (int i = 0; i < 100; i++) {
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 执行");
}
}, 1, 1, TimeUnit.SECONDS);
}
executorService.shutdown();
}
}
创新互联www.cdcxhl.cn,专业提供香港、美国云服务器,动态BGP最优骨干路由自动选择,持续稳定高效的网络助力业务部署。公司持有工信部办法的idc、isp许可证, 机房独有T级流量清洗系统配攻击溯源,准确进行流量调度,确保服务器高可用性。佳节活动现已开启,新人活动云服务器买多久送多久。
分享标题:Java的线程池-创新互联
网站网址:http://pwwzsj.com/article/dhodgj.html