This article mainly studies how parallelStream uses a custom thread pool
ForkJoinPool
java/util/concurrent/ForkJoinPool.java
public class ForkJoinPool extends AbstractExecutorService { public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, boolean asyncMode) { this(checkParallelism(parallelism), checkFactory(factory), handler, asyncMode ? FIFO_QUEUE : LIFO_QUEUE, "ForkJoinPool-" + nextPoolId() + "-worker-"); checkPermission(); } private ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, int mode, String workerNamePrefix) { this.workerNamePrefix = workerNamePrefix; this.factory = factory; this.ueh = handler; this.config = (parallelism & SMASK) | mode; long np = (long)(-parallelism); // offset ctl counts this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK); } private static ForkJoinPool makeCommonPool() { int parallelism = -1; ForkJoinWorkerThreadFactory factory = null; UncaughtExceptionHandler handler = null; try { // ignore exceptions in accessing/parsing properties String pp = System.getProperty ("java.util.concurrent.ForkJoinPool.common.parallelism"); String fp = System.getProperty ("java.util.concurrent.ForkJoinPool.common.threadFactory"); String hp = System.getProperty ("java.util.concurrent.ForkJoinPool.common.exceptionHandler"); if (pp != null) parallelism = Integer.parseInt(pp); if (fp != null) factory = ((ForkJoinWorkerThreadFactory)ClassLoader. getSystemClassLoader().loadClass(fp).newInstance()); if (hp != null) handler = ((UncaughtExceptionHandler)ClassLoader. getSystemClassLoader().loadClass(hp).newInstance()); } catch (Exception ignore) { } if (factory == null) { if (System.getSecurityManager() == null) factory = new DefaultCommonPoolForkJoinWorkerThreadFactory(); else // use security-managed default factory = new InnocuousForkJoinWorkerThreadFactory(); } if (parallelism < 0 && // default 1 less than #cores (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0) parallelism = 1; if (parallelism > MAX_CAP) parallelism = MAX_CAP; return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE, "ForkJoinPool.commonPool-worker-"); } }
parallelStream uses common ForkJoinPool by default, and parallelism, etc. can be set through system properties.
ForkJoinPoolFactoryBean
org/springframework/scheduling/concurrent/ForkJoinPoolFactoryBean.java
public class ForkJoinPoolFactoryBean implements FactoryBean<ForkJoinPool>, InitializingBean, DisposableBean { private boolean commonPool = false; private int parallelism = Runtime.getRuntime().availableProcessors(); private ForkJoinPool.ForkJoinWorkerThreadFactory threadFactory = ForkJoinPool.defaultForkJoinWorkerThreadFactory; @Nullable private Thread.UncaughtExceptionHandler uncaughtExceptionHandler; private boolean asyncMode = false; private int awaitTerminationSeconds = 0; @Nullable private ForkJoinPool forkJoinPool; //...... @Override public void destroy() { if (this.forkJoinPool != null) { // Ignored for the common pool. this.forkJoinPool.shutdown(); // Wait for all tasks to terminate - works for the common pool as well. if (this.awaitTerminationSeconds > 0) { try { this.forkJoinPool.awaitTermination(this.awaitTerminationSeconds, TimeUnit.SECONDS); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); } } } } }
spring3.1 provides ForkJoinPoolFactoryBean, which can be used to create and host forkJoinPool
Example
Configuration
@Configuration public class ForkJoinConfig { @Bean public ForkJoinPoolFactoryBean forkJoinPoolFactoryBean() { ForkJoinPoolFactoryBean factoryBean = new ForkJoinPoolFactoryBean(); factoryBean.setCommonPool(false); // NOTE LIFO_QUEUE FOR working steal from tail of queue factoryBean.setAsyncMode(true); // NOTE true FIFO_QUEUE, false LIFO_QUEUE factoryBean.setParallelism(10); // factoryBean.setUncaughtExceptionHandler(); factoryBean.setAwaitTerminationSeconds(60); return factoryBean; } }
use
@Autowired ForkJoinPoolFactoryBean forkJoinPoolFactoryBean; public void streamParallel() throws ExecutionException, InterruptedException { List<TodoTask> result = forkJoinPoolFactoryBean.getObject().submit(new Callable<List<TodoTask>>() { @Override public List<TodoTask> call() throws Exception { return IntStream.rangeClosed(1, 20).parallel().mapToObj(i -> { log.info("thread:{}", Thread.currentThread().getName()); return new TodoTask(i, "name"+i); }).collect(Collectors.toList()); } }).get(); result.stream().forEach(System.out::println); }
The common workerName prefix is ForkJoinPool.commonPool-worker -the customized workerName prefix defaults to ForkJoinPool- nextPoolId() -worker-
summary
parallelStream uses commonPool by default, which is the default initialization of static code blocks. For individual scenarios, you can customize ForkJoinPool and throw parallelStream into it as a task, so that it will not affect the default commonPool.