Basically all programmers have used thread pools directly or indirectly. It is a double-edged sword. If you use it well, you will get twice the result with half the effort. If you use it incorrectly, it may directly destroy the server. So we need to understand the thread pool in order to use it better. Of course, it is mainly based on Java and SpringBoot.
Contents
1 thread pool
1.1 Why use thread pool
- The creation and destruction of threads is time-consuming, and pooling technology can reduce these costs.
- The thread pool generally maintains several core threads waiting for tasks to be executed, which can improve response speed.
- At the same time, using the thread pool can also more conveniently manage threads, such as controlling the number, monitoring execution status, etc.
1.2 How to create a thread pool
In the Java juc
package, there are classes provided Executors
. Generally, for convenience of use, you may directly use the static method of this class to create it. Among these static methods, they are all ThreadPoolExecutor
constructed based on classes. Let’s take a look at them separately.
1.2.1 ThreadPoolExecutor
The top priority of the thread pool is that we need to be very familiar with its various parameters and overall process. Regarding source code analysis, we will explain it in a separate blog later. Let’s take a look at its construction method with the most complete parameters.
public ThreadPoolExecutor ( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
Parameter explanation:
- corePoolSize: Number of core threads. The number of threads to keep in the thread pool, even if it is idle. Will not be reduced unless set
allowCoreThreadTimeOut
.prestartCoreThread
and can be calledprestartAllCoreThreads
to pre-create one or all core threads. - maximumPoolSize: maximum number of threads. The maximum number of threads allowed in the thread pool.
- keepAliveTime, unit: keepAliveTime. When the number of threads is greater than the number of core threads, the thread will be terminated if its idle time is greater than this value.
- workQueue: work queue. Blocking queue stores tasks that have not yet been executed.
- ArrayBlockingQueue: Array-based bounded blocking queue.
- LinkedBlockingQueue: A blocking queue based on linked list, the throughput is usually higher than ArrayBlockingQueue.
- SynchronousQueue: A blocking queue that does not store elements. Each insertion must wait for another thread to call the removal operation.
- PriorityBlockingQueue: Infinite blocking queue with priority.
- threadFactory: thread factory. Used to generate threads. The main setting is the thread name prefix for differentiation. This piece is generally built using
guava
the packages provided by Googlecom.google.common.util.concurrent.ThreadFactoryBuilder
. - handler: rejection policy. When the current thread pool reaches the maximum processing capacity, the rejection policy is executed. This is actually standard
策略模式
.- CallerRunsPolicy: When the current executor is not closed, the calling thread executes the rejected task.
- AbortPolicy: Reject execution of the current task and throw
RejectedExecutionException
an exception. - DiscardPolicy: Discard the current task directly and do nothing.
- DiscardOldestPolicy: If the current executor is not closed, discard the earliest task in the queue.
Overall execution process:
1.2.2 Executors built-in static methods
FixedThreadPool
public static ExecutorService newFixedThreadPool ( int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor (nThreads, nThreads, 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>(), threadFactory); }
Features:
- The number of core threads is the same as the maximum number of threads
- Survival time: 0. Because the number of core threads and the maximum number of threads are the same, this parameter is meaningless
- The blocking queue
LinkedBlockingQueue
can be considered an unbounded queue
When the number of thread pool threads reaches the maximum number of threads, subsequent tasks will be pushed into the blocking queue. OOM exception may be triggered.
SingleThreadExecutor
public static ExecutorService newSingleThreadExecutor (ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService ( new ThreadPoolExecutor ( 1 , 1 , 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>(), threadFactory)); }
Features:
- almost equivalent to
newFixedThreadPool(1)
- The actual return is
FinalizableDelegatedExecutorService
private static class FinalizableDelegatedExecutorService extends DelegatedExecutorService { FinalizableDelegatedExecutorService(ExecutorService executor) { super (executor); } @SuppressWarnings("deprecation") protected void finalize () { super .shutdown(); } }
It can be seen that finalize()
within the method, it is actively called super.shutdown()
. That is, when the executor is garbage collected, the actual executor will be actively shut down. However, it is still recommended to turn it off promptly when not in use.
Because using it LinkedBlockingQueue
may trigger an OOM exception.
newCachedThreadPool
public static ExecutorService newCachedThreadPool (ThreadFactory threadFactory) { return new ThreadPoolExecutor ( 0 , Integer.MAX_VALUE, 60L , TimeUnit.SECONDS, new SynchronousQueue <Runnable>(), threadFactory); }
Features:
- The number of core threads is 0, and the maximum number of threads is ∞ (can be considered infinite)
- Survival time 60 seconds
- The blocking queue is
SynchronousQueue
, capacity is 0
Each submitted task will immediately get a thread for execution, which may cause a large number of threads and trigger an OOM exception.
ScheduledThreadPool, SingleThreadScheduledExecutor
public static ScheduledExecutorService newScheduledThreadPool ( int corePoolSize, ThreadFactory threadFactory) { return new ScheduledThreadPoolExecutor (corePoolSize, threadFactory); } public static ScheduledExecutorService newSingleThreadScheduledExecutor (ThreadFactory threadFactory) { return new DelegatedScheduledExecutorService ( new ScheduledThreadPoolExecutor ( 1 , threadFactory)); }
Inherited and ThreadPoolExecutor
expanded its function, the submitted task can be delayed for a certain period of time or executed periodically.
WorkStealingPool
public static ExecutorService newWorkStealingPool ( int parallelism) { return new ForkJoinPool (parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null , true ); }
Introduced in 1.8, used ForkJoinPool
as implementation. The thread pool maintains enough threads to support a given level of parallelism, and multiple queues can be used to reduce contention. The actual number of threads can be dynamically increased or decreased. The execution order of submitted tasks is not guaranteed.
1.2.3 Thread pool submission task
- execute: Common submission methods have no return value.
- submit: Suitable for task submission that requires a return value. An
Future
object will be returned, through which you can determine whether the task has been executed and obtain the task execution result.
Future
It was introduced in 1.5, and again in 1.8 CompletableFuture
. CompletableFuture
I will talk about it in a separate article later.
1.2.4 Closing the thread pool
- shutdownNow: Set the current thread pool to
STOP
state, then stop all currently executing and suspended tasks, and return to the task to be executed list. - shutdown: Set the current thread to
SHUTDOWN
state, interrupt threads that are not executing tasks, and no longer accept new tasks, but previously submitted tasks will be executed.
1.3 Monitoring of thread pool
The thread pool itself provides many properties for obtaining:
- getPoolSize: the current number of threads in the thread pool
- getCorePoolSize: Number of thread pool core threads
- getActiveCount: Number of threads executing tasks
- getCompletedTaskCount: Approximate total number of completed tasks (the status of tasks and threads may change during the calculation process)
- getTaskCount: Approximate total number of all tasks
- getQueue: the task queue of the current thread pool
- getLargestPoolSize: The maximum number of threads in the thread pool ever
- getMaximumPoolSize: The maximum number of threads allowed in the thread pool
- getKeepAliveTime: thread pool thread survival time
- isShutdown: Whether the thread pool is closed (SHUTDOWN state)
- isTerminated: Whether the thread pool is in the TERMINATED state
At the same time, the thread pool also provides several protected
empty implementation methods. You can override, through inheritance beforeExecute(Thread t, Runnable r)
, afterExecute(Runnable r, Throwable t)
and terminated()
obtain the thread pool status at the extension point.
1.4 Cooperate with Prometheus to expose monitoring indicators
The main purpose of the thread pool in this example is to improve concurrency, but it does not want the task to be delayed. If there is currently no idle thread to execute the task, it will be executed by itself.
The overall logic is very simple. It is enabled internally SingleThreadScheduledExecutor
to collect thread pool data at certain intervals and io.micrometer.core.instrument.Metrics
expose it to the outside world.
@Slf4j @EnableAsync @Configuration public class ExecutorConfig implements InitializingBean , DisposableBean { public static final String THREAD_POOL_NAME_FUTURE = "futureExecutor" ; private static final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor(); private static final Iterable<Tag> TAG = Collections.singletonList(Tag.of( "thread.pool.name" , THREAD_POOL_NAME_FUTURE)); @Bean(name = THREAD_POOL_NAME_FUTURE) public Executor futureExecutor () { return new ThreadPoolExecutor ( 20 , 80 , 10 , TimeUnit.MINUTES, new SynchronousQueue <>(), new ThreadFactoryBuilder ().setNameFormat(THREAD_POOL_NAME_FUTURE + "-%d" ).build(), new ThreadPoolExecutor .CallerRunsPolicy() ); } @Override public void afterPropertiesSet () { scheduledExecutor.scheduleAtFixedRate(doCollect(), 30L , 5L , TimeUnit.SECONDS); } @Override public void destroy () { try { scheduledExecutor.shutdown(); } catch (Exception ignored) {} } private Runnable doCollect () { return () -> { try { ThreadPoolTaskExecutor exec = (ThreadPoolTaskExecutor) futureExecutor(); Metrics.gauge( "thread.pool.core.size" , TAG, exec, ThreadPoolTaskExecutor::getCorePoolSize); Metrics.gauge( "thread.pool.max.size" , TAG, exec, ThreadPoolTaskExecutor::getMaxPoolSize); Metrics.gauge( "thread.pool.keepalive.seconds" , TAG, exec, ThreadPoolTaskExecutor::getKeepAliveSeconds); // Metrics.gauge( "thread.pool.active.size" , TAG, exec, ThreadPoolTaskExecutor::getActiveCount); Metrics.gauge( "thread.pool.thread.count" , TAG, exec, ThreadPoolTaskExecutor::getPoolSize); // Metrics.gauge( "thread.pool.queue.size" , TAG, exec, e -> e.getThreadPoolExecutor ().getQueue().size()); // Metrics.gauge( "thread.pool.task.count" , TAG, exec, e -> e.getThreadPoolExecutor().getTaskCount()); Metrics.gauge( "thread.pool.task.completed.count" , TAG, exec, e -> e.getThreadPoolExecutor().getCompletedTaskCount()); } catch (Exception ex) { log.warn( "doCollect ex => {}" , ex.getLocalizedMessage()); } }; } }
1.5 Thread pool in SpringBoot
In Spring projects, and are often used @EnableAsync
to @Async
implement asynchronous tasks. By default, will be used ThreadPoolTaskExecutor
.
public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport implements AsyncListenableTaskExecutor , SchedulingTaskExecutor { private final Object poolSizeMonitor = new Object (); private int corePoolSize = 1 ; private int maxPoolSize = Integer.MAX_VALUE; private int keepAliveSeconds = 60 ; private int queueCapacity = Integer.MAX_VALUE; private boolean allowCoreThreadTimeOut = false ; private boolean prestartAllCoreThreads = false ;
The default deny policy is ThreadPoolExecutor.AbortPolicy()
. So generally speaking, we need to customize the thread pool.
2 things to note
2.1 Alibaba Java Development Manual
- [Mandatory] Please specify a meaningful thread name when creating a thread or thread pool to facilitate backtracking when an error occurs.
- This is easy to understand. Like
http-nio-8103-exec-1
this, you can tell at a glance that it is an http web related thread.
- This is easy to understand. Like
- [Mandatory] Thread resources must be provided through the thread pool, and explicit creation of threads in the application is not allowed.
- Refer to the benefits of thread pools
- [Mandatory] Thread pools are not allowed to be created using Executors, but through ThreadPoolExecutor. This processing method allows students who write to know more clearly the operating rules of the thread pool and avoid the risk of resource exhaustion.
2.2 Others
Regarding the number of core threads, the maximum number of threads, the survival time, and the selection and capacity of the blocking queue in the thread pool. There is no fixed formula.
Under normal circumstances, some distinctions will be made based on task types.
- CPU intensive tasks. In this case, the number of threads is generally set smaller. If there are too many threads, context switching may take a high proportion of time.
- IO intensive tasks. In this case, try to set up as many threads as possible. The ratio between request waiting time (WT) and service time (ST) can be roughly estimated. The thread pool size is set to N*(1+WT/ST).
However, it is best to monitor indicators and continuously adjust and optimize according to business scenarios. The initial value can be determined by doing some stress testing in a test environment.
3 Postscript
Only by using it rationally can you get twice the result with half the effort. You must always hold it in awe, take preventive measures before they occur, and monitor indicators well.
Two more holes remain:
- [ ] ThreadPoolExecutor source code
- [ ] CompletableFuture related