Flume使用详解(四)

Bliss ·
更新时间:2024-09-21
· 518 次阅读

flume的Interceptors

概述:
Flume有能力在运行阶段修改/删除Event,这是通过拦截器(Interceptors)来实现的。
拦截器需要实现org.apache.flume.interceptor.Interceptor接口。
拦截器可以修改或删除事件基于开发者在选择器中选择的任何条件。
拦截器采用了责任链模式,多个拦截器可以按指定顺序拦截。
一个拦截器返回的事件列表被传递给链中的下一个拦截器。
如果一个拦截器需要删除事件,它只需要在返回的事件集中不包含要删除的事件即可。
如果要删除所有事件,只需返回一个空列表。

Timestamp Interceptor
这个拦截器在事件头中插入以毫秒为单位的当前处理时间。
头的名字为timestamp,值为当前处理的时间戳。
如果在之前已经有这个时间戳,则保留原有的时间戳。
配置示例:
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=timestamp

Host Interceptor
这个拦截器插入当前处理Agent的主机名或ip
头的名字为host或配置的名称
值是主机名或ip地址,基于配置。
配置示例:
a1.sources.r1.interceptors=i2
a1.sources.r1.interceptors.i2.type=host

Static Interceptor
此拦截器允许用户增加静态头信息使用静态的值到所有事件。
目前的实现中不允许一次指定多个头。
如果需要增加多个静态头可以指定多个Static interceptors
在这里插入图片描述

UUID Interceptor
这个拦截器在所有事件头中增加一个全局一致性标志。
其实就是UUID。
在这里插入图片描述

Search and Replace Interceptor
这个拦截器提供了简单的基于字符串的正则搜索和替换功能。
在这里插入图片描述

Regex Filtering Interceptor
此拦截器通过解析事件体去匹配给定正则表达式来筛选事件。
所提供的正则表达式即可以用来包含或刨除事件。
在这里插入图片描述

Regex Extractor Interceptor
使用指定正则表达式匹配事件,并将匹配到的组作为头加入到事件中。
它也支持插件化的序列化器用来格式化匹配到的组在加入他们作为头之前。

Flume的Selector

Selector的复制模式
选择器可以工作在复制、多路复用(路由) 模式下 Selector 默认是复制模式(replicating),即把source复制,然后分发给多个sink
a1.sources = r1
a1.channels = c1 c2 c3
a1.source.r1.selector.type = replicating(这个是默认的)
a1.source.r1.channels = c1 c2 c3
a1.source.r1.selector.optional = c3

多路复用模式
在这种模式下,用户可以指定转发的规则。selector根据规则进行数据的分发
selector.type multiplexing 表示路由模式
selector.header 指定要监测的头的名称
selector.mapping.* 匹配规则
selector.default 如果未满足匹配规则,则默认发往指定的通道
案例:
检测的头信息是state关键字,如果state=CN,进c1通道。如果state=US,进c2通道。如果不满足以上的情况,进c2通道.
配置示例:
a1.sources=r1
a1.sinks=s1 s2
a1.channels=c1 c2
#描述/配置a1的source1
a1.sources.r1.type=http
a1.sources.r1.port=8888
a1.sources.r1.selector.type=multiplexing
a1.sources.r1.selector.header=state
a1.sources.r1.selector.mapping.cn=c1
a1.sources.r1.selector.mapping.us=c2
a1.sources.r1.selector.default=c2
#描述sink
a1.sinks.s1.type=avro
a1.sinks.s1.hostname=192.168.234.212
a1.sinks.s1.port=9999
a1.sinks.s2.type=avro
a1.sinks.s2.hostname=192.168.234.213
a1.sinks.s2.port=9999
#描述内存channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
a1.channels.c2.type=memory
a1.channels.c2.capacity=1000
a1.channels.c2.transactionCapacity=100
#位channel 绑定 source和sink
a1.sources.r1.channels=c1 c2
a1.sinks.s1.channel=c1
a1.sinks.s2.channel=c2


作者:大哥惯过谁



flume

需要 登录 后方可回复, 如果你还没有账号请 注册新账号