ThreadPoolExecutor

ThreadPoolExecutor 处理流程

画了一个大概的处理流程
threadPoolExecutor

Executors 通用的线程池创建方法

newFixedThreadPool

1
2
3
4
5
6
7
8
9
10
11
12
13
// LinkedBlockingQueue 这里的大小默认是Integer.MAX_VALUE,相当于无界,nThreads满了后会放入Queue中,有空闲线程后工作线程才会从队列中获取任务
// max pool size 只有当有界队列时候才发挥作用
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}

newSingleThreadExecutor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// LinkedBlockingQueue 这里的大小默认是Integer.MAX_VALUE,依然相当于无界 core size为1,则正在执行的只有一个任务,而且后续进来的任务都会放入到Queue中
// max size=1这里不起作用,这个的好处是保证所有的任务按先进先出顺序执行
// FinalizableDelegatedExecutorService 为代理
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}

// 以下三者的区别
1、 job.run() // 当前线程下直接执行方法
2new Thread(job).start() // 自己新建一个线程执行方法
3、 Executors.newSingleThreadExecutor().submit(job); // executor方法,保证只有一个任务在运行,后续任务按先进先出执行

newCachedThreadPool

1
2
3
4
5
6
7
8
9
10
11
12
13
// Core size = 0, Max size 为Integer.MAX_VALUE无限大,一有任务就会由SynchronousQueue做同步移交,无线程则创建线程执行
// 60秒的超时时间,如果有空闲线程没有任务执行,则在超时时间后会进行关闭,整体更加弹性
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}

newScheduledThreadPool

1
2
3
4
5
6
7
8
// 定时器调度线程池,见[定时器](...), 使用时间作为比较条件的优先级阻塞队列
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}

自定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 继承ThreadPoolExecutor,调用其构造函数,实现自己的线程池内部大小

// 覆盖方法,对每个任务进行监控统计操作
@Override
public void execute(Runnable command) {
// do something (任务执行之前)
super.execute(command);
}

@Override
public void afterExecute(Runnable r, Throwable t) {
// do something (任务执行完毕后)
super.afterExecute(r, t);
}

ExecutorService

  • submit & execute

    父接口Executor有execute 方法,ExecutorService有submit方法,经常会提到两者的区别
    1、 execute 没有返回值,submit会返回Future
    Future的作用:
    a、Cancel the task prematurely, with the cancel method.
    b、Wait for the task to finish executing, with get
    2、 execute 会抛出异常,submit会吃掉异常
    submit中怎么处理异常:
    a、Wrap your Callable or Runnable code in try{} catch{} block
    OR
    b、Keep future.get() call in try{} catch{} block
    OR
    c、implement your own ThreadPoolExecutor and override afterExecute method

  • showdown & shutdownNow

    showdown: 等待已经提交的任务执行完毕,不再接收新任务,然后进行关闭
    shutdownNow: 即时关闭,尝试中止运行中的任务,停止处理等待中的任务,将等待执行的任务返回后,进行关闭(速度更快,但风险更大)

  • invokeAll & invokeAny

    Use invokeAll if you want to wait for all submitted tasks to complete.
    Use invokeAny if you are looking for successful completion of one task out of N submitted tasks. In this case, tasks in progress will be cancelled if one of the tasks completes successfully.

ThreadFactory

Executors中每种线程池的创建都有重载自定义ThreadFactory的方法
Executors.defaultThreadFactory() 创建DefaultThreadFactory
用户自定义可以:定义线程名称,记录线程创建和销毁的日志,方便进行统计监控
统计信息使用atomic类进行处理

1
2
3
4
5
6
7
8
9
public class MyThreadFactory implements ThreadFactory {
private final String poolName;
public MyThreadFactory(String poolName) {
this.poolName = poolName;
}
public Thread newThread(Runnable runnable) {
return new MyThread(runnable, poolName); // MyThread 省略
}
}

线程的一些概念

Daemon Thread & User Thread

  • 两者的差别在于,对JVM退出的影响,当一个User线程退出时,JVM会检查其他线程是否都是守护线程,是则JVM会正常退出,守护线程不会影响JVM退出
  • JVM 退出,守护线程会被抛弃,不会执行finally代码块,这个特性导致守护线程不能作为业务逻辑线程进行执行,只用于一些内部调度、内部监控,能被丢弃的任务
  • JVM 启动时,除了主线程,其他都是守护线程
  • 通过设置setDaemon(boolean on)方法设置守护线程,在thread.start()之前设置
  • 守护线程的场景:web服务器中的Servlet,容器启动时后台初始化一个服务线程,即调度线程,负责处理http请求,然后每个请求过来调度线程从线程池中取出一个工作者线程来处理该请求,从而实现并发控制的目的
  • 守护线程与操作系统的守护进程类似,但有点不同

###