0%

个人博客

https://zuofw.github.io/
本项目详细介绍和实现:
手写rpc | QingQiu’Blog (zuofw.github.io)
源码地址:Zuofw/zuofw-rpc: 一个废物的手写rpc (github.com)

项目介绍

zuofw-rpc是一款基于Java、Netty、Zookeeper实现的RPC通信框架,它具有以下核心特性:

  1. 使用”微内核+可插拔”架构,通过自定义SPI加载机制,支持缓存,动态替代扩展点组件
  2. 灵活使用设计模式来提高系统可扩展性,如单例模式、工厂模式、建造者模式
  3. 实现服务调用负载均衡机制,支持轮询、随机、一致性哈希算法,优化调用体验
  4. 通过自定义通信协议、支持多种序列化方式,同时实现Gzip压缩,提高网络传输效率。
  5. 实现自定义request - response模型,在异步通信条件下确保消息的请求和响应成功
  6. 基于自定义starter实现,优化SpringBoot环境下的使用。

基本架构

  1. 注册中心,用于服务注册和获取
  2. 服务端:提供服务的一方Provider
  3. 客户端:调用服务的一方Consumer
    基本流程:
  4. 服务端把服务信息注册到注册中心上,一般包括服务端地址、接口类和方法
  5. 客户端从注册中心获取对应的服务信息
  6. 客户端根据服务的信息,通过网络调用服务端的接口
阅读全文 »

前言

Bean的加载过程算是面试中的老生常谈了,今天我们就来从源码层面深入去了解一下Spring中是如何进行Bean的加载的

Spring

先看示例代码:

public static void main(String[] args) {
        ApplicationContext context = new ClassPathXmlApplicationContext("classpath:/spring/text.xml");
        System.out.println(context.getBean("stringutil"));
    }

我们就从这个getBean去探索Spring如何进行类的加载。
首先我们先看看ClassPathXmlApplicationContex的继承关系:

阅读全文 »

为何写这一篇文章

🐭🐭在面试的时候被问到NULL值是否会走索引的时候,感到有点不理解,于是事后就有了这篇文章
问题:
为name建立索引,name可以为空
select * from user where name is null是否会使用索引?
生活会拷打每一个做事不认真的人😭

索引的结构

详细的可以参照我的上一篇文章深入浅出MySQL,里面有关于索引的详细介绍
在InnoDB引擎中,索引分为聚簇索引和二级索引,对于二级索引,在这个场景下我们要考虑的就是是否会为NULL建立索引和如果列中存在NULL值,是否会走索引去查找这个NULL

访问方法

访问方法是MySQL来实际访问数据的执行方法
大致分为:

  1. 全表扫描
  2. 使用索引扫

测试表

CREATE TABLE user (  
                          `id` int(11) NOT NULL AUTO_INCREMENT,  
                          `name` varchar(20) NOT NULL,  
                          `age` int(11) DEFAULT NULL,  
                          `sex` varchar(20) DEFAULT NULL,  
                          PRIMARY KEY (`id`)  
) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8;  
  
INSERT INTO user (`id`, `name`, `age`, `sex`) VALUES ('1', 'Bob', '20', '男');  
INSERT INTO user (`id`, `name`, `age`, `sex`) VALUES ('2', 'Jack', '20', '男');  
INSERT INTO user (`id`, `name`, `age`, `sex`) VALUES ('3', 'Tony', '20', '男');  
INSERT INTO user (`id`, `name`, `age`, `sex`) VALUES ('4', 'Alan', '20', '男');  
  
CREATE  UNIQUE  INDEX indexName ON user(name(20));  
# 为age建立索引  
CREATE INDEX indexAge ON user(age);

const

通过主键或者唯一二级索引列来定位一条记录的访问方法
explain select * from user where id = 1;
解决如下:
|1150
通过type我们可以看见访问方法是const

ref

如果二级索引列不是唯一的,那么就使用二级索引的值去匹配,之后再回表

explain select * from user where age = 20;

如图使用的是ref方法
二级索引列值为NULL时
二级索引列对NULL值的数量时不限制的,所以key is NULL最多使用的是ref,而不是const

ref_or_null

有时候我们需要找出二级索引等于常数和为NULL的记录一同找出
explain select * from user where age = 20 or age is null ;

执行的流程:
如图,NULL是放在每一层中最左侧的,并且是连在一起的

range

使用索引进行范围访问,可以是聚簇索引,也可以是二级索引。
explain select * from user where age > 11 and age <= 20;

index

遍历二级索引记录的执行方式,常常出现在查询列和条件都包含在索引中,不需要回表,所以直接遍历即可

all

全表扫描

NULL在二级索引中的位置

通过查询资料,发现如果索引列允许NULL值,那么NULL在二级索引中是被当作最小值放在树的每一层的最左侧的,也就是NULL值会被当成索引列的数据使用的,所以NULL值匹配是可能会走索引的

  1. 如果在索引列上使用IS NULL或IS NOT NULL,MySQL通常会走索引
    // type为ref,所以走索引了
    explain select * from user where age is null;  
    // type为range,所以走索引了
    explain select * from user where age is not null;
  2. 符合索引,如果签到列不为NULL,后续的列也是可以走索引的

文章推荐

Java7/8 中的 HashMap 和 ConcurrentHashMap 全解析 (javadoop.com)

源码分析

HashMap

基本属性(阈值&系数&容量)

public class HashMap<K,V> extends AbstractMap<K,V>
    implements Map<K,V>, Cloneable, Serializable {
	// 序列号
    @java.io.Serial
    private static final long serialVersionUID = 362498820763181265L;

	// 默认容量,也就是 16
	static final int DEFAULT_INITIAL_CAPACITY = 1 << 4; 

	/**
	 * 最大容量,如果通过构造函数隐式指定了更高的值,则使用此值。
	 * 必须是2的幂且 <= 1<<30。
	 * 也就是最大容量是 2^30
	 */
	static final int MAXIMUM_CAPACITY = 1 << 30;

	/**
	 * 构造函数未指定时使用的负载因子。
	 */
	static final float DEFAULT_LOAD_FACTOR = 0.75f;

	// 当桶上的节点数>=这个值时会转化为红黑树
	static final int TREEIFY_THRESHOLD = 8;

	// 当桶上的节点数小于这个值时会转化为链表
	static final int UNTREEIFY_THRESHOLD = 6;

	// 转化为红黑树对应的所需要的最小的数组容量
	static final int MIN_TREEIFY_CAPACITY = 64;
	// 存储元素的数组,容量总是2的幂次倍
	transient Node<K,V>[] table;
	// transient用于标识这个字段不应该被序列化
	transient Set<Map.Entry<K,V>> entrySet;
	// 存放元素的个数,
	transient int size;
	// 每次扩容和更改map结构的计数器
	/*
	* 1.检测并发修改:在迭代过程中,如果 modCount 发生变化,迭代器会抛出 ConcurrentModificationException,以防止并发修改导致的不一致性。
	2.维护内部状态:在某些操作(如插入、删除、扩容)中,modCount 会增加,以确保 HashMap 的内部状态保持一致
	*/
	transient int modCount;
	// 阈值(容量*负载因子) 当实际大小超过阈值时,会进行扩容
	int threshold;
	// 负载因子
	final float loadFactor;
}

loadFactor负载因子

负载因子是控制数组存放数据的疏密程度,loadFactor越接近1,那么数组中存放的数据(entry)也就越多,否则就越少。
太大查找元素的效率低,太小利用率太低
所以默认 16 * 0.75 = 12,超过这个数据量时,就会进行扩容

threshold

threshold = capacity * loadFacotry,是数组扩容的标准
因为hashmap中没有capacity这个属a性,所以即使指定了初始化的capacity,也会被扩容到2的最接近这个大小的幂次方

Node 节点类源码

// 继承Map.Entry<K,V>
static class Node<K,V> implements Map.Entry<K,V> {  
	// hash值,存放元素到hashmap时用来与其他元素进行比较
    final int hash;  
    final K key;  
    V value;  
	// 指向下一个节点
    Node<K,V> next;  
  
    Node(int hash, K key, V value, Node<K,V> next) {  
        this.hash = hash;  
        this.key = key;  
        this.value = value;  
        this.next = next;  
    }  
    public final K getKey()        { return key; }  
    public final V getValue()      { return value; }  
    public final String toString() { return key + "=" + value; }  
  
    public final int hashCode() {  
        return Objects.hashCode(key) ^ Objects.hashCode(value);  
    }  
    public final V setValue(V newValue) {  
        V oldValue = value;  
        value = newValue;  
        return oldValue;  
    }  
    public final boolean equals(Object o) {  
        if (o == this)  
            return true;  
  
        return o instanceof Map.Entry<?, ?> e  
                && Objects.equals(key, e.getKey())  
                && Objects.equals(value, e.getValue());  
    }}

树节点源码

// 仅截取部分源码
static final class TreeNode<K,V> extends LinkedHashMap.Entry<K,V> {  
    TreeNode<K,V> parent;  // 父
    TreeNode<K,V> left;    // 左
    TreeNode<K,V> right;   // 右 
    TreeNode<K,V> prev;    // needed to unlink next upon deletion  
    boolean red;
}

hash方法

put

  1. 判断数组,若发现数组为空,则进行首次扩容。
  2. 判断头节点,若发现头节点为空,则新建链表节点,存入数组。
  3. 判断头节点,若发现头节点非空,则将元素插入槽内。
  4. 若元素的key与头节点一致,则直接覆盖头节点。
  5. 若元素为树型节点,则将元素追加到树中。
  6. 若元素为链表节点,则将元素追加到链表中。(追加后,需要判断链表长度以决定是否转为红黑树。若链表长度达到8、数组容量未达到64,则扩容。若链表长度达到8、数组容量达到64,则转为红黑树。)
  7. 插入元素后,判断元素的个数,若发现超过阈值则再次扩容。
    put方法实际就是调用putVal,并且putVal用户不可直接使用
    public V put(K key, V value) {  
        return putVal(hash(key), key, value, false, true);  
    }
    
    final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
                   boolean evict) {
        Node<K,V>[] tab; Node<K,V> p; int n, i;
        // 如果表为空或长度为0,则进行初始化或扩容
        // 将属性中的table赋值,并且判断是否为空或者长度为0,如果是就进行初始化或者扩容
        if ((tab = table) == null || (n = tab.length) == 0)
    	    // resize()函数就是进行扩容或者初始化
            n = (tab = resize()).length;
        // 计算索引位置,如果当前位置为空,则直接插入新节点
        // (n - 1) & hash 确定元素放在哪个桶里,桶为空,新生成节点放入数组中
        if ((p = tab[i = (n - 1) & hash]) == null)
            tab[i] = newNode(hash, key, value, null);
        else {
            Node<K,V> e; K k;
            // 如果当前位置的节点的哈希值和键都相等,则直接覆盖
            if (p.hash == hash &&
                ((k = p.key) == key || (key != null && key.equals(k))))
                e = p;
            // 如果是树节点,则调用树节点的插入方法
            else if (p instanceof TreeNode)
                e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
            else {
                // 遍历链表,查找插入位置或相同键的节点
                for (int binCount = 0; ; ++binCount) {
                    if ((e = p.next) == null) {
                        p.next = newNode(hash, key, value, null);
                        // 如果链表长度达到阈值,则转换为树结构
                        if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
                            treeifyBin(tab, hash);
                        break;
                    }
                    // 如果找到相同键的节点,则跳出循环
                    if (e.hash == hash &&
                        ((k = e.key) == key || (key != null && key.equals(k))))
                        break;
                    p = e;
                }
            }
            // 如果找到相同键的节点,则更新值
            if (e != null) { // existing mapping for key
                V oldValue = e.value;
                if (!onlyIfAbsent || oldValue == null)
                    e.value = value;
                afterNodeAccess(e);
                return oldValue;
            }
        }
        // 修改计数器
        ++modCount;
        // 如果大小超过阈值,则进行扩容
        if (++size > threshold)
            resize();
        afterNodeInsertion(evict);
        return null;
    }
    基本逻辑:
  8. 如果定位到的数组位置没有元素,则直接插入
  9. 如果有元素,就要与插入的key进行比较,如果相同,就覆盖。如果不同就判断p是否是树节点,如果不是就直接遍历链表插入,使用尾插法,否则调用putTreeVal函数

get

public V get(Object key) {
    Node<K,V> e;
    return (e = getNode(key)) == null ? null : e.value;
}
final Node<K,V> getNode(Object key) {
    Node<K,V>[] tab; Node<K,V> first, e; int n, hash; K k;
    // 检查表是否为空且长度大于0,并计算哈希值和索引
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (first = tab[(n - 1) & (hash = hash(key))]) != null) {
        // 检查第一个节点是否匹配
        if (first.hash == hash && // 始终检查第一个节点
            ((k = first.key) == key || (key != null && key.equals(k))))
            return first;
        // 遍历链表或树节点
        if ((e = first.next) != null) {
            if (first instanceof TreeNode)
                return ((TreeNode<K,V>)first).getTreeNode(hash, key);
            do {
                // 检查链表中的每个节点是否匹配
                if (e.hash == hash &&
                    ((k = e.key) == key || (key != null && key.equals(k))))
                    return e;
            } while ((e = e.next) != null);
        }
    }
    // 如果没有找到匹配的节点,返回null
    return null;
}

resize

final Node<K,V>[] resize() {
    Node<K,V>[] oldTab = table; // 旧的哈希表
    int oldCap = (oldTab == null) ? 0 : oldTab.length; // 旧的容量
    int oldThr = threshold; // 旧的阈值
    int newCap, newThr = 0; // 新的容量和阈值
    if (oldCap > 0) { // 如果旧的容量大于0
        if (oldCap >= MAXIMUM_CAPACITY) { // 如果旧的容量已经达到最大容量
            threshold = Integer.MAX_VALUE; // 将阈值设为最大整数值
            return oldTab; // 返回旧的哈希表
        }
        else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY && oldCap >= DEFAULT_INITIAL_CAPACITY)
            newThr = oldThr << 1; // 将容量和阈值都翻倍
    }
    else if (oldThr > 0) // 如果旧的阈值大于0
        newCap = oldThr; // 将新的容量设为旧的阈值
    else { // 如果旧的阈值为0,使用默认值
        newCap = DEFAULT_INITIAL_CAPACITY; // 默认初始容量
        newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY); // 默认阈值
    }
    if (newThr == 0) { // 如果新的阈值为0
        float ft = (float)newCap * loadFactor; // 计算新的阈值
        newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ? (int)ft : Integer.MAX_VALUE);
    }
    threshold = newThr; // 更新阈值
    @SuppressWarnings({"rawtypes","unchecked"})
    Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap]; // 创建新的哈希表
    table = newTab; // 更新哈希表引用
    if (oldTab != null) { // 如果旧的哈希表不为空
        for (int j = 0; j < oldCap; ++j) { // 遍历旧的哈希表
            Node<K,V> e;
            if ((e = oldTab[j]) != null) { // 如果旧表中的桶不为空
                oldTab[j] = null; // 清空旧表中的桶
                if (e.next == null) // 如果桶中只有一个节点
                    newTab[e.hash & (newCap - 1)] = e; // 直接放入新表
                else if (e instanceof TreeNode) // 如果桶中是树节点
                    ((TreeNode<K,V>)e).split(this, newTab, j, oldCap); // 拆分树节点
                else { // 如果桶中是链表节点
                    Node<K,V> loHead = null, loTail = null; // 低位链表的头和尾
                    Node<K,V> hiHead = null, hiTail = null; // 高位链表的头和尾
                    Node<K,V> next;
                    do {
                        next = e.next;
                        if ((e.hash & oldCap) == 0) { // 低位链表
                            if (loTail == null)
                                loHead = e;
                            else
                                loTail.next = e;
                            loTail = e;
                        }
                        else { // 高位链表
                            if (hiTail == null)
                                hiHead = e;
                            else
                                hiTail.next = e;
                            hiTail = e;
                        }
                    } while ((e = next) != null);
                    if (loTail != null) {
                        loTail.next = null;
                        newTab[j] = loHead; // 放入低位链表
                    }
                    if (hiTail != null) {
                        hiTail.next = null;
                        newTab[j + oldCap] = hiHead; // 放入高位链表
                    }
                }
            }
        }
    }
    return newTab; // 返回新的哈希表
}

ConcurrentHashMap

JDK1.7

JDK1.7中,ConcurrentHashMap是由多个Segment组合,而每一个Segment都是一个类似于HashMap的结构,可以内部进行扩容,但是Segment的个数一旦初始化就不能改变了。默认支持16个,也就是默认支持16个线程并发Segment 通过继承 ReentrantLock 来进行加锁

初始化

无参构造中调用了有参构造,传入三个默认值

/**
 * 默认初始化容量
 */
static final int DEFAULT_INITIAL_CAPACITY = 16;

/**
 * 默认负载因子
 */
static final float DEFAULT_LOAD_FACTOR = 0.75f;

/**
 * 默认并发级别
 */
static final int DEFAULT_CONCURRENCY_LEVEL = 16;

初始化逻辑

@SuppressWarnings("unchecked")
public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {
    // 参数校验
    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    // 校验并发级别大小,大于 1<<16,重置为 65536
    if (concurrencyLevel > MAX_SEGMENTS)
        concurrencyLevel = MAX_SEGMENTS;
    // Find power-of-two sizes best matching arguments
    // 2的多少次方
    int sshift = 0;
    int ssize = 1;
    // 这个循环可以找到 concurrencyLevel 之上最近的 2的次方值
    while (ssize < concurrencyLevel) {
        ++sshift;
        ssize <<= 1;
    }
    // 记录段偏移量
    this.segmentShift = 32 - sshift;
    // 记录段掩码
    this.segmentMask = ssize - 1;
    // 设置容量
    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;
    // c = 容量 / ssize ,默认 16 / 16 = 1,这里是计算每个 Segment 中的类似于 HashMap 的容量
    int c = initialCapacity / ssize;
    if (c * ssize < initialCapacity)
        ++c;
    int cap = MIN_SEGMENT_TABLE_CAPACITY;
    //Segment 中的类似于 HashMap 的容量至少是2或者2的倍数
    while (cap < c)
        cap <<= 1;
    // create segments and segments[0]
    // 创建 Segment 数组,设置 segments[0]
    Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                         (HashEntry<K,V>[])new HashEntry[cap]);
    Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
    UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
    this.segments = ss;
}
  1. 校验并发等级大小,如果大于最大值,就重置为最大值
  2. 初始化容量为大于并发等级最近的2的幂次方,默认16
  3. 记录segmentShift偏移量,2^n 中的n,默认为32 - sshift = 28
  4. 记录segmentMask,默认是ssize - 1 = 16 - 1 = 15
  5. 初始化segment[0],默认大小为2,扩容阈值为 2 * 0.75 = 1.5。插入第二个值时才会进行扩容

put

/**
 * Maps the specified key to the specified value in this table.
 * Neither the key nor the value can be null.
 *
 * <p> The value can be retrieved by calling the <tt>get</tt> method
 * with a key that is equal to the original key.
 *
 * @param key key with which the specified value is to be associated
 * @param value value to be associated with the specified key
 * @return the previous value associated with <tt>key</tt>, or
 *         <tt>null</tt> if there was no mapping for <tt>key</tt>
 * @throws NullPointerException if the specified key or value is null
 */
public V put(K key, V value) {
    Segment<K,V> s;
    if (value == null)
        throw new NullPointerException();
    int hash = hash(key);
    // hash 值无符号右移 28位(初始化时获得),然后与 segmentMask=15 做与运算
    // 其实也就是把高4位与segmentMask(1111)做与运算
    int j = (hash >>> segmentShift) & segmentMask;
    if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
         (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
        // 如果查找到的 Segment 为空,初始化
        s = ensureSegment(j);
    return s.put(key, hash, value, false);
}

/**
 * Returns the segment for the given index, creating it and
 * recording in segment table (via CAS) if not already present.
 *
 * @param k the index
 * @return the segment
 */
@SuppressWarnings("unchecked")
private Segment<K,V> ensureSegment(int k) {
    final Segment<K,V>[] ss = this.segments;
    long u = (k << SSHIFT) + SBASE; // raw offset
    Segment<K,V> seg;
    // 判断 u 位置的 Segment 是否为null
    if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
        Segment<K,V> proto = ss[0]; // use segment 0 as prototype
        // 获取0号 segment 里的 HashEntry<K,V> 初始化长度
        int cap = proto.table.length;
        // 获取0号 segment 里的 hash 表里的扩容负载因子,所有的 segment 的 loadFactor 是相同的
        float lf = proto.loadFactor;
        // 计算扩容阀值
        int threshold = (int)(cap * lf);
        // 创建一个 cap 容量的 HashEntry 数组
        HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // recheck
            // 再次检查 u 位置的 Segment 是否为null,因为这时可能有其他线程进行了操作
            Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
            // 自旋检查 u 位置的 Segment 是否为null
            while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                   == null) {
                // 使用CAS 赋值,只会成功一次
                if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                    break;
            }
        }
    }
    return seg;
}
  1. 计算key的位置,获取指定位置的Segment
  2. 如果指定位置的Segment为空,则初始化这个Segment
    • 如果为null,使用Segment[0]的容量和负载因子创建一个HashEntry数组
    • 再次检查是否为null
    • 使用HashEntry初始化这个Segment
    • 自旋计算得到的指定位置是否为null,使用CAS在这个位置赋值Segment
  3. Segment.put插入key,value
    final V put(K key, int hash, V value, boolean onlyIfAbsent) {
        // 获取 ReentrantLock 独占锁,获取不到,scanAndLockForPut 获取。
        HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);
        V oldValue;
        try {
            HashEntry<K,V>[] tab = table;
            // 计算要put的数据位置
            int index = (tab.length - 1) & hash;
            // CAS 获取 index 坐标的值
            HashEntry<K,V> first = entryAt(tab, index);
            for (HashEntry<K,V> e = first;;) {
                if (e != null) {
                    // 检查是否 key 已经存在,如果存在,则遍历链表寻找位置,找到后替换 value
                    K k;
                    if ((k = e.key) == key ||
                        (e.hash == hash && key.equals(k))) {
                        oldValue = e.value;
                        if (!onlyIfAbsent) {
                            e.value = value;
                            ++modCount;
                        }
                        break;
                    }
                    e = e.next;
                }
                else {
                    // first 有值没说明 index 位置已经有值了,有冲突,链表头插法。
                    if (node != null)
                        node.setNext(first);
                    else
                        node = new HashEntry<K,V>(hash, key, value, first);
                    int c = count + 1;
                    // 容量大于扩容阀值,小于最大容量,进行扩容
                    if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                        rehash(node);
                    else
                        // index 位置赋值 node,node 可能是一个元素,也可能是一个链表的表头
                        setEntryAt(tab, index, node);
                    ++modCount;
                    count = c;
                    oldValue = null;
                    break;
                }
            }
        } finally {
            unlock();
        }
        return oldValue;
    }
  • tryLock() 获取锁,获取不到使用 scanAndLockForPut 方法继续获取。

  • 计算 put 的数据要放入的 index 位置,然后获取这个位置上的 HashEntry

  • 遍历 put 新元素,为什么要遍历?因为这里获取的 HashEntry 可能是一个空元素,也可能是链表已存在,所以要区别对待。

    如果这个位置上的 HashEntry 不存在

    1. 如果当前容量大于扩容阀值,小于最大容量,进行扩容
    2. 直接头插法插入。

    如果这个位置上的 HashEntry 存在

    1. 判断链表当前元素 key 和 hash 值是否和要 put 的 key 和 hash 值一致。一致则替换值
    2. 不一致,获取链表下一个节点,直到发现相同进行值替换,或者链表表里完毕没有相同的。
      1. 如果当前容量大于扩容阀值,小于最大容量,进行扩容
      2. 直接链表头插法插入。
  • 如果要插入的位置之前已经存在,替换后返回旧值,否则返回 null.

resize

扩容使用的是头插法

JDK1.8


由原先的Segment数组 + HashEntry转化为Node数组 + 链表/红黑树

重要属性

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>  
    implements ConcurrentMap<K,V>, Serializable {
    transient volatile Node<K,V>[] table; // 哈希表数组,存储键值对

/**
 * 下一个哈希表数组,仅在扩容时使用,不为空
 */
private transient volatile Node<K,V>[] nextTable;

/**
 * 基础计数器值,主要在没有竞争时使用,也作为表初始化竞争期间的回退。通过CAS更新
 */
private transient volatile long baseCount;

/**
 * 表初始化和扩容控制。当为负数时,表示表正在初始化或扩容:-1表示初始化,否则为-(1 + 活动扩容线程数)。
 * 否则,当表为空时,保存创建时使用的初始表大小,默认为0。初始化后,保存下一个用于扩容的元素计数值
 */
private transient volatile int sizeCtl;

/**
 * 扩容时要拆分的下一个表索引(加一)
 */
private transient volatile int transferIndex;

/**
 * 扩容和/或创建CounterCells时使用的自旋锁(通过CAS锁定)
 */
private transient volatile int cellsBusy;

/**
 * 计数单元表。当不为空时,大小为2的幂
 */
private transient volatile CounterCell[] counterCells;

// views
private transient KeySetView<K,V> keySet; // 键集合视图
private transient ValuesView<K,V> values; // 值集合视图
private transient EntrySetView<K,V> entrySet; // 键值对集合视图
}

Node

基本与HashMap中的相同,所以略过

initTable

初始化

private final Node<K,V>[] initTable() {
       Node<K,V>[] tab; int sc;
       while ((tab = table) == null || tab.length == 0) {
        // sizeCtl < 0 说明其他线程CAS成功,正在初始化或
           if ((sc = sizeCtl) < 0)
			// 让出cpu
               Thread.yield(); 
           else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
               try {
                   if ((tab = table) == null || tab.length == 0) {
                       int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                       @SuppressWarnings("unchecked")
                       Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                       table = tab = nt;
                       sc = n - (n >>> 2);
                   }
               } finally {
                   sizeCtl = sc;
               }
               break;
           }
       }
       return tab;
   }
  1. 通过自旋 + CAS操作来完成初始化
  2. sizeCtl 的值
    • -1 : 正在初始化
      • N:正在扩容高16位位扩容的标识戳,低16位-1位正在扩容的线程数
    • 0 :table初始化大小,如果table没有初始化
    • 0 : 扩容阈值

put

public V put(K key, V value) {  
    return putVal(key, value, false);  
}  
  
/** Implementation for put and putIfAbsent */  
final V putVal(K key, V value, boolean onlyIfAbsent) {  
    if (key == null || value == null) throw new NullPointerException();  
    // 计算hash
    int hash = spread(key.hashCode());  
    // 计数器,检测链表长度,统计节点个数,控制循环
    int binCount = 0;  
    for (Node<K,V>[] tab = table;;) {  
	    // f是目标元素的位置, fh是后面存放目标位置元素的hash值
        Node<K,V> f; int n, i, fh;  
        if (tab == null || (n = tab.length) == 0)  
		    // 数据桶为空,就先初始化
            tab = initTable();  
        // 定位到这个位置之后发现桶中没有数据,就直接放入,不加锁,然后跳出即可
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {  
            if (casTabAt(tab, i, null,  
                         new Node<K,V>(hash, key, value, null)))  
                break;                   // no lock when adding to empty bin  
        }  
        // 检查当前桶中的第一个节点是否是一个 ForwardingNode。ForwardingNode 是在哈希表扩容时使用的一种特殊节点类型,用于指示该桶的内容已经被移动到新的哈希表中。 
        // 如果是就需要helpTranfer
        else if ((fh = f.hash) == MOVED)  
            tab = helpTransfer(tab, f);  
        else {  
            V oldVal = null;  
            // 对节点f加锁
            synchronized (f) {  
                if (tabAt(tab, i) == f) {  
	                // 发现是链表
                    if (fh >= 0) {  
                        binCount = 1;  
                        // 循环加入新的节点,或者覆盖
                        for (Node<K,V> e = f;; ++binCount) {  
                            K ek;  
                            if (e.hash == hash &&  
                                ((ek = e.key) == key ||  
                                 (ek != null && key.equals(ek)))) {  
                                oldVal = e.val;  
                                if (!onlyIfAbsent)  
                                    e.val = value;  
                                break;  
                            }                            Node<K,V> pred = e;  
                            if ((e = e.next) == null) {  
                                pred.next = new Node<K,V>(hash, key,  
                                                          value, null);  
                                break;  
                            }                        }                    }                    else if (f instanceof TreeBin) {  
                            //发现是红黑树
                        Node<K,V> p;  
                        binCount = 2;  
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,  
                                                       value)) != null) {  
                            oldVal = p.val;  
                            if (!onlyIfAbsent)  
                                p.val = value;  
                        }                    }                }            }            if (binCount != 0) {  
                if (binCount >= TREEIFY_THRESHOLD)  
                    treeifyBin(tab, i);  
                if (oldVal != null)  
                    return oldVal;  
                break;  
            }        }    }    addCount(1L, binCount);  
    return null;  
}
  1. 通过key计算hashcode
  2. 判断是否需要初始化
  3. 定位到Node,如果为空直接用CAS写入,失败就自旋保证成功
  4. 如果hashcode == MOVED == - 1需要进行扩容
  5. 如果都不满足就使用synchronized锁写入数据
  6. 如果数量大于TREEIFY_THRESHOLD调用treeifyBin进行转化
    值得一提的是synchronized在这里是锁住了这个一个桶,也是这一个Node,而不是一整个表

get

public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    // key 所在的 hash 位置
    int h = spread(key.hashCode());
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
        // 如果指定位置元素存在,头结点hash值相同
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                // key hash 值相等,key值相同,直接返回元素 value
                return e.val;
        }
        else if (eh < 0)
            // 头结点hash值小于0,说明正在扩容或者是红黑树,find查找
            return (p = e.find(h, key)) != null ? p.val : null;
        while ((e = e.next) != null) {
            // 是链表,遍历查找
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

常见八股

HashMap的底层实现是什么

JDK1.8之前是数组 + 链表,数组是HashMap的主体,链表是用来解决哈希冲突。使用拉链法来解决。
JDK1.8之后变为数组+链表/红黑树,当链表长度大于等于阈值 8 时,会转化,如果当前数组长度小于64就学会先进行数组扩容,否则就转化为红黑树,默认大小为16,每次扩容2倍。

拉链法到底是什么

拉链法就是使用一个链表数组,数组中的每一格都是一个链表,如果遇见哈希冲突,则将冲突的值加到链表中。
JDK1.8之后会先判断链表的长度是否大于阈值8,然后去根据数据来判断是否转化为红黑树。
|450

为什么使用2作为底数

  1. 使用2作为底数可以使用位运算而不是取模去计算索引,比较方便,提高性能
  2. 同时容量是2的幂次时,哈希值的低位和高位都可以参与索引的计算,减少哈希冲突(capacity 是 2 的幂次,因此 capacity - 1 的二进制表示全是 1,这样 hash & (capacity - 1) 就能高效地计算出索引。)
  3. 扩容更更简单,重新计算哈希只需要检查哈希值的一个额外位

讲解一下put的过程

  1. 判断map是否为空,如果为空或者长度位0就及逆行初始化或者扩容
  2. 之后计算hash值,去匹配Node数组,如果定位到的数组没有位置为空,则直接插入到Node数组中
  3. 如果不为空,判断是否是树节点,如果是就调用putTreeVal,否则就去遍历这个数组所对应的链表,如果出现哈希值和key完全一致就覆盖数据,否则就遍历到最后使用尾插法,插入数据
  4. 插入或者覆盖数据之后判断是否需要扩容或者转化为树形结构

为什么JDK1.8使用尾插法而不是和JDK1.7一样使用头插法

因为多线程的情况头插法有可能会出现环

HashMap何时会扩容

当链表的长度大于TREEIFY_THRESHOLD(默认为8)时会开始转化,同时要判断Node数组的个数是否大于MIN_TREEIFY_CAPACITY(默认64),如果小于就会先进行扩容,而不是转化,如果大于就直接转化为红黑树

这是你的新仓库

写点笔记,[[创建链接]],或者试一试导入器插件!

当你准备好了,就将该笔记文件删除,使这个仓库为你所用。

前言:我的算法就是屎,赶点想想办法吧

2025.02.17 b站一面(项目和实习拷打,第二天晚上oc)

拷打项目

  1. 数据同步的耗时有没有要求?
  2. 什么场景需要需要百万数据? 从上流数据库批处理工具
  3. 数据的一致性、数据的稳定性、系统的稳定性、效率都是如何考虑的?会不会有丢的情况,丢的话如何处理的?(好难)
  4. Redis锁如果超时了,有没有自动释放机制(注意,不能剥夺,不要说强行剥夺,因为不合常理)
  5. SQL查询是如何做到的?单表还是多表?数据量?大概有多少列?
  6. B+树的结构、节点是怎么组织的,存的是什么,为什么效率比较高?和B树的区别?
  7. B+树一个数据页如果满了,之后会发生什么?
  8. 有没有用过NoSQL型数据库?tidb有了解过吗?
  9. 有没有分表分库?使用了什么中间件?
  10. 项目的灵感来源?为什么要做这个项目?为什么websocket的通信会比较慢?
  11. rpc:负载均衡、轮询、一致性哈希如何实现的
  12. 一致性哈希数据迁移的过程是如何的?
  13. 自定义通信协议是怎么做的?和传统的通信协议有什么区别?
  14. TCP为什么三次握手而不是四次握手?
  15. TCP每次连接时序列号都一样吗?为什么不一样,有什么作用?
  16. time_wait为什么是2MSL?MSL是什么?(报文最大生存时间)
  17. TCP滑动窗口的算法?(不会,我换成拥塞控制算法了,拥塞控制算法也要看一看)
  18. 简单介绍一下限流算法?
  19. 为什么要做ZClub?参考了小红书哪些模块?(跟上设计思路)
  20. 分片上传是怎么想到的?怎么设计的?
  21. 断点重传是如何设计的?
  22. 秒传是如何设计的?(有点扯淡的设计)
  23. Netty相关?零拷贝技术介绍一下?
  24. 多路复用?select、poll、epoll介绍一些
  25. 手撕重排链表
  26. 反问:表现如何(前面还可以,基础扎实,但是算法有点可惜,要加强一下,项目也还可以,结果会在一周内给出)

数据的一致性、数据的稳定性、系统的稳定性、效率都是如何考虑的?会不会有丢的情况,丢的话如何处理的?

  1. 使用事务,保证数据的组装、校验、导入等操作的原子性,如果中间环节失败,自动回滚。
  2. 增量更新使用日期,以日为单位进行更新,防止重复更新
  3. 同时对于同一业务实体的数据,通过RocketMQ顺序消息发送,确保消费者按照顺序处理,避免并发导致的脏数据。
  4. 对于异常情况,记录日志,进行补偿。RocketMQ开启手动ACK模式,仅在数据成功落库之后提交Offset,多次重试之后转存到死信队列人工介入。

系统稳定性如何保证的

使用Redis记录最后处理的批次ID,故障后从断点处理

效率上如何优化的

使用CF对多数据源进行拉取,异步并行处理

Redis分布式 锁如何实现的,有没有兜底策略啥的?

加锁的时候设置过期时间,解锁的时候使用lua脚本,确保判断锁存在,然后删除锁是一个原子操作。
兜底策略:合理的过期时间,使用看门狗,后台线程定期检测业务是否能够完成,延长锁的时间,私信队列/日志记录锁的异常行为,人工排查后强制释放
看门狗机制,每隔一段时间自动续期一次,保证锁不会过期。如果看门狗没有在指定时间内收到心跳恢复,会认为锁的写线程已经发生了故障,会尽量的处理死锁(强制解锁或者触发一个回调方法来处理死锁)

B+树的结构、节点是怎么组织的,存的是什么,为什么效率比较高?和B树的区别?

  1. 非叶子节点存储索引(键值),不存储数据,支持更多分支,降低树的高度,叶子节点存储数据,并且通过双向链表连接,支持范围查询
  2. 树低可以减少磁盘I/O次数
  3. B树非叶子节点也存数据,并且每一层的分支更少,树更高,且数据之间没有形成链表,范围查询时需要进行回溯。

B+树一个数据页如果满了,之后会发生什么?

会分裂为两个页,中间键提升到父节点
流程:

  1. 创建新页:右半部分移到新页,提取中间键(如原表最后一条数据)插入父节点。
  2. 更新父节点索引:父节点未满,直接插入中间键
  3. 若父节点满,则递归分裂:父节点已满,递归触发父节点分裂
  4. 分裂之后维护双向链表以保证顺序访问能够正常

分库分表

使用ShardingSphere或者MyCat,比如可以将帖子按照时间分片

为什么使用WebSocket通信,效率会比较慢

握手阶段需要HTTP升级,头部较大,并且会长期占用资源,高并发时会影响性能

RPC负载均衡的设计

轮询、一致性哈希(相同请求命中固定节点,减少数据迁移)

一致性哈希数据迁移

一致性哈希通过虚拟节点和局部迁移减少数据移动。例如,新增节点时,仅影响相邻区间的数据,

  1. 新增节点:
    • 哈希环上插入新节点的多个虚拟节点
    • 仅迁移新节点与顺时针相邻节点之间的数据
  2. 删除节点:
    1. 移除目标节点的所有虚拟节点
    2. 将数据迁移到顺时针下一个节点
  3. 客户端进行切换,逐步将请求路由到新节点,避免瞬时负载过高。

自定义通信协议设计,对比传统协议如何?

协议的组成:

  1. Header
    • 魔数:1字节
    • 版本号:1字节
    • 序列化器:1字节
    • 消息类型:1字节
    • 消息状态:1字节
    • 请求id:8个字节
    • 压缩方式:1字节
    • 消息体长度:4字节
  2. Body(任意内容,字节)
    优势:支持多种序列化与压缩(如Gzip),使用更灵活,无HTTP头部冗余

为什么TCP每次建立连接时序列号不同?

为了防止重放攻击

time_wait为什么是2MSL?MSL是什么?(报文最大生存时间)

MSL要大于等于TTL消耗为0的时间
MSL是报文的最大生存时间,网络中可能存在来自发送方的数据被接收方处理之后,又会发送响应,一来一回所以需要等待二倍的时间。

拥塞算法

  • 慢启动:初始拥塞窗口指数增长。
  • 拥塞避免:窗口线性增长。
  • 快重传:收到3个重复ACK立即重传。
  • 快恢复:窗口减半后进入拥塞避免。

限流算法有哪些?

  • 固定窗口:单位时间限请求数,简单但临界突发。
  • 滑动窗口:细分时间块,平滑限流。
  • 令牌桶:匀速生成令牌,允许突发。
  • 漏桶:恒定速率处理,平滑流量。

分片上传、断点重传、秒传设计的细节

  • 分片上传
    1. 前端将文件切分为固定大小(如5MB)。
    2. 服务端为每个分片生成唯一ID,并行上传。
    3. 所有分片上传完成后合并。
  • 断点续传
    • 记录已上传分片ID至Redis。
    • 续传时跳过已传分片。
  • 秒传
    • 计算文件MD5作为唯一标识
    • 上传前查询服务器是否存在相同哈希
    • 如果存在直接返回地址,无需重复上传

TCP滑动窗口算法

  1. 接收方维护一个可接受的数据量大小,也就是滑动窗口大小,并且通过ACK报文来通告对方大小,发送方据此来调整发送速率
  2. 动态滑动:每收到一个ACK窗口向前滑动允许发送新的数据
  3. 流量控制:接收方通过减小窗口,抑制发送方速率,防止缓冲区溢出

四次挥手是否可以合并为三次

如果被动关闭方无数据发送,可将FIN和ACK合并,依赖延迟确认机制。
可能会导致主动关闭方每收到最终的ACK,被动方已经关闭

简单介绍一下PageCache

PageCache通过内存缓存磁盘数据,加速文件访问。例如,读取文件时,优先从PageCache获取,未命中再访问磁盘,结合预读和写回优化整体性能。

将频繁访问的文件数据缓存在内存中,从而减少磁盘IO,同时可以根据访问模式预加载后续数据,提升读取效率,写回时可以延迟磁盘写入,合并多次写操作,降低磁盘IO

2025.02.19 二面 (20多分钟 ,问了一点实习,还有一点计网,然后写了个LRU 秒了,怀疑是kpi)

  1. 自我介绍
  2. 实习时间,何时到岗(画饼说如果有hc的可以转正,感觉纯画饼)
  3. 学校安排,如何处理考试啥的
  4. 到岗时间
  5. 实习介绍
  6. 手撕LRU
  7. 反问岗位:直播结算?整体基础不错,但是实习经历比较平淡

后续:挂了

mysql一次性查询200w数据,msyql会出现问题,我该如何处理,如何优化,业务是数据批处理和同步到其他数据库中,如何保证这200w条数据的一致性、准确性、稳定性、容错呢,怎么样保证这个查询不会影响其他查询的性能呢?

  1. 使用基于有序主键的范围查询WHERE id > last_max_id LIMIT batich_size
  2. 流式查询:使用JDBC/连接器的游标,或者服务端游标,减少内存压力
  3. 索引覆盖:确保查询仅能通过所以即可完成避免回表
  4. 资源隔离:通过MySQL资源组和中间件限制查询的CPU/IO资源
    数据一致性如何保证:
  5. 事务与批次提交:使用事务保证单批次操作的原子性,分批提交(每1000条提交一次)
  6. 幂等设计:通过唯一键、版本号避免重复处理
  7. 分布式锁:同步时标记正在处理的记录,防止并发冲突
  8. 补偿机制:记录同步日志,通过定时任务修复不一致数据
    稳定性设计:
  9. 熔断降级:服务中引入熔断机制,异常时暂停任务并且告警
  10. 重试策略:网络抖动使用指数避退重试
  11. 异步化处理:将数据同步任务提交到线程池或者消息队列,解耦试试性要求

为什么使用RocketMQ和Kafka对比,适合场景,RocketMQ底层是如何做的顺序保障的

Kafka:

  1. 高吞吐日志流处理,适合大数据场景
  2. 顺序消息只能保证分区内顺序,需要业务自行管理分区键
    RocketMQ:
  3. 金融级事务消息、顺序消息、延迟消息支持更好
  4. 通过MessageQueue锁机制严格保证同一队列的顺序性
  5. 提供两阶段提交(版消息机制),Kafka需要自行实现。

RocketMQ顺序性的实现原理

  1. 队列分配:生产者通过MessageQueueSelector将同一业务ID的消息发送到固定的队列中
  2. 消费端:消费者对队列加锁,单线程顺序消费
  3. 失败重试:若某条消息消费失败,会阻塞后续消息直到处理成功

MongoDB了解和MySQL的区别

看门狗的底层

浏览器键入URL会发生什么,路由转发过程,每一层如何寻址,如何解数据包和包装

前言

之前我们分析了一波ThreadLocal的源码,我们知道ThreadLoca是为了为每一个线程保存一个本线程的变量,但是某些场景下,我们可能需要在线程之间共享一些变量,比如异步操作和使用线程池进行任务等,接下来我们来分析一下有哪些可以满足我们的需求,具体的使用方式就不贴上来了,基本和ThreadLcoal一样的,这里重点分析他们的实现

InheritableThreadLocal

InheritThreadLocal是ThreadLocal的一个子类,他的代码基本就是一些定义,和包装好供我们使用的,主要是为了给ThreadLoca中的inheritableThreadLocals进行赋值等操作。精髓还是在ThreadLocal中
首先先上结论:
InheritableThreadLocal只能在父子线程之间传递变量,而不适用于线程池的场景。

源码解读

首先看一段代码:


public class ThreadLocalTest implements Runnable{
    private static final InheritableThreadLocal<String> MAIN_THREAD_LOCAL = new InheritableThreadLocal<>();
    @SneakyThrows
    @Override
    public void run() {
        System.out.println("threadlocal 默认值:"+ThreadLocalTest.MAIN_THREAD_LOCAL.get());
        MAIN_THREAD_LOCAL.set("child thread value :"+Thread.currentThread().getName());
        System.out.println("threadlocal 设置子线程值之后:"+ThreadLocalTest.MAIN_THREAD_LOCAL.get());
    }
    public String get(){
        return MAIN_THREAD_LOCAL.get();
    }
    public void clean(){
        MAIN_THREAD_LOCAL.remove();
    }
    public static void main(String[] args) {
        ThreadLocalTest threadLocalTest = new ThreadLocalTest();
        MAIN_THREAD_LOCAL.set("父线程的值 set 111");
        System.out.println("启动:"+threadLocalTest.get());
        for (int i = 0; i < 3; i++) {
            new Thread(threadLocalTest).start();
//            ThreadUtil.execAsync(threadLocalTest);
        }
        System.out.println("结束:"+threadLocalTest.get());
    }
}

这是一段启动子线程的代码,我们顺着 new Thread进去Thread中来到这段代码:

private void init(ThreadGroup g, Runnable target, String name,
                     long stackSize, AccessControlContext acc,
                     boolean inheritThreadLocals) {}

那么我们就去找inheritThreadLocals这个变量,最后在400多行的一个函数中找到了这个变量的位置(具体位置每个版本的行数不一样,直接搜索inheritThreadLocals就行了)
他会使用ThreadLocal.createInheriteaMap去进行赋值,我们就去看这个方法是如何做的

static ThreadLocalMap createInheritedMap(ThreadLocalMap parentMap) {
        return new ThreadLocalMap(parentMap);
    }

接着进去看

private ThreadLocalMap(ThreadLocalMap parentMap) {
           Entry[] parentTable = parentMap.table;
           int len = parentTable.length;
           setThreshold(len);
           table = new Entry[len];

           for (Entry e : parentTable) {
               if (e != null) {
                   @SuppressWarnings("unchecked")
                   ThreadLocal<Object> key = (ThreadLocal<Object>) e.get();
                   if (key != null) {
                       Object value = key.childValue(e.value);
                       Entry c = new Entry(key, value);
                       int h = key.threadLocalHashCode & (len - 1);
                       while (table[h] != null)
                           h = nextIndex(h, len);
                       table[h] = c;
                       size++;
                   }
               }
           }
       }

到这里,我们其实一目了然了,原来就是新建了一个ThreadLocalMap,然后将传进来的父ThreadLocalMap中的内容一个一个复制过去。。。可以看出,实际上高大上的技术其实往往都是很简单的实现,所以源码该看的还是得看。

为什么线程池中无法共享

通过之前我们对线程池源码的分析,我们可以发现,线程池也使用的new Thread创建子线程的,也就是,实际上给是可以实现线程池之间的共享的,但是问题是,线程池中的线程是重用的,也就是我们没办法确认哪几个线程之间共享了哪个InheritedLocal

TransmittableThreadLocal

这个是来自阿里巴巴开源的一个库中的类,简称TTL,这个是可以满足线程池之间传递变量的。
先给出使用实例:

public class LoginUserContextHolder {

    // 初始化一个 ThreadLocal 变量
    private static final ThreadLocal<Map<String, Object>> LOGIN_USER_CONTEXT_THREAD_LOCAL
            = TransmittableThreadLocal.withInitial(HashMap::new);

    /**
     * 设置用户 ID
     *
     * @param value
     */
    public static void setUserId(Object value) {
        LOGIN_USER_CONTEXT_THREAD_LOCAL.get().put(GlobalConstants.USER_ID, value);
    }

    /**
     * 获取用户 ID
     *
     * @return
     */
    public static Long getUserId() {
        Object value = LOGIN_USER_CONTEXT_THREAD_LOCAL.get().get(GlobalConstants.USER_ID);
        if (Objects.isNull(value)) {
            return null;
        }
        new Thread(() -> {
            System.out.println("子线程获取到的用户ID:" + value);
        }).start();
        return Long.valueOf(value.toString());
    }

    /**
     * 删除 ThreadLocal
     */
    public static void remove() {
        LOGIN_USER_CONTEXT_THREAD_LOCAL.remove();
    }

}

源码分析

首先看定义
可以看出TTL是InheritableThreadLocal的子类,进行了一些补充。
我们去看上面示例中的withInitial方法
插个题外话,我们先关注一下这个Supplier:

Supplier 接口在 Java 中的作用是提供一个结果的供应者。它是一个函数式接口,意味着它可以用作 lambda 表达式或方法引用的目标。函数式接口的意义在于它们使代码更加简洁和可读,特别是在使用 Java 8 引入的流式 API 时

源代码比较长,(有点懒得写) 可以看这位大佬的分析,这里就做一些简单的总结 :

  1. 定义了一个静态变量:holder(InheritableThreadLocal来存储WeakHashMap,父子线程共享),负责保存所有的TTL示例
  2. get、set、remove方法会将当前实例从holder中添加或者移除
  3. withInitial 和 withInitialAndCopier提供用于创建具有初始值和复制器的TTL变量
  4. 核心 Transmitter 内部类提供了 capture(将当前线程中的holder里的TTL捕获出来设置到HashMap中)、replay(将捕获的HashMap设置到新线程的TTL实例中)、clear 和 restore(将之前捕获的HashMap恢复到当前线程的TTL中) 方法,用于捕获和恢复线程本地变量的值。

TTL做的实际上就是将原本与Thread绑定的线程变量,缓存一份到TtlRunnable对象中,在执行子线程任务前,将对象中缓存的变量值设置到子线程的ThreadLocal中以供run()方法的代码使用,然后执行完后,又恢复现场,保证不会对复用线程产生影响。

前言

本人在尝试使用Netty来手写RPC时,学习到了很多Netty知识,在此进行一些记录

示例

以下时服务端的简单启动示例

 public void start(int port) {
        // 1是指定一个线程用于处理连接,0表示不处理连接
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        // 里面的参数是线程数,这里是处理消息的线程数,false是指定线程工厂是否是守护线程
        DefaultEventExecutorGroup serviceHandlerGroup = new DefaultEventExecutorGroup(
                RuntimeUtil.getProcessorCount() * 2,
                ThreadUtil.newNamedThreadFactory("service-handler-group", false)
        );
        try {
            /**
             * boss线程组用于处理连接工作,worker线程组用于数据处理
             * 依次的结构是 group -> channel -> childHandler -> handler
             * group 用于处理连接,channel 用于处理数据,childHandler 用于处理连接的数据,handler 用于处理数据的
             * 所属关系:一个 group 可以有多个 channel,一个 channel 可以有多个 childHandler,一个 childHandler 可以有多个 handler
             * 一个 channel 只能有一个 childHandler,一个 childHandler 可以有多个 handler
             */
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.TRACE))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline()
//                                    .addLast(new HttpServerCodec())
//                                    .addLast(new HttpObjectAggregator(65536))
//                                    .addLast(new ChunkedWriteHandler())
                                    // 30之内没有收到客户端请求,就会触发IdleStateHandler的userEventTriggered方法
                                    .addLast(new IdleStateHandler(30,0,0, TimeUnit.SECONDS))
                                    .addLast(new ProtocolEncoder())
                                    .addLast(new ProtocolDecoder())
                                    .addLast(serviceHandlerGroup, new NettyServerHandler());
//                                    .addLast(new TestNettyHandler());
                            // todo 接收消息,将消息先编码,然后解码成ZMessage格式,最后交由NettyHttpServerHandler处理
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            ChannelFuture f = b.bind(port).sync();
            System.out.println("Server is now listening on port " + port);
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

本文就基于这一段代码来进行深入探究Netty是如何实现的

EventLoopGroup

我们进入NioEventLoopGroup的源码中

public NioEventLoopGroup() {
       this(0);
   }

   public NioEventLoopGroup(int nThreads) {
       this(nThreads, (Executor)null);
   }

我们沿着this不断往下查找,发现

public NioEventLoopGroup(int nThreads, Executor executor, SelectorProvider selectorProvider, SelectStrategyFactory selectStrategyFactory) {
        super(nThreads, executor, new Object[]{selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject()});
    }

然后进入super之后,发现nThreads这个参数影响的是这里

private static final int DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));  
  
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {  
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);  
}

最终发现:
一切的根源是MultithreadEventExecutorGroup这个类

protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {
    // 初始化 terminatedChildren 和 terminationFuture
    this.terminatedChildren = new AtomicInteger();
    this.terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);

    // 检查 nThreads 是否为正数
    ObjectUtil.checkPositive(nThreads, "nThreads");

    // 如果 executor 为空,则使用默认的线程工厂创建一个新的 ThreadPerTaskExecutor
    if (executor == null) {
        executor = new ThreadPerTaskExecutor(this.newDefaultThreadFactory());
    }

    // 初始化 children 数组
    this.children = new EventExecutor[nThreads];

    // 创建并初始化每个 EventExecutor
    for (int i = 0; i < nThreads; ++i) {
        boolean success = false;
        boolean var18 = false;

        try {
            var18 = true;
            // 创建新的子 EventExecutor
            this.children[i] = this.newChild((Executor) executor, args);
            success = true;
            var18 = false;
        } catch (Exception var19) {
            // 如果创建失败,抛出异常
            throw new IllegalStateException("failed to create a child event loop", var19);
        } finally {
            if (var18) {
                if (!success) {
                    // 如果创建失败,关闭已创建的 EventExecutor
                    for (int j = 0; j < i; ++j) {
                        this.children[j].shutdownGracefully();
                    }

                    for (int j = 0; j < i; ++j) {
                        EventExecutor e = this.children[j];

                        try {
                            while (!e.isTerminated()) {
                                e.awaitTermination(2147483647L, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException var20) {
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
        }

        if (!success) {
            // 如果创建失败,关闭已创建的 EventExecutor
            for (int j = 0; j < i; ++j) {
                this.children[j].shutdownGracefully();
            }

            for (int j = 0; j < i; ++j) {
                EventExecutor e = this.children[j];

                try {
                    while (!e.isTerminated()) {
                        e.awaitTermination(2147483647L, TimeUnit.SECONDS);
                    }
                } catch (InterruptedException var22) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }
    }

    // 创建 EventExecutorChooser
    this.chooser = chooserFactory.newChooser(this.children);

    // 创建并添加 terminationListener
    FutureListener<Object> terminationListener = new FutureListener<Object>() {
        public void operationComplete(Future<Object> future) throws Exception {
            if (MultithreadEventExecutorGroup.this.terminatedChildren.incrementAndGet() == MultithreadEventExecutorGroup.this.children.length) {
                MultithreadEventExecutorGroup.this.terminationFuture.setSuccess((Object) null);
            }
        }
    };

    // 为每个 EventExecutor 添加 terminationListener
    for (EventExecutor e : this.children) {
        e.terminationFuture().addListener(terminationListener);
    }

    // 创建只读的 children 集合
    Set<EventExecutor> childrenSet = new LinkedHashSet<>(this.children.length);
    Collections.addAll(childrenSet, this.children);
    this.readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

我们debug走进去,到这个地方

发现在这里使用了默认的线程工厂创建了一个ThreadPerTaskExecutor,接着往下走
发现从this.children = new EventExecutor[nThreads];这一行开始,在创建并初始化每一个Executor
接着往下看

this.chooser = chooserFactory.newChooser(this.children)

这里创建了一个 EventExecutorChooser对象用于在多个 EventExecutor 之间选择一个执行任务
之后往下看,可以发现,为每一个children数组中的对象添加了TerminationListener这个监听器
TerminationListener的作用是,监听每一个EventExecutor的终止事件,当所有的EventExecutor都终止之后,会将TerminationFuture设置为成功状态,表示整个MultithreadEventExecutorGroup 已经完全终止
TerminationListener是个匿名内部类

FutureListener<Object> terminationListener = new FutureListener<Object>() {  
    
    public void operationComplete(Future<Object> future) throws Exception {  
        if (MultithreadEventExecutorGroup.this.terminatedChildren.incrementAndGet() == MultithreadEventExecutorGroup.this.children.length) {  
            MultithreadEventExecutorGroup.this.terminationFuture.setSuccess((Object)null);  
        }  
    }
};

方法的最后,将所有的children中的EventExecutor转化为一个不可修改的集合,从而保证readonlyChildren集合中的元素不可修改,从而保证线程安全和数据的完整性
到这里第一部分的源码就分析完毕,接下来让我们关注到ServerBootstrap这个类

ServerBootstrap

先回顾一波示例代码

b.group(bossGroup, workerGroup)  
                    .channel(NioServerSocketChannel.class)  
                    .handler(new LoggingHandler(LogLevel.TRACE))  
                    .childHandler(new ChannelInitializer<SocketChannel>() {  
                        @Override  
                        protected void initChannel(SocketChannel socketChannel) throws Exception {  
                            socketChannel.pipeline()  
//                                    .addLast(new HttpServerCodec())  
//                                    .addLast(new HttpObjectAggregator(65536))  
//                                    .addLast(new ChunkedWriteHandler())  
                                    // 30之内没有收到客户端请求,就会触发IdleStateHandler的userEventTriggered方法  
                                    .addLast(new IdleStateHandler(30,0,0, TimeUnit.SECONDS))  
                                    .addLast(new ProtocolEncoder())  
                                    .addLast(new ProtocolDecoder())  
                                    .addLast(serviceHandlerGroup, new NettyServerHandler());  
//                                    .addLast(new TestNettyHandler());  
                            // todo 接收消息,将消息先编码,然后解码成ZMessage格式,最后交由NettyHttpServerHandler处理  
                        }  
                    })  
                    .option(ChannelOption.SO_BACKLOG, 128)  
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

group


让我们点进去查看:

public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {  
	// 将parentGroup设置为父组
    super.group(parentGroup);  
    if (this.childGroup != null) {  
        throw new IllegalStateException("childGroup set already");  
    } else {  
        this.childGroup = (EventLoopGroup)ObjectUtil.checkNotNull(childGroup, "childGroup");  
        return this;  
    }
}

在这里我们可以发现几个有趣的地方:

  1. 链式调用实际上就是在方法结束的时候返回this本身,这个指针
  2. parentGroup需要去看父类
    先让我们聚焦到ServerBootstrap这个类

基本属性

public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
    // 用于记录日志的静态常量
    private static final InternalLogger logger = InternalLoggerFactory.getInstance(ServerBootstrap.class);

    // 存储子通道选项的 Map
    private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<>();

    // 存储子通道属性的 Map
    private final Map<AttributeKey<?>, Object> childAttrs = new ConcurrentHashMap<>();

    // ServerBootstrap 的配置对象
    private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);

    // 子事件循环组,使用 volatile 修饰以保证线程安全
    private volatile EventLoopGroup childGroup;

    // 子通道处理器,使用 volatile 修饰以保证线程安全
    private volatile ChannelHandler childHandler;
}

在这里我们就会发现,构造函数中的那些属性全都是在这里的
让我们继续溯源到AbstractBootstrap

public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {

	// 空的 ChannelOption 数组常量
private static final Map.Entry<ChannelOption<?>, Object>[] EMPTY_OPTION_ARRAY = new Map.Entry[0];

// 空的 AttributeKey 数组常量
private static final Map.Entry<AttributeKey<?>, Object>[] EMPTY_ATTRIBUTE_ARRAY = new Map.Entry[0];

// 事件循环组,使用 volatile 修饰以保证线程安全
volatile EventLoopGroup group;

// 通道工厂,使用 volatile 修饰以保证线程安全
private volatile ChannelFactory<? extends C> channelFactory;

// 本地地址,使用 volatile 修饰以保证线程安全
private volatile SocketAddress localAddress;

// 存储通道选项的 Map
private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<>();

// 存储通道属性的 Map
private final Map<AttributeKey<?>, Object> attrs = new ConcurrentHashMap<>();

// 通道处理器,使用 volatile 修饰以保证线程安全
private volatile ChannelHandler handler;

// 扩展类加载器,使用 volatile 修饰以保证线程安全
private volatile ClassLoader extensionsClassLoader;
}

这里有个很有意思的继承关系
这个定义是一个抽象类 AbstractBootstrap,它有两个泛型参数 BC,并且实现了 Cloneable 接口。

  • B extends AbstractBootstrap<B, C>:表示泛型参数 B 必须是 AbstractBootstrap 类的子类,并且具有相同的泛型参数 BC。这种定义方式通常用于实现流式 API,使方法可以返回当前对象的类型。
  • C extends Channel:表示泛型参数 C 必须是 Channel 类的子类。
    public B group(EventLoopGroup group) {  
        ObjectUtil.checkNotNull(group, "group");  
        if (this.group != null) {  
            throw new IllegalStateException("group set already");  
        } else {  
            this.group = group;  
            return this.self();  
        }
    }
    从这里可以看到

    这里设置的group实际上就是在给从AbstractBoostrap中集成到的group进行赋值,而childGroup则是ServerBoostarap自己的新增的属性赋值
    接着关注到.handler进入之后可以看到将handler赋值给属性了,这段源码比较简单,就不贴出来了
    接着关注childHandler

    在这里我们可以给连接后产生的SocketChannel配置一些东西,比如通过.addLast来添加连接处理器,handler是给Boss初0始化的,而childHandler则是给Worker进行初始化
    最后
    .option 和 .childOption 是 Netty 中 ServerBootstrap 类的方法,用于配置服务器通道和子通道的选项。
    其中.option是AbstractBoostrap中的属性。
    以上都不是很难的部分,真正核心的在下面👇

bind

ChannelFuture f = b.bind(port).sync();

这一段代码是将我们的ChannelFuture通过Boostrap启动类初始化之后,来绑定结果,ChannelFuture就是一个类似于Future的一个东西,提供了一些与Channel相关的方法,后文会展开,这里先跳过。
然我们跟着bind的调用链去揭开谜底!


到这里我们才意识到,一切的根源都在doBind这个方法中。
先贴源码

private ChannelFuture doBind(final SocketAddress localAddress) {
    // 初始化并注册Channel
    final ChannelFuture regFuture = this.initAndRegister();
    final Channel channel = regFuture.channel();
    
    // 如果注册过程中出现异常,直接返回注册的Future
    if (regFuture.cause() != null) {
        return regFuture;
    } 
    // 如果注册已经完成,创建一个新的ChannelPromise并执行绑定操作
    else if (regFuture.isDone()) {
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } 
    // 如果注册未完成,创建一个PendingRegistrationPromise并添加监听器
    else {
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                // 如果注册过程中出现异常,设置Promise为失败状态
                if (cause != null) {
                    promise.setFailure(cause);
                } 
                // 如果注册成功,标记Promise为已注册并执行绑定操作
                else {
                    promise.registered();
                    AbstractBootstrap.doBind0(regFuture, channel, localAddress, promise);
                }
            }
        });
        return promise;
    }
}

开始分析

让我们关注到initAndRegister这个方法

final ChannelFuture initAndRegister() {
    Channel channel = null;

    try {
        // 创建新的Channel实例
        channel = this.channelFactory.newChannel();
        // 初始化Channel
        this.init(channel);
    } catch (Throwable var3) {
        // 如果初始化过程中出现异常,关闭Channel并返回失败的Promise
        // promise 是一种用于表示异步操作结果的对象。它允许你在异步操作完成后执行某些操作,而不需要阻塞当前线程
        if (channel != null) {
            channel.unsafe().closeForcibly();
            return (new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE)).setFailure(var3);
        }
        // 如果Channel为null,返回一个失败的Promise
        return (new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE)).setFailure(var3);
    }

    // 注册Channel到EventLoopGroup
    ChannelFuture regFuture = this.config().group().register(channel);
    // 如果注册过程中出现异常,关闭Channel
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }

    // 返回注册的Future
    return regFuture;
}

对于channel = this.channelFactory.newChannel();这一行,我们debug进去,
最终在找到了如何初始化的,

Constructor 是 Java 反射机制中的一个类,用于表示类的构造方法。通过 Constructor 类,你可以动态地创建类的实例、获取构造方法的参数类型、访问修饰符等信息。


这里通过反射获得了io.netty.channel.ReflectiveChannelFactory

这里可以找到,实际上是通过DelegatingConstructorAccessorImpl来实现的,不断深入,最后在这里找到答案
奥,原来是调用了一个本地方法来实现的服务注册!

ChannelFuture

补充一下ChannelFuture的定义

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

package io.netty.channel;

import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;

public interface ChannelFuture extends Future<Void> {
    /**
     * 返回与此Future相关联的Channel。
     * @return 相关联的Channel
     */
    Channel channel();

    /**
     * 添加一个监听器,当操作完成时会通知该监听器。
     * @param listener 要添加的监听器
     * @return 当前的ChannelFuture实例
     */
    ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);

    /**
     * 添加多个监听器,当操作完成时会通知这些监听器。
     * @param listeners 要添加的监听器数组
     * @return 当前的ChannelFuture实例
     */
    ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);

    /**
     * 移除一个监听器。
     * @param listener 要移除的监听器
     * @return 当前的ChannelFuture实例
     */
    ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener);

    /**
     * 移除多个监听器。
     * @param listeners 要移除的监听器数组
     * @return 当前的ChannelFuture实例
     */
    ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);

    /**
     * 等待操作完成并同步返回结果,如果操作被中断则抛出InterruptedException。
     * @return 当前的ChannelFuture实例
     * @throws InterruptedException 如果操作被中断
     */
    ChannelFuture sync() throws InterruptedException;

    /**
     * 等待操作完成并同步返回结果,不会被中断。
     * @return 当前的ChannelFuture实例
     */
    ChannelFuture syncUninterruptibly();

    /**
     * 等待操作完成。
     * @return 当前的ChannelFuture实例
     * @throws InterruptedException 如果操作被中断
     */
    ChannelFuture await() throws InterruptedException;

    /**
     * 等待操作完成,不会被中断。
     * @return 当前的ChannelFuture实例
     */
    ChannelFuture awaitUninterruptibly();

    /**
     * 检查此Future是否是Void类型。
     * @return 如果是Void类型则返回true,否则返回false
     */
    boolean isVoid();
}

ChannelFuture就是定义了一系列的监听Channel行为的监听器

doBind

接着顺着doBind往下看

在这里如果出现各种异常行为,会给promise设置异常之后退出,否则则调用doBind0

private static void doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) {
    // 在 channel 的事件循环中执行绑定操作
    channel.eventLoop().execute(new Runnable() {
        public void run() {
            // 检查注册操作是否成功
            if (regFuture.isSuccess()) {
                // 如果注册成功,绑定到本地地址,并在失败时关闭 channel
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                // 如果注册失败,设置 promise 的失败原因
                promise.setFailure(regFuture.cause());
            }
        }
    });
}

通过EventLoop的类关系图可以知道,EventLoop实际上就是一个封装的Executor,所以实际上是使用的Executor来跑一个线程来进行绑定
至此,ServerBootstrap的初始化我们就已经完整的探究了一遍