Some knowledge related to thread pool

Basically all programmers have used thread pools directly or indirectly. It is a double-edged sword. If you use it well, you will get twice the result with half the effort. If you use it incorrectly, it may directly destroy the server. So we need to understand the thread pool in order to use it better. Of course, it is mainly based on Java and SpringBoot.

1 thread pool

1.1 Why use thread pool

  • The creation and destruction of threads is time-consuming, and pooling technology can reduce these costs.
  • The thread pool generally maintains several core threads waiting for tasks to be executed, which can improve response speed.
  • At the same time, using the thread pool can also more conveniently manage threads, such as controlling the number, monitoring execution status, etc.

1.2 How to create a thread pool

In the Java jucpackage, there are classes provided Executors. Generally, for convenience of use, you may directly use the static method of this class to create it. Among these static methods, they are all ThreadPoolExecutorconstructed based on classes. Let’s take a look at them separately.

1.2.1 ThreadPoolExecutor

The top priority of the thread pool is that we need to be very familiar with its various parameters and overall process. Regarding source code analysis, we will explain it in a separate blog later. Let’s take a look at its construction method with the most complete parameters.

public  ThreadPoolExecutor ( int corePoolSize,
                           int maximumPoolSize,
                           long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) 

Parameter explanation:

  • corePoolSize: Number of core threads. The number of threads to keep in the thread pool, even if it is idle. Will not be reduced unless set allowCoreThreadTimeOutprestartCoreThreadand can be called prestartAllCoreThreadsto pre-create one or all core threads.
  • maximumPoolSize: maximum number of threads. The maximum number of threads allowed in the thread pool.
  • keepAliveTime, unit: keepAliveTime. When the number of threads is greater than the number of core threads, the thread will be terminated if its idle time is greater than this value.
  • workQueue: work queue. Blocking queue stores tasks that have not yet been executed.
    • ArrayBlockingQueue: Array-based bounded blocking queue.
    • LinkedBlockingQueue: A blocking queue based on linked list, the throughput is usually higher than ArrayBlockingQueue.
    • SynchronousQueue: A blocking queue that does not store elements. Each insertion must wait for another thread to call the removal operation.
    • PriorityBlockingQueue: Infinite blocking queue with priority.
  • threadFactory: thread factory. Used to generate threads. The main setting is the thread name prefix for differentiation. This piece is generally built using guavathe packages provided by Google com.google.common.util.concurrent.ThreadFactoryBuilder.
  • handler: rejection policy. When the current thread pool reaches the maximum processing capacity, the rejection policy is executed. This is actually standard 策略模式.
    • CallerRunsPolicy: When the current executor is not closed, the calling thread executes the rejected task.
    • AbortPolicy: Reject execution of the current task and throw RejectedExecutionExceptionan exception.
    • DiscardPolicy: Discard the current task directly and do nothing.
    • DiscardOldestPolicy: If the current executor is not closed, discard the earliest task in the queue.

Overall execution process:

Thread pool-overall execution process-20221206.png
1.2.2 Executors built-in static methods

FixedThreadPool

    public  static ExecutorService newFixedThreadPool ( int nThreads, ThreadFactory threadFactory) {
     return  new  ThreadPoolExecutor (nThreads, nThreads,
                                   0L , TimeUnit.MILLISECONDS,
                                   new  LinkedBlockingQueue <Runnable>(),
                                  threadFactory);
}

Features:

  • The number of core threads is the same as the maximum number of threads
  • Survival time: 0. Because the number of core threads and the maximum number of threads are the same, this parameter is meaningless
  • The blocking queue LinkedBlockingQueuecan be considered an unbounded queue

When the number of thread pool threads reaches the maximum number of threads, subsequent tasks will be pushed into the blocking queue. OOM exception may be triggered.

SingleThreadExecutor

public  static ExecutorService newSingleThreadExecutor (ThreadFactory threadFactory) {
     return  new  FinalizableDelegatedExecutorService 
        ( new  ThreadPoolExecutor ( 1 , 1 ,
                                 0L , TimeUnit.MILLISECONDS,
                                 new  LinkedBlockingQueue <Runnable>(),
                                threadFactory));
}

Features:

  • almost equivalent tonewFixedThreadPool(1)
  • The actual return isFinalizableDelegatedExecutorService
private  static  class  FinalizableDelegatedExecutorService 
        extends  DelegatedExecutorService {
    FinalizableDelegatedExecutorService(ExecutorService executor) {
        super (executor);
    }
    @SuppressWarnings("deprecation") 
    protected  void  finalize () {
         super .shutdown();
    }
}

It can be seen that finalize()within the method, it is actively called super.shutdown(). That is, when the executor is garbage collected, the actual executor will be actively shut down. However, it is still recommended to turn it off promptly when not in use.

Because using it LinkedBlockingQueuemay trigger an OOM exception.

newCachedThreadPool

public  static ExecutorService newCachedThreadPool (ThreadFactory threadFactory) {
     return  new  ThreadPoolExecutor ( 0 , Integer.MAX_VALUE,
                                   60L , TimeUnit.SECONDS,
                                   new  SynchronousQueue <Runnable>(),
                                  threadFactory);
}

Features:

  • The number of core threads is 0, and the maximum number of threads is ∞ (can be considered infinite)
  • Survival time 60 seconds
  • The blocking queue is SynchronousQueue, capacity is 0

Each submitted task will immediately get a thread for execution, which may cause a large number of threads and trigger an OOM exception.

ScheduledThreadPool, SingleThreadScheduledExecutor

public  static ScheduledExecutorService newScheduledThreadPool (
         int corePoolSize, ThreadFactory threadFactory) {
     return  new  ScheduledThreadPoolExecutor (corePoolSize, threadFactory);
}

public  static ScheduledExecutorService newSingleThreadScheduledExecutor (ThreadFactory threadFactory) {
     return  new  DelegatedScheduledExecutorService 
        ( new  ScheduledThreadPoolExecutor ( 1 , threadFactory));
}

Inherited and ThreadPoolExecutorexpanded its function, the submitted task can be delayed for a certain period of time or executed periodically.

WorkStealingPool

public  static ExecutorService newWorkStealingPool ( int parallelism) {
     return  new  ForkJoinPool
        (parallelism,
         ForkJoinPool.defaultForkJoinWorkerThreadFactory,
         null , true );
}

Introduced in 1.8, used ForkJoinPoolas implementation. The thread pool maintains enough threads to support a given level of parallelism, and multiple queues can be used to reduce contention. The actual number of threads can be dynamically increased or decreased. The execution order of submitted tasks is not guaranteed.

1.2.3 Thread pool submission task
  • execute: Common submission methods have no return value.
  • submit: Suitable for task submission that requires a return value. An Futureobject will be returned, through which you can determine whether the task has been executed and obtain the task execution result.

FutureIt was introduced in 1.5, and again in 1.8 CompletableFutureCompletableFutureI will talk about it in a separate article later.

1.2.4 Closing the thread pool
  • shutdownNow: Set the current thread pool to STOPstate, then stop all currently executing and suspended tasks, and return to the task to be executed list.
  • shutdown: Set the current thread to SHUTDOWNstate, interrupt threads that are not executing tasks, and no longer accept new tasks, but previously submitted tasks will be executed.

1.3 Monitoring of thread pool

The thread pool itself provides many properties for obtaining:

  • getPoolSize: the current number of threads in the thread pool
  • getCorePoolSize: Number of thread pool core threads
  • getActiveCount: Number of threads executing tasks
  • getCompletedTaskCount: Approximate total number of completed tasks (the status of tasks and threads may change during the calculation process)
  • getTaskCount: Approximate total number of all tasks
  • getQueue: the task queue of the current thread pool
  • getLargestPoolSize: The maximum number of threads in the thread pool ever
  • getMaximumPoolSize: The maximum number of threads allowed in the thread pool
  • getKeepAliveTime: thread pool thread survival time
  • isShutdown: Whether the thread pool is closed (SHUTDOWN state)
  • isTerminated: Whether the thread pool is in the TERMINATED state

At the same time, the thread pool also provides several protectedempty implementation methods. You can override, through inheritance beforeExecute(Thread t, Runnable r)afterExecute(Runnable r, Throwable t)and terminated()obtain the thread pool status at the extension point.

1.4 Cooperate with Prometheus to expose monitoring indicators

The main purpose of the thread pool in this example is to improve concurrency, but it does not want the task to be delayed. If there is currently no idle thread to execute the task, it will be executed by itself.

The overall logic is very simple. It is enabled internally SingleThreadScheduledExecutorto collect thread pool data at certain intervals and io.micrometer.core.instrument.Metricsexpose it to the outside world.

@Slf4j 
@EnableAsync 
@Configuration 
public  class  ExecutorConfig  implements  InitializingBean , DisposableBean {
     public  static  final  String  THREAD_POOL_NAME_FUTURE  =  "futureExecutor" ;

    private  static  final  ScheduledExecutorService  scheduledExecutor  = Executors.newSingleThreadScheduledExecutor();
    
    private  static  final Iterable<Tag> TAG = Collections.singletonList(Tag.of( "thread.pool.name" , THREAD_POOL_NAME_FUTURE));

    @Bean(name = THREAD_POOL_NAME_FUTURE) 
    public Executor futureExecutor () {
         return  new  ThreadPoolExecutor (
                 20 ,
                 80 ,
                 10 ,
                TimeUnit.MINUTES,
                new  SynchronousQueue <>(),
                 new  ThreadFactoryBuilder ().setNameFormat(THREAD_POOL_NAME_FUTURE + "-%d" ).build(),
                 new  ThreadPoolExecutor .CallerRunsPolicy()
        );
    }

    @Override 
    public  void  afterPropertiesSet () {
        scheduledExecutor.scheduleAtFixedRate(doCollect(), 30L , 5L , TimeUnit.SECONDS);
    }

    @Override 
    public  void  destroy () {
         try {
            scheduledExecutor.shutdown();
        } catch (Exception ignored) {}
    }

    private Runnable doCollect () {
         return () -> {
             try {
                 ThreadPoolTaskExecutor  exec  = (ThreadPoolTaskExecutor) futureExecutor();
                Metrics.gauge( "thread.pool.core.size" , TAG, exec, ThreadPoolTaskExecutor::getCorePoolSize);
                Metrics.gauge( "thread.pool.max.size" , TAG, exec, ThreadPoolTaskExecutor::getMaxPoolSize);
                Metrics.gauge( "thread.pool.keepalive.seconds" , TAG, exec, ThreadPoolTaskExecutor::getKeepAliveSeconds);
                 // 
                Metrics.gauge( "thread.pool.active.size" , TAG, exec, ThreadPoolTaskExecutor::getActiveCount);
                Metrics.gauge( "thread.pool.thread.count" , TAG, exec, ThreadPoolTaskExecutor::getPoolSize);
                 // 
                Metrics.gauge( "thread.pool.queue.size" , TAG, exec, e -> e.getThreadPoolExecutor ().getQueue().size());
                 // 
                Metrics.gauge( "thread.pool.task.count" , TAG, exec, e -> e.getThreadPoolExecutor().getTaskCount());
                Metrics.gauge( "thread.pool.task.completed.count" , TAG, exec, e -> e.getThreadPoolExecutor().getCompletedTaskCount());
            } catch (Exception ex) {
                log.warn( "doCollect ex => {}" , ex.getLocalizedMessage());
            }
        };
    }
}

1.5 Thread pool in SpringBoot

In Spring projects, and are often used @EnableAsyncto @Asyncimplement asynchronous tasks. By default, will be used ThreadPoolTaskExecutor.

public  class  ThreadPoolTaskExecutor  extends  ExecutorConfigurationSupport 
        implements  AsyncListenableTaskExecutor , SchedulingTaskExecutor {

    private  final  Object  poolSizeMonitor  =  new  Object ();

    private  int  corePoolSize  =  1 ;

    private  int  maxPoolSize  = Integer.MAX_VALUE;

    private  int  keepAliveSeconds  =  60 ;

    private  int  queueCapacity  = Integer.MAX_VALUE;

    private  boolean  allowCoreThreadTimeOut  =  false ;

    private  boolean  prestartAllCoreThreads  =  false ;

The default deny policy is ThreadPoolExecutor.AbortPolicy(). So generally speaking, we need to customize the thread pool.

2 things to note

2.1 Alibaba Java Development Manual

  • [Mandatory] Please specify a meaningful thread name when creating a thread or thread pool to facilitate backtracking when an error occurs.
    • This is easy to understand. Like http-nio-8103-exec-1this, you can tell at a glance that it is an http web related thread.
  • [Mandatory] Thread resources must be provided through the thread pool, and explicit creation of threads in the application is not allowed.
    • Refer to the benefits of thread pools
  • [Mandatory] Thread pools are not allowed to be created using Executors, but through ThreadPoolExecutor. This processing method allows students who write to know more clearly the operating rules of the thread pool and avoid the risk of resource exhaustion.

2.2 Others

Regarding the number of core threads, the maximum number of threads, the survival time, and the selection and capacity of the blocking queue in the thread pool. There is no fixed formula.

Under normal circumstances, some distinctions will be made based on task types.

  • CPU intensive tasks. In this case, the number of threads is generally set smaller. If there are too many threads, context switching may take a high proportion of time.
  • IO intensive tasks. In this case, try to set up as many threads as possible. The ratio between request waiting time (WT) and service time (ST) can be roughly estimated. The thread pool size is set to N*(1+WT/ST).

However, it is best to monitor indicators and continuously adjust and optimize according to business scenarios. The initial value can be determined by doing some stress testing in a test environment.

3 Postscript

Only by using it rationally can you get twice the result with half the effort. You must always hold it in awe, take preventive measures before they occur, and monitor indicators well.

Two more holes remain:

  • [ ] ThreadPoolExecutor source code
  • [ ] CompletableFuture related