public final class EchoServer {
static final boolean SSL = System.getProperty("ssl") != null;
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception {
// Configure SSL.
final SslContext sslCtx;
if (SSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
} else {
sslCtx = null;
}
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
// Start the server.
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
结构模式
启动步骤
.创建bossGroup
EventLoopGroup bossGroup=new NioEventLoopGroup();//相当于mainReactor
创建workerGroup:
EventLoopGroup workerGroup=new NioEventGroup();//相当于subReactor
创建启动类
ServerBootstrap sb=new ServerBootstrap()
给启动类设置属性和业务处理Handler:
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
绑定端口:
ChannelFuture f = b.bind(PORT).sync();
源码流程解析
启动bossGroup,每个EventLoopGroup中都有一组NioEventLoop,每个NioEventLoop都会启动一个Selector
1.创建
EventLoopGroup bossGroup = new NioEventLoopGroup();
2.进入NioEventLoopGroup中
public NioEventLoopGroup() {
this(0);
}
//将会根据提供的线程数来创建线程数
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
.....
3. //最后调用了父类MultithreadEventLoopGroup,如果初始化的线程数为0,则初始化值为2*cpu核心数
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
//创建EventNioLoop
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads 0)", nThreads));
}
//如果 executor 是null,创建一个默认的 ThreadPerTaskExecutor,使用 Netty 默认的线程工厂。
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
//根据传入的线程数(CPU*2)创建一个线程池(单例线程池)数组。
children = new EventExecutor[nThreads];
//循环填充数组中的元素。如果异常,则关闭所有的单例线程池。
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
//调用子类NioEventLoopGroup实现,创建NioEventLoop
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
//根据线程选择工厂创建一个 线程选择器,默认是对2取余(位运算),也可以顺序获取。
chooser = chooserFactory.newChooser(children);
final FutureListener terminationListener = new FutureListener() {
@Override
public void operationComplete(Future future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
//为每一个单例线程池添加一个关闭监听器。
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
//将所有的单例线程池添加到一个 HashSet 中
Set childrenSet = new LinkedHashSet(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
4.进入NioEventLoop
//创建NioEventLoop,初始化一系列参数
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory) {
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
//初始化创建Selector
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
2.绑定监听端口
创建服务启动辅助类ServerBootstrap,调用该类的Bind方法,启动服务
//创建服务启动类
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
// Start the server.
ChannelFuture f = b.bind(PORT).sync();
调用ServerBootstrap的bind()方法进行绑定端口
//1.调用父类AbstractBootStrap#bind
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
....
public ChannelFuture bind(SocketAddress localAddress) {
//2.校验EventLoopGroup parentGroup是否为空,parentGroup通过ServerBootstrap#group 设置
//3.判断channelFactory 是否为空,channelFactory为调用channel方法是创建的ReflectiveChannelFactory实例
validate();
return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
}
进入到doBind操作,执行正在得绑定端口,doBind方法主要有以下操作:1.执行初始化和注册操作. 2.执行绑定操作
private ChannelFuture doBind(final SocketAddress localAddress) {
//1.执行初始化和注册操作
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
//
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
通道初始化和注册操作:
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
//调用工厂方法创建通道,channelFactory的创建是通过ServerBootStrap#Channel(NioServerSocketChannel.class)时候创建,channelFactory的类为ReflectiveChannelFactory,通过ReflectiveChannelFactory的构造方法,反射创建Channel(NioServerSocketChannel.class)传递过来的NioServerSocketChannel 通道类
channel = channelFactory.newChannel();
//初始化通道
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
//开始注册服务到selector
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
@Override
void init(Channel channel) {
//1.设置channelOption
setChannelOptions(channel, options0().entrySet().toArray(newOptionArray(0)), logger);
//2.设置attr
setAttributes(channel, attrs0().entrySet().toArray(newAttrArray(0)));
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;//workerBoss
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption, Object>[] currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
final Entry<AttributeKey, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
//3.往channel的pipeline中添加了一个自定义的ChannelInitializer,该ChannelInitializer的initChannel方法会在channel注册完成之后调用,而initChannel做了一件比较重要的事情就是往channel(NioServerSocketChannel)的pipeline里面添加了一个ServerBootstrapAcceptor,在ServerBootstrapAcceptor重写了channelRead方法(对于Server端来说,该方法在有新连接建立的时候调用),而这里面设置了新连接(对于ServerBootStrap可以说是childChannel)option、attr,同时调用了childGroup.register方法,在workerGroup中选出的eventLoop线程中进行注册流程
p.addLast(new ChannelInitializer() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
//ServerBootstrapAccepter handle 负责接受客户端连接创建连接后,对连接做初始化工作
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
初始化channel 的属性后,开始注册selector,回到InitAndRegister方法,调用config().group().register(channel);也就是调用MultithreadEventLoopGroup#register(Channel channel)方法
@Override
public ChannelFuture register(Channel channel) {
//1.在这里next()会选择NioEventLoop;通过DefaultEventExecutorChooserFactory#newChooser() 创建eventLoop选择器PowerOfTwoEventExecutorChooser和GenericEventExecutorChooser。
//2.选择了一个NioEventLoop,每个NioEventLoop都会启动一个Selector,然后开始注册ServerSocketChannel
return next().register(channel);
}
调用NioEventLoop#register()注册,进入入父类SingleThreadEventLoop方法
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
//调用channel的内部类Unsafe去注册,这里的channel就是上文的NioServerSocketChannel
promise.channel().unsafe().register(this, promise);
return promise;
}
进入NioServerSocketChannel。进入unsafe方法,调用流程为AbstractChannel构造方法调用调用newUnsafe(),最后实现为子类AbstractNioMessageChannel#newUnsafe()方法创建NioMessageUnsafe()实例,最后调用其父类AbstractUnsafe#register,接着流程走到调用AbstractNioChannel#doRegister()方法
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
//ops 为0 注意这里没有注册channel,而是拿到selectionKey,注册通道有两种方式,一种是调用Channel的register方法,第二种是设置SelectionKey的interestOps的值。Netty是用了第二种方式,通过设置SelectionKey的interestOps来注册Channel关心的事件,把实际的注册延迟了
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
至此通道初始化和延迟注册就完成了,接下来就是绑定端口了:继续执行AbstractBootstrap#doBind0()开始进行绑定数据
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
//开始异步执行注册绑定监听
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
调用channel的bind方法进行绑定,最后进入AbstractChannel类的bind()方法
@Override
public ChannelFuture bind(SocketAddress localAddress) {
//调用pipLine的bind方法进行绑定地址。该pipeLine为DefaultChannelPipeline
return pipeline.bind(localAddress);
}
最终会调用AbstractChannel#bind()方法
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
// See: https://github.com/netty/netty/issues/576
if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
!PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
// Warn a user about the fact that a non-root user can't receive a
// broadcast packet on *nix if the socket is bound on non-wildcard address.
logger.warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" + localAddress + ") anyway as requested.");
}
boolean wasActive = isActive();
try {
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
//判断是否已经绑定,绑定后,调用pipLine#fireChannelActive()方法
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
调用子类NioServerSocketChannel#doBind()方法进行绑定端口,但是还没有注册监听事件,接下来往下走
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
判断是否已经绑定,绑定后调用pipLine#fireChannelActive()
@Override
public final ChannelPipeline fireChannelActive() {
//执行Channelhandler的channelActive方法,大家都知道ChannelPipeline 里面有调用链handler,分为链头和链尾,这里传入链头处理,该head为HeadContext的实例
AbstractChannelHandlerContext.invokeChannelActive(head);
return this;
}
进入AbstractChannelHandlerContext,执行ChannelHandler的channelActive方法,上面说过head为HeadContext实例,最后执行调用HeadContext#channelActive()方法
@Override
public void channelActive(ChannelHandlerContext ctx) {
//回调fireChannelActive,最终会执行TailContext#channelActive 方法
ctx.fireChannelActive();
//在这里注册读事件,创建连接/读事件
readIfIsAutoRead();
}
进入readIfIsAutoRead()方法
private void readIfIsAutoRead() {
//netty 设置了自动读
if (channel.config().isAutoRead()) {
//执行channel的read方法
channel.read();
}
}
进入NioServerSocketChannel 一直追溯到AbstractChannel#read
@Override
public Channel read() {
pipeline.read();
return this;
}
又回到了调用pipeline类,进行执行下去
@Override
public final ChannelPipeline read() {
tail.read();
return this;
}
调用了tail.read方法,tail为TailContext类的实例
@Override
public ChannelHandlerContext read() {
final AbstractChannelHandlerContext next = findContextOutbound(MASK_READ);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeRead();
} else {
Tasks tasks = next.invokeTasks;
if (tasks == null) {
next.invokeTasks = tasks = new Tasks(next);
}
executor.execute(tasks.invokeReadTask);
}
return this;
}
最后执行到 HeadContext#read(ChannelHandlerContext ctx)
@Override
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}
终于走到unsafe.beginRead()方法了,unsafe为NioMessageUnsafe实例,进入最后实际调用了他的父类AbstractNioChannel#doBeginRead()方法
@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
//上文中开始注册时候使用的ops为0,这边判断是否注册了SelectionKey.OP_ACCEPT和OP_READ事件,如果没有注册,就开始注册
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
设置interstOps在
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
至此整个流程注册完毕