0%

使用示例

Callable & FutureTask

Callable<String> callableTask = () -> {  
    TimeUnit.SECONDS.sleep(2);  
    return "Callable Task Result";  
};  
  
FutureTask<String> futureTask = new FutureTask<>(callableTask);  
Thread thread = new Thread(futureTask);  
thread.start();

CompletableFuture

CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {  
    try {  
        TimeUnit.SECONDS.sleep(1);  
    } catch (InterruptedException e) {  
        throw new IllegalStateException(e);  
    }  
    return "CompletableFuture Result";  
});

深入使用

以下示例来自:CompletableFuture原理与实践-外卖商家端API的异步化 - 美团技术团队 (meituan.com)
CF的使用是基于构造依赖树的,一个CompletableFuture的使用会触发另外一系列依赖它的CF执行

服务依赖

// 无依赖
ExecutorService executor = Executors.newFixedThreadPool(5);  
// 直接发起异步调用  
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> "result1",executor);  
// 直接返回一个已经完成的CompletableFuture  
CompletableFuture<String> cf2 = CompletableFuture.completedFuture("result2");

// 单个依赖
CompletableFuture<String> cf3 = cf1.thenApply(result -> {  
    System.out.println("result1: " + result);  
    return "result3";  
});  
CompletableFuture<String> cf4 = cf1.thenCombine(cf2, (result1, result2) -> {  
    //result1和result2分别为cf1和cf2的结果  
    return "result4";  
});
// 多依赖
CompletableFuture<Void> cf6 = CompletableFuture.allOf(cf3, cf4, cf5);  
CompletableFuture<String> result = cf6.thenApply(v -> {  
    //这里的join并不会阻塞,因为传给thenApply的函数是在CF3、CF4、CF5全部完成时,才会执行 。  
    result3 = cf3.join();  
    result4 = cf4.join();  
    result5 = cf5.join();  
    //根据result3、result4、result5组装最终result;  
    return "result";  
});
// 或者使用anyOf,只要有一个依赖完成即可完成

源码解读(基于JDK1.8)

Callable

Callable是用于定义和返回结果,并且可能抛出异常的任务,类似于Runnable

@FunctionalInterface  
public interface Callable<V> {  
    /**  
     * call 方法是 Callable 接口中的唯一方法,用于执行任务并返回结果。它与 Runnable 接口的 run 方法类似,但 call 方法可以返回一个结果并且可以抛出受检异常
     */    
     V call() throws Exception;  
}

FutureTask

先梳理一下继承关系,Future是一个接口,定义了一些属性,然后

public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}

所以FutureTask就是一个Future的实际实现,是基于Runnable实现的

public class FutureTask<V> implements RunnableFuture<V> {
	// 任务的运行状态,初始为 NEW。状态只在 set、setException 和 cancel 方法中转换为终止状态。
	private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

	// 重要的属性
	// 底层的callable,运行后置为null
	private Callable<V> callable;
	// get()方法返回的结果或者抛出的异常
    private Object outcome; // non-volatile, protected by state reads/writes
	// 运行callable的线程,在run()期间CAS设置
    private volatile Thread runner;
	// 等待线程的Triber栈
    private volatile WaitNode waiters;
	// 返回结果或者返回异常 
	private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }
    // 构造函数,接收callable作为参数来执行任务
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }
    // 获取执行结果
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
		// 如果还处于未完成的状态则调用awaitDone
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
		// 返回执行结果或者异常
        return report(s);
    }
    // 核心执行
    // 因为封装了Runnable接口,所以可以直接执行使用主线程
    public void run() {
	    // 如果任务状态不是NEW或者使用CAS无法将runner设置为当前线程,直接返回
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
					// 调用callable的call方法去执行callable封装的任务
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
					// 设置返回结果
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
			// 线程置为null,防止并发调用线程
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
    // 停止执行任务实际上就是将执行任务的写线程从runner属性中取出然后调用interupt信号,这里省略
    ...
    // 
    
}

通过分析源码,我们发现Future获取结果的方式是通过阻塞实现的,会阻塞当前线程,如何避免阻塞呢?

CompletableFuture(以下简称CF)

为何会选择CF呢?

  • 可组合:可以将多个依赖操作通过不同的方式进行编排
  • 操作融合:将数据流中使用的多个操作符以某中方式组合起来从而降低开销
  • 延迟执行
  • 学习成本低
  • 相比于只能通过阻塞或者轮询获得结果而且不支持回调方法的Future,CF支持回调的方式进行处理结果,同时支持组合操作支持进一步的编排

定义

让我们首先关注它的定义

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
	...
}

可以发现奥秘就在CompletionStage这个接口中,CompletionStage用于标识执行过程中的一个步骤(Stage),从而实现了服务编排

CompletionStage

定义了一系列任务的步骤,具体实现看CompletableFuture是如何实现的

基本属性

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
	// 用于存储当前CF的结果	
	volatile Object result;      
	// stack用于表示当前CF完成后需要触发的依赖动作
    volatile Completion stack;    
}

从这里我们看出,使用了类似于”观察者”模式设计思想,被观察者是CompletableFuture,而观察者是这些需要回调的依赖动作

Completion的定义

Completion是定义在CompletableFuture里的

abstract static class Completion extends ForkJoinTask<Void>
       implements Runnable, AsynchronousCompletionTask {
       volatile Completion next;      // Treiber stack link

       /**
        * Performs completion action if triggered, returning a
        * dependent that may need propagation, if one exists.
        *
        * @param mode SYNC, ASYNC, or NESTED
        */
       abstract CompletableFuture<?> tryFire(int mode);

       /** Returns true if possibly still triggerable. Used by cleanStack. */
       abstract boolean isLive();

       public final void run()                { tryFire(ASYNC); }
       public final boolean exec()            { tryFire(ASYNC); return false; }
       public final Void getRawResult()       { return null; }
       public final void setRawResult(Void v) {}
   }

依赖的流程

public <U> CompletableFuture<U> thenApply(  
    Function<? super T,? extends U> fn) {  
    return uniApplyStage(null, fn);  
}
private <V> CompletableFuture<V> uniApplyStage(  
    Executor e, Function<? super T,? extends V> f) {  
    if (f == null) throw new NullPointerException();  
    Object r;  
    // 如果已经任务已经完成了,直接调用uniApplyNow
    if ((r = result) != null)  
        return uniApplyNow(r, e, f);  
    // 创建一个新的未完成CompletableFuture
    CompletableFuture<V> d = newIncompleteFuture();  
	// 将这个新的UniApply压栈 
    unipush(new UniApply<T,V>(e, d, this, f));  
    return d;  
}

private <V> CompletableFuture<V> uniApplyNow(  
    Object r, Executor e, Function<? super T,? extends V> f) {  
    Throwable x;  
	// 创建一个新的CompletableFuture
    CompletableFuture<V> d = newIncompleteFuture();  
    if (r instanceof AltResult) {  
        if ((x = ((AltResult)r).ex) != null) {  
            d.result = encodeThrowable(x, r);  
            return d;  
        }        r = null;  
    }    try {  
        if (e != null) {  
			// 不是异常就执行新的UniApply任务
            e.execute(new UniApply<T,V>(null, d, this, f));  
        } else {  
            @SuppressWarnings("unchecked") T t = (T) r;  
            d.result = d.encodeValue(f.apply(t));  
        }    } catch (Throwable ex) {  
        d.result = encodeThrowable(ex);  
    }    return d;  
}

 final void unipush(Completion c) {
        if (c != null) {
			// 尝试将c压栈,知道成功或者任务完成为止
            while (!tryPushStack(c)) {
                if (result != null) {
                    NEXT.set(c, null);
                    break;
                }
            }
            if (result != null)
                c.tryFire(SYNC);
        }
    }
  • 简单理解一下就是,在CF完成任务时,会去观察者链中出栈一个,然后执行,并且返回一个新的CompletableFuture,然后后续继续去完成后续依赖任务
  • 如果原始任务还没完成,那么就会将新的任务推入栈中,等待原始任务完成

执行过程

以 supplyAsync为例

// 有自定义线程池的调用函数
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,  
                                                   Executor executor) {  
    return asyncSupplyStage(screenExecutor(executor), supplier);  
}

// 没有自定义线程池会使用自带的线程池
private static final Executor ASYNC_POOL = USE_COMMON_POOL ?  
    ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {  
    return asyncSupplyStage(ASYNC_POOL, supplier);  
}
// 实际执行者
static <U> CompletableFuture<U> asyncSupplyStage(Executor e,  
                                                 Supplier<U> f) {  
    if (f == null) throw new NullPointerException();  
    CompletableFuture<U> d = new CompletableFuture<U>();  
    // 调用线程池去执行
    e.execute(new AsyncSupply<U>(d, f));  
    return d;  
}

通过阅读源码,我们可以发现,原来CompletableFuture执行任务是会使用我们提交给它的线程池或者它自己默认去生成一个线程池去执行的,相比FutureTask通过阻塞去等待结果,确实是提升了性能。
关于线程池的源码和原理解析,以及一些八股知识,欢迎翻看鄙人的另一篇《深入浅出线程池》博客观看

原理

ThreadLocal实际起作用的是ThreadLocal内部的ThreadLocalMap类,对于每一个Tread,都有一个ThreadLocalMap属性,从而实现线程隔离,然后set的时候就通过获取当前线程然后给他的ThreadLcoalMap属性添加值

public  
class Thread implements Runnable {
	ThreadLocal.ThreadLocalMap threadLocals = null;
}

ThreadLocalMap

static class ThreadLocalMap {  
  
    /**  
     * The entries in this hash map extend WeakReference, using     * its main ref field as the key (which is always a     * ThreadLocal object).  Note that null keys (i.e. entry.get()     * == null) mean that the key is no longer referenced, so the     * entry can be expunged from table.  Such entries are referred to     * as "stale entries" in the code that follows. */
     // Entry是节点,继承了WeakReferentce,定义了一个value
     static class Entry extends WeakReference<ThreadLocal<?>> { 
		  Object value;

            Entry(ThreadLocal<?> k, Object v) {
                super(k);
                value = v;
            }
		}
	    public void set(T value) {
			    // 获取当前线程
		        Thread t = Thread.currentThread();
				// 通过当前线程获取线程中维护的ThreadLocalMap对象
		        ThreadLocalMap map = getMap(t);
		        if (map != null) {
		            map.set(this, value);
		        } else {
		            createMap(t, value);
		        }
		    }
}

为何使用弱引用

因为ThreadLocalMap的key是Thread,如果像普通的key-value的定义,只要线程未销毁
就会一直存在,所以使用弱引用就可以在不存在强引用的时候就将这个销毁。

前言

鄙人在公司实习时,看见了公司的自定义线程池结合CompletableFuture异步获取结果,对线程池的实现感兴趣,于是花了些时间来解决来弄清楚线程池的执行原理
本文将会探讨
Executor、Executors、ExecutorService、AbstractExecutorService、ThreadPoolExecutor、ForkJoinPool之间的关系
先简单介绍一下之间的关系,Executor是最底层的定义,ExecutorService继承了Executor

AbstractExecutorService是ExecutorService的默认实现,而ThreadPoolExecutor和ForkPool都是AbstrackExecutor的子类

Executor ——> 一切的根源


Executor接口是JUC提供的一个接口,通常用于提交Runnable任务,用于显示替代创建线程。

ExecutorService ——> 线程池管理接口

ExecutorService 是 Java 中用于管理和控制线程池的接口。它提供了管理线程生命周期的方法,并允许提交任务以供执行。换句话说,Executor中只定义了一个execute方法,管理线程池的方法全都是在这个接口中定义的

使用示例

// 创建一个固定大小的线程池  
    ExecutorService executor = Executors.newFixedThreadPool(10);  
  
    // 提交一个异步任务  
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {  
        // 模拟长时间运行的任务  
        try {  
            Thread.sleep(2000);  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }        return "Hello, World!";  
    }, executor);  
  
    // 处理异步任务的结果  
    future.thenAccept(result -> {  
        System.out.println("Result: " + result);  
    });  
    // 关闭线程池  
    executor.shutdown();  
}

本文仅仅探究线程池的使用,关于CompletableFuture的部分,鄙人还有另外一篇源码解析的文章,欢迎观看!
这里简单提一下,CompletableFuture实际上是使用我们传进去的线程池参数来执行任务的,会调用线程池Executor中定义的execute方法,而这个地方这个方法的实现者是ThreadPoolExecutor,后文会详细分析,这里先简单介绍。

源码解析

public interface ExecutorService extends Executor {

    // 启动有序关闭,已提交任务会执行,但不接受新任务。
    void shutdown();

    // 尝试停止所有正在执行的任务,返回等待执行的任务列表。
    List<Runnable> shutdownNow();

    // 返回此执行器是否已关闭。
    boolean isShutdown();

    // 返回所有任务是否在关闭后已完成。
    boolean isTerminated();

    // 阻塞直到所有任务在关闭请求后完成,或超时,或当前线程中断。
    boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

    // 提交一个返回值的任务用于执行,并返回表示任务结果的Future。
    <T> Future<T> submit(Callable<T> task);

    // 提交一个Runnable任务用于执行,并返回表示任务的Future。
    <T> Future<T> submit(Runnable task, T result);

    // 提交一个Runnable任务用于执行,并返回表示任务的Future。
    Future<?> submit(Runnable task);

    // 执行给定的任务,返回表示其状态和结果的Future列表。
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;

    // 执行给定的任务,返回表示其状态和结果的Future列表。在所有任务完成或超时后返回。
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;

    // 执行给定的任务,返回一个已成功完成的任务结果。未完成的任务会被取消。
    <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;

    // 执行给定的任务,返回一个已成功完成的任务结果。在成功完成或超时前返回,未完成的任务会被取消。
    <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

可以发现,ExecutorService定义了一系列线程池管理方法

AbstractExecutorService ——> ExecutorService 接口的默认实现

源码

/**
 * 提供了 ExecutorService 的默认实现
 * 实现了 submit, invokeAny 和 invokeAll 方法
 */
public abstract class AbstractExecutorService implements ExecutorService {

    // 构造函数
    public AbstractExecutorService() {}

    // 返回一个 RunnableFuture,用于包装 Runnable 任务和默认值
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

    // 返回一个 RunnableFuture,用于包装 Callable 任务
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

    // 提交一个 Runnable 任务
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

    // 提交一个 Runnable 任务,并返回一个 Future
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

    // 提交一个 Callable 任务
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

    // 执行一组任务,返回第一个成功完成的任务结果
    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                              boolean timed, long nanos)
        throws InterruptedException, ExecutionException, TimeoutException {
        // 省略具体步骤
    }

    // 执行一组任务,返回第一个成功完成的任务结果
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException {
        // 省略具体步骤
    }

    // 执行一组任务,返回第一个成功完成的任务结果,带超时
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                           long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        // 省略具体步骤
    }

    // 执行一组任务,返回所有任务的 Future 列表
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {
        // 省略具体步骤
    }

    // 执行一组任务,返回所有任务的 Future 列表,带超时
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                         long timeout, TimeUnit unit)
        throws InterruptedException {
        // 省略具体步骤
    }

    // 取消所有 Future
    private static <T> void cancelAll(ArrayList<Future<T>> futures) {
        // 省略具体步骤
    }

    // 取消从索引 j 开始的所有 Future
    private static <T> void cancelAll(ArrayList<Future<T>> futures, int j) {
        // 省略具体步骤
    }
}

源码中实现并定义了一些线程池的管理和操作,有一个地方是比较有意思的

AbstractExecutorService并没有实现execute方法,从而这个地方的调用实际上是调用的AbstractExecutorService的子类实现的方法,这中设计方法叫做模板方法模式

模板方法模式

由父类定义一个算法的骨架,将一些步骤实现延迟到子类中,子类可以重写这些步骤来实特定的行为,但是算法的整体逻辑由父类来设计

回到上面的示例代码中

ExecutorService executor = Executors.newFixedThreadPool(10);  
  
// 提交一个异步任务  
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {  
    // 模拟长时间运行的任务  
    try {  
        Thread.sleep(2000);  
    } catch (InterruptedException e) {  
        e.printStackTrace();  
    }        return "Hello, World!";  
}, executor);

当我们进入到Executors中,我们就能找到真相
原来如此,execute实际调用的是ThreadPoolExecutor中的实现!

ThreadPoolExecutor ——> 灵活的方式来创建和管理线程池

使用示例

ThreadPoolExecutor executor = new ThreadPoolExecutor(
    corePoolSize, // 核心线程数
    maximumPoolSize, // 最大线程数
    keepAliveTime, // 线程空闲时间
    TimeUnit.MILLISECONDS, // 时间单位
    new LinkedBlockingQueue<Runnable>(), // 任务队列
    new ThreadPoolExecutor.AbortPolicy() // 拒绝策略
);
executor.execute(task); // 提交任务
executor.shutdown(); // 关闭线程池

核心参数

参数 意义
corePoolSize 线程池中的核心线程数
workQueue 存放提交的task
maximumPoolSize 线程池中允许的最大线程数
threadFactory 线程工厂, 用来创建线程, 由Executors#defaultThreadFactory实现
keepAliveTime 空闲线程存活时间(默认是临时线程, 也可设置为核心线程)
unit 空闲线程存活时间单位枚举

工作队列相关

线程池的队列主要有以下几种:

ArrayBlockingQueue

有界阻塞队列,性能高,不能动态扩展,高并发下可能会导致线程阻塞

LinkedBlockingQueue

Executors.newFixedThreadPool默认的队列就是这个队列,容易造成内存耗尽,
所以不建议直接使用Executors来new一个线程池

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

链表实现的有界或者无界队列,容量可以动态扩展,高并发下性能好,缺点:内存占用大,有可能会导致资源耗尽

SynchronousQueue

不存储元素的队列,每个插入操作必须等待一个相应的删除操作,适用于任务交给工作线程来。
缺点:
高并发场景下,可能会造成线程阻塞,不适合需要缓存任务的场景

PriorityBlockingQueue

优点:基于优先级的无界阻塞队列,元素按照优先级顺序出队,适用于需要任务优先级调度的场景。
缺点:插入和删除操作的时间复杂度为 O(log n),性能较低;无界队列可能导致内存溢出。

DelayQueue

优点:基于优先级队列的无界阻塞队列,元素只有在其延迟期满后才能出队,适用于需要延迟执行任务的场景。
缺点:插入和删除操作的时间复杂度为 O(log n),性能较低;无界队列可能导致内存溢出。

拒绝策略

CallerRunsPolicy

如果线程池没关闭将其提交到线程池当中,直接执行execute
如果线程池关闭,则直接丢弃

AbortPolicy

拒绝并抛弃任务,抛出异常

DiscardPolicy

直接抛弃任务,做任何操作。

DiscardOldestPolicy

如果线程池没有关闭,丢弃最老的任务,然后将任务提交并执行
如果线程池关闭,就直接丢弃

关于线程创建后任务会如何执行

  1. 线程数小于核心线程数时,直接创建线程来执行任务
  2. 大于核心线程且工作队列没满就将任务提交给工作队列
  3. 大于核心线程数且队列已满,直接创建一个临时线程来处理任务
  4. 大于最大线程数,工作队列已满,执行拒绝策略

源码解析

当我们进入到这个类中,会发现有好多代码,这里就不贴出来了,我们只挑重点的来解析,我们来顺着上面的示例代码来深入

定义和属性

public class ThreadPoolExecutor extends AbstractExecutorService {
	// 用于同时存储线程池状态和工作线程数量的原子整数
	private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
	
	// 用于存储线程数量的位数
	private static final int COUNT_BITS = Integer.SIZE - 3

	// 用于提取线程数量部分的掩码
	private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
	
	// runState 存储在高位
	// 线程池正在运行,可以接受新任务并处理队列中的任务
	private static final int RUNNING    = -1 << COUNT_BITS;
	
	// 线程池关闭状态,不再接受新任务,但会处理队列中的任务
	private static final int SHUTDOWN   =  0 << COUNT_BITS;
	
	// 线程池停止状态,不再接受新任务,也不处理队列中的任务,并中断正在执行的任务
	private static final int STOP       =  1 << COUNT_BITS;
	
	// 线程池整理状态,所有任务都已终止,工作线程数量为 0
	private static final int TIDYING    =  2 << COUNT_BITS;
	
	// 线程池终止状态,terminated() 方法已完成
	private static final int TERMINATED =  3 << COUNT_BITS;
		

	// 阻塞队列,主要用于生产者-消费者模式,在多线程环境中非常有用。它提供了线程安全的方式来在队列中添加和移除元素,并且在队列为空或已满时,线程可以被阻塞,直到有空间或元素可用。
	private final BlockingQueue<Runnable> workQueue;
	
	// 访问 workers 集合和相关簿记的锁
	private final ReentrantLock mainLock = new ReentrantLock();
	
	// 包含线程池中所有工作线程的集合
	private final HashSet<Worker> workers = new HashSet<>();
	
	// 支持 awaitTermination 的条件
	private final Condition termination = mainLock.newCondition();
	
	// 记录达到的最大线程池大小
	private int largestPoolSize;
	
	// 已完成任务的计数器
	private long completedTaskCount;
	
	// 所有用户控制参数都声明为 volatile,以便基于最新值进行操作
	
	// 用于创建新线程的工厂
	private volatile ThreadFactory threadFactory;
	
	// 执行时饱和或关闭时调用的处理程序
	private volatile RejectedExecutionHandler handler;
	
	// 空闲线程等待工作的超时时间(纳秒)
	private volatile long keepAliveTime;
	
	// 如果为 false(默认),核心线程即使在空闲时也保持存活
	// 如果为 true,核心线程使用 keepAliveTime 超时等待工作
	private volatile boolean allowCoreThreadTimeOut;
	
	// 核心线程池大小
	private volatile int corePoolSize;
	
	// 最大线程池大小
	private volatile int maximumPoolSize;
}

我们点进去new ThreadPoolExecutor

public ThreadPoolExecutor(int corePoolSize,  
                          int maximumPoolSize,  
                          long keepAliveTime,  
                          TimeUnit unit,  
                          BlockingQueue<Runnable> workQueue,  
                          RejectedExecutionHandler handler) {  
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,  
         Executors.defaultThreadFactory(), handler);  
}

从这里我们可以看见有一个参数是Executors中的工厂方法,这里我们稍后再提
顺着this去翻找
发现这里仅仅进行了一些赋值,emm,线索好像断了。没关系,还记得我们上文提到的execute方法吗?ThreadPoolExecutor中需要实现这个方法,让我们去看看吧

public void execute(Runnable command) {
    // 如果传入的任务为空,则抛出 NullPointerException
    if (command == null)
        throw new NullPointerException();
    
    // 获取当前的控制状态 ctl
    int c = ctl.get();
    
    // 如果当前运行的线程数少于 corePoolSize,尝试添加一个新的工作线程来执行任务
    if (workerCountOf(c) < corePoolSize) {
        // 如果成功添加了工作线程,则返回
        if (addWorker(command, true))
            return;
        // 否则,重新获取当前的控制状态 ctl
        c = ctl.get();
    }
    
    // 如果线程池正在运行,并且任务可以成功加入到工作队列中
    if (isRunning(c) && workQueue.offer(command)) {
        // 再次检查线程池的运行状态
        int recheck = ctl.get();
        // 如果线程池不再运行且任务成功移除,则拒绝任务
        if (!isRunning(recheck) && remove(command))
            reject(command);
        // 如果没有运行的工作线程,尝试添加一个新的工作线程
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    } 
    // 如果任务无法加入队列,尝试添加一个新的工作线程
    else if (!addWorker(command, false))
        // 如果添加工作线程失败,则拒绝任务
        reject(command);
}

这里就是我们上文提到的提交任务时会遇到的情况:

  1. 运行中的线程数 < 核心线程数,就尝试添加一个新的工作线程来执行任务
  2. 大于核心线程数,就尝试将任务加到工作队列中
  3. 如果无法加入队列就尝试添加一个新的工作线程
  4. 再失败就拒绝任务
    接着来看addWorker
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (int c = ctl.get();;) {
            // 仅在必要时检查队列是否为空
            if (runStateAtLeast(c, SHUTDOWN)
                && (runStateAtLeast(c, STOP)
                    || firstTask != null
                    || workQueue.isEmpty()))
                return false;
    
            for (;;) {
                // 如果工作线程数量超过限制,返回 false
                if (workerCountOf(c)
                    >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                    return false;
                // 尝试增加工作线程数量
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // 重新获取 ctl
                if (runStateAtLeast(c, SHUTDOWN))
                    continue retry;
                // 否则 CAS 失败,重试内部循环
            }
        }
    
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask); // 创建新的 Worker
            final Thread t = w.thread; // 获取 Worker 的线程
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock(); // 加锁
                try {
                    // 再次检查线程池的运行状态
                    int c = ctl.get();
    
                    // 如果线程池正在运行,或者线程池未停止且任务为空
                    if (isRunning(c) ||
                        (runStateLessThan(c, STOP) && firstTask == null)) {
                        if (t.getState() != Thread.State.NEW)
                            throw new IllegalThreadStateException();
                        workers.add(w); // 添加 Worker 到集合
                        workerAdded = true;
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s; // 更新最大池大小
                    }
                } finally {
                    mainLock.unlock(); // 解锁
                }
                if (workerAdded) {
                    t.start(); // 启动线程
                    workerStarted = true;
                }
            }
        } finally {
            if (!workerStarted)
                addWorkerFailed(w); // 如果线程启动失败,执行失败处理
        }
        return workerStarted;
    }
    我们到这里就很清楚了,所谓的线程池不过是管理的一堆线程的工具,实际上就是帮助我们判断合适去新增线程何时复用线程。
    最后让我们去看看线程复用是如何实现的!

线程复用的奥秘

ThreadPoolExecutor中有一个静态内部类Worker

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
    private static final long serialVersionUID = 6138294804551838833L;

    // 运行此 Worker 的线程。如果工厂创建线程失败,则为 null
    @SuppressWarnings("serial") // 不太可能是可序列化的
    final Thread thread;

    // 要运行的初始任务。可能为 null
    @SuppressWarnings("serial") // 静态类型为非可序列化
    Runnable firstTask;

    // 每个线程的任务计数器
    volatile long completedTasks;

    // 创建具有给定初始任务和线程工厂的 Worker
    Worker(Runnable firstTask) {
        setState(-1); // 在 runWorker 之前禁止中断
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    // 将主运行循环委托给外部的 runWorker 方法
    public void run() {
        runWorker(this);
    }

    // 锁定以防止中断任务
    protected boolean isHeldExclusively() {
        return getState() != 0;
    }

    // 尝试获取锁
    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    // 尝试释放锁
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    // 锁定此 Worker
    public void lock() {
        acquire(1);
    }

    // 尝试锁定此 Worker
    public boolean tryLock() {
        return tryAcquire(1);
    }

    // 解锁此 Worker
    public void unlock() {
        release(1);
    }

    // 检查当前线程是否持有锁
    public boolean isLocked() {
        return isHeldExclusively();
    }

    // 中断此 Worker 的线程
    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

Worker是定义了一个线程和要执行的初始任务,而线程复用要关注到这行

跳转之后,发现是调用的ThreadPoolExecutor中定义的runWorker方法

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // 允许中断
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // 如果线程池正在停止,确保线程被中断;
            // 如果没有,确保线程没有被中断。这需要在第二种情况下重新检查,
            // 以处理 shutdownNow 竞赛,同时清除中断
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                try {
                    task.run();
                    afterExecute(task, null);
                } catch (Throwable ex) {
                    afterExecute(task, ex);
                    throw ex;
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

发现这里的逻辑:

  1. 获取当前线程、获取Worker中的初始任务并清空,解锁Worker允许中断
  2. 只要任务不为null或者从getTask,也就是不停的从wokerQueue中获取到新任务就一直循环执行,从而达到线程复用
  3. 检测线程池状态,进行一些钩子方法
    到这里我们才真正找到线程池的奥秘,原来是通过循环来不断从任务队列中获取任务执行,而不是执行完任务之后就终止线程!!!

线程池中线程销毁的原理

本来没打算写这个的,可是在牛课上看见了问这个的,但是我好像答不出来
看到这一行代码:

当woker执行过一次之后,task = null ,也就是这个循环是否进行就全看getTask()函数了
让我们关注到getTask()的源码

private Runnable getTask() {
    boolean timedOut = false; // 上一次进行poll()任务出列是否超时

    for (;;) {
        int c = ctl.get();
		/**
		* 这是ctl的定义,他是一个AtomicInteger类型的变量,是原子操作的,用来控制线程池的状态和工作线程的数量
		* private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
		*/
        // 仅在必要时检查队列是否为空
        if (runStateAtLeast(c, SHUTDOWN)
            && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
		// 当前线程池中工作线程的数量
        int wc = workerCountOf(c);

	        //worker工作线程会不会被回收 
	        // alloW是控制核心线程是否会被销毁的 
            //1. allowCoreThreadTimeOut == true,核心线程和非核心线程空闲keepAliveTime被销毁,workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)  
            //2. allowCoreThreadTimeOut == false,非核心线程空闲keepAliveTIme销毁,workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 
            //  ,核心线程会被阻塞,不会被销毁workQueue.take()
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
		// 如果允许核心线程被淘汰或者存活线程数大于核心线程数并且超时了,或者线程数大于最大线程数,开始淘汰线程
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // 根据是否允许超时,选择poll或take方法获取任务
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

所以简单总结一下:
workQueue为空,线程就会被销毁,具体的实现是通过阻塞队列workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)来控制线程是否销毁的,如果使用poll就会在指定时间内等待元素可用否则返回null,从而等待keepAlive来决定是否销毁
两种情景:

  1. allowCoreThreadTimeOut == true,也就是允许核心线程销毁,两种线程使用poll来进行调用,一段时间后销毁
  2. allowCoreThreadTimeOut == false,也就是非核心线程会被销毁,核心线程永远不会销毁,非核心线程会调用poll,而核心线程会调用take,从而一直阻塞这个线程来做到永不销毁
    然后这个逻辑之后我们回到上面的调用的代码
    之后会走这个流程,不过销毁的过程我们就不看了。

ForkJoinPool ——> 递归拆分!让并行嗨起来!并行流的原理!

篇幅有限简单介绍一下

主要组件

  1. ForkJoinTask:任务的基本单位,提供fork、join,用于任务的分解和合并
  2. ForkJoinWorkerThread:执行ForkJoinTask的工作线程
  3. WorkQueue:工作线程用于存储任务
  4. ForkJoinPool:管理和调度ForkJoinTask的执行

核心原理

工作窃取法:

  • 工作窃取:当一个工作线程完成了自己的任务队列中的任务后,它会尝试从其他线程的队列中窃取任务来执行。
  • 双端队列:每个工作线程的任务队列是一个双端队列,线程从队列的一端(通常是头部)获取任务执行,而其他线程从队列的另一端(通常是尾部)窃取任务。

并行流

  • 使用stream()方法将数据源转换为顺序流,或使用parallelStream()方法将数据源转换为并行流。
  • 使用中间操作(如filter、map、sorted等)和终端操作(如forEach、collect、reduce等)定义流的处理逻辑。
  • 执行并行处理:
  • 并行流会自动使用ForkJoinPool来并行处理数据。每个操作会被拆分成多个子任务,并由ForkJoinPool中的工作线程并行执行。

常见八股

为什么使用抽象类来实现接口

  1. 因为普通类去实现接口中的方法,子类在继承的时候必须重写父类中的方法不利于代码的复用
  2. 抽象类可以提供一些默认实现,从而使得子类直接使用即可
  3. 可以强制子类去实现某个方法(abstract)

前言

面试中经常被问到动态代理相关内容,每次都答得不够完美,又因近期在尝试手写RPC框架,了解到不少的动态代理相关的内容,顺道总结一下,于是就有了这篇文章

静态代理? 动态代理!

区别

静态代理是在编译期间就生成了实际的字节码和对应的class文件,而动态代理可以在运行中动态生成代理类。

  • 静态代理示例
    public interface Service {
        void perform();
    }
    // 实际类
    public class RealService implements Service {
        @Override
        public void perform() {
            System.out.println("Performing service...");
        }
    }
    // 代理类也要实现和被代理的类实现的同一个接口才能进行代理
    public class StaticProxy implements Service {
    	// 将需要代理的类封装到自己的内部
        private final Service realService;
    	
        public StaticProxy(Service realService) {
            this.realService = realService;
        }
    	// 实际就是在实现的方法中进行代理操作,然后执行被代理的类的方法
        @Override
        public void perform() {
            System.out.println("Static Proxy: Before performing service");
            realService.perform();
            System.out.println("Static Proxy: After performing service");
        }
    }
    实际上就是通过将被代理类封装到内部,然后调用。缺点很明显:没新增一个代理都需要重新新建一个类然后重新写一个包装。
  • 动态代理实例(JDK代理)
    import java.lang.reflect.InvocationHandler;
    import java.lang.reflect.Method;
    import java.lang.reflect.Proxy;
    
    public interface Service {
        void perform();
    }
    
    public class RealService implements Service {
        @Override
        public void perform() {
            System.out.println("Performing service...");
        }
    }
    
    public class DynamicProxyHandler implements InvocationHandler {
        private final Object target;
    
        public DynamicProxyHandler(Object target) {
            this.target = target;
        }
    	// 
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            System.out.println("Dynamic Proxy: Before performing service");
            Object result = method.invoke(target, args);
            System.out.println("Dynamic Proxy: After performing service");
            return result;
        }
    }
    
    public class DynamicProxyDemo {
        public static void main(String[] args) {
            Service realService = new RealService();
            Service proxyInstance = (Service) Proxy.newProxyInstance(
                    realService.getClass().getClassLoader(),
                    realService.getClass().getInterfaces(),
                    new DynamicProxyHandler(realService)
            );
    
            proxyInstance.perform();
        }
    }

源码解读

参考

解密Proxy

(以下是基于Java8的源码)
我们在使用JDK动态代理的时候使用的最多的就是Proxy.newProxyInstance()方法,接下来我们就来解密这个方法是如何实现我们的动态代理。

@CallerSensitive
   public static Object newProxyInstance(ClassLoader loader,
                                         Class<?>[] interfaces,
                                         InvocationHandler h)
       throws IllegalArgumentException
   {
    // 检查空指针异常
       Objects.requireNonNull(h);

       final Class<?>[] intfs = interfaces.clone();
       // 安全检查
       final SecurityManager sm = System.getSecurityManager();
       if (sm != null) {
           checkProxyAccess(Reflection.getCallerClass(), loader, intfs);
       }

       // 生成代理类
       Class<?> cl = getProxyClass0(loader, intfs);

       /*
        * Invoke its constructor with the designated invocation handler.
        */
       try {
           if (sm != null) {
               checkNewProxyPermission(Reflection.getCallerClass(), cl);
           }

           final Constructor<?> cons = cl.getConstructor(constructorParams);
           final InvocationHandler ih = h;
           if (!Modifier.isPublic(cl.getModifiers())) {
               AccessController.doPrivileged(new PrivilegedAction<Void>() {
                   public Void run() {
                       cons.setAccessible(true);
                       return null;
                   }
               });
           }
           return cons.newInstance(new Object[]{h});
       } catch (IllegalAccessException|InstantiationException e) {
           throw new InternalError(e.toString(), e);
       } catch (InvocationTargetException e) {
           Throwable t = e.getCause();
           if (t instanceof RuntimeException) {
               throw (RuntimeException) t;
           } else {
               throw new InternalError(t.toString(), t);
           }
       } catch (NoSuchMethodException e) {
           throw new InternalError(e.toString(), e);
       }
   }

参数解析:

  • loader:类加载器,用于自定义加载类
  • interface 接口数组,代理类将实现这些接口
  • h 调用处理器,处理代理实例上的方法调用
    这段代码的主要意思是获取调用者类和代理类构造函数。
    接着关注getProxyClass0方法
    private static Class<?> getProxyClass0(ClassLoader loader,
                                              Class<?>... interfaces) {
           // 规定对实现的接口数量不得超过这个数量
           if (interfaces.length > 65535) {
               throw new IllegalArgumentException("interface limit exceeded");
           }
    
           // If the proxy class defined by the given loader implementing
           // the given interfaces exists, this will simply return the cached copy;
           // otherwise, it will create the proxy class via the ProxyClassFactory
           return proxyClassCache.get(loader, interfaces);
       }
    接着看proxyClassCache
    // 使用WeakCache降低出现内存泄漏的概率
    private static final WeakCache<ClassLoader, Class<?>[], Class<?>>  
        proxyClassCache = new WeakCache<>(new KeyFactory(), new ProxyClassFactory());
    核心是ProxyClassFactroy,让我们关注它
    // 核心是实现BiFunction
    /**
     * ProxyClassFactory 是一个工厂类,用于生成和定义代理类。
     * 它实现了 BiFunction 接口,接受 ClassLoader 和接口数组作为输入,
     * 返回生成的代理类。
     */
    private static final class ProxyClassFactory
        implements BiFunction<ClassLoader, Class<?>[], Class<?>> {
    
        // 所有代理类名称的前缀
        private static final String proxyClassNamePrefix = "$Proxy";
    
        // 用于生成唯一代理类名称的下一个数字
        private static final AtomicLong nextUniqueNumber = new AtomicLong();
    
        /**
         * 生成并定义代理类。
         *
         * @param loader 用于定义代理类的��加载器
         * @param interfaces 代理类要实现的接口数组
         * @return 生成的代理类
         * @throws IllegalArgumentException 如果接口数组中的任何接口不可见或不是接口,或接口重复
         */
        @Override
        public Class<?> apply(ClassLoader loader, Class<?>[] interfaces) {
    
            // 使用 IdentityHashMap 检查接口是否重复
            Map<Class<?>, Boolean> interfaceSet = new IdentityHashMap<>(interfaces.length);
            for (Class<?> intf : interfaces) {
                /*
                 * 验证类加载器是否将此接口的名称解析为相同的 Class 对象。
                 */
                Class<?> interfaceClass = null;
                try {
    	            // 对于每一个使用指定的加载器去加载他的对象
                    interfaceClass = Class.forName(intf.getName(), false, loader);
                } catch (ClassNotFoundException e) {
                }
                if (interfaceClass != intf) {
                    throw new IllegalArgumentException(
                        intf + " is not visible from class loader: " + loader);
                }
                /*
                 * 验证 Class 对象是否实际代表一个接口。
                 */
                if (!interfaceClass.isInterface()) {
                    throw new IllegalArgumentException(
                        interfaceClass.getName() + " is not an interface");
                }
                /*
                 * 验证此接口是否重复。
                 */
                if (interfaceSet.put(interfaceClass, Boolean.TRUE) != null) {
                    throw new IllegalArgumentException(
                        "repeated interface: " + interfaceClass.getName());
                }
            }
    
            String proxyPkg = null;     // 定义代理类的包
            int accessFlags = Modifier.PUBLIC | Modifier.FINAL;
    
            /*
             * 记录非公共代理接口的包,以便代理类将在同一包中定义。
             * 验证所有非公共代理接口是否在同一包中。
             */
            for (Class<?> intf : interfaces) {
                int flags = intf.getModifiers();
                if (!Modifier.isPublic(flags)) {
                    accessFlags = Modifier.FINAL;
                    String name = intf.getName();
                    int n = name.lastIndexOf('.');
                    String pkg = ((n == -1) ? "" : name.substring(0, n + 1));
                    if (proxyPkg == null) {
                        proxyPkg = pkg;
                    } else if (!pkg.equals(proxyPkg)) {
                        throw new IllegalArgumentException(
                            "non-public interfaces from different packages");
                    }
                }
            }
    
            if (proxyPkg == null) {
                // 如果没有非公共代理接口,使用 com.sun.proxy 包
                proxyPkg = ReflectUtil.PROXY_PACKAGE + ".";
            }
    
            /*
             * 选择要生成的代理类的名称。
             */
            long num = nextUniqueNumber.getAndIncrement();
            String proxyName = proxyPkg + proxyClassNamePrefix + num;
    
            /*
             * 生成指定的代理类。
             */
            byte[] proxyClassFile = ProxyGenerator.generateProxyClass(
                proxyName, interfaces, accessFlags);
            try {
                return defineClass0(loader, proxyName,
                                    proxyClassFile, 0, proxyClassFile.length);
            } catch (ClassFormatError e) {
                /*
                 * 这里的 ClassFormatError 意味着(除非代理类生成代码中有错误),
                 * 否则是代理类创建时提供的参数存在其他无效方面(例如虚拟机限制超出)。
                 */
                throw new IllegalArgumentException(e.toString());
            }
        }
    }
    BiFunction 是 Java 8 引入的一个函数式接口,位于 java.util.function 包中。它代表一个接受两个输入参数并返回一个结果的函数。BiFunction 接口定义如下:
    @FunctionalInterface
    public interface BiFunction<T, U, R> {
        R apply(T t, U u);
    
        // 其他默认方法
    }
    然后我们顺着往下找,通过generateProxyClass ->generateClassFile ->generateMethod
    代码过长这里只给出比较重要的部分
    {
    	// 获取InvocationHandler字段
    	 out.writeByte(opc_getfield);
                out.writeShort(cp.getFieldRef(
                    superclassName,
                    handlerFieldName, "Ljava/lang/reflect/InvocationHandler;"));
    	// 加载Method对象
                code_aload(0, out);
    
                out.writeByte(opc_getstatic);
                out.writeShort(cp.getFieldRef(
                    dotToSlash(className),
                    methodFieldName, "Ljava/lang/reflect/Method;"));
    	// 后面紧跟处理方法参数的部分,这部分省略
    
    	// 重点!
    	// 调用InvocationHandler.invoke方法
    	 out.writeByte(opc_invokeinterface);
                out.writeShort(cp.getInterfaceMethodRef(
                    "java/lang/reflect/InvocationHandler",
                    "invoke",
                    "(Ljava/lang/Object;Ljava/lang/reflect/Method;" +
                        "[Ljava/lang/Object;)Ljava/lang/Object;"));
                out.writeByte(4);
                out.writeByte(0);
        // 后面设置返回值和异常处理等信息,最后返回方法的信息
    	return minfo;
    }
    到这里我们才终于发现JDK动态代理的原理:
    原来就是在代理方法中通过Proxy引用了自定义的InvocationHandler!!
    通过Proxy.newProxyInstance()方法将invocationHandler传入,然后生成代理类来继承Proxy类,从而拿到InvocationHandler,最后在代理类中调用invoke()方法

深入浅出MySQL

以下内容参考自 《MySQL是怎样运行的:从根儿上理解MySQL》一书,强烈推荐

存储引擎

对于不同的表可以设置不同的存储引擎

CREATE TABLE tableName (
	xxxx
) ENGINE = 引擎名称;
# 修改
ALTER TABLE tableName ENGINE = xxx;

编码格式

mysql中的utf8默认的是使用的自定义的1~3字节表示的uft8mb3,对于一些特殊的字符,比如emoji,需要我们指定为utf8mb4才能够存储。

CREATE / ALTER DATABASE 数据库名
 CHARACTER SET 字符集名称
 COLLATE 比较规则名称
# 或者对于表来修改
	CREATE TABLE tableName(
	)
	CHARACTER SET 字符集
	COLLATE 比较规则
ALTER tableName CHARACTER SET 字符集名称
## 或者对于某一列
CREATE TABLE 表名(
列名 字符串类型 [CHARACTER SET 字符集名称] [COLLATE 比较规则名称],
其他列...
);
ALTER TABLE 表名 MODIFY 列名 字符串类型 [CHARACTER SET 字符集名称] [COLLATE 比较规则名称];

InnoDB

物理存储结构


  1. 将数据划分为页,以页为单位作为磁盘和内存交互的单位,默认页大小为16KB
  2. 行结构
    记录的单位是行。

行格式

行/记录格式有很多,可以在建表的时候指定行格式

CREATE TABLE tableName(
xxxx;
) ROW_FORMAT=COMPACT;
  1. COMPACT格式 (重点,最常用)
  • 变长字段长度列表,只会存放变长字段的长度
    支持VARCHAR等变成字段类型的,结构
  1. 真正的数据内容
  2. 占用的字节数
    所有变长字段的真实占用长度,按照列顺序的逆序来进行存放。
    这个长度占用的字节数:
    如果可变字段允许的最大字节数超过255字节,并且真实存储的字节数超过127字节,就使用两个字节来表示这个长度,否则使用一个字节来表示。
  • NULL值列表
    Compact将列中的NULL值统一进行管理。而不是放在真实数据里面,从而减少存储占用
    进行的流程:
  1. 统计允许存储NULL值的列有哪些,如果不存在,NULL值列表就不存在了
  2. 表示形式:使用1表示为NULL值,0表示不为NULL,按照逆序排序。
  3. 要求NULL值列表必须使用整个字节的位来表示,如果不足位数,就在最前面补0
  • 记录头信息

  • 记录的真实数据

    记录的真实数据除了会有我们定义的数据,还会有MySQL为每一条记录添加的一些隐藏列

    1. row_id 行id,唯一标识一条记录
      2.(trx_id) transaction_id 事务ID,MVCC中会使用
    2. roll_pointer 回滚指针,事务回滚会用到,undo_log相关
  • 主键选取:
    优先使用用户定义的主键,如果没设置就选择一个Unique的键作为主键,如果不存在这种,就生成一个隐藏的row_id作为主键,所以 row_id不是 必须的

对于CHAR(M),MySQL会为分配大于这个值的空间,并且要求至少占用M个字节,即使存的是一个空字符串也会占用M个字节,而VARCHAR(M)没有这个要求。

目的是:如果后续更新CHAR(M)的大小,就无需分配一个额外的记录空间,直接在原记录上进行更新即可。就不会造成碎片空间。

  • Redundant行格式(老东西,不常用了)

溢出数据存储

可变数据类型需要占用3部分的存储空间:

  1. 真实数据
  2. 真实数据占用字节的长度
  3. NULL值标识,如果该列有NOT NULL属性则可以没有这部分存储空间
    如果要存储的列非常大 ,只会保存实际真实数据的一部分,把剩余的数据分散在几个其他页中。

数据页(重点,重中之重)


大小16KB

  • File Header 文件头部,记录页的一些通用信息
  • Page Header 页面头部,数据页专有
  • Infimum + Supremum 最小记录和最大记录, 虚拟行记录
  • User Records 用户记录 实际存储的行记录内容,一开始没有,每次从Free Space中分配空间
  • Free Space 空闲空间 页中未使用的空间
  • PageDirectory 页面目录 页中某些记录的相对位置
  • File Trailer 文件尾部 检验页是否完整

记录头信息

  1. delete_mask 标记记录是否被删除,先做一个标记实际还在磁盘里还没删除。所有删除掉的记录会组成一个垃圾链表,如果后续有新的记录插入表中,可能会直接覆盖这些被删除的记录空间
  2. min_rec_mask B+树的每层非叶子节点中的最小记录都会添加该标记
  3. n_owned 表示当前记录拥有的记录数
  4. heap_no 表示当前记录在记录堆的位置信息。
  5. record_type 记录类型 0普通记录, 1 B+树非叶子节点记录 2最小记录 3最大记录
  6. next_record 表示从当前记录的真实数据到下一条记录的真实数据的地址偏移量。 指向这个位置,向左就是记录头信息,向右读就是真实数据,同时因为变长字段列表和NULL值列表都是逆序存放的 ,所以可以使得真实数据和他们对应的长度在内存中的地址更近,能够提高缓存的命中率

最小记录和最大记录

最小记录heap_no = 0
最大记录heap_no = 1

这两个记录是自动生成的,所以不放在User_Record中
链表中的节点是按照主键值从小到大的顺序连接起来的

MySQL是如何进行查找的

对于主键查找记录:

select * from user where id = 3;

我们知道MySQL中的记录是根据链表从小到大连接起来的,那么如何快速从不支持随机访问的链表中找到我们需要的数据呢?

Page Directory

MySQL中的设计:

  1. 将所有的正常记录(不包括最小和最大记录)划分为多个组
  2. 每个组的最后一条信息记录的n_owned记录这个组中有多少条记录
  3. 将每个组的最后一条记录的地址偏移量单独提取出出来按照顺序存储到靠近 页 尾部的地方,也就是Page Directory页目录,这些偏移量被称为Slot 槽
  4. 最小记录为单独一个组,最大记录所在的分组条数只能在18条,其余分组只能有48条记录

查找过程

  1. 根据槽列表通过二分法来计算中间值,默认low 是最小记录的值,也就是0, high是最大记录的偏移量
  2. 通过对比中间槽的偏移量的值快速定位到所在的记录的位置,比对这条记录的主键值
  3. 对比之后接着通过二分法反复定位,直到 heigh - low = 1时,也就确认所需要的记录的数据所在的组
  4. 通过遍历链表找到该槽中的主键值值最小的那条记录,也就是上一个槽所对应的那条记录的下一条
  5. 通过next_record即可遍历该槽所在的组的各个记录

Page Header 页面头部

All problems in computer science can be solved by another level of indirection
计算机科学中的所有问题都可以通过增加一个间接层来解决

在上文中,我们已经可以在一个数据页内部快速定位到我们所需要的记录,但是一张表中不仅仅存在一个数据页,如何快速定位到我们所需要的数据页呢?
答案是再加一层抽象:对于每一个下层,我们都对其进行抽象,屏蔽掉其内部细节,方便上层使用。

Page Header 位于页结构的第二部分,用于存储页总的各种信息,比如第一条记录的地址是什么,本页存储了多少地址,页目录存储了多少个槽

主要的信息:

  • FIL_PAGE_SPAE_OR_CHECKSUM:校验和,通过算法来计算一个值,方便我们去比较
  • FIL_PAGE_OFFSET:页号,用于定位页
  • FIL_PAGE_TYPE:页类型,存放记录的页就是索引页,也是数据页
  • FIL_PAGE_PREV和FIL_PAGE_NEXT:用于组成双向链表

File Trailer

  1. 存储页的校验和:与Header部分中的校验和对应,如果同步磁盘中同步到一半就失败了,那么Header中的校验和就会变成已经修改后的校验和,而Trailer的校验和还代表着原来的校验和,从而确定同步出现错误
  2. 页面最后被修改时对应的日志序列位置LSN

数据页结构总结

  • File Header
  • Page Header
  • Infimum + Supremum
  • User Records
  • Free Space
  • Page Directory
  • File Trailer

索引!

以下部分是重中之重,可以说这篇文章就是为了这里才有写的必要的
指面试的时候因为忘了而被疯狂拷打

回顾

  1. InnoDB中数据页通过双向链表连接起来
  2. 数据页内部的记录通过双向链表连接起来,并且按照主键从小到大进行排序
  3. Page Directory 保存了每组记录最后一条记录的偏移量,方便我们快速定位到每个组

索引为什么会出现

  1. 对于主键查找,我们可以使用设计快速定位,而对于其他列,我们没有这种方便的方式可以快速定位
  2. 对于不同的页,我们也没办法快速定位到满足查询条件的记录所在的页在哪里,从而只能一个一个进行查找

索引的结构与实现原理

  1. 数据页链表中,后一个数据页中的主键值必须大于上一个页中用户记录的主键值。
  2. 仿照Page Directory,将数据页中的链表项建立目录
  3. key为数据页中最小的主键,page_no为页号

MySQL中的实现

  1. 使用数据页来存储目录项,通过record_type(0用户记录,1目录项记录,2最小记录,3最大记录) 来与用户记录进行区分
  2. 一个数据页中存储的记录是有限的,所以需要使用链表的形式将目录串起来,保证后一个页中的页号要大于前一个
  3. 为了方便我们使用二分查找从目录链表中快速定位,我们可以再次将其使用目录记录
  4. 反复之后就形成了B+树

B+树和B树的区别:

  1. B+树只在最底层的节点上存储真实数据,其余都是用来存储目录项的,B树的任何一个节点都能保存数据
  2. B+所有叶子节点之间有链表连接,方便范围查询

聚簇索引(重点)

定义:

  1. 使用主键值的大小进行记录和页的排序:
    • 页内记录按照主键大小的单向链表
    • 存放用户记录的页根据主键大小排成双向链表
    • 存放目录项的页也是同一层次排成双向链表
  2. 叶子节点存储的是完整记录(包括隐藏列)
    索引即数据,数据即索引。

二级索引

  1. 使用非主键值作为排序标准
    • 同聚簇索引,不过标准是我们使用的非主键
  2. B+树的叶子节点存储的是 这个列 + 主键两个列的值
  3. 目录项记录存的是 这个列 + 页号 + 主键
    查询过程:
    例如查询c2的值为4的记录:
  4. 根据页44, 2 < 4 < 9,可以定位到页42
  5. c2没有唯一约束,所以4可能在多个记录中,由此对比可以确定应该在页34和页35,因为 2 <4 < 5
  6. 定位到具体的记录
  7. 通过具体记录中的主键值进行回表操作—> 也就是根据主键值去聚簇索引中再查找一遍完整的用户记录

回表的好坏

优点:
可以不用重新存储完整的数据,减少空间占用
缺点:
回表会浪费额外的时间

联合索引(重点,加个书签)

比如对c2、c3建立联合索引

  1. 记录和页按照c2进行排序
  2. 在这个基础上对c3进行排序
  3. 叶子节点存储的是c2、c3和主键的值
  4. 仍然是一个二级索引,也是需要回表的

MyISAM索引方案的差别

  1. InnoDB使用索引即数据,而MyISAM将索引和数据分开存储。
  2. 将数据存在一个文件中,使用行号来快速访问,而不是使用主键值进行排序,无法进行二分查找
  3. 索引文件中存储的是主键值 + 行号,也就是所有查询都需要回表,全是二级索引
  4. 联合索引存储的也是行号 + 相应的列

创建和删除索引的sql

CREATE TABLE 表名(
	列信息,
	KEY / INDEX 索引名 (需要被索引的单个列或者多个列,联合索引使用 , 隔开列)
)
ALTER TABLE 表名 ADD INDEX / KEY 索引名 (列)
ALTER TABLE 表名 DROP INDEX / KEY 索引名

索引的一些常见八股和如何合理使用B+树索引

如何避免回表?

需要回表记录越多,使用二级索引的性能就越低。
覆盖索引:
在查询列表中只包含索引列,即可避免回表。所以一般不建议使用 * 作为查询列表,最好把需要查询的列都依次标明

访问方法

全表扫描
索引扫描

p139,接着看

最适合建立索引的场景

  1. 全值匹配:搜索条件的列和索引列一致,可以快速使用索引
  2. 联合索引最左匹配原则
  3. 前缀匹配 abc%
  4. 索引列的范围查找
  5. 对查找出的索引列数据进行排序:索引列本来就是基于排序的,所以可以不需要再内存或文件中进行排序

索引失效的场景

  1. LiIKE操作符以通配符开头例如 “%xx”或 “%xx%” ,但是 “xx%” 可以使用索引。
  2. 对索引使用函数或者表达式操作 select * from t_user where length(name)=6 , 因为索引存的是原始值
  3. 对索引隐式转换,如果查询条件中的类型和列的类型不匹配,MySQL可能会进行类型转换,索引就会失效。因为索引存的是原始值
  4. 联合索引非最左匹配,多个普通字段组合在一起创建的索引叫做联合索引,不遵循最左优先的方式就会失效。
  5. where子句中使用了OR,如果OR后的条件不是索引列就会失效
  6. 出现NULL值:不一定不走索引 ,需要看查询的cost和优化器的选择。
  7. 联合索引没有对最左列进行范围查找
    为何失效
  8. like %xx:我们索引是根据列的值大小进行排列的,也就是说,例如我们对 小写字母建立索引,那么我们的索引全是基于 a -> z的大小顺序排列的,同时我们是按照字典序的方式进行排序,也就是说如果前一个相同才会去根据后面的字母进行排序,也就是说当我们匹配abc%xxx的时候,我们可以顺着这个索引去查询,而我们匹配 %abc时,我们无法确认%之前的的字母是什么,于是我们必须走一次全表匹配所以无法走索引去去匹配
  9. 使用函数修改的列不满足最原始的排序了
  10. 联合索引,通过上文,我们可以很清楚的明白联合索引实际上就是先按照列的顺序去建立索引,只有前一个列相同,我们才会根据后一个列进行排序,也就是说,我们只有这种顺序能够保证是按照大小排序。如果我们没有遵循最左匹配,也就是将顺序倒置,那么后面的列是不能满足大小排序的,也就无法走索引
  11. 联合索引只有最左列是完全按照大小排序的,同时数据是基于链表,也就是我们可以很轻易将最左列的范围查找取出。

建立索引时应该考虑什么?重点

  1. 只为用于搜索、排序、分组的列建立索引,出现在查询列表中的列就没必要建立索引了,因为走不走索引还是看查询条件、排序条件、分组条件,而不是查询的列
  2. 考虑列的基数(不重复的数据的个数),为基数大的列建立索引效果更好
  3. 索引的类型尽量小,因为数据类型越小,查询时占用的存储空间越少,查询速度越快,数据页中可以放下更多的记录。
  4. 对于一些较长的字符串,可以只对其前缀建立索引
    CREATE TABLE person_info(
     name VARCHAR(100) NOT NULL,
     birthday DATE NOT NULL,
     phone_number CHAR(11) NOT NULL,
     country varchar(100) NOT NULL,
     KEY idx_name_birthday_phone_number (name(10), birthday, phone_number)
    );
  5. 让索引列在比较表达式中单独出现:这个和使用函数去修饰索引列是一个问题,会改变索引列的原来的形式,从而不能够走索引。例如:
    WHERE c1 * 2 < 4
    替换为
    WHERE c1 < 4 / 2
  6. 插入数据时应该注意主键的顺序,因为当数据页满了,再插入记录时,会导致页分裂->将本页中的一些记录移动到新创建的野种,从而需要将记录转移,带来性能消耗,所以推荐让主键具有AUTO_INCREMENT

连接

循环嵌套连接

两表连接,驱动表只访问一次,而被驱动表要访问多次。
左外连接左边的是驱动表。右外连接右边的表是驱动表
然后每一个被驱动表其实都是一次循环,多个表循环嵌套,驱动表的每一行都要去遍历被驱动表。这种连接被称为嵌套循环连接。

事务

redo log

redo log 会把事务在执行过程中对数据库所做的所有的修改都记录下来,之后系统崩溃重启后可以把事务所做的任何操作都回复出来。

格式

  • type:表示该redo日志的类型
  • space ID:表空间ID
  • page number:页号
  • data:该条redo 日志的具体内容
    刷盘时机:
  • 当log buffer空间不足时,当redo log占满了log buffer的一半左右
  • 事务提交时
  • 后台线程自动刷新,约是每秒一次
  • 正常关闭服务器时

redo log 如何保证事务的完整性的

undo log

undo log 是单独存储的

  • trx_id事务id
    被删除的记录也会通过记录头信息中的next_record组成一个单项链表,然后指向PAGE_FREE空间
    事务提交之前,被删除的记录的delete_mask会被设置为1,但是不会被加入垃圾链表中,也就是会处于中间态

MVCC

只靠MVCC不能解决幻读,需要额外使用锁

实现原理

  1. 每一行的数据都有多个版本,更新时不会覆盖原来的数据,而是生成新的版本。
  2. 读操作根据ReadView去读
  3. 写操作,旧的版本不会被删除,而是放在垃圾链表,将原来的数据写入undo log之后,通过roll_id指向这一行的undo log

事务隔离等级和几个并发会出现的问题

并发常见到的问题

  1. 脏读:一个事务读到了另一个未提交事务修改过的数据
  2. 不可重复读:一个事务两次读取读取到的数据不同,也就是两次读取之间被其他事务修改了数据
  3. 幻读:相同的查询,查出来的结果不同
    几个等级:
    脏写 < 脏读 < 不可重复读 < 幻读

事务隔离等级

  1. 读未提交:允许一个事务读取另一个事务未提交的数据 阻止不了上面几种
  2. 读已提交:只允许一个事务读取另一个事务已经提交的修改,可以阻止脏读
  3. 可重复读:保证同一个事务多次读取获得的数据是相同的,避免了脏读、不可重复读
  4. 可串行化:完全隔离事务,确保事务按照顺序执行,彷佛他们是串行执行的

版本链

rolle_pointer:每次对某条聚簇索引进行改动时,会将原来的旧版本写入到undo日志中,然后这个隐藏列就相当于一个指针,可以通过它来找到该记录修改之前的信息。
每次对记录进行更新后,都会将旧值放到一条undo日志中,就算是该记录中的一个旧版本,随着更新次数的增多,所有的版本都会被role_pointer连接成一个链表,版本链的头节点就是当前记录的最新值,另外,每个版本还包含该版本时对应的事务id

ReadView

如何判断一条记录是否可以被某个事务可见。
ReadView的内容

  • m_ids:表示生成ReadView时当前系统中活跃读写事务id列表
  • min_trx_id:表示生成ReadView时,当前系统中活跃的最小事务中的最小事务id,也就是m_ids的最小值
  • max_trx_id:生成ReadView时系统应该分配给下一个事务的id值。
  • creator_trx_id:表示生成该ReadView的事务的事务id
    当事务访问某条记录时,进行一下判断访问是否可见:
  • 被访问版本的trx_id == ReadView中的creator_trx_id时,意味着当前事务访问的是自己修改过的记录,可以被当前事务访问
  • trx_id < ReadView中的min_trx_id,表示该版本是在当前事务之前旧已经提交了,可以访问
  • trx_id > max_trx_id,表示当前记录的事务是在当前事务之后才开始的,不能访问
  • min_trx_id < trx_id < max_trx_id,说明创建ReadView时,该版本的事务还是活跃的,需要从m_ids中判断是否存在,如果存在就不能访问,否则就可以访问
    如果某个版本的数据对于当前的事务不可见,就沿着版本链去找下一个版本的数据,如果版本链中所有的数据都不可见,才意味着这条记录对当前事务不可见,查询条件就不会包括这条记录。

读已提交和可重复读的区别

区别就是二者的ReadView生成时机不同:

  • 读已提交在每次执行查询语句时生成一个ReadView,此后不会再重复生成了
  • 可重复读只在第一次进行查询语句时生成一个ReadView,此后查询操作都重复使用这个ReadView,从而做到保证多次读取读取到相同的数据。

锁的结构


对于每一条记录,都会有一个锁对应。当一个事务相对这条记录做改动时,首先会看看内存中是否有与这条记录想关联的锁结构,如果没有就需要生成一个锁结构与之关联。
所结构中的最重要的两个属性:

  • trx信息:这个锁结构是由哪个事务生成的
  • is_waiting:代表当前事务是否在等待
  1. 获取锁成功/加锁:当事务修改这条记录时,如果不存在锁,就会在修改记录之后生成一个锁机构1,trx是这个事务的id,is_waiting是false。
  2. 获取锁失败:当其他事务尝试修改这条记录时发现存在了如果发现已经存在了这个锁1,那么就会给自己生成一个锁2,但是is_waiting是true,表示需要等待。
  3. 当事务1正常提交之后会把该事务生成的锁结构1删除,然后查看是否还有其他别的事务在等待获取锁,发现事务2在等待,于是就将锁2的is_waiting设置为false,之后将这个事务的线程唤醒,继续执行。

一致性读

事务使用MVCC进行读取被称为一致性读,或者一致性无锁读,也叫快照读。

锁定读

行级锁
对于读写冲突问题,MySQL使用了MVCC+锁来解决

  1. 共享锁:Shared Locks S锁,读取记录时需要先获取该记录的S锁
  2. 独占锁:也叫排他锁, X锁,修改记录时需要鲜活的该记录的X锁。
    主动加锁的sql
    SELECT ... LOCK IN SHARE MODE; # 加S锁
    SELECT ... FOR UPDATE; # 加X锁

多粒度锁

上文提到的都是行级锁

  • 意向共享锁 IS锁:表级锁
  • 意向独占锁 IX锁:表级锁
    这两个锁时为了之后对表添加表级别的S锁和X锁时可以快判断表中的记录是否被上锁,以免需要使用遍历来确定是否有记录被上锁了。

InnoDB中的锁

表级锁

  • 表级的S锁、X锁:
  • 表级的IS锁、IX锁:
  • 表级的AUTO-INC锁:自增AUTO_INCREMENT修饰的列递增赋值

行级锁(重点)

  • Record Locks:正经记录锁,分为S锁和X锁
  • Gap Locks:间隙锁,加锁方案解决幻读的关键
  • Next-Key锁:简单来说就是 record locks + gap locks
  • Insert Intention Locks:事务在等待的时候也需要在内存中生成一个锁结构,表明有事务想在某个间隙中插入新记录。叫做插入意向锁,目的是避免插入操作的相互阻塞

Gap Locks

对于幻读中出现的幻影记录,我们没法加锁,因为他们还未存在。
间隙锁就是所著一个范围内所有的间隙,从而阻止其事务在这些间隙中插入数据

八股

InnoDB为什么是默认引擎

  1. 唯一支持事务的引擎
  2. 行级锁
  3. 支持外键约束
  4. 崩溃恢复,redolog和undolog

为什么InnoDB使用B+树

  1. 支持快速查找:平衡多路查找树,高度较低
  2. 有序性:翻遍范围查找分组
  3. 插入和删除更搞笑:使用双向链表
  4. 适应磁盘存储:节点的大小控制在磁盘页面大小范围内,减少I/O操作
  5. 支持有序查找和范围查找

MVCC

多版本并发控制

  1. 每一行的记录都维护多个版本,每一行更新时,不会覆盖原来的数据,而是会生成新的版本
  2. 读操作只读事务创建之前的

MVCC + 间隙锁无法解决幻读的场景

(这里我把net-key locks也作为间隙锁了)
一般情况下MVCC + 间隙锁就能够解决幻读了,但是有些场景下任然是不够的。原因和间隙锁的作用范围有关:

  1. 间隙锁只能锁定特定范围内的间隙,如果查询条件不匹配索引,可能会出现没有锁定的间隙,从而允许插入新记录
  2. 某些隔离级别下(读已提交),间隙锁可能不会被使用,从而无法避免幻读
  3. 复合查询:当查询设计多个间隙和索引时,间隙锁可能无法覆盖所有的间隙

MySQL执行一条sql语句的流程

主要的组件:

  1. 连接器
  2. 查询缓存
  3. 分析器
  4. 优化器
  5. 执行器

慢索引如何优化

  1. 使用EXPLAIN语句来查看执行计划,看看是使用了全表扫描,或者索引未命中
  2. 优化SQL:1. 加索引 2. 重写查询,使用子查询或者,联合查询 3. 添加过滤条件等

Kafka

常见问题
## Java API
- 一个独立的Kafka服务器被称作broker,broker是集群的组成部分,每个集群都有一个broker同时充当了集群控制器的角色
- kafka的主题被分为多个分区,分区存储在磁盘中
- kafka的消息特点:消息会保留一段时间,即使应用程序下线,消息仍然会保存在kafka里,是基于磁盘的数据存储。
- API使用:
- producer
- record的参数,topic , key ,value key可以省略,当省略时就是一个没有key的value,key一般用于把相同key的数据写入同一个分区里
- send默认是发送并忘记,send会返回一个Future对象使用.get方法得到RecordMetadata对象,可以获取消息的偏移量
- 异步发送.send(record,回调函数类),回调函数需要实现Callback类,并且重写其中的方法
- 配置设置:
1. acks = 0 生产者无需等待服务器的响应,但消息丢失时不会知晓, = 1,只要集群的首领节点收到即可,= all需要所有参与复制的节点都受到消息才会受到服务器的响应,不建议
2. buffer.memory设置producer内存缓冲区大小,
3. compression.type设置消息的压缩格式,默认不会压缩
4. retries重试次数
5. batch.size 当多个消息被发送到同一个分区时,producer会把他们放在一起,当作一个批次,这个参数指定一个批次可以使用的最大内存
6. max.in.flight.requests.per.connection 生产者在接收到服务器响应之前可以发送多少个消息,设置为 1 时可以保证消息是按照顺序写入的,适合在银行等严格要求顺序的时候使用
- 自定义分区:
public class DemoPartitionser implements Partitioner {

    private String myKey;
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 获取分区列表
        List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic);
        // 获取分区数
        int partitionNum = partitionInfos.size();
        if((keyBytes == null) || (!(key instanceof String))){
            throw new IllegalArgumentException("key is null or not a string");
        }
        // 如果key为key,则分配到最后一个分区
        if("key".equals(key)){
            return partitionNum;
        }
        //其余的消息都分配到最后一个分区
        return Math.abs(key.hashCode()) % (partitionNum - 1);
    }

    @Override
    public void close() {

    }

    /*
     * @description:
     * @author bronya
     * @date: 2024/4/20 14:50
     * @param map
     */
    @Override
    public void configure(Map<String, ?> map) {
        myKey = (String) map.get("key");
    }
}

- consumer:
- API:
1. consumer类的创建方式和producer一致,但是推荐指定 group.id来指定属于哪个群组,
2. consumer.subscirbe()订阅相关的topic,同时支持正则表达式 consumer.subscribe(Arrays.asList("topic1", "topic2"));
3. consumer.poll(xxx) 参数是轮询的阻塞时间,会等待broker返回数据
4. 记得.close()
5. 建议一个线程中只有一个消费者
- 配置:
- 大部分和producer类似
- fetch.min.bytes消费者从服务器获取记录的最小字节数,当数据量大于等于这个值才会被返回给消费者
- 提交和偏移量:消费者往_consumer_offset特殊主题发送消息,消息包含每个分区的偏移量,当触发再均衡时,消费者会读取每个分区组后一次提交的偏移量,然后从偏移量指定的地方开始处理
- 提交方式:
1. 自动提交:每隔一段时间自动提交一次
2. 提交当前偏移量:设置auto.commit.offset为false.然后使用commitSync()提交偏移量,会提交最新的一次由poll获得的偏移量
3. 异步提交:commitAsync(),也可使用回调函数来处理
4. 可以提交特定偏移量,而不是最后一次poll得到的偏移量
- 通过实现 ConsumerRebalanceListener接口来定义在consumer失去对分区的所有权时需要处理的事件,可在这里使用.seek()方法加上自定义的函数来实现从数据库中获得偏移量
## Kafka本体
### 优势
高吞吐,高性能,持久化(将消息持久化到磁盘,通过将数据持久化硬盘,以及follower节点来防止数据丢失)
缺点是异步的不适合电商场景
### 架构设计
- Producer
- Consumer
- Topic 主题,由用户定义并配置在Kafka服务器,建立Producer和Consumer之间的订阅关系,身缠这发送消息到特定的Topic下,消费者从这个Topic下消费消息。逻辑概念,相当于数据库中的表
- Partition 消息分区,一个Topic可以分为多个partition,partition是一个有序的队列,partition的每条消息都会被分配一个有序的id(offset) 物理实际概念,每一个partition对应一个log,producer生产的数据会不断地追加到该log文件末端,且每条数据都有自己的offset。系哦啊飞着组中的每个消费者都会实时记录自己消费到哪个offset
- Broker 一台Kafka就是一个broker,一个集群由多个broker组成,一个broker可以容纳多个topic
- ConsumerGroup 消费者组,用于归组同类消费者。每个consumer属于一个特定的consumer group,多个消费者可以共同消费一个Topic下的消息,每个消费者获取部分消息。
- 一个Partition对应一个唯一的文件夹,文件夹下使用的是Segment File的存储方式进行存储。将大文件拆成小文件,分为索引未见和数据文件
### 基本流程
- producer先从zookeeper的broker/**/state节点找到该partiton的leader
- producer将消息发送给该leader
- leader将消息写出本地log
- follower从leader pull 消息
- 写入本地log,后向leader发送ACK
- leader收到所有ISR中的replication的ACK,增加HT(high watermark ,最后commit 的offset)并向producer发送ACK
#### 生产过程
1. Producer创建时,先创建一个Sender线程并且设置守护线程
2. 生产的消息经过拦截器->序列化器->分区器,将消息存在缓冲区
3. 批量发送的条件:缓冲区数据大小达到batch.size或者linger.ms达到上限
4. 发往指定分区,最后到达broker
- acks = 0,消息放到缓冲区就认为发送完成
- acks = 1消息写到主分区即可完成,如果主分区收到消息之后宕机,副本分区来不及同步消息,消息就会丢失
- acks = all 等待所有的ISR副本的缺人记录
5. 如果设置了重试次数并且大于0,就会进行重试
6. 成功,返回元数据给生产者
ISR(In-Sync Replicas)是指与领导者副本保持同步的副本集合
#### 生产者Offset
消息写入的时候,每一个分区都有一个offset,即每个分区的最新最大的offset。
#### 消费者Offset
不同消费组中的消费者可以针对一个分区存储不同的Offset,互不影响。
#### LogSegment
日志文件的组成部分
#### Leader选举
- Kafka会在Zookeeper上针对每个Topic维护一个成为ISR的集合
- 当集合中副本都跟Leader同的副本同步之后,kafka才会认为消息已提交
- 只有这些跟Leader保持同步的Follower才应该被选作新的Leader

写在前面

本文不是git入门,但后续会补一些入门知识,主要是写给工作室的学弟学妹快速排障使用。主要是被某些同学整无语了才想着记录一下
观看前提:会上github

git后悔药

世上没有后悔药

但是git有( ̄︶ ̄*))
还在担心上班时脑子一热写下臭骂老板的话导致绩效奖没有吗,还在担心提交记录写的丑吗,还在担心远程push的时候不会搞吗,这篇文章用来记录如何解决这些问题。

基本概念

先介绍一些概念:

  1. 工作区:工作区就是我们修改的文件但是还没有进行git add
  2. 暂存区:我们使用git add 之后但是还么有进行git commit
  3. 已提交:进行git commit 之后
  4. 已经推送到远程仓库:git push之后
  5. HEAD:Git中的一个特殊的指针,指向了当前所在分支的最新提交。可以使用 ^ 来当作指针修饰符,HEAD^代表前一个提交, HEAD^^或者HEAD2代表上两层提交,同理HEADn 当前提交的第n级父提交

工作区

在工作区进行修改之后,发现不想要了可以使用
git checkout -- <文件名>来撤回修改
不过现在git新版本推荐使用
git restore <文件名> 来撤回修改

暂存区

git reset HEAD <文件名> 使用这个命令可以将这个文件放回工作区,之后如果不想要了可以结合工作区的命令来撤回修改

已提交

  1. 撤回提交但是保留修改:也就是回到工作区 : git reset --soft HEAD^
  2. 撤回提交并且撤回修改:也就是回到暂存区,git rest --mixed HEAD^可以简写为 git rest HEAD^
  3. 撤回修改并且丢失所有的修改:也就是将工作区和暂存区的修改都给撤回:git reset --hard HEAD^

已经推送到远程仓库

当你到达这一步的时候,你肯定已经慌了,绩效奖多半是没了(;´д`)ゞ
git revert HEAD
使用这个可以将你修改的东西撤回,然后你将这个新的提交上去就能够覆盖掉你原来的提交了,但是你原来的那个提交还是在记录中的,仍然可以访问的。

git开发注意

如何进去github

由于不可抗力,github需要一些魔法才能进去。不过工作室目前使用的是工作室服务器上搭建的gitea进行代码的管理,操作和github上基本一致。但是仓库目前由于学校锁ip,必须使用校园网才能进去,所以当成员不在校时,可以使用github来进行开发。

基本操作

如何提交代码

  1. 在仓库上中建立一个你英文名字的分支或者forked一个到自己仓库中(工作室一般采用第一种)
  2. 本地打开terminal,输入git push origin 你要提交的本地分支:你要提交到的远程分支名字,示例:git push origin main:huangzhenwei
  3. 点开仓库,找到Pull Request (pr)

面试常问

[黑马JVM面试](‌⁠​​‍​‍​‌​​​​​⁠​‌​‌‌​‍​​​​​⁠​‬‍​​​​‍​‍​​‌‌​‬面试篇 - 飞书云文档 (feishu.cn))
👍Java虚拟机8-11双版本 -JVM高频面试题👍 (yuque.com)

new的过程/创建对象的步骤

  1. 类加载检查,先去检查是否能在常量池中定位到这个类的符号引用来判断是否已经创建了
  2. 分配内存(指针碰撞,空闲列表)
  3. 初始化零值
  4. 设置对象头:
  5. 执行init方法

静态变量存储在哪里呢?

  • JDK6及之前的版本中,静态变量是存放在方法区中的,也就是永久代。
  • JDK7及之后的版本中,静态变量是存放在堆中的Class对象中,脱离了永久代。

组成

|525

  • 编译器:不属于Java虚拟机的一部分,负责将源代码文件编译成字节码文件。
  • 类加载子系统,负责将字节码文件读取、解析并保存到内存中。其核心就是类加载器。
  • 运行时数据区,管理JVM使用到的内存。
  • 执行引用,分为解释器 解释执行字节码指令;即时编译器 优化代码执行性能; 垃圾回收器 将不再使用的对象进行回收。
  • 本地接口,保存了本地已经编译好的方法,使用C/C++语言实现。

运行时数据区的组成

|900

  • 堆,可分为新生代和老年代,新生代可分为Eden,Surviver1,Surviver2
  • 方法区:永久区,存储已经被Java虚拟机架子啊的类信息等,jdk1.8之后称为元空间。
  • 虚拟机栈:线程私有,内有多个栈帧,方法在执行时会创建栈帧,包含(局部变量表,操作数栈,动态链接,返回地址等)
    • 局部变量表:存放方法参数,局部变量
    • 操作数栈:记录一个方法在执行过程中,字节码指令向操作数栈进行入栈和出栈的过程。
    • 动态链接:字节码中的符号链接,一部分在类加载过程中转化为直接引用(静态解析),还有一部分在运行时转化为直接引用,也就是动态链接。
    • 返回地址:当前方法执行过程中,当前方法需要返回的位置。
  • 本地方法栈:服务的是Native方法
  • PC程序计数器:线程私有

Java虚拟机栈

每一个方法的调用使用一个栈帧来保存,每一个线程都有一个自己的虚拟机栈,生命周期和线程相同
主要包括

  1. 局部变量表:方法执行过程中存放所有的局部变量
  2. 操作数栈:虚拟机在执行指令过程中用来存放临时数据的一块区域
  3. 帧数据:主要包括动态链接、方法出口、异常表等内容。

本地方法栈

存储的是native本地方法的栈
本地方法(Native Method)是使用非 Java 语言(通常是 C 或 C++)编写的方法。这些方法通过 Java 本地接口(Java Native Interface,JNI)与 Java 代码进行交互。使用本地方法的主要目的是为了实现以下几个功能:

  1. 与操作系统交互:直接调用操作系统的底层功能。
  2. 提高性能:在性能关键的部分使用更高效的本地代码。
  3. 访问硬件:与特定硬件设备进行交互。
  4. 复用现有库:调用已有的用其他语言编写的库。

用来存放创建出来的对象,栈上的局部变量表中,可以存放对上的引用,静态变量也可以存放堆对象的引用,实现对象在线程之间的共享
堆是垃圾回收的最主要部分

类加载器

ClassLoader是Java虚拟机提供给应用程序去实现获取类和接口字节码数据的技术。去获得二进制字节码信息。然后通过JVM调用JNI也就是本地接口方法区创建对象

类加载器的分类

jdk8以及之前
  1. Java代码实现的(扩展类加载器,应用程序类加载器)
  2. Java虚拟机底层代码实现的(启动类加载器)
    使用启动加载类器加载器去加载用户的jar包,可以在虚拟机参数那里添加:
    -Xbootclasspath /a: jar包名
    Java中的加载器: 是一个静态内部类,继承自URLClassLoader,通过目录或者指定jar包将字节码文件加载到内存中
双亲委派机制(jdk8以及之前重点)

如果一个类加载器收到了类加载的请求,它首先不会自己尝试去加载这个类,而是把这个请求委派给父类加载器,每一个层次的类加载器都是加此,因此所有的加载请求最终到达顶层的启动类加载器,只有当父类加载器反馈自己无法完成加载请求时(指它的搜索范围没有找到所需的类),子类加载器才会尝试自己去加载。

可以解决的问题:
重复的类:启动类,根据双亲委派机制,如果同一个类出现在三个类加载器中,会由启动类加载器来加载。
String类能覆盖吗? 不能,会由启动类加载器加载在rt.jar包中的String类
类加载器的关系:应用类加载器的父类加载器识扩展类加载器,扩展类加载器没有父类加载器,但是会委派给启动类加载器。
双亲委派机制的作用:
保证类加载器的安全性,避免重复加载。

  1. 每一个类加载器都有一个父类加载器,在类加载的过程中,每个加载器会先检查是否已经加载了该类,如果已经加载则直接返回否则奖将载请求委派给父类加载器
  2. 如果所有的父类加载器都无法加载,就由当前加载器尝试加载,也就是说,如果父类加载器的加载路径中没有这个类,就会由他自己加载
如何打破双亲委派机制
  1. 自定义类加载器并且重写loadClass方法就可以将双亲委派机制的代码去除。Tomcat通过这种方式实现应用之间类隔离,每一个应用会有一个独立的类加载器加载对应的类
  2. 线程上下文加载器加载类,比如JDBC和JNDI等,JDBC使用DriverManager来管理项目中引入的不同的数据库驱动,DriverManager类位于rt.jar包中,由启动类加载器加载。依赖中的mysql驱动对应的类,由应用程序类加载器来加载,DriverManager属于rt.jar是启动类加载器加载的。而用户jar包中的驱动需要由应用类加载器加载,这就违反了双亲委派机制。定义服务接口:JDBC定义了java.sql.Driver接口。
    服务提供者:各个数据库厂商实现java.sql.Driver接口,并在META-INF/services目录下创建文件java.sql.Driver,文件内容为实现类的全限定名。
    加载驱动:DriverManager类在静态代码块中通过ServiceLoader加载所有实现java.sql.Driver接口的类,并调用Class.forName方法加载驱动类。

    定义服务接口:JDBC定义了java.sql.Driver接口。
    服务提供者:各个数据库厂商实现java.sql.Driver接口,并在META-INF/services目录下创建文件java.sql.Driver,文件内容为实现类的全限定名。
    加载驱动:DriverManager类在静态代码块中通过ServiceLoader加载所有实现java.sql.Driver接口的类,并调用Class.forName方法加载驱动类

  3. Osgi框架的类加载器,允许同级之间委托进行类的加载
    自定义类加载器父类怎么是AppClassLoader呢?
JDK9之后的类加载器

为什么抛弃了拓展类加载器
因为扩展类加载器主要是和加载jre环境下lib下的jar包,需要拓展Java的功能时,需要把jar包能够在ext文件夹下,不安全
JDK9引入的模块化开发取代了他

  1. 启动类加载器使用Java编写,位于jdk.internal.loader.ClassLoaders类中。Java中的BootClassLoader继承自BuiltinClassLoader实现从模块中找到要加载的字节码资源文件。但是还是无法获取到
  2. 扩展类加载器被替换成了平台类加载器,所以继承关系从URLClassLoader变成了BuiltinClassLoader,BuiltinClassLoader实现了从模块中加载字节码文件。平台类加载器的存在更多的是为了与老版本的设计方案兼容,自身没有特殊的逻辑
双亲委派机制总结

|500

类的生命周期

加载、连接、初始化、使用、卸载

加载

简单概括:找到需要加载的类,并把类的信息加载到JVM中,然后在堆中实例化一个java.lang.Class对象,作为方法区中的这个类的信息的入口

  1. 加载过程中是类加载器根据类的全限定名通过不同的渠道以二进制流的形式获取字节码信息
  2. 加载器加载完类之后,Java虚拟机将字节码中的信息保存到方法区中,生成一个InstanceKlass对象,保存类的所有信息,里面还包含实现特定功能比如多态的信息
  3. Java虚拟机在堆中生成一份与方法去中数据类似的java.lang.Class对象,作用是在Java代码中获取类的信息和存储静态字段的数据,jkd8以后将静态字段放在堆区中。 优点是:对于开发者来说只需要访问堆中的Class对象而不需要访问方法区中的所有信息,能够很好的控制开发者访问数据的范围
连接
  1. 验证,校验Java字节码文件是都遵循了约束,一般不需要程序员参与
  2. 为静态变量分配内存并设置初始值,如果使用了final来修饰,就会直接将代码中的值赋给静态变量
  3. 解析,将符号引用替换为直接引用,符号引用是在字节码中使用序号来进行引用,而直接引用就是使用内存中的地址直接进行访问
初始化

当类被直接引用的时候才会出发类的初始化。类被直接引用的情况有->
new,读取或设置类的静态变量,调用类的静态方法,通过反射来执行前三种,初始化子类时会触发父类的初始化,接口实现类初始化时,会出发直接或间接实现的所有接口的初始化
类的初始化只会运行静态部分,而且优先父类

  1. 执行静态代码块中的代码,为静态变量赋值
  2. 初始化阶段会执行字节码中clinit部分的字节码指令
程序中可以直接导致初始化的操作:
  1. 访问一个类的静态变量或者静态方法,final修饰的并且等号在右侧是常量的话不会触发初始化,因为在连接阶段就已经进行赋值了
  2. 调用Class.forName(String className)时会进行初始化
  3. new
  4. 执行Main方法的当前类
    <clinit>是Java中的一个特殊方法,代表初始化器(class initializer)。这个方法不是由程序员显式编写的,而是由Java虚拟机(JVM)自动生成的。当类被首次加载到JVM时,<clinit>方法负责执行类变量的静态初始化和静态初始化块的代码
    clinit 在以下情况不会出现:
  5. 无静态代码块
  6. 有静态变量声明但是没有赋值语句
  7. 静态变量的定义使用final字段,会在准备阶段直接进行初始化。
    访问父类静态变量,只会初始化父类
    new 子类时会先执行父类的clinit方法
    数组的创建不会导致数组中元素的类的初始化。
    final修饰的变量如果不是常量需要执行指令才能得出结果会执行clinit方法进行初始化
使用
卸载

字节码文件

  1. 常量池:避免保存重复的内容,节省空间
  2. 具体的字节码文件分析
    //源码
    int i = 0;  
    i = i ++;
    //字节码
    0 iconst_0  将0放在操作数栈中
    1 istore_1 弹出操作数栈最顶层数据到局部变量1号位
    2 iload_1  复制到操作数栈顶
    3 iinc 1 by 1 将局部变量1号位的数据 + 1
    6 istore_1 弹出,保存在一号位,所以被覆盖了
    7 return
    
组成

基础信息,常量池(保存字符串常量、类或者接口名,主要在字节码指令中使用),字段,方法,属性

运行时数据区

按照线程共享不共享区分:

  • 线程不共享
    1. 程序计数器
    2. Java虚拟机栈
    3. 本地方法栈
  • 线程共享
    1. 方法区
程序计数器

内存溢出:程序在使用某一块内存区域时,存放的数据需要占用的内存大小超过了虚拟机能提供的内存上限
每个线程只需要存储一个固定长度的内存地址,所以程序计数器是不会发生内存溢出的
程序员无需对程序计数器进行任何处理

虚拟机栈

Java虚拟机使用栈来管理方法调用中的基本数据。每一个方法的调用使用一个栈帧来保存,每个线程都包含一个自己的虚拟机栈

栈帧的组成
  • 局部变量表:存放运行中的所有局部变量,包括局部变量表保存的内容有:实例方法的this对象,方法的参数,方法体中声明的局部变量。
  • 操作数栈:存放临时数据
    • 栈帧中的局部变量表是一个数组,数组中每一个位置称之为槽(slot) ,long和double类型占用两个槽,其他类型占用一个槽。
    • 实例方法中的序号为0的位置存放的是this,指的是当前调用方法的对象,运行时会在内存中存放实例对象的地址。
    • 为了节省空间,局部变量表中的槽是可以复用的,一旦某个局部变量不再生效,当前槽就可以再次被使用。
  • 帧数据:包含动态链接、方法出口、异常表的引用
本地方法栈:

存储natice本地方法的栈帧,在Hotspot虚拟机中,Java虚拟机栈和本地方法栈实现上使用了同一个栈空间。

堆内存


一般Java程序中堆内存是空间最大的一块内存区域。创建出来的对象都存在于堆上。栈上的局部变量表中,可以存放堆上对象的引用。静态变量也可以存放堆对象的引用,通过静态变量就可以实现对象在线程之间共享。
堆空间有三个需要关注的值,used、total、max。used指的是当前已使用的堆内存,total是java虚拟机已经分配的可用堆内存,max是java虚拟机可以分配的最大堆内存。
不是当used = max = total的时候,堆内存就溢出


方法区:

方法区存放基础信息的位置,线程共享的,主要包含三部分:

  • 类的元信息,保存所有类的基本信息
  • 运行时常量池,保存了字节码文件中常量池内容
  • 字符串常量池,保存了字符串常量
元信息

一般称之为InstanceKlass对象。在类的加载阶段完成。其中就包含了类的字段、方法等字节码文件中的内容,同时还保存了运行过程中需要使用的虚方法表(实现多态的基础)等信息。

运行时常量池

字节码文件中通过编号查表的方式找到常量,这种常量池称为静态常量池。当常量池加载到内存中之后,可以通过内存地址快速的定位到常量池中的内容,这种常量池称为运行时常量池。

字符串常量池

字符串的拼接操作会创建一个新的字符串对象,而不是使用字符串常量池中的现有对象。
而直接用两个字符串拼接是放在常量池的,不是新建了一个对象

String s1 = new String("abc");
String s2 = "abc";
System.out.println(s1 == s2);
答案是false,因为"abc"是直接放入字符串常量池的,而new 出来的是放在堆内存中,两者所在的位置不同。
1.

/**
 * 字符串常量池案例
 */
public class Demo3 {
    public static void main(String[] args) {
        String a = "1";
        String b = "2";
        String c = "12";
        String d = "1" + "2";
        System.out.println(c == d);
    }
}
答案是true,在编译阶段就把 "1" + "2"连接在一起了
2.
public class Demo2 {
    public static void main(String[] args) {
        String a = "1";
        String b = "2";
        String c = "12";
        String d = a + b;
        System.out.println(c == d);
    }
}
答案是false

jdk8之后运行时常量池放在元空间,而字符串常量池还在堆中。
String.intern()方法是可以手动将字符串放入字符串常量池中

直接内存

直接内存不属于Java运行时的内存区域,NIO机制中使用直接内存来解决

  1. Java堆中的对象如果不再使用要回收,回收时会影响对象的创建和使用。
  2. IO操作比如读文件,需要先把文件读入直接内存(缓冲区)再把数据复制到Java堆中。
    现在直接放入直接内存即可,同时Java堆上维护直接内存的引用,减少了数据复制的开销。写文件也是类似的思路。

执行引擎

本地接口

垃圾收集器GC

GC (Garbage Collection),堆是垃圾回收最主要的区域,所以也被乘坐GC堆。
如何手动触发垃圾回收:使用System.gc(),但是这个方法不会立即进行回收,只是向虚拟机发送一个垃圾回收的请求。

不由GC进行回收的部分

线程不共享的部分不需要GC进行回收,因为随着线程的销毁,对应的方法的栈帧就会自动弹出栈并且释放掉对应的内存。

进行回收的部分

方法区

主要回收不再使用的类,需要满足以下条件

  1. 这个类所有的实例对象都被回收,在队中不存在该类的实例对象以及子类对象
  2. 加载该类的类加载器已经被回收,类加载器的任务完成之后引用被去除后就会被回收
  3. java.lang.Class对象没有在任何地方被引用
对象

Java中的对象能否被回收是根据对象是否被引用来决定的。如果对象被引用就不允许被回收
主要的算法:

引用计数法

为每个对象维护一个指针,对象被引用时+1,取消引用时-d。
缺点是:

  1. 每次引用都要维护计数器,会影响系统的性能
  2. 存在循环引用问题,A引用B,B引用A就无法回收了
可达性分析法

可达性分析法将对象分为:垃圾回收的根对象 (GC Root)和普通对象,对象之间存在引用关系。
可达性分析就是如果某个对象到GC Root是可达的,就不能被回收。
哪些对象被称之为GC Root对象呢?

  • 线程Thread对象,引用线程栈帧中的方法参数、局部变量等。
  • 系统类加载器加载的java.lang.Class对象,引用类中的静态变量。
  • 监视器对象,用来保存同步锁synchronized关键字持有的对象。
  • 本地方法调用时使用的全局对象。

常见的引用对象

可达性算法中描述的对象引用,一般指的是强引用,Java中还设计了几种其他引用方式:

  • 软引用
  • 弱引用
  • 虚引用
  • 终结器引用
软引用

如果一个对象只有软引用关联到他时,程序内存不足时回将其中的数据及逆行回收,软引用常用于缓存中
好处就是用作缓存快速从内存中读取,即使被释放了也可以重新获取,减少内存溢出的可能性
软引用对象本身,也需要被强引用,否则软引用对象也会被回收掉。
使用案例:

/**
 * 软引用案例2 - 基本使用
 */
public class SoftReferenceDemo2 {
    public static void main(String[] args) throws IOException {

        byte[] bytes = new byte[1024 * 1024 * 100];
        SoftReference<byte[]> softReference = new SoftReference<byte[]>(bytes);
        bytes = null;
        System.out.println(softReference.get());

        byte[] bytes2 = new byte[1024 * 1024 * 100];
        System.out.println(softReference.get());
//
//        byte[] bytes3 = new byte[1024 * 1024 * 100];
//        softReference = null;
//        System.gc();
//
//        System.in.read();
    }
}

如果软引用对象中的数据已经被回收了,那么这个对象本身也可以被回收了
SoftReference提供了一套队列机制:

  1. 软引用创建时,通过构造器传入引用队列
  2. 在软引用中包含的对象被回收时,该软引用对象会被放入引用队列
  3. 通过代码遍历引用队列,将SoftReference的强引用删除
    /**
     * 软引用案例3 - 引用队列使用
     */
    public class SoftReferenceDemo3 {
    
        public static void main(String[] args) throws IOException {
    
            ArrayList<SoftReference> softReferences = new ArrayList<>();
            ReferenceQueue<byte[]> queues = new ReferenceQueue<byte[]>();
            for (int i = 0; i < 10; i++) {
                byte[] bytes = new byte[1024 * 1024 * 100];
                SoftReference studentRef = new SoftReference<byte[]>(bytes,queues);
                softReferences.add(studentRef);
            }
    
            SoftReference<byte[]> ref = null;
            int count = 0;
            while ((ref = (SoftReference<byte[]>) queues.poll()) != null) {
                count++;
            }
            System.out.println(count);
    
        }
    }
弱引用

弱引用整体与软引用基本一致,但是弱引用不管内存够不够都会直接被回收

package chapter04.weak;

import java.io.IOException;
import java.lang.ref.WeakReference;

/**
 * 弱引用案例 - 基本使用
 */
public class WeakReferenceDemo2 {
    public static void main(String[] args) throws IOException {

        byte[] bytes = new byte[1024 * 1024 * 100];
        WeakReference<byte[]> weakReference = new WeakReference<byte[]>(bytes);
        bytes = null;
        System.out.println(weakReference.get());

        System.gc();

        System.out.println(weakReference.get());
    }
}
虚引用和终结器引用

这两种引用在常规开发中是不会使用的。

  • 虚引用也叫幽灵引用/幻影引用,不能通过虚引用对象获取到包含的对象。虚引用唯一的用途是当对象被垃圾回收器回收时可以接收到对应的通知。Java中使用PhantomReference实现了虚引用,直接内存中为了及时知道直接内存对象不再使用,从而回收内存,使用了虚引用来实现。
  • 终结器引用指的是在对象需要被回收时,终结器引用会关联对象并放置在Finalizer类中的引用队列中,在稍后由一条由FinalizerThread线程从队列中获取对象,然后执行对象的finalize方法,在对象第二次被回收时,该对象才真正的被回收。在这个过程中可以在finalize方法中再将自身对象使用强引用关联上,但是不建议这样做。
    package chapter04.finalreference;
    
    /**
     * 终结器引用案例
     */
    public class FinalizeReferenceDemo {
        public static FinalizeReferenceDemo reference = null;
    
        public void alive() {
            System.out.println("当前对象还存活");
        }
    
        @Override
        protected void finalize() throws Throwable {
            try{
                System.out.println("finalize()执行了...");
                //设置强引用自救
                reference = this;
            }finally {
                super.finalize();
            }
        }
    
        public static void main(String[] args) throws Throwable {
            reference = new FinalizeReferenceDemo();
           test();
           test();
        }
    
        private static void test() throws InterruptedException {
            reference = null;
            //回收对象
            System.gc();
            //执行finalize方法的优先级比较低,休眠500ms等待一下
            Thread.sleep(500);
            if (reference != null) {
                reference.alive();
            } else {
                System.out.println("对象已被回收");
            }
        }
    }

垃圾回收算法

  1. 找到内存中存活的对象
  2. 释放不再存活对象的内存,使得程序能再次利用这部分空间

垃圾回收算法的历史和分类

1960年John McCarthy发布了第一个GC算法:标记-清除算法。
1963年Marvin L. Minsky 发布了复制算法。

本质上后续所有的垃圾回收算法,都是在上述两种算法的基础上优化而来。

标记清除算法

GCRoot包含的对象:

• 虚拟机栈(栈帧中的局部变量表)中引⽤的对象
• 本地⽅法栈(Native ⽅法)中引⽤的对象
• ⽅法区中类静态属性引⽤的对象
• ⽅法区中常量引⽤的对象
• 所有被同步锁持有的对象
• JNI(Java Native Interface)引⽤的对象

标记清除算法的核心思想分为两个阶段:

  1. 标记阶段,将所有存活的对象进行标记。Java中使用可达性分析算法,从GC Root开始通过引用链遍历出所有存活对象。从GC Root对象开始扫描,将对象A、B、C在引用链上的对象标记出来
  2. 清除阶段,从内存中删除没有被标记也就是非存活对象。将没有标记的对象清理掉,所以对象D就被清理掉了。
    缺点:
  3. 碎片化问题由于内存是连续的,所以在对象被删除之后,内存中会出现很多细小的可用内存单元。如果我们需要的是一个比较大的空间,很有可能这些内存单元的大小过小无法进行分配
  4. 分配速度慢。由于内存碎片的存在,需要维护一个空闲链表,极有可能发生每次需要遍历到链表的最后才能获得合适的内存空间。 我们需要用一个链表来维护,哪些空间可以分配对象,很有可能需要遍历这个链表到最后,才能发现这块空间足够我们去创建一个对象。如下图,遍历到最后才发现有足够的空间分配3个字节的对象了。如果链表很长,遍历也会花费较长的时间。
复制算法
  1. 准备两块空间From空间和To空间,每次在对象分配阶段,只能使用其中一块空间(From空间)。
  2. 在垃圾回收GC阶段,将From中存活对象复制到To空间。
  3. 将两块空间的From和To名字互换。下次依然在From空间上创建对象。
    优点:
  • 吞吐量高,复制算法只需要遍历一次存活对象复制到To空间即可,比标记-整理算法少了一次遍历的过程,因而性能较好,但是不如标记-清除算法,因为标记清除算法不需要进行对象的移动
  • 不会发生碎片化,复制算法在复制之后就会将对象按顺序放入To空间中,所以对象以外的区域都是可用空间,不存在碎片化内存空间。
    缺点:
    内存使用效率低,每次只能让一半的内存空间来为创建对象使用。
标记整理算法

标记整理算法也叫标记压缩算法,是对标记清理算法中容易产生内存碎片问题的一种解决方案。

核心思想分为两个阶段:

  1. 标记阶段,将所有存活的对象进行标记。Java中使用可达性分析算法,从GC Root开始通过引用链遍历出所有存活对象。
  2. 整理阶段,将存活对象移动到堆的一端。清理掉存活对象的内存空间。
    优点:
  • 内存使用效率高,整个堆内存都可以使用,不会像复制算法只能使用半个堆内存
  • 不会发生碎片化,在整理阶段可以将对象往内存的一侧进行移动,剩下的空间都是可以分配对象的有效空间
    缺点:
    整理阶段的效率不高,整理算法有很多种,比如Lisp2整理算法需要对整个堆中的对象搜索3次,整体性能不佳。可以通过Two-Finger、表格算法、ImmixGC等高效的整理算法优化此阶段的性能
分代垃圾回收算法

现代优秀的垃圾回收算法,会将上述描述的垃圾回收算法组合进行使用,其中应用最广的就是分代垃圾回收算法(Generational GC)。
分代垃圾回收将整个内存区域划分为年轻代和老年代:

  1. 分代回收时,创建出来的对象,首先会被放入Eden伊甸园区。
  2. 随着对象在Eden区越来越多,如果Eden区满,新创建的对象已经无法放入,就会触发年轻代的GC,称为Minor GC或者Young GC。Minor GC会把需要eden中和From需要回收的对象回收,把没有回收的对象放入To区。
  3. 接下来,S0会变成To区,S1变成From区。当eden区满时再往里放入对象,依然会发生Minor GC。
  4. 如果Minor GC后对象的年龄达到阈值(最大15,默认值和垃圾回收器有关),对象就会被晋升至老年代。
  5. 当老年代中空间不足,无法放入新的对象时,先尝试minor gc如果还是不足,就会触发Full GC,Full GC会对整个堆进行垃圾回收。如果Full GC依然无法回收掉老年代的对象,那么当对象继续放入老年代时,就会抛出Out Of Memory异常。

为什么分代GC算法要把堆分成年轻代和老年代?首先我们要知道堆内存中对象的特性:

  • 系统中的大部分对象,都是创建出来之后很快就不再使用可以被回收,比如用户获取订单数据,订单数据返回给用户之后就可以释放了。
  • 老年代中会存放长期存活的对象,比如Spring的大部分bean对象,在程序启动之后就不会被回收了。
  • 在虚拟机的默认设置中,新生代大小要远小于老年代的大小。

分代GC算法将堆分成年轻代和老年代主要原因有:

  1. 可以通过调整年轻代和老年代的比例来适应不同类型的应用程序,提高内存的利用率和性能。
  2. 新生代和老年代使用不同的垃圾回收算法,新生代一般选择复制算法,老年代可以选择标记-清除和标记-整理算法,由程序员来选择灵活度较高。
  3. 分代的设计中允许只回收新生代(minor gc),如果能满足对象分配的要求就不需要对整个堆进行回收(full gc),STW时间就会减少。

垃圾回收器 重点

一些好用JVM分析工具

[Arthas](Arthas Install | arthas (aliyun.com))


阿里开发的工具

内存分配

  1. 优先分配在Eden区,如果Eden区没有足够的空间进行分配时,虚拟机会进行一次MinorGC,而那些无需回收的存货对象会进入Survivor的From区,再不满足就进入Old区
  2. 大对象直接进入老年代,避免在Eden区和两个Survivor区之间发生大量的内存拷贝
  3. 长期存货的对象进入老年代,虚拟机为每一个对象定义一个年龄计数器,经过一次MinorCG就会进入Survivor区,此后每经历一次都会年龄+1,到达阈值,对象进入老年区。

Full GC和Minor GC和内存回收算法

大多数情况下,对象在新生代中Eden区分配,当Eden区没有足够的内存进行分配时,回发起一次Minor GC。经过第一次Minor GC仍能够存货,并且能够被Survior容器容纳的话,会被移动到Survivor空间。并且将对象年龄设置为1,之后对象每熬过一次MinorGC,年龄就增加1岁,当它的年龄达到一定程度时,就会被晋级到老年代中。部分垃圾回收器会将大对象直接放入就老代。

Minor GC/ Young GC

只对新生代进行垃圾收集

Major GC / Old GC

只对老年代进行GC,有时候也可以代指Full GC

Full GC

回收整个Java堆和方法区

内存调优

内存泄漏(memory leak):Java中的如果不再使用一个对象,但是该对象依然在CG ROOT的引用链上,这个对象就不会被回收。
绝大数情况都是由堆内存泄露引起的
常见由:

  1. 没有及时删除缓存数据
  2. 分布式任务调度系统等进行任务调度任务结束中出现了内存泄漏。

代码中的内存泄漏

equals和hashCode,不正确使用会导致泄漏

  1. 不一致的 equals() 和 hashCode() 实现
    • 如果两个对象根据 equals() 方法被认为是相等的,那么它们的 hashCode() 值也必须相等。如果这个规则被违反,可能会导致哈希表中的对象无法正确地被访问和删除,从而导致内存泄漏。
  2. 对象无法被正确移除
    • 在使用哈希表时,如果对象的 hashCode() 值在插入后发生变化,可能会导致对象无法被正确移除,因为哈希表依赖于 hashCode() 值来定位对象。
      在定义新类时没有重写正确的equals()和hashCode()方法。在使用HashMap的场景下,如果使用这个类对象作为key,HashMap在判断key是否已经存在时会使用这些方法,如果重写方式不正确,会导致相同的数据被保存多份。
      正常情况:
  3. 以JDK8为例,首先调用hash方法计算key的哈希值,hash方法中会使用到key的hashcode方法。根据hash方法的结果决定存放的数组中位置。
  4. 如果没有元素,直接放入。如果有元素,先判断key是否相等,会用到equals方法,如果key相等,直接替换value;key不相等,走链表或者红黑树查找逻辑,其中也会使用equals比对是否相同。
    异常情况:
  5. hashCode方法不对,导致相同id的学生对象计算出来的hash值不同被放在不同的槽中
  6. equals方法不对,导致即使id相同,也会被认为是不同的对象
    下列代码会重复添加这个对象,导致内存溢出,因为不是相同的对象实例
  class Student {
        Integer id;
        String name;
        public void setId(Integer id) {
            this.id = id;
        }
        public void setName(String name) {
            this.name = name;
        }
        public Integer getId() {
            return id;
        }
        public String getName() {
            return name;
        }
    }
class Main {
	    public static long count = 0;
	    public static Map<Student,Long> map = new HashMap<>();
	    public static void main(String[] args) throws InterruptedException {
	        while (true){
	            Student student = new Student();
	            student.setId(1);
	            student.setName("张三");
	            map.put(student,1L);
	        }
	    }
	}
}

解决方案:

  1. 在定义新实体时,始终重写equals()和hashCode()方法。
  2. 重写时一定要确定使用了唯一标识去区分不同的对象,比如用户的id等。
  3. hashmap使用时尽量使用编号id等数据作为key,不要将整个实体类对象作为key存放。

ThreadLocal的使用

线程池中的线程不被回收导致的ThreadLocal内存泄漏
如果仅仅使用手动创建的线程,就算没有调用ThreadLocal的remove方法清理数据,也不会产生内存泄漏。因为当线程被回收时,ThreadLocal也同样被回收。但是如果使用线程池就不一定了。
解决方案:
线程方法执行完,一定要调用ThreadLocal中的remove方法清理对象。

内部类引用外部类

非静态的内部类和匿名内部类的错误使用导致内存泄漏

  1. 非静态的内部类默认会持有外部类 ,尽管代码上不再使用外部类,所以如果有地方引用了这个非静态内部类,会导致外部类也被引用,垃圾回收时无法回收这个外部类。
  2. 匿名内部类对象如果在非静态方法中被创建,会持有调用者对象,垃圾回收时无法回收调用者。

解决方案:

  1. 使用静态内部类从而不持有外部对象
  2. 使用静态方法,避免匿名内部类持有调用者对象

String的intern方法

由于JDK6中的字符串常量池位于永久代,intern被大量调用并保存产生的内存泄漏

通过静态字段保存对象

大量的数据在静态变量中被引用,但是不再使用,成为了内存泄漏
问题:
如果大量的数据在静态变量中被长期引用,数据就不会被释放,如果这些数据不再使用,就成为了内存泄漏。

解决方案:
1、尽量减少将对象长时间的保存在静态变量中,如果不再使用,必须将对象删除(比如在集合中)或者将静态变量设置为null。
2、使用单例模式时,尽量使用懒加载,而不是立即加载。

package com.itheima.jvmoptimize.leakdemo.demo7;

import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;

@Lazy //懒加载
@Component
public class TestLazy {
    private byte[] bytes = new byte[1024 * 1024 * 1024];
}

资源没有正常关闭

由于资源没有调用close方法正常关闭,导致的内存溢出
连接和流这些资源会占用内存,如果使用完之后没有关闭,这部分内存不一定会出现内存泄漏,但是会导致close方法不被执行。
解决方案:

  1. 为了防止出现这类的资源对象泄漏问题,必须在finally块中关闭不再使用的资源。
  2. 从 Java 7 开始,使用try-with-resources语法可以用于自动关闭资源。
    try (
        BufferedReader br = new BufferedReader(new FileReader("file1.txt"));
        BufferedReader br2 = new BufferedReader(new FileReader("file2.txt"))
    ) {
        // 使用资源
    } catch (IOException e) {
        e.printStackTrace();
    }

并发请求问题

GC调优

GC调优是对垃圾回收进行调优,GC调优的主要目标是避免由垃圾回收引起程序性能下降
可以进行调优的内容:

  1. 通用JVM参数的设置
  2. 特定垃圾回收器的JVM参数的设置
  3. 解决由频繁FULLGC引起的程序性能问题

调优指标:

垃圾回收的吞吐量

  • 吞吐量,一段时间内程序需要完成的业务数量。
    保证高吞吐量的常规手段有两条:
    1、优化业务执行性能,减少单次业务的执行时间
    2、优化垃圾回收吞吐量

垃圾回收吞吐量指的是 CPU 用于执行用户代码的时间与 CPU 总执行时间的比值,即吞吐量 = 执行用户代码时间 /(执行用户代码时间 + GC时间)。吞吐量数值越高,垃圾回收的效率就越高,允许更多的CPU时间去处理用户的业务,相应的业务吞吐量也就越高。

延迟

1延迟指的是从用户发起一个请求到收到响应这其中经历的时间。比如企业中对于延迟的要求可能会是这样的:
所有的请求必须在5秒内返回给用户结果

延迟 = GC延迟 + 业务执行时间,所以如果GC时间过长,会影响到用户的使用。

内存使用量

内存使用量指的是Java应用占用系统内存的最大值,一般通过Jvm参数调整,在满足上述两个指标的前提下,这个值越小越好。

调优工具

jstat工具

Jstat工具是JDK自带的一款监控工具,可以提供各种垃圾回收、类加载、编译信息
等不同的数据。使用方法为:jstat -gc 进程ID 每次统计的间隔(毫秒) 统计次数

C代表Capacity容量,U代表Used使用量
S – 幸存者区,E – 伊甸园区,O – 老年代,M – 元空间
YGC、YGT:年轻代GC次数和GC耗时(单位:秒)
FGC、FGCT:Full GC次数和Full GC耗时
GCT:GC总耗时

Visualvm插件

Prometheus + Grafana

GC日志

分析GC日志 - GCViewer

GraalVM

GraalVM是Oracle官方推出的一款高性能JDK,使用它享受比OpenJDK或者OracleJDK更好的性能。
两种模式:

  • JIT( Just-In-Time )模式 ,即时编译模式,在运行时将热点代码编译为本地机器码,以提高执行效率。
  • AOT(Ahead-Of-Time)模式 ,提前编译模式

如何判断一个对象是否可以被回收?

判断对象存活一般有两种方式:引用计数和可达性分析
GCROOT包括:

  1. 虚拟机栈中的引用对象:局部变量 ,方法参数等
  2. 本地方法栈内引用的对象
  3. 方法区中静态属性引用的对象:Java类中的引用类型静态变量
  4. 方法区中的敞亮引用对象

#todo

  • 常见的OOM场景,加书签

常见的OOM场景

  1. 堆内存溢出->对象太多了
  2. 栈内存OOM->线程创建太多有可能会导致,或者递归过多

https://javaguide.cn/distributed-system/distributed-process-coordination/zookeeper/zookeeper-plus.html

ZooKeeper是什么

ZooKeeper是一个开源的分布式协调服务,设计目标是将哪些复杂且容易出错的分布式一致性服务封装起来,构成一个高效的原语(原语的执行必须连续且不可分割)集,并以一系列简单易用的接口提供给用户时使用

常用场景

  1. 命名服务:通过ZooKeepe的顺序节点生成全局唯一ID
  2. 数据发布/订阅:通过Watcher机制可以很方便的实现数据发布/订阅。其他机器可以通过监听ZooKeeper上的节点变化来实现配置的动态更新
  3. 分布式锁:通过创建唯一节点获得分布式锁,当获得锁的乙方执行完相关的代码或者挂掉后就释放,也需要使用Watcher机制

一些可以使用的场景

[[手写rpc#注册中心|使用ZooKeeper作为注册中心手写RPC]]

使用ZooKeeper作为分布式锁

重要概念

Data model

ZooKeeper数据模型使用层次化的多叉树形结构,每个节点上都可以存储数据,而且数据可以是数字、字符串、二进制序列。每个节点可以有N个子节点。每个数据节点叫做znode,是数据的最小单元,每个znode都有唯一的路径标识
znode存储的数据大小上线为1M,避免将大数据保存在ZooKeeper中

znode

分类:

  • 持久(PERSISTENT)节点:一旦创建就存在即使ZooKeeper集群宕机,知道将其删除
  • 临时(EPHEMERAL)节点:临时节点的声明周期与客户端会话绑定。会话节点小时则节点消失,临时节点只能作为叶子节点,不可创建属于自己的子节点。
  • 持久顺序节点:除了具有持久节点外,子节点的名称还有顺序性。

znode的组成:

  • stat:状态信息
  • data:节点存放数据的具体内容

版本:

ZooKeeper会为每一个znode维护一个叫做Stat的数据结构
Stat中记录了三个znode相关的版本:

  • dataVersion:当前znode节点的版本号
  • cversion:当前znode子节点的版本
  • aclVersion:当前znode的ACL的版本

ACL:权限控制

  • CREATE:创建子节点
  • READ:获取节点数据和列出其子节点
  • WRITE:设置/更新节点数据
  • DELETE:删除子节点
  • ADMIN:设置节点ACL的权限
    身份认证有四种方式:
  • world:默认,任何用户都可以无条件访问
  • auth : 不适用任何id,代表任何已经认证的用户
  • digest:用户名:密码的方式
  • ip:对指定ip进行限制

Watcher(事件监听器)

ZooKeeper允许用户在指定节点上注册一些Watcher,并且在一些特定事件触发时,ZooKeeper服务器会将事件通知到感兴趣的客户端上。

Session

ZooKeeper服务端与客户端之间的一个TCP长连接。客户端可以通过它进行心跳检测与服务器保持有效的会话,也能够向ZooKeeper服务器发送请求并接受响应。同时也能接收来在服务器的Watcher事件通知

集群

ZooKeeper将集群中节点的角色分为三类:

  • Leader 为客户端提供读写服务,负责投票的发起和决议,更新系统状态
  • Follower:只读,将写服务转发给Leader,参与选举过程的头票
  • Observer:只读,写服务转发给Leader,不参与选举中的投票,也不参与”过半写成功”策略。 在不影响写性能的情况下提升集群的读性能。

Leader的选举过程

条件:
当Leader服务器出现网络中断、崩溃退出与重启等异常情况是,会进去Leader选举过程。
流程:

  1. Leader election 选举阶段:开始投票,只要有一个节点获得过半节点的票数即可作为准Leader
  2. Discovery发现阶段:followers跟准节点leader进行通信,同步followers最近接收的事务提议
  3. Synchronization 同步阶段:利用leader前一阶段获得的最新提议历史,同步集群中所有的副本,同步之后,准leader成为正式节点
  4. Broadcast 广播阶段:Zookeeper集群正式对外提供事务服务,leader及逆行消息广播,如果有新的节点加入,还需要进行同步
    节点的状态:
  • LOOKING:寻找Leader
  • LEADING:Leader状态,对应的节点成为Leader
  • FOLLOWING:对应的节点成为Follower
  • OBSERVING:对应的节点成为OBSERVING
    脑裂问题通过过半机制解决

ZAB协议

ZooKeeper Atomic Broadcast,原子广播
###3 模式

  • 崩溃恢复:启动或者出现异常状况时,进行崩溃恢复状态,当选举出新的Leader节点,并且已经进行状态同步之后,退出恢复状态。
  • 消息广播:当集群中已经完成状态通过,整个服务架构进入消息广播模式。新加入的节点会自觉进入数据恢复状态。

常见的Java API

curator-x-discovery是 Apache Curator 库中的一个模块,用于简化与 Apache ZooKeeper 交互时的服务发现和注册功能

ServiceDiscovery 用于管理服务的注册和发现
ServiceInstance 描述一个服务的信息
CuratorFramework zk连接客户端