添加
DefaultChannelPipeline.addLast(ChannelHandler... handlers)
checkMultiplicity(handler)重复添加验证
isSharable()
newCtx = newContext(group, filterName(name, handler), handler)
filterName(name, handler)
checkDuplicateName(name)
context0(name)
newContext(EventExecutorGroup group, String name, ChannelHandler handler)
new DefaultChannelHandlerContext(this, childExecutor(group), name, handler)
isInbound(handler)
isOutbound(handler)
在跟到其父类AbstractChannelHandlerContext的构造方法中
addLast0(newCtx)
callHandlerAdded0(newCtx)
ChannelHandlerAdapter.(ChannelHandlerContext ctx)
删除
添加ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new MyServerHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
channelFuture.channel().closeFuture().sync();
分析pipeline.addLast(new MyServerHandler())
中的addLast
。
首先通过channel
拿到当前的pipline
, 拿到pipeline
之后再为其添加handler
, 因为channel
初始化默认创建的是DefualtChannelPipeline
public final ChannelPipeline addLast(ChannelHandler... handlers) {
return addLast(null, handlers);
}
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
if (handlers == null) {
throw new NullPointerException("handlers");
}
for (ChannelHandler h: handlers) {
if (h == null) {
break;
}
addLast(executor, null, h);
}
return this;
}
这里的handlers
只有一个
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
//判断handler是否被重复添加(1)
checkMultiplicity(handler);
//创建一个HandlerContext并添加到列表(2)
newCtx = newContext(group, filterName(name, handler), handler);
//添加HandlerContext(3)
addLast0(newCtx);
//是否已注册
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
//回调用户事件
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
//回调添加事件(4)
callHandlerAdded0(newCtx);
return this;
}
分为四个步骤:
重复添加验证
创建一个HandlerContext
并添加到列表
添加context
回调添加事件
checkMultiplicity(handler)重复添加验证private static void checkMultiplicity(ChannelHandler handler) {
if (handler instanceof ChannelHandlerAdapter) {
ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
if (!h.isSharable() && h.added) {
throw new ChannelPipelineException(
h.getClass().getName() +
" is not a @Sharable handler, so can't be added or removed multiple times.");
}
//满足条件设置为true, 代表已添加
h.added = true;
}
}
首先判断是不是ChannelHandlerAdapter
类型, 因为我们自定义的handler
通常会直接或者间接的继承该接口, 所以这里为true
拿到handler
之后转换成ChannelHandlerAdapter
类型。
然后进行条件判断 if (!h.isSharable() && h.added)
代表如果不是共享的handler
, 并且是未添加状态, 则抛出异常。
public boolean isSharable() {
Class<?> clazz = getClass();
Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();
Boolean sharable = cache.get(clazz);
if (sharable == null) {
//如果这个类注解了Sharable.class, 说明这个类会被多个channel共享
sharable = clazz.isAnnotationPresent(Sharable.class);
cache.put(clazz, sharable);
}
return sharable;
}
首先拿到当前handler
的class
对象。
然后再从netty
自定义的一个ThreadLocalMap
对象中获取一个盛放handler
的class
对象的map
, 并获取其value
。
如果value
值为空, 则会判断是否被Sharable
注解, 并将自身handler
的class
对象和判断结果存入map
对象中, 最后返回判断结果。
这说明了被Sharable
注解的handler
是一个共享handler
。
从这个逻辑我们可以判断, 共享对象是可以重复添加的。
回到DefaultChannelPipeline.addLast
,如果是共享对象或者没有被添加, 则将ChannelHandlerAdapter
的added
设置为true
, 代表已添加分析完了重复添加验证, 回到addLast
方法中, 我们看第二步, 创建一个HandlerContext
并添加到列表
newCtx = newContext(group, filterName(name, handler), handler)
首先看filterName(name, handler)方法, 这个方法是判断添加handler的name是否重复
filterName(name, handler)首先看filterName(name, handler)
方法, 这个方法是判断添加handler
的name
是否重复
private String filterName(String name, ChannelHandler handler) {
if (name == null) {
//没有名字创建默认名字
return generateName(handler);
}
//检查名字是否重复
checkDuplicateName(name);
return name;
}
因为我们添加handler
时候, 不一定会给handler
命名, 所以这一步name
有可能是null
, 如果是null
, 则创建一个默认的名字, 这里创建名字的方法就不分析了
private void checkDuplicateName(String name) {
//不为空
if (context0(name) != null) {
throw new IllegalArgumentException("Duplicate handler name: " + name);
}
}
继续跟进分析context0(name)
方法
private AbstractChannelHandlerContext context0(String name) {
//遍历pipeline
AbstractChannelHandlerContext context = head.next;
while (context != tail) {
//发现name相同, 说明存在handler
if (context.name().equals(name)) {
//返回
return context;
}
context = context.next;
}
return null;
}
这里的逻辑就是将pipeline
中, 从head
节点往下遍历HandlerContext
, 一直遍历到tail
, 如果发现名字相同则会认为重复并返回HandlerContext
对象。
继续跟到newContext(group, filterName(name, handler), handler)
方法中。
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
可以看到创建了一个DefaultChannelHandlerContext
对象, 构造方法的参数中, 第一个this
代表当前的pipeline
对象, group
为null
, 所以childExecutor(group)
也会返回null
, name
为handler
的名字, handler
为新添加的handler
对象
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler;
}
首先调用了父类的构造方法, 之后将handler
赋值为自身handler
的成员变量, HandlerConext
和handler
关系在此也展现了出来, 是一种组合关系
父类的构造方法, 有这么两个参数:isInbound(handler)
, isOutbound(handler)
, 这两个参数意思是判断需要添加的handler
是inboundHandler
还是outBoundHandler
private static boolean isInbound(ChannelHandler handler) {
return handler instanceof ChannelInboundHandler;
}
这里通过是否实现ChannelInboundHandler
接口来判断是否为inboundhandler
private static boolean isOutbound(ChannelHandler handler) {
return handler instanceof ChannelOutboundHandler;
}
通过判断是否实现ChannelOutboundHandler
接口判断是否为outboundhandler
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
boolean inbound, boolean outbound) {
this.name = ObjectUtil.checkNotNull(name, "name");
this.pipeline = pipeline;
this.executor = executor;
this.inbound = inbound;
this.outbound = outbound;
ordered = executor == null || executor instanceof OrderedEventExecutor;
}
之前tail
节点和head
节点创建的时候也执行到了这里,初始化了name
, pipeline
, 以及标识添加的handler
是inboundhanlder
还是outboundhandler
。
回到DefaultChannelPipeline.addLast
,分析完了创建HandlerContext
的相关逻辑, 我们继续跟第三步, 添加HandlerContext
private void addLast0(AbstractChannelHandlerContext newCtx) {
//拿到tail节点的前置节点
AbstractChannelHandlerContext prev = tail.prev;
//当前节点的前置节点赋值为tail节点的前置节点
newCtx.prev = prev;
//当前节点的下一个节点赋值为tail节点
newCtx.next = tail;
//tail前置节点的下一个节点赋值为当前节点
prev.next = newCtx;
//tail节点的前一个节点赋值为当前节点
tail.prev = newCtx;
}
做了一个指针的指向操作, 将新添加的handlerConext
放在tail
节点之前, 之前tail
节点的上一个节点之后, 如果是第一次添加handler
, 那么添加后的结构入下图所示
添加完handler
之后, 这里会判断当前channel
是否已经注册, 这部分逻辑之后再进行分析,先接着继续执行。
之后会判断当前线程线程是否为eventLoop
线程, 如果不是eventLoop
线程, 就将添加回调事件封装成task
交给eventLoop
线程执行, 否则, 直接执行添加回调事件callHandlerAdded0(newCtx)
。
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
try {
ctx.handler().handlerAdded(ctx);
ctx.setAddComplete();
} catch (Throwable t) {
/**
* 省略
* */
}
}
分析ctx.handler().handlerAdded(ctx)
,其中ctx
是我们新创建的HandlerContext
, 通过handler()
方法拿到绑定的handler
, 也就是新添加的handler
, 然后执行handlerAdded(ctx)
方法, 如果我们没有重写这个方法, 则会执行父类的该方法。
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// NOOP
}
没做任何操作, 也就是如果我们没有重写该方法时, 如果添加handler
之后将不会做任何操作, 这里如果我们需要做一些业务逻辑, 可以通过重写该方法进行实现
删除的逻辑和添加的逻辑相同,区别删除是将pipeline
的双向链表的节点去掉。这里就不详细的分析。
以上就是netty中pipelinehandler添加删除分析的详细内容,更多关于netty pipeline handler的资料请关注软件开发网其它相关文章!