0%

Netty源码初探

前言

本人在尝试使用Netty来手写RPC时,学习到了很多Netty知识,在此进行一些记录

示例

以下时服务端的简单启动示例

 public void start(int port) {
        // 1是指定一个线程用于处理连接,0表示不处理连接
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        // 里面的参数是线程数,这里是处理消息的线程数,false是指定线程工厂是否是守护线程
        DefaultEventExecutorGroup serviceHandlerGroup = new DefaultEventExecutorGroup(
                RuntimeUtil.getProcessorCount() * 2,
                ThreadUtil.newNamedThreadFactory("service-handler-group", false)
        );
        try {
            /**
             * boss线程组用于处理连接工作,worker线程组用于数据处理
             * 依次的结构是 group -> channel -> childHandler -> handler
             * group 用于处理连接,channel 用于处理数据,childHandler 用于处理连接的数据,handler 用于处理数据的
             * 所属关系:一个 group 可以有多个 channel,一个 channel 可以有多个 childHandler,一个 childHandler 可以有多个 handler
             * 一个 channel 只能有一个 childHandler,一个 childHandler 可以有多个 handler
             */
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.TRACE))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline()
//                                    .addLast(new HttpServerCodec())
//                                    .addLast(new HttpObjectAggregator(65536))
//                                    .addLast(new ChunkedWriteHandler())
                                    // 30之内没有收到客户端请求,就会触发IdleStateHandler的userEventTriggered方法
                                    .addLast(new IdleStateHandler(30,0,0, TimeUnit.SECONDS))
                                    .addLast(new ProtocolEncoder())
                                    .addLast(new ProtocolDecoder())
                                    .addLast(serviceHandlerGroup, new NettyServerHandler());
//                                    .addLast(new TestNettyHandler());
                            // todo 接收消息,将消息先编码,然后解码成ZMessage格式,最后交由NettyHttpServerHandler处理
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            ChannelFuture f = b.bind(port).sync();
            System.out.println("Server is now listening on port " + port);
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

本文就基于这一段代码来进行深入探究Netty是如何实现的

EventLoopGroup

我们进入NioEventLoopGroup的源码中

public NioEventLoopGroup() {
       this(0);
   }

   public NioEventLoopGroup(int nThreads) {
       this(nThreads, (Executor)null);
   }

我们沿着this不断往下查找,发现

public NioEventLoopGroup(int nThreads, Executor executor, SelectorProvider selectorProvider, SelectStrategyFactory selectStrategyFactory) {
        super(nThreads, executor, new Object[]{selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject()});
    }

然后进入super之后,发现nThreads这个参数影响的是这里

private static final int DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));  
  
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {  
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);  
}

最终发现:
一切的根源是MultithreadEventExecutorGroup这个类

protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {
    // 初始化 terminatedChildren 和 terminationFuture
    this.terminatedChildren = new AtomicInteger();
    this.terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);

    // 检查 nThreads 是否为正数
    ObjectUtil.checkPositive(nThreads, "nThreads");

    // 如果 executor 为空,则使用默认的线程工厂创建一个新的 ThreadPerTaskExecutor
    if (executor == null) {
        executor = new ThreadPerTaskExecutor(this.newDefaultThreadFactory());
    }

    // 初始化 children 数组
    this.children = new EventExecutor[nThreads];

    // 创建并初始化每个 EventExecutor
    for (int i = 0; i < nThreads; ++i) {
        boolean success = false;
        boolean var18 = false;

        try {
            var18 = true;
            // 创建新的子 EventExecutor
            this.children[i] = this.newChild((Executor) executor, args);
            success = true;
            var18 = false;
        } catch (Exception var19) {
            // 如果创建失败,抛出异常
            throw new IllegalStateException("failed to create a child event loop", var19);
        } finally {
            if (var18) {
                if (!success) {
                    // 如果创建失败,关闭已创建的 EventExecutor
                    for (int j = 0; j < i; ++j) {
                        this.children[j].shutdownGracefully();
                    }

                    for (int j = 0; j < i; ++j) {
                        EventExecutor e = this.children[j];

                        try {
                            while (!e.isTerminated()) {
                                e.awaitTermination(2147483647L, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException var20) {
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
        }

        if (!success) {
            // 如果创建失败,关闭已创建的 EventExecutor
            for (int j = 0; j < i; ++j) {
                this.children[j].shutdownGracefully();
            }

            for (int j = 0; j < i; ++j) {
                EventExecutor e = this.children[j];

                try {
                    while (!e.isTerminated()) {
                        e.awaitTermination(2147483647L, TimeUnit.SECONDS);
                    }
                } catch (InterruptedException var22) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }
    }

    // 创建 EventExecutorChooser
    this.chooser = chooserFactory.newChooser(this.children);

    // 创建并添加 terminationListener
    FutureListener<Object> terminationListener = new FutureListener<Object>() {
        public void operationComplete(Future<Object> future) throws Exception {
            if (MultithreadEventExecutorGroup.this.terminatedChildren.incrementAndGet() == MultithreadEventExecutorGroup.this.children.length) {
                MultithreadEventExecutorGroup.this.terminationFuture.setSuccess((Object) null);
            }
        }
    };

    // 为每个 EventExecutor 添加 terminationListener
    for (EventExecutor e : this.children) {
        e.terminationFuture().addListener(terminationListener);
    }

    // 创建只读的 children 集合
    Set<EventExecutor> childrenSet = new LinkedHashSet<>(this.children.length);
    Collections.addAll(childrenSet, this.children);
    this.readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

我们debug走进去,到这个地方

发现在这里使用了默认的线程工厂创建了一个ThreadPerTaskExecutor,接着往下走
发现从this.children = new EventExecutor[nThreads];这一行开始,在创建并初始化每一个Executor
接着往下看

this.chooser = chooserFactory.newChooser(this.children)

这里创建了一个 EventExecutorChooser对象用于在多个 EventExecutor 之间选择一个执行任务
之后往下看,可以发现,为每一个children数组中的对象添加了TerminationListener这个监听器
TerminationListener的作用是,监听每一个EventExecutor的终止事件,当所有的EventExecutor都终止之后,会将TerminationFuture设置为成功状态,表示整个MultithreadEventExecutorGroup 已经完全终止
TerminationListener是个匿名内部类

FutureListener<Object> terminationListener = new FutureListener<Object>() {  
    
    public void operationComplete(Future<Object> future) throws Exception {  
        if (MultithreadEventExecutorGroup.this.terminatedChildren.incrementAndGet() == MultithreadEventExecutorGroup.this.children.length) {  
            MultithreadEventExecutorGroup.this.terminationFuture.setSuccess((Object)null);  
        }  
    }
};

方法的最后,将所有的children中的EventExecutor转化为一个不可修改的集合,从而保证readonlyChildren集合中的元素不可修改,从而保证线程安全和数据的完整性
到这里第一部分的源码就分析完毕,接下来让我们关注到ServerBootstrap这个类

ServerBootstrap

先回顾一波示例代码

b.group(bossGroup, workerGroup)  
                    .channel(NioServerSocketChannel.class)  
                    .handler(new LoggingHandler(LogLevel.TRACE))  
                    .childHandler(new ChannelInitializer<SocketChannel>() {  
                        @Override  
                        protected void initChannel(SocketChannel socketChannel) throws Exception {  
                            socketChannel.pipeline()  
//                                    .addLast(new HttpServerCodec())  
//                                    .addLast(new HttpObjectAggregator(65536))  
//                                    .addLast(new ChunkedWriteHandler())  
                                    // 30之内没有收到客户端请求,就会触发IdleStateHandler的userEventTriggered方法  
                                    .addLast(new IdleStateHandler(30,0,0, TimeUnit.SECONDS))  
                                    .addLast(new ProtocolEncoder())  
                                    .addLast(new ProtocolDecoder())  
                                    .addLast(serviceHandlerGroup, new NettyServerHandler());  
//                                    .addLast(new TestNettyHandler());  
                            // todo 接收消息,将消息先编码,然后解码成ZMessage格式,最后交由NettyHttpServerHandler处理  
                        }  
                    })  
                    .option(ChannelOption.SO_BACKLOG, 128)  
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

group


让我们点进去查看:

public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {  
	// 将parentGroup设置为父组
    super.group(parentGroup);  
    if (this.childGroup != null) {  
        throw new IllegalStateException("childGroup set already");  
    } else {  
        this.childGroup = (EventLoopGroup)ObjectUtil.checkNotNull(childGroup, "childGroup");  
        return this;  
    }
}

在这里我们可以发现几个有趣的地方:

  1. 链式调用实际上就是在方法结束的时候返回this本身,这个指针
  2. parentGroup需要去看父类
    先让我们聚焦到ServerBootstrap这个类

基本属性

public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
    // 用于记录日志的静态常量
    private static final InternalLogger logger = InternalLoggerFactory.getInstance(ServerBootstrap.class);

    // 存储子通道选项的 Map
    private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<>();

    // 存储子通道属性的 Map
    private final Map<AttributeKey<?>, Object> childAttrs = new ConcurrentHashMap<>();

    // ServerBootstrap 的配置对象
    private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);

    // 子事件循环组,使用 volatile 修饰以保证线程安全
    private volatile EventLoopGroup childGroup;

    // 子通道处理器,使用 volatile 修饰以保证线程安全
    private volatile ChannelHandler childHandler;
}

在这里我们就会发现,构造函数中的那些属性全都是在这里的
让我们继续溯源到AbstractBootstrap

public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {

	// 空的 ChannelOption 数组常量
private static final Map.Entry<ChannelOption<?>, Object>[] EMPTY_OPTION_ARRAY = new Map.Entry[0];

// 空的 AttributeKey 数组常量
private static final Map.Entry<AttributeKey<?>, Object>[] EMPTY_ATTRIBUTE_ARRAY = new Map.Entry[0];

// 事件循环组,使用 volatile 修饰以保证线程安全
volatile EventLoopGroup group;

// 通道工厂,使用 volatile 修饰以保证线程安全
private volatile ChannelFactory<? extends C> channelFactory;

// 本地地址,使用 volatile 修饰以保证线程安全
private volatile SocketAddress localAddress;

// 存储通道选项的 Map
private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<>();

// 存储通道属性的 Map
private final Map<AttributeKey<?>, Object> attrs = new ConcurrentHashMap<>();

// 通道处理器,使用 volatile 修饰以保证线程安全
private volatile ChannelHandler handler;

// 扩展类加载器,使用 volatile 修饰以保证线程安全
private volatile ClassLoader extensionsClassLoader;
}

这里有个很有意思的继承关系
这个定义是一个抽象类 AbstractBootstrap,它有两个泛型参数 BC,并且实现了 Cloneable 接口。

  • B extends AbstractBootstrap<B, C>:表示泛型参数 B 必须是 AbstractBootstrap 类的子类,并且具有相同的泛型参数 BC。这种定义方式通常用于实现流式 API,使方法可以返回当前对象的类型。
  • C extends Channel:表示泛型参数 C 必须是 Channel 类的子类。
    public B group(EventLoopGroup group) {  
        ObjectUtil.checkNotNull(group, "group");  
        if (this.group != null) {  
            throw new IllegalStateException("group set already");  
        } else {  
            this.group = group;  
            return this.self();  
        }
    }
    从这里可以看到

    这里设置的group实际上就是在给从AbstractBoostrap中集成到的group进行赋值,而childGroup则是ServerBoostarap自己的新增的属性赋值
    接着关注到.handler进入之后可以看到将handler赋值给属性了,这段源码比较简单,就不贴出来了
    接着关注childHandler

    在这里我们可以给连接后产生的SocketChannel配置一些东西,比如通过.addLast来添加连接处理器,handler是给Boss初0始化的,而childHandler则是给Worker进行初始化
    最后
    .option 和 .childOption 是 Netty 中 ServerBootstrap 类的方法,用于配置服务器通道和子通道的选项。
    其中.option是AbstractBoostrap中的属性。
    以上都不是很难的部分,真正核心的在下面👇

bind

ChannelFuture f = b.bind(port).sync();

这一段代码是将我们的ChannelFuture通过Boostrap启动类初始化之后,来绑定结果,ChannelFuture就是一个类似于Future的一个东西,提供了一些与Channel相关的方法,后文会展开,这里先跳过。
然我们跟着bind的调用链去揭开谜底!


到这里我们才意识到,一切的根源都在doBind这个方法中。
先贴源码

private ChannelFuture doBind(final SocketAddress localAddress) {
    // 初始化并注册Channel
    final ChannelFuture regFuture = this.initAndRegister();
    final Channel channel = regFuture.channel();
    
    // 如果注册过程中出现异常,直接返回注册的Future
    if (regFuture.cause() != null) {
        return regFuture;
    } 
    // 如果注册已经完成,创建一个新的ChannelPromise并执行绑定操作
    else if (regFuture.isDone()) {
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } 
    // 如果注册未完成,创建一个PendingRegistrationPromise并添加监听器
    else {
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                // 如果注册过程中出现异常,设置Promise为失败状态
                if (cause != null) {
                    promise.setFailure(cause);
                } 
                // 如果注册成功,标记Promise为已注册并执行绑定操作
                else {
                    promise.registered();
                    AbstractBootstrap.doBind0(regFuture, channel, localAddress, promise);
                }
            }
        });
        return promise;
    }
}

开始分析

让我们关注到initAndRegister这个方法

final ChannelFuture initAndRegister() {
    Channel channel = null;

    try {
        // 创建新的Channel实例
        channel = this.channelFactory.newChannel();
        // 初始化Channel
        this.init(channel);
    } catch (Throwable var3) {
        // 如果初始化过程中出现异常,关闭Channel并返回失败的Promise
        // promise 是一种用于表示异步操作结果的对象。它允许你在异步操作完成后执行某些操作,而不需要阻塞当前线程
        if (channel != null) {
            channel.unsafe().closeForcibly();
            return (new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE)).setFailure(var3);
        }
        // 如果Channel为null,返回一个失败的Promise
        return (new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE)).setFailure(var3);
    }

    // 注册Channel到EventLoopGroup
    ChannelFuture regFuture = this.config().group().register(channel);
    // 如果注册过程中出现异常,关闭Channel
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }

    // 返回注册的Future
    return regFuture;
}

对于channel = this.channelFactory.newChannel();这一行,我们debug进去,
最终在找到了如何初始化的,

Constructor 是 Java 反射机制中的一个类,用于表示类的构造方法。通过 Constructor 类,你可以动态地创建类的实例、获取构造方法的参数类型、访问修饰符等信息。


这里通过反射获得了io.netty.channel.ReflectiveChannelFactory

这里可以找到,实际上是通过DelegatingConstructorAccessorImpl来实现的,不断深入,最后在这里找到答案
奥,原来是调用了一个本地方法来实现的服务注册!

ChannelFuture

补充一下ChannelFuture的定义

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

package io.netty.channel;

import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;

public interface ChannelFuture extends Future<Void> {
    /**
     * 返回与此Future相关联的Channel。
     * @return 相关联的Channel
     */
    Channel channel();

    /**
     * 添加一个监听器,当操作完成时会通知该监听器。
     * @param listener 要添加的监听器
     * @return 当前的ChannelFuture实例
     */
    ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);

    /**
     * 添加多个监听器,当操作完成时会通知这些监听器。
     * @param listeners 要添加的监听器数组
     * @return 当前的ChannelFuture实例
     */
    ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);

    /**
     * 移除一个监听器。
     * @param listener 要移除的监听器
     * @return 当前的ChannelFuture实例
     */
    ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener);

    /**
     * 移除多个监听器。
     * @param listeners 要移除的监听器数组
     * @return 当前的ChannelFuture实例
     */
    ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);

    /**
     * 等待操作完成并同步返回结果,如果操作被中断则抛出InterruptedException。
     * @return 当前的ChannelFuture实例
     * @throws InterruptedException 如果操作被中断
     */
    ChannelFuture sync() throws InterruptedException;

    /**
     * 等待操作完成并同步返回结果,不会被中断。
     * @return 当前的ChannelFuture实例
     */
    ChannelFuture syncUninterruptibly();

    /**
     * 等待操作完成。
     * @return 当前的ChannelFuture实例
     * @throws InterruptedException 如果操作被中断
     */
    ChannelFuture await() throws InterruptedException;

    /**
     * 等待操作完成,不会被中断。
     * @return 当前的ChannelFuture实例
     */
    ChannelFuture awaitUninterruptibly();

    /**
     * 检查此Future是否是Void类型。
     * @return 如果是Void类型则返回true,否则返回false
     */
    boolean isVoid();
}

ChannelFuture就是定义了一系列的监听Channel行为的监听器

doBind

接着顺着doBind往下看

在这里如果出现各种异常行为,会给promise设置异常之后退出,否则则调用doBind0

private static void doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) {
    // 在 channel 的事件循环中执行绑定操作
    channel.eventLoop().execute(new Runnable() {
        public void run() {
            // 检查注册操作是否成功
            if (regFuture.isSuccess()) {
                // 如果注册成功,绑定到本地地址,并在失败时关闭 channel
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                // 如果注册失败,设置 promise 的失败原因
                promise.setFailure(regFuture.cause());
            }
        }
    });
}

通过EventLoop的类关系图可以知道,EventLoop实际上就是一个封装的Executor,所以实际上是使用的Executor来跑一个线程来进行绑定
至此,ServerBootstrap的初始化我们就已经完整的探究了一遍