0%

深入浅出线程池

前言

鄙人在公司实习时,看见了公司的自定义线程池结合CompletableFuture异步获取结果,对线程池的实现感兴趣,于是花了些时间来解决来弄清楚线程池的执行原理
本文将会探讨
Executor、Executors、ExecutorService、AbstractExecutorService、ThreadPoolExecutor、ForkJoinPool之间的关系
先简单介绍一下之间的关系,Executor是最底层的定义,ExecutorService继承了Executor

AbstractExecutorService是ExecutorService的默认实现,而ThreadPoolExecutor和ForkPool都是AbstrackExecutor的子类

Executor ——> 一切的根源


Executor接口是JUC提供的一个接口,通常用于提交Runnable任务,用于显示替代创建线程。

ExecutorService ——> 线程池管理接口

ExecutorService 是 Java 中用于管理和控制线程池的接口。它提供了管理线程生命周期的方法,并允许提交任务以供执行。换句话说,Executor中只定义了一个execute方法,管理线程池的方法全都是在这个接口中定义的

使用示例

// 创建一个固定大小的线程池  
    ExecutorService executor = Executors.newFixedThreadPool(10);  
  
    // 提交一个异步任务  
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {  
        // 模拟长时间运行的任务  
        try {  
            Thread.sleep(2000);  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }        return "Hello, World!";  
    }, executor);  
  
    // 处理异步任务的结果  
    future.thenAccept(result -> {  
        System.out.println("Result: " + result);  
    });  
    // 关闭线程池  
    executor.shutdown();  
}

本文仅仅探究线程池的使用,关于CompletableFuture的部分,鄙人还有另外一篇源码解析的文章,欢迎观看!
这里简单提一下,CompletableFuture实际上是使用我们传进去的线程池参数来执行任务的,会调用线程池Executor中定义的execute方法,而这个地方这个方法的实现者是ThreadPoolExecutor,后文会详细分析,这里先简单介绍。

源码解析

public interface ExecutorService extends Executor {

    // 启动有序关闭,已提交任务会执行,但不接受新任务。
    void shutdown();

    // 尝试停止所有正在执行的任务,返回等待执行的任务列表。
    List<Runnable> shutdownNow();

    // 返回此执行器是否已关闭。
    boolean isShutdown();

    // 返回所有任务是否在关闭后已完成。
    boolean isTerminated();

    // 阻塞直到所有任务在关闭请求后完成,或超时,或当前线程中断。
    boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

    // 提交一个返回值的任务用于执行,并返回表示任务结果的Future。
    <T> Future<T> submit(Callable<T> task);

    // 提交一个Runnable任务用于执行,并返回表示任务的Future。
    <T> Future<T> submit(Runnable task, T result);

    // 提交一个Runnable任务用于执行,并返回表示任务的Future。
    Future<?> submit(Runnable task);

    // 执行给定的任务,返回表示其状态和结果的Future列表。
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;

    // 执行给定的任务,返回表示其状态和结果的Future列表。在所有任务完成或超时后返回。
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;

    // 执行给定的任务,返回一个已成功完成的任务结果。未完成的任务会被取消。
    <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;

    // 执行给定的任务,返回一个已成功完成的任务结果。在成功完成或超时前返回,未完成的任务会被取消。
    <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

可以发现,ExecutorService定义了一系列线程池管理方法

AbstractExecutorService ——> ExecutorService 接口的默认实现

源码

/**
 * 提供了 ExecutorService 的默认实现
 * 实现了 submit, invokeAny 和 invokeAll 方法
 */
public abstract class AbstractExecutorService implements ExecutorService {

    // 构造函数
    public AbstractExecutorService() {}

    // 返回一个 RunnableFuture,用于包装 Runnable 任务和默认值
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

    // 返回一个 RunnableFuture,用于包装 Callable 任务
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

    // 提交一个 Runnable 任务
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

    // 提交一个 Runnable 任务,并返回一个 Future
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

    // 提交一个 Callable 任务
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

    // 执行一组任务,返回第一个成功完成的任务结果
    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                              boolean timed, long nanos)
        throws InterruptedException, ExecutionException, TimeoutException {
        // 省略具体步骤
    }

    // 执行一组任务,返回第一个成功完成的任务结果
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException {
        // 省略具体步骤
    }

    // 执行一组任务,返回第一个成功完成的任务结果,带超时
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                           long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        // 省略具体步骤
    }

    // 执行一组任务,返回所有任务的 Future 列表
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {
        // 省略具体步骤
    }

    // 执行一组任务,返回所有任务的 Future 列表,带超时
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                         long timeout, TimeUnit unit)
        throws InterruptedException {
        // 省略具体步骤
    }

    // 取消所有 Future
    private static <T> void cancelAll(ArrayList<Future<T>> futures) {
        // 省略具体步骤
    }

    // 取消从索引 j 开始的所有 Future
    private static <T> void cancelAll(ArrayList<Future<T>> futures, int j) {
        // 省略具体步骤
    }
}

源码中实现并定义了一些线程池的管理和操作,有一个地方是比较有意思的

AbstractExecutorService并没有实现execute方法,从而这个地方的调用实际上是调用的AbstractExecutorService的子类实现的方法,这中设计方法叫做模板方法模式

模板方法模式

由父类定义一个算法的骨架,将一些步骤实现延迟到子类中,子类可以重写这些步骤来实特定的行为,但是算法的整体逻辑由父类来设计

回到上面的示例代码中

ExecutorService executor = Executors.newFixedThreadPool(10);  
  
// 提交一个异步任务  
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {  
    // 模拟长时间运行的任务  
    try {  
        Thread.sleep(2000);  
    } catch (InterruptedException e) {  
        e.printStackTrace();  
    }        return "Hello, World!";  
}, executor);

当我们进入到Executors中,我们就能找到真相
原来如此,execute实际调用的是ThreadPoolExecutor中的实现!

ThreadPoolExecutor ——> 灵活的方式来创建和管理线程池

使用示例

ThreadPoolExecutor executor = new ThreadPoolExecutor(
    corePoolSize, // 核心线程数
    maximumPoolSize, // 最大线程数
    keepAliveTime, // 线程空闲时间
    TimeUnit.MILLISECONDS, // 时间单位
    new LinkedBlockingQueue<Runnable>(), // 任务队列
    new ThreadPoolExecutor.AbortPolicy() // 拒绝策略
);
executor.execute(task); // 提交任务
executor.shutdown(); // 关闭线程池

核心参数

参数 意义
corePoolSize 线程池中的核心线程数
workQueue 存放提交的task
maximumPoolSize 线程池中允许的最大线程数
threadFactory 线程工厂, 用来创建线程, 由Executors#defaultThreadFactory实现
keepAliveTime 空闲线程存活时间(默认是临时线程, 也可设置为核心线程)
unit 空闲线程存活时间单位枚举

工作队列相关

线程池的队列主要有以下几种:

ArrayBlockingQueue

有界阻塞队列,性能高,不能动态扩展,高并发下可能会导致线程阻塞

LinkedBlockingQueue

Executors.newFixedThreadPool默认的队列就是这个队列,容易造成内存耗尽,
所以不建议直接使用Executors来new一个线程池

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

链表实现的有界或者无界队列,容量可以动态扩展,高并发下性能好,缺点:内存占用大,有可能会导致资源耗尽

SynchronousQueue

不存储元素的队列,每个插入操作必须等待一个相应的删除操作,适用于任务交给工作线程来。
缺点:
高并发场景下,可能会造成线程阻塞,不适合需要缓存任务的场景

PriorityBlockingQueue

优点:基于优先级的无界阻塞队列,元素按照优先级顺序出队,适用于需要任务优先级调度的场景。
缺点:插入和删除操作的时间复杂度为 O(log n),性能较低;无界队列可能导致内存溢出。

DelayQueue

优点:基于优先级队列的无界阻塞队列,元素只有在其延迟期满后才能出队,适用于需要延迟执行任务的场景。
缺点:插入和删除操作的时间复杂度为 O(log n),性能较低;无界队列可能导致内存溢出。

拒绝策略

CallerRunsPolicy

如果线程池没关闭将其提交到线程池当中,直接执行execute
如果线程池关闭,则直接丢弃

AbortPolicy

拒绝并抛弃任务,抛出异常

DiscardPolicy

直接抛弃任务,做任何操作。

DiscardOldestPolicy

如果线程池没有关闭,丢弃最老的任务,然后将任务提交并执行
如果线程池关闭,就直接丢弃

关于线程创建后任务会如何执行

  1. 线程数小于核心线程数时,直接创建线程来执行任务
  2. 大于核心线程且工作队列没满就将任务提交给工作队列
  3. 大于核心线程数且队列已满,直接创建一个临时线程来处理任务
  4. 大于最大线程数,工作队列已满,执行拒绝策略

源码解析

当我们进入到这个类中,会发现有好多代码,这里就不贴出来了,我们只挑重点的来解析,我们来顺着上面的示例代码来深入

定义和属性

public class ThreadPoolExecutor extends AbstractExecutorService {
	// 用于同时存储线程池状态和工作线程数量的原子整数
	private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
	
	// 用于存储线程数量的位数
	private static final int COUNT_BITS = Integer.SIZE - 3

	// 用于提取线程数量部分的掩码
	private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
	
	// runState 存储在高位
	// 线程池正在运行,可以接受新任务并处理队列中的任务
	private static final int RUNNING    = -1 << COUNT_BITS;
	
	// 线程池关闭状态,不再接受新任务,但会处理队列中的任务
	private static final int SHUTDOWN   =  0 << COUNT_BITS;
	
	// 线程池停止状态,不再接受新任务,也不处理队列中的任务,并中断正在执行的任务
	private static final int STOP       =  1 << COUNT_BITS;
	
	// 线程池整理状态,所有任务都已终止,工作线程数量为 0
	private static final int TIDYING    =  2 << COUNT_BITS;
	
	// 线程池终止状态,terminated() 方法已完成
	private static final int TERMINATED =  3 << COUNT_BITS;
		

	// 阻塞队列,主要用于生产者-消费者模式,在多线程环境中非常有用。它提供了线程安全的方式来在队列中添加和移除元素,并且在队列为空或已满时,线程可以被阻塞,直到有空间或元素可用。
	private final BlockingQueue<Runnable> workQueue;
	
	// 访问 workers 集合和相关簿记的锁
	private final ReentrantLock mainLock = new ReentrantLock();
	
	// 包含线程池中所有工作线程的集合
	private final HashSet<Worker> workers = new HashSet<>();
	
	// 支持 awaitTermination 的条件
	private final Condition termination = mainLock.newCondition();
	
	// 记录达到的最大线程池大小
	private int largestPoolSize;
	
	// 已完成任务的计数器
	private long completedTaskCount;
	
	// 所有用户控制参数都声明为 volatile,以便基于最新值进行操作
	
	// 用于创建新线程的工厂
	private volatile ThreadFactory threadFactory;
	
	// 执行时饱和或关闭时调用的处理程序
	private volatile RejectedExecutionHandler handler;
	
	// 空闲线程等待工作的超时时间(纳秒)
	private volatile long keepAliveTime;
	
	// 如果为 false(默认),核心线程即使在空闲时也保持存活
	// 如果为 true,核心线程使用 keepAliveTime 超时等待工作
	private volatile boolean allowCoreThreadTimeOut;
	
	// 核心线程池大小
	private volatile int corePoolSize;
	
	// 最大线程池大小
	private volatile int maximumPoolSize;
}

我们点进去new ThreadPoolExecutor

public ThreadPoolExecutor(int corePoolSize,  
                          int maximumPoolSize,  
                          long keepAliveTime,  
                          TimeUnit unit,  
                          BlockingQueue<Runnable> workQueue,  
                          RejectedExecutionHandler handler) {  
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,  
         Executors.defaultThreadFactory(), handler);  
}

从这里我们可以看见有一个参数是Executors中的工厂方法,这里我们稍后再提
顺着this去翻找
发现这里仅仅进行了一些赋值,emm,线索好像断了。没关系,还记得我们上文提到的execute方法吗?ThreadPoolExecutor中需要实现这个方法,让我们去看看吧

public void execute(Runnable command) {
    // 如果传入的任务为空,则抛出 NullPointerException
    if (command == null)
        throw new NullPointerException();
    
    // 获取当前的控制状态 ctl
    int c = ctl.get();
    
    // 如果当前运行的线程数少于 corePoolSize,尝试添加一个新的工作线程来执行任务
    if (workerCountOf(c) < corePoolSize) {
        // 如果成功添加了工作线程,则返回
        if (addWorker(command, true))
            return;
        // 否则,重新获取当前的控制状态 ctl
        c = ctl.get();
    }
    
    // 如果线程池正在运行,并且任务可以成功加入到工作队列中
    if (isRunning(c) && workQueue.offer(command)) {
        // 再次检查线程池的运行状态
        int recheck = ctl.get();
        // 如果线程池不再运行且任务成功移除,则拒绝任务
        if (!isRunning(recheck) && remove(command))
            reject(command);
        // 如果没有运行的工作线程,尝试添加一个新的工作线程
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    } 
    // 如果任务无法加入队列,尝试添加一个新的工作线程
    else if (!addWorker(command, false))
        // 如果添加工作线程失败,则拒绝任务
        reject(command);
}

这里就是我们上文提到的提交任务时会遇到的情况:

  1. 运行中的线程数 < 核心线程数,就尝试添加一个新的工作线程来执行任务
  2. 大于核心线程数,就尝试将任务加到工作队列中
  3. 如果无法加入队列就尝试添加一个新的工作线程
  4. 再失败就拒绝任务
    接着来看addWorker
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (int c = ctl.get();;) {
            // 仅在必要时检查队列是否为空
            if (runStateAtLeast(c, SHUTDOWN)
                && (runStateAtLeast(c, STOP)
                    || firstTask != null
                    || workQueue.isEmpty()))
                return false;
    
            for (;;) {
                // 如果工作线程数量超过限制,返回 false
                if (workerCountOf(c)
                    >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                    return false;
                // 尝试增加工作线程数量
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // 重新获取 ctl
                if (runStateAtLeast(c, SHUTDOWN))
                    continue retry;
                // 否则 CAS 失败,重试内部循环
            }
        }
    
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask); // 创建新的 Worker
            final Thread t = w.thread; // 获取 Worker 的线程
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock(); // 加锁
                try {
                    // 再次检查线程池的运行状态
                    int c = ctl.get();
    
                    // 如果线程池正在运行,或者线程池未停止且任务为空
                    if (isRunning(c) ||
                        (runStateLessThan(c, STOP) && firstTask == null)) {
                        if (t.getState() != Thread.State.NEW)
                            throw new IllegalThreadStateException();
                        workers.add(w); // 添加 Worker 到集合
                        workerAdded = true;
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s; // 更新最大池大小
                    }
                } finally {
                    mainLock.unlock(); // 解锁
                }
                if (workerAdded) {
                    t.start(); // 启动线程
                    workerStarted = true;
                }
            }
        } finally {
            if (!workerStarted)
                addWorkerFailed(w); // 如果线程启动失败,执行失败处理
        }
        return workerStarted;
    }
    我们到这里就很清楚了,所谓的线程池不过是管理的一堆线程的工具,实际上就是帮助我们判断合适去新增线程何时复用线程。
    最后让我们去看看线程复用是如何实现的!

线程复用的奥秘

ThreadPoolExecutor中有一个静态内部类Worker

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
    private static final long serialVersionUID = 6138294804551838833L;

    // 运行此 Worker 的线程。如果工厂创建线程失败,则为 null
    @SuppressWarnings("serial") // 不太可能是可序列化的
    final Thread thread;

    // 要运行的初始任务。可能为 null
    @SuppressWarnings("serial") // 静态类型为非可序列化
    Runnable firstTask;

    // 每个线程的任务计数器
    volatile long completedTasks;

    // 创建具有给定初始任务和线程工厂的 Worker
    Worker(Runnable firstTask) {
        setState(-1); // 在 runWorker 之前禁止中断
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    // 将主运行循环委托给外部的 runWorker 方法
    public void run() {
        runWorker(this);
    }

    // 锁定以防止中断任务
    protected boolean isHeldExclusively() {
        return getState() != 0;
    }

    // 尝试获取锁
    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    // 尝试释放锁
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    // 锁定此 Worker
    public void lock() {
        acquire(1);
    }

    // 尝试锁定此 Worker
    public boolean tryLock() {
        return tryAcquire(1);
    }

    // 解锁此 Worker
    public void unlock() {
        release(1);
    }

    // 检查当前线程是否持有锁
    public boolean isLocked() {
        return isHeldExclusively();
    }

    // 中断此 Worker 的线程
    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

Worker是定义了一个线程和要执行的初始任务,而线程复用要关注到这行

跳转之后,发现是调用的ThreadPoolExecutor中定义的runWorker方法

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // 允许中断
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // 如果线程池正在停止,确保线程被中断;
            // 如果没有,确保线程没有被中断。这需要在第二种情况下重新检查,
            // 以处理 shutdownNow 竞赛,同时清除中断
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                try {
                    task.run();
                    afterExecute(task, null);
                } catch (Throwable ex) {
                    afterExecute(task, ex);
                    throw ex;
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

发现这里的逻辑:

  1. 获取当前线程、获取Worker中的初始任务并清空,解锁Worker允许中断
  2. 只要任务不为null或者从getTask,也就是不停的从wokerQueue中获取到新任务就一直循环执行,从而达到线程复用
  3. 检测线程池状态,进行一些钩子方法
    到这里我们才真正找到线程池的奥秘,原来是通过循环来不断从任务队列中获取任务执行,而不是执行完任务之后就终止线程!!!

线程池中线程销毁的原理

本来没打算写这个的,可是在牛课上看见了问这个的,但是我好像答不出来
看到这一行代码:

当woker执行过一次之后,task = null ,也就是这个循环是否进行就全看getTask()函数了
让我们关注到getTask()的源码

private Runnable getTask() {
    boolean timedOut = false; // 上一次进行poll()任务出列是否超时

    for (;;) {
        int c = ctl.get();
		/**
		* 这是ctl的定义,他是一个AtomicInteger类型的变量,是原子操作的,用来控制线程池的状态和工作线程的数量
		* private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
		*/
        // 仅在必要时检查队列是否为空
        if (runStateAtLeast(c, SHUTDOWN)
            && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
		// 当前线程池中工作线程的数量
        int wc = workerCountOf(c);

	        //worker工作线程会不会被回收 
	        // alloW是控制核心线程是否会被销毁的 
            //1. allowCoreThreadTimeOut == true,核心线程和非核心线程空闲keepAliveTime被销毁,workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)  
            //2. allowCoreThreadTimeOut == false,非核心线程空闲keepAliveTIme销毁,workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 
            //  ,核心线程会被阻塞,不会被销毁workQueue.take()
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
		// 如果允许核心线程被淘汰或者存活线程数大于核心线程数并且超时了,或者线程数大于最大线程数,开始淘汰线程
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // 根据是否允许超时,选择poll或take方法获取任务
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

所以简单总结一下:
workQueue为空,线程就会被销毁,具体的实现是通过阻塞队列workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)来控制线程是否销毁的,如果使用poll就会在指定时间内等待元素可用否则返回null,从而等待keepAlive来决定是否销毁
两种情景:

  1. allowCoreThreadTimeOut == true,也就是允许核心线程销毁,两种线程使用poll来进行调用,一段时间后销毁
  2. allowCoreThreadTimeOut == false,也就是非核心线程会被销毁,核心线程永远不会销毁,非核心线程会调用poll,而核心线程会调用take,从而一直阻塞这个线程来做到永不销毁
    然后这个逻辑之后我们回到上面的调用的代码
    之后会走这个流程,不过销毁的过程我们就不看了。

ForkJoinPool ——> 递归拆分!让并行嗨起来!并行流的原理!

篇幅有限简单介绍一下

主要组件

  1. ForkJoinTask:任务的基本单位,提供fork、join,用于任务的分解和合并
  2. ForkJoinWorkerThread:执行ForkJoinTask的工作线程
  3. WorkQueue:工作线程用于存储任务
  4. ForkJoinPool:管理和调度ForkJoinTask的执行

核心原理

工作窃取法:

  • 工作窃取:当一个工作线程完成了自己的任务队列中的任务后,它会尝试从其他线程的队列中窃取任务来执行。
  • 双端队列:每个工作线程的任务队列是一个双端队列,线程从队列的一端(通常是头部)获取任务执行,而其他线程从队列的另一端(通常是尾部)窃取任务。

并行流

  • 使用stream()方法将数据源转换为顺序流,或使用parallelStream()方法将数据源转换为并行流。
  • 使用中间操作(如filter、map、sorted等)和终端操作(如forEach、collect、reduce等)定义流的处理逻辑。
  • 执行并行处理:
  • 并行流会自动使用ForkJoinPool来并行处理数据。每个操作会被拆分成多个子任务,并由ForkJoinPool中的工作线程并行执行。

常见八股

为什么使用抽象类来实现接口

  1. 因为普通类去实现接口中的方法,子类在继承的时候必须重写父类中的方法不利于代码的复用
  2. 抽象类可以提供一些默认实现,从而使得子类直接使用即可
  3. 可以强制子类去实现某个方法(abstract)