netty-启动流程

Githa ·
更新时间:2024-11-14
· 550 次阅读

netty启动流程 启动实例: public final class EchoServer { static final boolean SSL = System.getProperty("ssl") != null; static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); public static void main(String[] args) throws Exception { // Configure SSL. final SslContext sslCtx; if (SSL) { SelfSignedCertificate ssc = new SelfSignedCertificate(); sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build(); } else { sslCtx = null; } // Configure the server. EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); final EchoServerHandler serverHandler = new EchoServerHandler(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc())); } //p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(serverHandler); } }); // Start the server. ChannelFuture f = b.bind(PORT).sync(); // Wait until the server socket is closed. f.channel().closeFuture().sync(); } finally { // Shut down all event loops to terminate all threads. bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } } 结构模式

在这里插入图片描述

启动步骤 .创建bossGroup EventLoopGroup bossGroup=new NioEventLoopGroup();//相当于mainReactor 创建workerGroup: EventLoopGroup workerGroup=new NioEventGroup();//相当于subReactor 创建启动类 ServerBootstrap sb=new ServerBootstrap() 给启动类设置属性和业务处理Handler: b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc())); } //p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(serverHandler); } }); 绑定端口: ChannelFuture f = b.bind(PORT).sync(); 源码流程解析 启动bossGroup,每个EventLoopGroup中都有一组NioEventLoop,每个NioEventLoop都会启动一个Selector 1.创建 EventLoopGroup bossGroup = new NioEventLoopGroup(); 2.进入NioEventLoopGroup中 public NioEventLoopGroup() { this(0); } //将会根据提供的线程数来创建线程数 public NioEventLoopGroup(int nThreads) { this(nThreads, (Executor) null); } ..... 3. //最后调用了父类MultithreadEventLoopGroup,如果初始化的线程数为0,则初始化值为2*cpu核心数 protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) { super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); } //创建EventNioLoop protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { if (nThreads 0)", nThreads)); } //如果 executor 是null,创建一个默认的 ThreadPerTaskExecutor,使用 Netty 默认的线程工厂。 if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } //根据传入的线程数(CPU*2)创建一个线程池(单例线程池)数组。 children = new EventExecutor[nThreads]; //循环填充数组中的元素。如果异常,则关闭所有的单例线程池。 for (int i = 0; i < nThreads; i ++) { boolean success = false; try { //调用子类NioEventLoopGroup实现,创建NioEventLoop children[i] = newChild(executor, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { if (!success) { for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); } for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; } } } } } //根据线程选择工厂创建一个 线程选择器,默认是对2取余(位运算),也可以顺序获取。 chooser = chooserFactory.newChooser(children); final FutureListener terminationListener = new FutureListener() { @Override public void operationComplete(Future future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } }; //为每一个单例线程池添加一个关闭监听器。 for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); } //将所有的单例线程池添加到一个 HashSet 中 Set childrenSet = new LinkedHashSet(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet); } 4.进入NioEventLoop //创建NioEventLoop,初始化一系列参数 NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory queueFactory) { super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory), rejectedExecutionHandler); if (selectorProvider == null) { throw new NullPointerException("selectorProvider"); } if (strategy == null) { throw new NullPointerException("selectStrategy"); } provider = selectorProvider; //初始化创建Selector final SelectorTuple selectorTuple = openSelector(); selector = selectorTuple.selector; unwrappedSelector = selectorTuple.unwrappedSelector; selectStrategy = strategy; }

2.绑定监听端口
创建服务启动辅助类ServerBootstrap,调用该类的Bind方法,启动服务

//创建服务启动类 ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc())); } //p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(serverHandler); } }); // Start the server. ChannelFuture f = b.bind(PORT).sync(); 调用ServerBootstrap的bind()方法进行绑定端口 //1.调用父类AbstractBootStrap#bind public ChannelFuture bind(int inetPort) { return bind(new InetSocketAddress(inetPort)); } .... public ChannelFuture bind(SocketAddress localAddress) { //2.校验EventLoopGroup parentGroup是否为空,parentGroup通过ServerBootstrap#group 设置 //3.判断channelFactory 是否为空,channelFactory为调用channel方法是创建的ReflectiveChannelFactory实例 validate(); return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress")); } 进入到doBind操作,执行正在得绑定端口,doBind方法主要有以下操作:1.执行初始化和注册操作. 2.执行绑定操作 private ChannelFuture doBind(final SocketAddress localAddress) { //1.执行初始化和注册操作 final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } if (regFuture.isDone()) { // At this point we know that the registration was complete and successful. ChannelPromise promise = channel.newPromise(); // doBind0(regFuture, channel, localAddress, promise); return promise; } else { // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered(); doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } } 通道初始化和注册操作: final ChannelFuture initAndRegister() { Channel channel = null; try { //调用工厂方法创建通道,channelFactory的创建是通过ServerBootStrap#Channel(NioServerSocketChannel.class)时候创建,channelFactory的类为ReflectiveChannelFactory,通过ReflectiveChannelFactory的构造方法,反射创建Channel(NioServerSocketChannel.class)传递过来的NioServerSocketChannel 通道类 channel = channelFactory.newChannel(); //初始化通道 init(channel); } catch (Throwable t) { if (channel != null) { // channel can be null if newChannel crashed (eg SocketException("too many open files")) channel.unsafe().closeForcibly(); // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); } //开始注册服务到selector ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; } @Override void init(Channel channel) { //1.设置channelOption setChannelOptions(channel, options0().entrySet().toArray(newOptionArray(0)), logger); //2.设置attr setAttributes(channel, attrs0().entrySet().toArray(newAttrArray(0))); ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup;//workerBoss final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption, Object>[] currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0)); final Entry<AttributeKey, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0)); //3.往channel的pipeline中添加了一个自定义的ChannelInitializer,该ChannelInitializer的initChannel方法会在channel注册完成之后调用,而initChannel做了一件比较重要的事情就是往channel(NioServerSocketChannel)的pipeline里面添加了一个ServerBootstrapAcceptor,在ServerBootstrapAcceptor重写了channelRead方法(对于Server端来说,该方法在有新连接建立的时候调用),而这里面设置了新连接(对于ServerBootStrap可以说是childChannel)option、attr,同时调用了childGroup.register方法,在workerGroup中选出的eventLoop线程中进行注册流程 p.addLast(new ChannelInitializer() { @Override public void initChannel(final Channel ch) { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } //ServerBootstrapAccepter handle 负责接受客户端连接创建连接后,对连接做初始化工作 ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); } 初始化channel 的属性后,开始注册selector,回到InitAndRegister方法,调用config().group().register(channel);也就是调用MultithreadEventLoopGroup#register(Channel channel)方法 @Override public ChannelFuture register(Channel channel) { //1.在这里next()会选择NioEventLoop;通过DefaultEventExecutorChooserFactory#newChooser() 创建eventLoop选择器PowerOfTwoEventExecutorChooser和GenericEventExecutorChooser。 //2.选择了一个NioEventLoop,每个NioEventLoop都会启动一个Selector,然后开始注册ServerSocketChannel return next().register(channel); } 调用NioEventLoop#register()注册,进入入父类SingleThreadEventLoop方法 @Override public ChannelFuture register(final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); //调用channel的内部类Unsafe去注册,这里的channel就是上文的NioServerSocketChannel promise.channel().unsafe().register(this, promise); return promise; } 进入NioServerSocketChannel。进入unsafe方法,调用流程为AbstractChannel构造方法调用调用newUnsafe(),最后实现为子类AbstractNioMessageChannel#newUnsafe()方法创建NioMessageUnsafe()实例,最后调用其父类AbstractUnsafe#register,接着流程走到调用AbstractNioChannel#doRegister()方法 @Override protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { //ops 为0 注意这里没有注册channel,而是拿到selectionKey,注册通道有两种方式,一种是调用Channel的register方法,第二种是设置SelectionKey的interestOps的值。Netty是用了第二种方式,通过设置SelectionKey的interestOps来注册Channel关心的事件,把实际的注册延迟了 selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. eventLoop().selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } } } 至此通道初始化和延迟注册就完成了,接下来就是绑定端口了:继续执行AbstractBootstrap#doBind0()开始进行绑定数据 private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. //开始异步执行注册绑定监听 channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); } 调用channel的bind方法进行绑定,最后进入AbstractChannel类的bind()方法 @Override public ChannelFuture bind(SocketAddress localAddress) { //调用pipLine的bind方法进行绑定地址。该pipeLine为DefaultChannelPipeline return pipeline.bind(localAddress); } 最终会调用AbstractChannel#bind()方法 @Override public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { assertEventLoop(); if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } // See: https://github.com/netty/netty/issues/576 if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) && localAddress instanceof InetSocketAddress && !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() && !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) { // Warn a user about the fact that a non-root user can't receive a // broadcast packet on *nix if the socket is bound on non-wildcard address. logger.warn( "A non-root user can't receive a broadcast packet if the socket " + "is not bound to a wildcard address; binding to a non-wildcard " + "address (" + localAddress + ") anyway as requested."); } boolean wasActive = isActive(); try { doBind(localAddress); } catch (Throwable t) { safeSetFailure(promise, t); closeIfClosed(); return; } //判断是否已经绑定,绑定后,调用pipLine#fireChannelActive()方法 if (!wasActive && isActive()) { invokeLater(new Runnable() { @Override public void run() { pipeline.fireChannelActive(); } }); } safeSetSuccess(promise); } 调用子类NioServerSocketChannel#doBind()方法进行绑定端口,但是还没有注册监听事件,接下来往下走 protected void doBind(SocketAddress localAddress) throws Exception { if (PlatformDependent.javaVersion() >= 7) { javaChannel().bind(localAddress, config.getBacklog()); } else { javaChannel().socket().bind(localAddress, config.getBacklog()); } } 判断是否已经绑定,绑定后调用pipLine#fireChannelActive() @Override public final ChannelPipeline fireChannelActive() { //执行Channelhandler的channelActive方法,大家都知道ChannelPipeline 里面有调用链handler,分为链头和链尾,这里传入链头处理,该head为HeadContext的实例 AbstractChannelHandlerContext.invokeChannelActive(head); return this; } 进入AbstractChannelHandlerContext,执行ChannelHandler的channelActive方法,上面说过head为HeadContext实例,最后执行调用HeadContext#channelActive()方法 @Override public void channelActive(ChannelHandlerContext ctx) { //回调fireChannelActive,最终会执行TailContext#channelActive 方法 ctx.fireChannelActive(); //在这里注册读事件,创建连接/读事件 readIfIsAutoRead(); } 进入readIfIsAutoRead()方法 private void readIfIsAutoRead() { //netty 设置了自动读 if (channel.config().isAutoRead()) { //执行channel的read方法 channel.read(); } } 进入NioServerSocketChannel 一直追溯到AbstractChannel#read @Override public Channel read() { pipeline.read(); return this; } 又回到了调用pipeline类,进行执行下去 @Override public final ChannelPipeline read() { tail.read(); return this; } 调用了tail.read方法,tail为TailContext类的实例 @Override public ChannelHandlerContext read() { final AbstractChannelHandlerContext next = findContextOutbound(MASK_READ); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeRead(); } else { Tasks tasks = next.invokeTasks; if (tasks == null) { next.invokeTasks = tasks = new Tasks(next); } executor.execute(tasks.invokeReadTask); } return this; } 最后执行到 HeadContext#read(ChannelHandlerContext ctx) @Override public void read(ChannelHandlerContext ctx) { unsafe.beginRead(); } 终于走到unsafe.beginRead()方法了,unsafe为NioMessageUnsafe实例,进入最后实际调用了他的父类AbstractNioChannel#doBeginRead()方法 @Override protected void doBeginRead() throws Exception { // Channel.read() or ChannelHandlerContext.read() was called final SelectionKey selectionKey = this.selectionKey; if (!selectionKey.isValid()) { return; } readPending = true; //上文中开始注册时候使用的ops为0,这边判断是否注册了SelectionKey.OP_ACCEPT和OP_READ事件,如果没有注册,就开始注册 final int interestOps = selectionKey.interestOps(); if ((interestOps & readInterestOp) == 0) { selectionKey.interestOps(interestOps | readInterestOp); } } 设置interstOps在 public NioServerSocketChannel(ServerSocketChannel channel) { super(null, channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this, javaChannel().socket()); }

至此整个流程注册完毕


作者:鲸落1024



netty 启动

需要 登录 后方可回复, 如果你还没有账号请 注册新账号
相关文章