Flume使用详解(三)

Zarah ·
更新时间:2024-11-13
· 616 次阅读

在Flume使用中配置相当重要,但也非常简单。
在conf目录下,创建一个配置文件,比如:template.conf(名字可以不固定,后缀也可以不固定)
相关配置:

#配置Agent a1 的组件
a1.sources=r1
a1.channels=c1 (可以配置多个,以空格隔开,名字自己定)
a1.sinks=s1 (可以配置多个,以空格隔开,名字自己定)
#描述/配置a1的r1
a1.sources.r1.type=netcat (netcat表示通过指定端口来访问)
a1.sources.r1.bind=0.0.0.0 (表示本机)
a1.sources.r1.port=44444 (指定的端口,此端口不固定,但是不要起冲突)
#描述a1的s1
a1.sinks.s1.type=logger (表示数据汇聚点的类型是logger日志)
#描述a1的c1
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
#位channel 绑定 source和sink
a1.sources.r1.channels=c1 (一个source是可以对应多个通道的)
a1.sinks.s1.channel=c1 (一个sink,只能对应一个通道)

进入flume的bin目录执行:

./flume-ng agent -n a1 -c …/conf -f …/conf/template.conf -Dflume.root.logger=INFO,console

在这里插入图片描述

Flume的Source Avro :
Avro类型的Source 监听Avro 端口来接收外部avro客户端的事件流。和netcat不同的是,avro-source接收到的是经过avro序列化后的数据,然后反序列化数据继续传输。所以,如果是avro-source的话,源数据必须是经过avro序列化后的数据。而netcat接收的是字符串格式。
利用Avro source可以实现多级流动、扇出流、扇入流等效果。 另外,也可以接收通过flume提供的avro客户端发送的日志信息。
在这里插入图片描述 Exec
可以将命令产生的输出作为源
在这里插入图片描述 Spooling Directory
将指定的文件加入到“自动搜集”目录中。flume会持续监听这个目录,把文件当做source来处理。注意:一旦文件被放到“自动收集”目录中后,便不能修改,如果修改,flume会报错。此外,也不能有重名的文件,如果有,flume也会报错。
在这里插入图片描述

创建相关的文件夹 ,根据指定的配置文件,启动flume,向指定的文件目录下传送一个日志文件,发现flume的控制台打印相关的信息,此外,会发现被处理的文件,会追加一个后缀:completed,表示已处理完。 (重名文件包括已加后缀的文件)

NetCat Source
一个NetCat Source用来监听一个指定端口,并接收监听到的数据。
在这里插入图片描述 Sequence Generator Source
一个简单的序列发生器,不断的产生事件,值是从0开始每次递增1。主要用来测试。
在这里插入图片描述 HTTP source
此Source接受HTTP的GET和POST请求作为Flume的事件。其中GET方式应该只用于试验。
如果想让flume正确解析Http协议信息,比如解析出请求头、请求体等信息,需要提供一个可插拔的"处理器"来将 请求转换为事件对象,这个处理器必须实现HTTPSourceHandler接口。
这个处理器接受一个 HttpServletRequest对象,并返回一个Flume Envent对象集合。
flume提供了一些常用的Handler(处理器): JSONHandler 可以处理JSON格式的数据,并支持UTF-8 UTF-16 UTF-32字符集 该handler接受Evnet数组,并根据请求头中指定的编码将其转换为Flume Event 如果没有指定编码,默认编码为UTF-8.
JSON格式如下:
[{
“headers” : {
“timestamp” : “434324343”,
“host” : “random_host.example.com”
},
“body” : “random_body”
},
{
“headers” : {
“namenode” : “namenode.example.com”,
“datanode” : “random_datanode.example.com” },
“body” : “really_random_body”
}]
BlobHandler BlobHandler是一种将请求中上传文件信息转化为event的处理器。 参数说明,加!为必须属性:
!handler – The FQCN of this class: org.apache.flume.sink.solr.morphline.BlobHandler handler.maxBlobLength 100000000 The maximum number of bytes to read and buffer for a given request
在这里插入图片描述
执行curl 命令,模拟一次http的Post请求:
curl -X POST -d ‘[{“headers”:{“a”:“a1”},“body”:“hello 123”}]’ http://0.0.0.0:8888 flume的Sink Logger Sink
记录指定级别(比如INFO,DEBUG,ERROR等)的日志,通常用于调试
在这里插入图片描述 File Roll Sink
在本地系统中存储事件。 每隔指定时长生成文件保存这段时间内收集到的日志信息。
在这里插入图片描述

创建指定的文件目录/home/work/rolldata
启动测试

…/bin/flume-ng agent -c ./ -f ./template.conf -n a1

Avro Sink
是实现多级流动、扇出流(1到多) 扇入流(多到1) 的基础。

案例一

让01机的flume通过netcat source源接收数据,然后通过avro sink 发给02机—>02机的flume利用 avro source源收数据,然后通过avro sink 传给03机—>03机通过avro source源收数据,通过 logger sink 输出到控制台上
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
案例二
01机发出的数据,让02,03来接收
在这里插入图片描述
在这里插入图片描述
案例三
02,03机收到的数据都发往01
在这里插入图片描述

在这里插入图片描述

HDFS Sink
此Sink将事件写入到Hadoop分布式文件系统HDFS中。 目前它支持创建文本文件和序列化文件。 对这两种格式都支持压缩。 这些文件可以分卷,按照指定的时间或数据量或事件的数量为基础。 它还通过类似时间戳或机器属性对数据进行 buckets/partitions 操作
在这里插入图片描述
运行需要导入相关包,flume缺少相关hadoop的依赖jar包,找到以下的jar包,放到flume的lib目录下即可。 commons-configuration-1.6.jar
hadoop-auth-2.5.2.jar
hadoop-common-2.5.2.jar
hadoop-hdfs-2.5.2.jar
hadoop-mapreduce-client-core-2.5.2.jar

但是一个一个找特别麻烦,所以解决办法是将hadoop的jar包都拷贝到flume的lib目录下: 进入到hadoop安装目录的share目录下的hadoop目录 执行:scp common/* common/lib/* hdfs/* hdfs/lib/* mapreduce/* mapreduce/lib/* tools/lib/* 192.168.234.163:/home/software/flume/lib/

Flume的channel

memory channel
事件将被存储在内存中(指定大小的队列里) 非常适合那些需要高吞吐量且允许数据丢失的场景下.

JDBC Channel
事件会被持久化(存储)到可靠的数据库里,目前支持嵌入式Derby数据库。即source—> channel—>sink。在传输的过程中,会先把事件存到关系型数据库里。但是Derby数据库不太好用,所以JDBC Channel目前仅用于测试,不能用于生产环境。

FileChannel
好处:数据不丢失
坏处:极大的降低flume的吞吐量,因为要频繁的发生磁盘I/O 性能比较低,但是即使程序出错数据不会丢失 性能会比较低下,但是即使程序出错数据不会丢失.
在这里插入图片描述


作者:大哥惯过谁



flume

需要 登录 后方可回复, 如果你还没有账号请 注册新账号