官方文档: http://flume.apache.org/
example: WebServer --> Agent[Source–>Channel–>Sink] --> HDFS
[ruoze@rzdata001 ~]$ cd software/
[ruoze@rzdata001 software]$ wget http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.6.0-cdh5.16.2.tar.gz
解压,创建软连接
[ruoze@rzdata001 software]$ tar -zxvf flume-ng-1.6.0-cdh5.16.2.tar.gz -C /home/app/
[ruoze@rzdata001 software]$ cd ~/app/
[ruoze@rzdata001 app]$ ln -s /home/ruoze/app/apache-flume-1.6.0-cdh5.16.2-bin /home/ruoze/app/flume
配置配置文件
[ruoze@rzdata001 ~]$ cd /home/ruoze/app/flume/conf
[ruoze@rzdata001 conf]$ cp flume-env.sh.template flume-env.sh
[ruoze@rzdata001 conf]$ cp flume-conf.properties.template flume-conf.properties
[ruoze@rzdata001 conf]$ vim flume-env.sh
export JAVA_HOME=/usr/java/jdk1.8.0_121
添加环境变量
[ruoze@rzdata001 ~]$ vim .bash_profile
# Flume env
export FLUME_HOME=/home/ruoze/app/flume
export PATH=$FLUME_HOME/bin:$PATH
flume版本
[ruoze@rzdata001 bin]$
[ruoze@rzdata001 bin]$ flume-ng version
Flume 1.6.0-cdh5.16.2
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: df92badde3691ee3eb6074a177f0e96682345381
Compiled by jenkins on Mon Jun 3 03:49:33 PDT 2019
From source with checksum 9336bfa3ff8cfb5e20cd9d700135a2c1
[ruoze@rzdata001 bin]$
三.任务配置
Flume agent配置是一个配置在本地的配置文件,是一个遵循java配置文件格式的text文件。
在同一个配置文件中可以配置指定一个或多个agents,同时配置上每个source、sink和channel数据流串联的方式。
目标: 掌握根据官网文档进行配置的方法。
各个类型的source、channel和sink的配置说明中,粗体是必须设置的参数。
写配置文件:
# conf file name: 1_netcat_memory_logger.conf
# Source type : netcat source
# Channel type: memory channel
# Sink type : logger sink
# Name the components on this agent <== define agent
# a1 <== agent name
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source <== define Source
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 44444
# Use a channel which buffers events in memory <== define Channel
a1.channels.c1.type = memory
# Describe the sink <== define Sink
a1.sinks.k1.type = logger
# Bind the source and sink to the channel <== connect source,channel and sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动agent:
$FLUME_HOME/bin/flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/myconf/1_netcat_memory_logger.conf \
-Dflume.root.logger=INFO,console
jps查看新增的服务:
[ruoze@rzdata001 ~]$ jps
7040 SecondaryNameNode
31123 Jps
20724 NameNode
21144 ResourceManager
7897 RunJar
30939 Application <== 新增agent的信息
20860 DataNode
21260 NodeManager
19215 StandaloneSessionClusterEntrypoint
[ruoze@rzdata001 ~]$
查看服务启动的详细信息:
[ruoze@rzdata001 ~]$ ps -ef | grep 30939 | grep -v grep
ruoze 30939 31334 0 20:49 pts/0 00:00:01 /usr/java/jdk1.8.0_121/bin/java -Xmx20m -Dflume.root.logger=INFO,console -cp /home/ruoze/app/flume/conf:/home/ruoze/app/flume/lib/*:/home/ruoze/app/hadoop/etc/hadoop:/home/ruoze/app/hadoop-2.6.0-cdh5.16.2/share/hadoop/common/lib/*:/home/ruoze/app/hadoop-2.6.0-cdh5.16.2/share/hadoop/common/*:/home/ruoze/app/hadoop-2.6.0-cdh5.16.2/share/hadoop/hdfs:/home/ruoze/app/hadoop-2.6.0-cdh5.16.2/share/hadoop/hdfs/lib/*:/home/ruoze/app/hadoop-2.6.0-cdh5.16.2/share/hadoop/hdfs/*:/home/ruoze/app/hadoop-2.6.0-cdh5.16.2/share/hadoop/yarn/lib/*:/home/ruoze/app/hadoop-2.6.0-cdh5.16.2/share/hadoop/yarn/*:/home/ruoze/app/hadoop-2.6.0-cdh5.16.2/share/hadoop/mapreduce/lib/*:/home/ruoze/app/hadoop-2.6.0-cdh5.16.2/share/hadoop/mapreduce/*:/home/ruoze/app/hadoop/contrib/capacity-scheduler/*.jar:/home/ruoze/app/hive/lib/* -Djava.library.path=:/home/ruoze/app/hadoop-2.6.0-cdh5.16.2/lib/native org.apache.flume.node.Application --name a1 --conf-file /home/ruoze/app/flume/myconf/1_netcat_memory_logger.conf
[ruoze@rzdata001 ~]$
agent启动后,向44444端口发送数据。注意,一定要先启动agent。
[ruoze@rzdata001 ~]$ telnet localhost 44444
Trying ::1...
Connected to localhost.
Escape character is '^]'.
123456789
OK
abcdefg
OK
1234567890987654321
OK
总共发送了3条,前两条控制台上完整打印,最后一条控制台上只打印了1234567890987654
,原因是logger sink的maxBytesToLog参数为设置,默认截断长度是16字节
思考控制台的输出:
Event: { headers:{} body: 31 32 33 34 35 36 37 38 39 0D 123456789. }
Event:Flume数据传输的基本单元,其构成部分中。
header是可选的,其中的内容已k-v的方式存在;
body是字节数组.
写配置文件:
# conf file name: 2_exec_memory_hdfs.conf
# Source type : exec source
# Channel type: memory channel
# Sink type : hdfs sink
# Name the components on this agent <== define agent
# a1 <== agent name
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source <== define Source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/ruoze/data/flume/data.log
a1.sources.r1.shell = /bin/sh -c
# Use a channel which buffers events in memory <== define Channel
a1.channels.c1.type = memory
# Describe the sink <== define Sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://rzdata001:9000/ruozedata/flume/tail/events/%y-%m-%d/%H%M
a1.sinks.k1.hdfs.filePrefix = events
a1.sinks.k1.hdfs.fileSuffix = .txt
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.rollInterval = 600
a1.sinks.k1.hdfs.rollSize = 1024000
a1.sinks.k1.hdfs.rollCount = 2000
a1.sinks.k1.hdfs.batchSize = 10
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# Bind the source and sink to the channel <== connect source,channel and sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动agent:
$FLUME_HOME/bin/flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/myconf/2_exec_memory_hdfs.conf \
-Dflume.root.logger=INFO,console
验证数据
[ruoze@rzdata001 ~]$ hadoop fs -ls /ruozedata/flume/tail/events/*/*
20/02/13 22:33:27 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 5 items
-rw-r--r-- 1 ruoze supergroup 215039 2020-02-13 22:26 /ruozedata/flume/tail/events/20-02-13/2220/events.1581603956479.txt
-rw-r--r-- 1 ruoze supergroup 216644 2020-02-13 22:26 /ruozedata/flume/tail/events/20-02-13/2220/events.1581603956480.txt
-rw-r--r-- 1 ruoze supergroup 217824 2020-02-13 22:26 /ruozedata/flume/tail/events/20-02-13/2220/events.1581603956481.txt
-rw-r--r-- 1 ruoze supergroup 215789 2020-02-13 22:26 /ruozedata/flume/tail/events/20-02-13/2220/events.1581603956482.txt
-rw-r--r-- 1 ruoze supergroup 75874 2020-02-13 22:31 /ruozedata/flume/tail/events/20-02-13/2220/events.1581603956483.txt
Found 1 items
-rw-r--r-- 1 ruoze supergroup 104360 2020-02-13 22:31 /ruozedata/flume/tail/events/20-02-13/2230/events.1581604278540.txt.tmp
[ruoze@rzdata001 ~]$
需要注意的是,缓冲区的大小和roll的三个参数需要根据实际业务调整。
例3: 3_spooldir_memory_hdfs.conf写配置文件:
# conf file name: 3_spooldir_memory_hdfs.conf
# Source type : spooling directory source
# Channel type: memory channel
# Sink type : hdfs sink
# Name the components on this agent <== define agent
# a1 <== agent name
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source <== define Source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/ruoze/data/flume/spool
a1.sources.r1.fileSuffix = .COMPLETED
# Use a channel which buffers events in memory <== define Channel
a1.channels.c1.type = memory
# Describe the sink <== define Sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://rzdata001:9000/ruozedata/flume/spool/%Y%m%d%H%M
a1.sinks.k1.hdfs.filePrefix = events
a1.sinks.k1.hdfs.fileSuffix = .txt
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.rollInterval = 600
a1.sinks.k1.hdfs.rollSize = 1024000
a1.sinks.k1.hdfs.rollCount = 2000
a1.sinks.k1.hdfs.batchSize = 10
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# Bind the source and sink to the channel <== connect source,channel and sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动agent:
$FLUME_HOME/bin/flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/myconf/3_spooldir_memory_hdfs.conf \
-Dflume.root.logger=INFO,console
验证数据
[ruoze@rzdata001 ~]$ hadoop fs -ls /ruozedata/flume/spool/*/*
20/02/14 15:58:54 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
-rw-r--r-- 1 ruoze supergroup 224236 2020-02-14 13:39 /ruozedata/flume/spool/202002141330/events.1581658686779.txt
-rw-r--r-- 1 ruoze supergroup 216625 2020-02-14 13:39 /ruozedata/flume/spool/202002141330/events.1581658686780.txt
-rw-r--r-- 1 ruoze supergroup 136405 2020-02-14 13:49 /ruozedata/flume/spool/202002141330/events.1581658686781.txt
-rw-r--r-- 1 ruoze supergroup 139660 2020-02-14 13:40 /ruozedata/flume/spool/202002141340/events.1581658800114.txt
-rw-r--r-- 1 ruoze supergroup 216048 2020-02-14 13:40 /ruozedata/flume/spool/202002141340/events.1581658800115.txt
-rw-r--r-- 1 ruoze supergroup 216894 2020-02-14 13:41 /ruozedata/flume/spool/202002141340/events.1581658800132.txt
-rw-r--r-- 1 ruoze supergroup 214981 2020-02-14 13:41 /ruozedata/flume/spool/202002141340/events.1581658800133.txt
-rw-r--r-- 1 ruoze supergroup 56282 2020-02-14 13:51 /ruozedata/flume/spool/202002141340/events.1581658800134.txt
[ruoze@rzdata001 ~]$ s
例4: 4_taildir_memory_hdfs.conf
taildir source 在数据采集时会把OFFSET偏移量记录下来,数据不会丢失。
这个方式不仅支持文件,而且支持目录
写配置文件:
# conf file name: 4_taildir_memory_hdfs.conf
# Source type : TAILDIR source
# Channel type: memory channel
# Sink type : hdfs sink
# Name the components on this agent <== define agent
# a1 <== agent name
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source <== define Source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /home/ruoze/data/flume/taildir/taildir_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /home/ruoze/data/flume/taildir/dir1/hive.log
a1.sources.r1.headers.f1.headerKey1 = value1
a1.sources.r1.filegroups.f2 = /home/ruoze/data/flume/taildir/dir2/.*log.*
a1.sources.r1.headers.f2.headerKey1 = value2
a1.sources.r1.headers.f2.headerKey2 = value2-2
# Use a channel which buffers events in memory <== define Channel
a1.channels.c1.type = memory
# Describe the sink <== define Sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://rzdata001:9000/ruozedata/flume/taildir/%Y%m%d%H
a1.sinks.k1.hdfs.filePrefix = hive
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.roundValue = 1
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 102400
a1.sinks.k1.hdfs.rollCount = 500
a1.sinks.k1.hdfs.batchSize = 10
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# Bind the source and sink to the channel <== connect source,channel and sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动agent:
$FLUME_HOME/bin/flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/myconf/4_taildir_memory_hdfs.conf \
-Dflume.root.logger=INFO,console
验证taildir的断点还原功能
# 1.准备数据
[ruoze@rzdata001 taildir]$ pwd
/home/ruoze/data/flume/taildir
[ruoze@rzdata001 taildir]$
[ruoze@rzdata001 taildir]$ ll ./*/*
-rw-rw-r-- 1 ruoze ruoze 200000 Feb 14 23:04 ./dir1/hive.log
-rw-rw-r-- 1 ruoze ruoze 120000 Feb 14 22:54 ./dir2/hive.log.1
-rw-rw-r-- 1 ruoze ruoze 20000 Feb 14 23:00 ./dir2/hive.txt
-rw-rw-r-- 1 ruoze ruoze 70000 Feb 14 22:57 ./dir9/hive.log.2
[ruoze@rzdata001 taildir]$
# 2.开启4个窗口,验证:
# Step1: 第1个窗口启动flume进程.
# Step2: 第2个窗口操作本地数据.
# Step3: 第3个窗口查看HDFS上sink的数据: hadoop fs -ls /ruozedata/flume/taildir/*/*
# Step4: 第4个窗口监控taildir_position.json: tail -F /home/ruoze/data/flume/taildir/taildir_position.json
# Step5: 查看taildir_position.json 中只有两个文件被读取,hive.txt由于匹配规则,未被采集,核查 pos值和文件大小对应
# Step6:窗口1关闭进程,在窗口2中 拷贝 ./dir9/hive.log.2 到 ./dir2 ; cat ./dir2/hive.txt >> ./dir1/hive.log
# Step7:第一个窗口启动Flume进程,查看taildir_position.json变化情况:hive.log 有对应变化,新增 hive.log.2的pos信息 。
# 其他 flume服务运行的时候,删除taildir_position.json后又会重新生成
例5: 5_avro_memory_hdfs.conf
http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#log4j-appender
写配置文件:
# conf file name: 5_avro_memory_hdfs.conf
# Source type : avro source
# Channel type: memory channel
# Sink type : hdfs sink
# Name the components on this agent <== define agent
# a1 <== agent name
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source <== define Source
a1.sources.r1.type = avro
a1.sources.r1.bind = rzdata001
a1.sources.r1.port = 44445
# Use a channel which buffers events in memory <== define Channel
a1.channels.c1.type = memory
# Describe the sink <== define Sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://rzdata001:9000/ruozedata/flume/avro/%Y%m%d%H
a1.sinks.k1.hdfs.filePrefix = hive
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.roundValue = 1
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 102400
a1.sinks.k1.hdfs.rollCount = 500
a1.sinks.k1.hdfs.batchSize = 10
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# Bind the source and sink to the channel <== connect source,channel and sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动agent:
$FLUME_HOME/bin/flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/myconf/5_avro_memory_hdfs.conf \
-Dflume.root.logger=INFO,console
Java App 使用往Avro对应端口发送日志:
# 1)添加 maven 依赖
org.apache.flume.flume-ng-clients
flume-ng-log4jappender
1.6.0-cdh5.16.2
# 2)log4j配置文件配置
log4j.rootLogger = INFO,Console,File,Flume
#### 日志输出到Avro Source,后续在主机上使用flume avro source 采集 ###
log4j.appender.Flume=org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.Flume.Hostname=rzdata001
log4j.appender.Flume.Port=44445
log4j.appender.Flume.UnsafeMode=true
log4j.appender.Flume.layout=org.apache.log4j.PatternLayout
log4j.appender.Flume.layout.ConversionPattern=%d{yyyy/MM/dd HH:mm:ss.SSS}|%t|[%m]
# 3)代码产生日志
public class SendFlumeAvro {
private static final Logger logger = LogManager.getLogger(SendFlumeAvro.class.getName());
public static void main(String[] args) {
logger.trace("这是一条 trace 信息");
logger.info("这是一条 info 信息1");
logger.error("这是一条 error 信息1");
logger.info("这是一条 info 信息4");
logger.error("这是一条 error 信息2");
logger.info("这是一条 info 信息6");
logger.error("这是一条 error 信息3");
System.out.println("exit");
}
}
# 4)运行程序发送日志
# 5)查看hdfs上sink落地的数据
[ruoze@rzdata001 ~]$ hadoop fs -text /ruozedata/flume/avro/2020021512/hive.1581740388601.log
20/02/15 12:27:00 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2020/02/15 12:19:46.987|main|[这是一条 info 信息1]
2020/02/15 12:19:47.087|main|[这是一条 error 信息1]
2020/02/15 12:19:47.100|main|[这是一条 info 信息4]
2020/02/15 12:19:47.125|main|[这是一条 error 信息2]
2020/02/15 12:19:47.137|main|[这是一条 info 信息6]
2020/02/15 12:19:47.165|main|[这是一条 error 信息3]
[ruoze@rzdata001 ~]$