flume入门级

Jenny ·
更新时间:2024-09-21
· 880 次阅读

flume 一、下载 [pxj@pxj /opt]$sudo wget http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.6.0-cdh5.16.2.tar.gz [sudo] pxj 的密码: --2020-02-13 01:21:32-- http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.6.0-cdh5.16.2.tar.gz 正在解析主机 archive.cloudera.com (archive.cloudera.com)... 151.101.228.167 正在连接 archive.cloudera.com (archive.cloudera.com)|151.101.228.167|:80... 已连接。 已发出 HTTP 请求,正在等待回应... 200 OK 长度:81326682 (78M) [binary/octet-stream] 正在保存至: “flume-ng-1.6.0-cdh5.16.2.tar.gz” 解压 [pxj@pxj /opt]$tar -zxvf flume-ng-1.6.0-cdh5.16.2.tar.gz -C ~/app 修改配置文件 [pxj@pxj /home/pxj/app/apache-flume-1.6.0-cdh5.16.2-bin/conf]$vim flume-env.sh [pxj@pxj /home/pxj/app/apache-flume-1.6.0-cdh5.16.2-bin/conf]$cp flume-env.sh.template flume-env.sh export JAVA_HOME=/usr/java/jdk1.8.0_121 配置环境变量 export FLUME_HOME=/home/pxj/app/apache-flume-1.6.0-cdh5.16.2-bin export PATH=$FLUME_HOME/bin:$PATH 二、flume简介

数据采集
RDBMS:Sqoop
日志 ==> Hadoop/HDFS

hadoop fs -put … …
crontab

ng: 1h 一个文件夹 put 一个小时跑一次
监控

Flume:large amounts of log data
collecting,
aggregating,
moving

ng ==> Flume ==> HDFS

Flume就是一个针对日志数据进行采集和汇总的一个框架
把日志从A地方搬迁到B地方

Agent:是一个Flume
Source
Channel
Sink

Flume
OG 0.9
NG 1.x

Logstash ELK
Flume Hadoop
Kafka MQ

Flume
真正要开发的代码真的不多了
如何基于Flume进行自定义开发
Source
Sink
Intercepter
Flume从使用层面来讲就是写配置文件,其实就是配置我们的Agent

Source、Channel、Sink的配置 ***** 但是不用记,知道去哪里查就行

三、flume的组件

Source 从哪收集
avro 序列化
exec 命令行
spooling 目录
taildir *****
kafka

Channel:数据存哪里
memory
kafka
file

Sink:数据输出到哪里
hdfs Hadoop
logger 控制台
avro
kafka

Agent: Source Channel Sink ==> JVM

四、应用实例 1.netcat

example.conf <== flume配置文件的文件名
44444端口的输入数据通过flume采集,然后在控制台上打印出来
example.conf <== flume配置文件的文件名
44444端口的输入数据通过flume采集,然后在控制台上打印出来
a1 <== agent的名字

# Name the components on this agent a1.sources = r1 <== source的名字 a1.sinks = k1 <== sink的名字 a1.channels = c1 <== channel的名字 # Describe/configure the source <== 定义source a1.sources.r1.type = netcat a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 44444 # Use a channel which buffers events in memory <== 定义channel a1.channels.c1.type = memory # Describe the sink <== 定义sink a1.sinks.k1.type = logger # Bind the source and sink to the channel a1.sources.r1.channels = c1 <== 定义source和channel的连线 a1.sinks.k1.channel = c1 <== 定义sink的channel的连线

Agent的三个组件
两根连线定义好的
==> agent配置完毕了
启动命令

flume-ng agent \ --name a1 \ --conf $FLUME_HOME/conf \ --conf-file $FLUME_HOME/script/netcat.conf \ -Dflume.root.logger=INFO,console [root@pxj /root]#telnet localhost 44444 Trying ::1... Connected to localhost. Escape character is '^]'. xuejie OK jinlig OK ccj OK 2020-02-15 01:11:06,569 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 78 75 65 6A 69 65 0D xuejie. } 2020-02-15 01:11:15,574 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 6A 69 6E 6C 69 67 0D jinlig. } 2020-02-15 01:11:16,513 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 63 63 6A 0D ccj. } 2.HDFS # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = exec a1.sources.r1.command = tail -F /home/pxj/datas/1.log a1.sources.r1.shell = /bin/bash -c # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Describe the sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path =hdfs://pxj:9000/pxj/test/flume/tail a1.sinks.k1.hdfs.batchSize=10 a1.sinks.k1.hdfs.fileType=DataStream a1.sinks.k1.hdfs.writeFormat=Text # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 flume-ng agent \ --name a1 \ --conf $FLUME_HOME/conf \ --conf-file $FLUME_HOME/script/hdfs.conf \ -Dflume.root.logger=INFO,console spooldir # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = spooldir a1.sources.r1.spoolDir =/home/pxj/datas/ # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Describe the sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path =hdfs://pxj:9000/pxj/test/flume/tail/%Y%m%d%H%M a1.sinks.k1.hdfs.batchSize=10 a1.sinks.k1.hdfs.fileType=DataStream a1.sinks.k1.hdfs.writeFormat=Text a1.sinks.k1.hdfs.useLocalTimeStamp=true a1.sinks.k1.hdfs.filePrefix=pxj- # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 flume-ng agent \ --name a1 \ --conf $FLUME_HOME/conf \ --conf-file $FLUME_HOME/script/spooldir.conf \ -Dflume.root.logger=INFO,console If a file is written to after being placed into the spooling directory, Flume will print an error to its log file and stop processing. If a file name is reused at a later time, Flume will print an error to its log file and stop processing. tailDIR --home # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = TAILDIR a1.sources.r1.positionFile=/home/pxj/datas/taildir_position.json a1.sources.r1.filegroups = f1 f2 a1.sources.r1.filegroups.f1 = /home/pxj/flume/test1/example.log a1.sources.r1.headers.f1.headerKey1 = value1 a1.sources.r1.filegroups.f2 = /home/pxj/flume/test2/.*log.* a1.sources.r1.headers.f2.headerKey1 = value2 a1.sources.r1.headers.f2.headerKey2 = value2-2 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Describe the sink a1.sinks.k1.type = logger # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 flume-ng agent \ --name a1 \ --conf $FLUME_HOME/conf \ --conf-file $FLUME_HOME/script/taildir.conf \ -Dflume.root.logger=INFO,console avro # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 44444 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Describe the sink a1.sinks.k1.type = logger # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 代码发送到44444端口 package com.ccj.pxj.flume; import org.apache.log4j.Logger; public class LoggerData { private static Logger logger = Logger.getLogger(LoggerData.class.getName()); public static void main(String[] args) throws Exception { int i=0; while (true){ Thread.sleep(1000); logger.info("pxj:"+i++); } } } log4j配置 log4j.rootCategory=INFO, console, flume log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.err log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender log4j.appender.flume.Hostname = pxj log4j.appender.flume.Port = 44444 log4j.appender.flume.UnsafeMode = true log4j.appender.flume.layout=org.apache.log4j.PatternLayout pom 4.0.0 com.ccj.pxj.flume flume 1.0-SNAPSHOT org.apache.flume.flume-ng-clients flume-ng-log4jappender 1.6.0

作者:pxj(潘陈)
日期:2020-02-16 凌晨1:08:32
你若安好便是晴天


作者:pxjwfy



flume

需要 登录 后方可回复, 如果你还没有账号请 注册新账号