Flume部署和使用

Tabitha ·
更新时间:2024-09-21
· 509 次阅读

Flume部署和使用

官方文档: http://flume.apache.org/
example: WebServer --> Agent[Source–>Channel–>Sink] --> HDFS

一.简介 Flume是一个分布式,可靠的的框架,它能从许多不同的数据源高效地收集、聚合和移动大量日志数据到一个集中的目的进行存储。Flume OG主要是0.9及以前的版本,Flume NG主要是1.X的版本。类似的框架还有Logstash。对应广义的Hadoop而言,Flume是比较常用的。Logstash更轻量级,主要配合ELK使用 我们使用Flume,真要需要开发的代码不多,从使用层面来讲就是写配置文件,掌握常见类型Source、Channel、Sink的配置,配置我们的Agent。这部分主要对应的是Flume User Guide: http://flume.apache.org/releases/content/1.9.0/FlumeDeveloperGuide.html。 需要自己写代码的部分,主要是如何基于Flume进行自定义开发,自定义Source、Sink、和Channel。 常用的Source类型如下(从哪里收集) avro (序列化) exec (命令行) spooling (目录) taildir (重要) kafka 常见的Channel类型(数据存在哪里) memory kafka file 常见Sink类型(数据输出到哪里) hdfs (Hadoop) logger (控制台) avro kafka 二.安装 下载 [ruoze@rzdata001 ~]$ cd software/ [ruoze@rzdata001 software]$ wget http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.6.0-cdh5.16.2.tar.gz 解压,创建软连接 [ruoze@rzdata001 software]$ tar -zxvf flume-ng-1.6.0-cdh5.16.2.tar.gz -C /home/app/ [ruoze@rzdata001 software]$ cd ~/app/ [ruoze@rzdata001 app]$ ln -s /home/ruoze/app/apache-flume-1.6.0-cdh5.16.2-bin /home/ruoze/app/flume 配置配置文件 [ruoze@rzdata001 ~]$ cd /home/ruoze/app/flume/conf [ruoze@rzdata001 conf]$ cp flume-env.sh.template flume-env.sh [ruoze@rzdata001 conf]$ cp flume-conf.properties.template flume-conf.properties [ruoze@rzdata001 conf]$ vim flume-env.sh export JAVA_HOME=/usr/java/jdk1.8.0_121 添加环境变量 [ruoze@rzdata001 ~]$ vim .bash_profile # Flume env export FLUME_HOME=/home/ruoze/app/flume export PATH=$FLUME_HOME/bin:$PATH flume版本 [ruoze@rzdata001 bin]$ [ruoze@rzdata001 bin]$ flume-ng version Flume 1.6.0-cdh5.16.2 Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git Revision: df92badde3691ee3eb6074a177f0e96682345381 Compiled by jenkins on Mon Jun 3 03:49:33 PDT 2019 From source with checksum 9336bfa3ff8cfb5e20cd9d700135a2c1 [ruoze@rzdata001 bin]$ 三.任务配置

Flume agent配置是一个配置在本地的配置文件,是一个遵循java配置文件格式的text文件。
在同一个配置文件中可以配置指定一个或多个agents,同时配置上每个source、sink和channel数据流串联的方式。
目标: 掌握根据官网文档进行配置的方法。
各个类型的source、channel和sink的配置说明中,粗体是必须设置的参数。

例1: 1_netcat_memory_logger.conf

写配置文件:

# conf file name: 1_netcat_memory_logger.conf # Source type : netcat source # Channel type: memory channel # Sink type : logger sink # Name the components on this agent <== define agent # a1 <== agent name a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source <== define Source a1.sources.r1.type = netcat a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 44444 # Use a channel which buffers events in memory <== define Channel a1.channels.c1.type = memory # Describe the sink <== define Sink a1.sinks.k1.type = logger # Bind the source and sink to the channel <== connect source,channel and sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

启动agent:

$FLUME_HOME/bin/flume-ng agent \ --name a1 \ --conf $FLUME_HOME/conf \ --conf-file $FLUME_HOME/myconf/1_netcat_memory_logger.conf \ -Dflume.root.logger=INFO,console

jps查看新增的服务:

[ruoze@rzdata001 ~]$ jps 7040 SecondaryNameNode 31123 Jps 20724 NameNode 21144 ResourceManager 7897 RunJar 30939 Application <== 新增agent的信息 20860 DataNode 21260 NodeManager 19215 StandaloneSessionClusterEntrypoint [ruoze@rzdata001 ~]$

查看服务启动的详细信息:

[ruoze@rzdata001 ~]$ ps -ef | grep 30939 | grep -v grep ruoze 30939 31334 0 20:49 pts/0 00:00:01 /usr/java/jdk1.8.0_121/bin/java -Xmx20m -Dflume.root.logger=INFO,console -cp /home/ruoze/app/flume/conf:/home/ruoze/app/flume/lib/*:/home/ruoze/app/hadoop/etc/hadoop:/home/ruoze/app/hadoop-2.6.0-cdh5.16.2/share/hadoop/common/lib/*:/home/ruoze/app/hadoop-2.6.0-cdh5.16.2/share/hadoop/common/*:/home/ruoze/app/hadoop-2.6.0-cdh5.16.2/share/hadoop/hdfs:/home/ruoze/app/hadoop-2.6.0-cdh5.16.2/share/hadoop/hdfs/lib/*:/home/ruoze/app/hadoop-2.6.0-cdh5.16.2/share/hadoop/hdfs/*:/home/ruoze/app/hadoop-2.6.0-cdh5.16.2/share/hadoop/yarn/lib/*:/home/ruoze/app/hadoop-2.6.0-cdh5.16.2/share/hadoop/yarn/*:/home/ruoze/app/hadoop-2.6.0-cdh5.16.2/share/hadoop/mapreduce/lib/*:/home/ruoze/app/hadoop-2.6.0-cdh5.16.2/share/hadoop/mapreduce/*:/home/ruoze/app/hadoop/contrib/capacity-scheduler/*.jar:/home/ruoze/app/hive/lib/* -Djava.library.path=:/home/ruoze/app/hadoop-2.6.0-cdh5.16.2/lib/native org.apache.flume.node.Application --name a1 --conf-file /home/ruoze/app/flume/myconf/1_netcat_memory_logger.conf [ruoze@rzdata001 ~]$

agent启动后,向44444端口发送数据。注意,一定要先启动agent。

[ruoze@rzdata001 ~]$ telnet localhost 44444 Trying ::1... Connected to localhost. Escape character is '^]'. 123456789 OK abcdefg OK 1234567890987654321 OK

总共发送了3条,前两条控制台上完整打印,最后一条控制台上只打印了1234567890987654,原因是logger sink的maxBytesToLog参数为设置,默认截断长度是16字节
思考控制台的输出:
Event: { headers:{} body: 31 32 33 34 35 36 37 38 39 0D 123456789. }
Event:Flume数据传输的基本单元,其构成部分中。
header是可选的,其中的内容已k-v的方式存在;
body是字节数组.

例2: 2_exec_memory_hdfs.conf

写配置文件:

# conf file name: 2_exec_memory_hdfs.conf # Source type : exec source # Channel type: memory channel # Sink type : hdfs sink # Name the components on this agent <== define agent # a1 <== agent name a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source <== define Source a1.sources.r1.type = exec a1.sources.r1.command = tail -F /home/ruoze/data/flume/data.log a1.sources.r1.shell = /bin/sh -c # Use a channel which buffers events in memory <== define Channel a1.channels.c1.type = memory # Describe the sink <== define Sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://rzdata001:9000/ruozedata/flume/tail/events/%y-%m-%d/%H%M a1.sinks.k1.hdfs.filePrefix = events a1.sinks.k1.hdfs.fileSuffix = .txt a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = minute a1.sinks.k1.hdfs.rollInterval = 600 a1.sinks.k1.hdfs.rollSize = 1024000 a1.sinks.k1.hdfs.rollCount = 2000 a1.sinks.k1.hdfs.batchSize = 10 a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.writeFormat = Text a1.sinks.k1.hdfs.useLocalTimeStamp = true # Bind the source and sink to the channel <== connect source,channel and sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

启动agent:

$FLUME_HOME/bin/flume-ng agent \ --name a1 \ --conf $FLUME_HOME/conf \ --conf-file $FLUME_HOME/myconf/2_exec_memory_hdfs.conf \ -Dflume.root.logger=INFO,console

验证数据

[ruoze@rzdata001 ~]$ hadoop fs -ls /ruozedata/flume/tail/events/*/* 20/02/13 22:33:27 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Found 5 items -rw-r--r-- 1 ruoze supergroup 215039 2020-02-13 22:26 /ruozedata/flume/tail/events/20-02-13/2220/events.1581603956479.txt -rw-r--r-- 1 ruoze supergroup 216644 2020-02-13 22:26 /ruozedata/flume/tail/events/20-02-13/2220/events.1581603956480.txt -rw-r--r-- 1 ruoze supergroup 217824 2020-02-13 22:26 /ruozedata/flume/tail/events/20-02-13/2220/events.1581603956481.txt -rw-r--r-- 1 ruoze supergroup 215789 2020-02-13 22:26 /ruozedata/flume/tail/events/20-02-13/2220/events.1581603956482.txt -rw-r--r-- 1 ruoze supergroup 75874 2020-02-13 22:31 /ruozedata/flume/tail/events/20-02-13/2220/events.1581603956483.txt Found 1 items -rw-r--r-- 1 ruoze supergroup 104360 2020-02-13 22:31 /ruozedata/flume/tail/events/20-02-13/2230/events.1581604278540.txt.tmp [ruoze@rzdata001 ~]$

需要注意的是,缓冲区的大小和roll的三个参数需要根据实际业务调整。

例3: 3_spooldir_memory_hdfs.conf

写配置文件:

# conf file name: 3_spooldir_memory_hdfs.conf # Source type : spooling directory source # Channel type: memory channel # Sink type : hdfs sink # Name the components on this agent <== define agent # a1 <== agent name a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source <== define Source a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /home/ruoze/data/flume/spool a1.sources.r1.fileSuffix = .COMPLETED # Use a channel which buffers events in memory <== define Channel a1.channels.c1.type = memory # Describe the sink <== define Sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://rzdata001:9000/ruozedata/flume/spool/%Y%m%d%H%M a1.sinks.k1.hdfs.filePrefix = events a1.sinks.k1.hdfs.fileSuffix = .txt a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = minute a1.sinks.k1.hdfs.rollInterval = 600 a1.sinks.k1.hdfs.rollSize = 1024000 a1.sinks.k1.hdfs.rollCount = 2000 a1.sinks.k1.hdfs.batchSize = 10 a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.writeFormat = Text a1.sinks.k1.hdfs.useLocalTimeStamp = true # Bind the source and sink to the channel <== connect source,channel and sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

启动agent:

$FLUME_HOME/bin/flume-ng agent \ --name a1 \ --conf $FLUME_HOME/conf \ --conf-file $FLUME_HOME/myconf/3_spooldir_memory_hdfs.conf \ -Dflume.root.logger=INFO,console

验证数据

[ruoze@rzdata001 ~]$ hadoop fs -ls /ruozedata/flume/spool/*/* 20/02/14 15:58:54 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable -rw-r--r-- 1 ruoze supergroup 224236 2020-02-14 13:39 /ruozedata/flume/spool/202002141330/events.1581658686779.txt -rw-r--r-- 1 ruoze supergroup 216625 2020-02-14 13:39 /ruozedata/flume/spool/202002141330/events.1581658686780.txt -rw-r--r-- 1 ruoze supergroup 136405 2020-02-14 13:49 /ruozedata/flume/spool/202002141330/events.1581658686781.txt -rw-r--r-- 1 ruoze supergroup 139660 2020-02-14 13:40 /ruozedata/flume/spool/202002141340/events.1581658800114.txt -rw-r--r-- 1 ruoze supergroup 216048 2020-02-14 13:40 /ruozedata/flume/spool/202002141340/events.1581658800115.txt -rw-r--r-- 1 ruoze supergroup 216894 2020-02-14 13:41 /ruozedata/flume/spool/202002141340/events.1581658800132.txt -rw-r--r-- 1 ruoze supergroup 214981 2020-02-14 13:41 /ruozedata/flume/spool/202002141340/events.1581658800133.txt -rw-r--r-- 1 ruoze supergroup 56282 2020-02-14 13:51 /ruozedata/flume/spool/202002141340/events.1581658800134.txt [ruoze@rzdata001 ~]$ s 例4: 4_taildir_memory_hdfs.conf

taildir source 在数据采集时会把OFFSET偏移量记录下来,数据不会丢失。
这个方式不仅支持文件,而且支持目录
写配置文件:

# conf file name: 4_taildir_memory_hdfs.conf # Source type : TAILDIR source # Channel type: memory channel # Sink type : hdfs sink # Name the components on this agent <== define agent # a1 <== agent name a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source <== define Source a1.sources.r1.type = TAILDIR a1.sources.r1.positionFile = /home/ruoze/data/flume/taildir/taildir_position.json a1.sources.r1.filegroups = f1 f2 a1.sources.r1.filegroups.f1 = /home/ruoze/data/flume/taildir/dir1/hive.log a1.sources.r1.headers.f1.headerKey1 = value1 a1.sources.r1.filegroups.f2 = /home/ruoze/data/flume/taildir/dir2/.*log.* a1.sources.r1.headers.f2.headerKey1 = value2 a1.sources.r1.headers.f2.headerKey2 = value2-2 # Use a channel which buffers events in memory <== define Channel a1.channels.c1.type = memory # Describe the sink <== define Sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://rzdata001:9000/ruozedata/flume/taildir/%Y%m%d%H a1.sinks.k1.hdfs.filePrefix = hive a1.sinks.k1.hdfs.fileSuffix = .log a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundUnit = minute a1.sinks.k1.hdfs.roundValue = 1 a1.sinks.k1.hdfs.rollInterval = 10 a1.sinks.k1.hdfs.rollSize = 102400 a1.sinks.k1.hdfs.rollCount = 500 a1.sinks.k1.hdfs.batchSize = 10 a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.writeFormat = Text a1.sinks.k1.hdfs.useLocalTimeStamp = true # Bind the source and sink to the channel <== connect source,channel and sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

启动agent:

$FLUME_HOME/bin/flume-ng agent \ --name a1 \ --conf $FLUME_HOME/conf \ --conf-file $FLUME_HOME/myconf/4_taildir_memory_hdfs.conf \ -Dflume.root.logger=INFO,console

验证taildir的断点还原功能

# 1.准备数据 [ruoze@rzdata001 taildir]$ pwd /home/ruoze/data/flume/taildir [ruoze@rzdata001 taildir]$ [ruoze@rzdata001 taildir]$ ll ./*/* -rw-rw-r-- 1 ruoze ruoze 200000 Feb 14 23:04 ./dir1/hive.log -rw-rw-r-- 1 ruoze ruoze 120000 Feb 14 22:54 ./dir2/hive.log.1 -rw-rw-r-- 1 ruoze ruoze 20000 Feb 14 23:00 ./dir2/hive.txt -rw-rw-r-- 1 ruoze ruoze 70000 Feb 14 22:57 ./dir9/hive.log.2 [ruoze@rzdata001 taildir]$ # 2.开启4个窗口,验证: # Step1: 第1个窗口启动flume进程. # Step2: 第2个窗口操作本地数据. # Step3: 第3个窗口查看HDFS上sink的数据: hadoop fs -ls /ruozedata/flume/taildir/*/* # Step4: 第4个窗口监控taildir_position.json: tail -F /home/ruoze/data/flume/taildir/taildir_position.json # Step5: 查看taildir_position.json 中只有两个文件被读取,hive.txt由于匹配规则,未被采集,核查 pos值和文件大小对应 # Step6:窗口1关闭进程,在窗口2中 拷贝 ./dir9/hive.log.2 到 ./dir2 ; cat ./dir2/hive.txt >> ./dir1/hive.log # Step7:第一个窗口启动Flume进程,查看taildir_position.json变化情况:hive.log 有对应变化,新增 hive.log.2的pos信息 。 # 其他 flume服务运行的时候,删除taildir_position.json后又会重新生成 例5: 5_avro_memory_hdfs.conf

http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#log4j-appender

写配置文件:

# conf file name: 5_avro_memory_hdfs.conf # Source type : avro source # Channel type: memory channel # Sink type : hdfs sink # Name the components on this agent <== define agent # a1 <== agent name a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source <== define Source a1.sources.r1.type = avro a1.sources.r1.bind = rzdata001 a1.sources.r1.port = 44445 # Use a channel which buffers events in memory <== define Channel a1.channels.c1.type = memory # Describe the sink <== define Sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://rzdata001:9000/ruozedata/flume/avro/%Y%m%d%H a1.sinks.k1.hdfs.filePrefix = hive a1.sinks.k1.hdfs.fileSuffix = .log a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundUnit = minute a1.sinks.k1.hdfs.roundValue = 1 a1.sinks.k1.hdfs.rollInterval = 10 a1.sinks.k1.hdfs.rollSize = 102400 a1.sinks.k1.hdfs.rollCount = 500 a1.sinks.k1.hdfs.batchSize = 10 a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.writeFormat = Text a1.sinks.k1.hdfs.useLocalTimeStamp = true # Bind the source and sink to the channel <== connect source,channel and sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

启动agent:

$FLUME_HOME/bin/flume-ng agent \ --name a1 \ --conf $FLUME_HOME/conf \ --conf-file $FLUME_HOME/myconf/5_avro_memory_hdfs.conf \ -Dflume.root.logger=INFO,console

Java App 使用往Avro对应端口发送日志:

# 1)添加 maven 依赖 org.apache.flume.flume-ng-clients flume-ng-log4jappender 1.6.0-cdh5.16.2 # 2)log4j配置文件配置 log4j.rootLogger = INFO,Console,File,Flume #### 日志输出到Avro Source,后续在主机上使用flume avro source 采集 ### log4j.appender.Flume=org.apache.flume.clients.log4jappender.Log4jAppender log4j.appender.Flume.Hostname=rzdata001 log4j.appender.Flume.Port=44445 log4j.appender.Flume.UnsafeMode=true log4j.appender.Flume.layout=org.apache.log4j.PatternLayout log4j.appender.Flume.layout.ConversionPattern=%d{yyyy/MM/dd HH:mm:ss.SSS}|%t|[%m] # 3)代码产生日志 public class SendFlumeAvro { private static final Logger logger = LogManager.getLogger(SendFlumeAvro.class.getName()); public static void main(String[] args) { logger.trace("这是一条 trace 信息"); logger.info("这是一条 info 信息1"); logger.error("这是一条 error 信息1"); logger.info("这是一条 info 信息4"); logger.error("这是一条 error 信息2"); logger.info("这是一条 info 信息6"); logger.error("这是一条 error 信息3"); System.out.println("exit"); } } # 4)运行程序发送日志 # 5)查看hdfs上sink落地的数据 [ruoze@rzdata001 ~]$ hadoop fs -text /ruozedata/flume/avro/2020021512/hive.1581740388601.log 20/02/15 12:27:00 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 2020/02/15 12:19:46.987|main|[这是一条 info 信息1] 2020/02/15 12:19:47.087|main|[这是一条 error 信息1] 2020/02/15 12:19:47.100|main|[这是一条 info 信息4] 2020/02/15 12:19:47.125|main|[这是一条 error 信息2] 2020/02/15 12:19:47.137|main|[这是一条 info 信息6] 2020/02/15 12:19:47.165|main|[这是一条 error 信息3] [ruoze@rzdata001 ~]$
作者:你拓哥



flume

需要 登录 后方可回复, 如果你还没有账号请 注册新账号