Java 线程池主要有7个核心线程数,最大线程数,最大线程存活时间,单位,等待线程,线程工厂,拒绝策略。在请求到来时,优先在核心线程中取线程给消费者(核心线程是一直开启的),核心线程全部分配出去后,消费者的请求会放到等待队列中。如果等待队列也满了,则会开启到最大线程数处理请求(最大线程数的线程在空闲存活时间后会被回收),如果前三者都满了,则会触发拒绝策略(抛弃等待队列队首或者队尾的请求,抛出或者不抛出异常)。

线程池参数源码

/**
 * Creates a new {@code ThreadPoolExecutor} with the given initial
 * parameters.
 *
 * @param corePoolSize the number of threads to keep in the pool, even
 *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
 * @param maximumPoolSize the maximum number of threads to allow in the
 *        pool
 * @param keepAliveTime when the number of threads is greater than
 *        the core, this is the maximum time that excess idle threads
 *        will wait for new tasks before terminating.
 * @param unit the time unit for the {@code keepAliveTime} argument
 * @param workQueue the queue to use for holding tasks before they are
 *        executed.  This queue will hold only the {@code Runnable}
 *        tasks submitted by the {@code execute} method.
 * @param threadFactory the factory to use when the executor
 *        creates a new thread
 * @param handler the handler to use when execution is blocked
 *        because the thread bounds and queue capacities are reached
 * @throws IllegalArgumentException if one of the following holds:<br>
 *         {@code corePoolSize < 0}<br>
 *         {@code keepAliveTime < 0}<br>
 *         {@code maximumPoolSize <= 0}<br>
 *         {@code maximumPoolSize < corePoolSize}
 * @throws NullPointerException if {@code workQueue}
 *         or {@code threadFactory} or {@code handler} is null
 */
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

corePoolSize 核心线程数

指的是核心线程大小,线程池中维护一个最小的线程数量,即使这些线程处于空闲状态,也一直存在池中,除非设置了核心线程超时时间。

maximunPoolSize

指的是线程池中允许的最大线程数量。当线程池中核心线程都处理执行状态,有新请求的任务:
1、工作队列未满:新请求的任务加入工作队列
2、工作队列已满:线程池会创建新线程,来执行这个任务。当然,创建新线程不是无限制的,因为会受到 maximumPoolSize 最大线程数量的限制。

keepAliveTime

指的是空闲线程存活时间。具体说,当线程数大于核心线程数时,空闲线程在等待新任务到达的最大时间,如果超过这个时间还没有任务请求,该空闲线程就会被销毁。

unit

指空闲线程存活时间的单位。keepAliveTime的计量单位。枚举类型TimeUnit类

workQueue

ArrayBlockingQueue

基于数组的有界阻塞队列,特点FIFO(先进先出)。
当线程池中已经存在最大数量的线程时候,再请求新的任务,这时就会将任务加入工作队列的队尾,一旦有空闲线程,就会取出队头执行任务。
因为是基于数组的有界阻塞队列,所以可以避免系统资源的耗尽。如果出现有界队列已满,最大数量的所有线程都处于执行状态,这时又有新的任务请求,会采用Handler拒绝策略,对请求的任务进行处理。

LinkedBlockingQueue

基于链表的无界阻塞队列,默认最大容量Integer.MAX_VALUE( 2^{32}-1​),可认为是无限队列,特点FIFO。
关于maximumPoolSize参数在工作队列为LinkedBlockingQueue时候,是否起作用这个问题,我们需要视情况而定!

情况①:如果指定了工作队列大小,比如core=2,max=3,workQueue=2,任务数task=5,这种情况的最大线程数量的限制是有效的。
情况②:如果工作队列大小默认2^32 -1,这时maximumPoolSize不起作用,因为新请求的任务一直可以加到队列中。

PriorityBlockingQueue

优先级无界阻塞队列,优先级阻塞队列可以通过参数Comparator实现对任务进行排序。

SynchronousQueue

不缓存任务的阻塞队列,它实际上不是真正的队列,因为它没有提供存储任务的空间。生产者一个任务请求到来,会直接执行,也就是说这种队列在消费者充足的情况下更加适合。

threadFactory

线程工厂,创建一个新线程时使用的工厂,可以用来设定线程名、是否为daemon线程等。

Java线程分两种:用户线程和守护线程。
守护线程,是指在程序运行的时,后台提供一种通用服务的线程。比如垃圾回收线程就是一个很称职的守护者,并且这种线程并不属于程序中不可或缺的部分。因此,当所有的非守护线程结束时,程序也就终止了,同时会杀死进程中的所有守护线程。反过来说,只要任何非守护线程还在运行,程序就不会终止。
守护线程和用户线程的没有本质的区别,不同之处在于虚拟机的离开;若用户线程已全部退出运行,只剩守护线程存在,虚拟机也即退出。 因没有了被守护者,守护线程也就无工作可做,也就没有继续运行程序的必要了。
将线程转换为守护线程可以通过调用Thread对象的setDaemon(true)方法来实现。在使用守护线程时需要注意一下几点:
(1)thread.setDaemon(true)须在thread.start()之前设置,不能把正在运行的常规线程设置为守护线程,否则会抛出IllegalThreadStateException异常。
(2) 在Daemon线程中创建的新线程也是Daemon的。
(3) 守护线程应该永远不去访问固有资源,如文件、数据库,因为它会在任何时候甚至在一个操作的中间发生中断。
(4)java多线程编程,偏向使用java自带的多线程框架,比如ExecutorService,但是java的线程池会将守护线程转换为用户线程,所以如果要使用后台线程就不能用java的线程池。

/**
 * The default thread factory
 */
static class DefaultThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    DefaultThreadFactory() {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
                              Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" +
                      poolNumber.getAndIncrement() +
                     "-thread-";
    }

    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,
                              namePrefix + threadNumber.getAndIncrement(),
                              0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}

handler

Java 并发超出线程数和工作队列时候的任务请求处理策略,使用了策略设计模式。

ThreadPoolExecutor.AbortPolicy

在默认的处理策略。该处理在拒绝时抛出RejectedExecutionException,拒绝执行。

public static class AbortPolicy implements RejectedExecutionHandler {
     /**
      * Creates an {@code AbortPolicy}.
      */
     public AbortPolicy() { }

     /**
      * Always throws RejectedExecutionException.
      *
      * @param r the runnable task requested to be executed
      * @param e the executor attempting to execute this task
      * @throws RejectedExecutionException always
      */
     public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
         throw new RejectedExecutionException("Task " + r.toString() +
                                              " rejected from " +
                                              e.toString());
     }
 }

ThreadPoolExecutor.CallerRunsPolicy

调用 execute 方法的线程本身运行任务。这提供了一个简单的反馈控制机制,可以降低新任务提交的速度。

public static class CallerRunsPolicy implements RejectedExecutionHandler {
    /**
     * Creates a {@code CallerRunsPolicy}.
     */
    public CallerRunsPolicy() { }

    /**
     * Executes task r in the caller's thread, unless the executor
     * has been shut down, in which case the task is discarded.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}

ThreadPoolExecutor.DiscardOldestPolicy

如果执行程序未关闭,则删除工作队列头部的任务,然后重试执行(可能再次失败,导致重复执行)。

public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    /**
     * Creates a {@code DiscardOldestPolicy} for the given executor.
     */
    public DiscardOldestPolicy() { }

    /**
     * Obtains and ignores the next task that the executor
     * would otherwise execute, if one is immediately available,
     * and then retries execution of task r, unless the executor
     * is shut down, in which case task r is instead discarded.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            e.getQueue().poll();
            e.execute(r);
        }
    }
}

ThreadPoolExecutor.DiscardPolicy

无法执行的任务被简单地删除,将会丢弃当前任务,通过源码可以看出,该策略不会执行任务操作。

public static class DiscardPolicy implements RejectedExecutionHandler {
    /**
     * Creates a {@code DiscardPolicy}.
     */
    public DiscardPolicy() { }

    /**
     * Does nothing, which has the effect of discarding task r.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
}