上一篇说完了NioEventLoop完成的三件事,1.轮询感兴趣事件 2.处理IO事件 3.处理任务队列
流程走到了启动好reactor线程后,ServerSocketChannel注册到selector上,但是感兴趣事件填的0,我们继续跟流程,走到这里,initAndRegister方法完成,继续跟bind方法,我们看bind方法中的doBind0方法
/**
*
* @param regFuture
* @param channel nioserversocketchannel
* @param localAddress 本地地址
* @param promise 标明通道是否被注册
*/
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// 在触发channelregister()之前调用此方法。给用户处理程序设置的机会
// 管道在其channelRegistered()实现中。
//这里也是封装成任务,这些任务最终被事件轮询线程同步调用
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
System.out.println("bind0");
if (regFuture.isSuccess()) {
//bind方法,添加关闭的监听事件
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
这里也是把bind方法封装成任务,添加到任务队列中,再让executor处理任务
查看bind方法,最终会通过pipeline走到headContext,我们查看headContext的invokeBind方法
@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
unsafe.bind(localAddress, promise);
}
这里有调用到了unsafe的doBind方法,
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
//如果没有开启reactor线程,直接抛异常
assertEventLoop();
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
// See: https://github.com/netty/netty/issues/576
if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
!PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
// Warn a user about the fact that a non-root user can't receive a
// broadcast packet on *nix if the socket is bound on non-wildcard address.
logger.warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" + localAddress + ") anyway as requested.");
}
boolean wasActive = isActive();
try {
//处理bind方法
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
/*
isActive判断通道是否就绪
*/
if (!wasActive && isActive()) {
//线程执行调用channelActive方法
invokeLater(new Runnable() {
@Override
public void run() {
//这里就是用来为注册上的channel添加感兴趣事件
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
里面调用了bind方法,也就是我们nio原生的绑定方法
//调用nio api绑定ip端口
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
//根据jdk版本不同进行绑定方法,到这里绑定完成
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
到这里,绑定方法就完成了,现在距离我们的serverSocketChannel的正常使用还差一步,就是为注册到selector的channel添加感兴趣事件,其实添加感兴趣事件也在unsafe的bind方法中,在完成bind方法后,会将pipeline.fireChannelActive();方法添加到任务队列中,根据handler调用链,会先调用到headContext的channelActive方法
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.fireChannelActive();
readIfIsAutoRead();
}
我们主要研究readIfIsAutoRead方法,这里通过debug,最终会走到headContext的invokeRead方法中
@Override
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}
调用到unsafe的beginRead方法,最终调用到AbstaractNioChannel的doBeginRead方法
@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
//添加感兴趣事件
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
还记得我们刚刚创建NioServerSocketChannel的代码吗,里面调用了
super(null, channel, SelectionKey.OP_ACCEPT);即传入了accept的感兴趣事件,在AbstaractNioChannel中保存,this.readInterestOp = readInterestOp;在这里用到。
到这里我们的NioServerSocketChannel就可以正常工作了
下一篇会讲客户端的接入流程,其实netty中代码复用非常多,理解了NioServerSocketChanel的创建和使用,再去理解NioSocketChannel就会非常简单