Flume传输Nginx日志到HDFS

Posy ·
更新时间:2024-11-13
· 898 次阅读

环境配置

linux:Centos7 JDK:1.8 Hadoop:2.8.5(默认已安装好集群环境) Nginx:14.0(独立部署) Flume:1.8.0(与Nginx部署在一起) 一、Nginx编译安装

1、官网下载.tar.gz文件,上传至linux服务器
http://nginx.org/en/download.html(建议下载Stable version)
2、解压nginx

tar -zxvf nginx-14.0.tar.gz -C /usr/local/src/

3、 进入到nginx的源码目录中

cd /usr/local/src/nginx-14.0/

4、 预编译(报错)

./configure

5、安装gcc及其他所需依赖

yum -y install gcc pcre-devel openssl openssl-devel

6、再编译

./configure

7、编译安装nginx

make && make install

8、启动nginx

sbin/nginx

9、查看进程

ps -ef | grep nginx netstat -anpt | grep nginx

去浏览器输入linux服务器ip地址
在这里插入图片描述
完成!

二、Flume安装及配置

1、官网下载.tar.gz安装包
推荐:https://mirrors.tuna.tsinghua.edu.cn/apache/flume/1.8.0/apache-flume-1.8.0-bin.tar.gz
2、解压flume

tar -zxvf apache-flume-1.8.0-bin.tar.gz -C /flume.1.8.0/

3、配置nginx-hdfs文件

vi nginx-hdfs.conf a1.sources = r1 a1.sinks = k1 a1.channels = c1 #具体定义source a1.sources.r1.type = cn.edu360.flume.source.TailFileSource #指定以后监听的文件 a1.sources.r1.filePath = /usr/local/nginx/logs/access.log #指定以后记录数据读取到哪里的偏移量文件 a1.sources.r1.offsetFile = /opt/module/flume-1.8.0/offset.txt #数据读取的时间间隔 a1.sources.r1.interval = 1000 #读取文件的字符集 a1.sources.r1.charset = UTF-8 #指定内存channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Describe the sink a1.sinks.k1.type = hdfs #按照日期生成目录,必须添加header a1.sinks.k1.hdfs.useLocalTimeStamp = true a1.sinks.k1.hdfs.path = hdfs://192.168.139.111:9000/flume/access-logs/%y-%m-%d a1.sinks.k1.hdfs.filePrefix = event- #生成文件的时间 a1.sinks.k1.hdfs.rollInterval = 30 #不按条数生成文件 a1.sinks.k1.hdfs.rollCount = 0 #生成文件的大小 a1.sinks.k1.hdfs.rollSize = 104857600 #文件写入到HDFS中的储存格式 a1.sinks.k1.hdfs.fileType = DataStream # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 三、Flume传输Nginx日志到HDFS

1、用IDEA写Maven项目,导入依赖

1.8 1.8 UTF-8 org.apache.flume flume-ng-core 1.8.0 provided org.apache.maven.plugins maven-shade-plugin 2.4.3 package shade

写Flume传输Nginx日志到HDFS的java进程程序,并package为jar包

import org.apache.commons.io.FileUtils; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDrivenSource; import org.apache.flume.channel.ChannelProcessor; import org.apache.flume.conf.Configurable; import org.apache.flume.event.EventBuilder; import org.apache.flume.source.AbstractSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.charset.Charset; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * 是用来监听一个文件的source,特点: * 实时监听一个文件,只要有新内容产生,就收集起来 * 可以记录偏移量(读到哪一行),如果flume出现问题,重启后,接着上一次的偏移量继续读取 */ public class TailFileSource extends AbstractSource implements Configurable, EventDrivenSource { private static Logger logger = LoggerFactory.getLogger(TailFileSource.class); private String filePath; private String offsetFile; private String charset; private Long interval; private ExecutorService pool; private FileRunner runner; /** * 先调用configure方法,且只调用一次 * @param context */ @Override public void configure(Context context) { filePath = context.getString("filePath"); offsetFile = context.getString("offsetFile"); charset = context.getString("charset", "UTF-8"); interval = context.getLong("interval", 1000L); } /** * 在configure方法调用后,调用一次 */ @Override public synchronized void start() { //创建一个线程池(只有一个线程池) pool = Executors.newSingleThreadExecutor(); //创建一个实现Runnable接口的实现类 //调用父类的方法拿到ChannelProcessor ChannelProcessor channelProcessor = getChannelProcessor(); runner = new FileRunner(filePath,offsetFile,charset,interval,channelProcessor); //提交到线程池 pool.submit(runner); //执行父类的start方法 super.start(); } @Override public synchronized void stop() { //停掉线程 runner.setFlag(false); //释放线程池 pool.shutdown(); super.stop(); } private static class FileRunner implements Runnable{ private String filePath; private String offsetFile; private String charset; private Long interval; private ChannelProcessor channelProcessor; private File offFile; private long offset = 0L; private RandomAccessFile raf; private boolean flag = true; public FileRunner(String filePath, String offsetFile, String charset, Long interval,ChannelProcessor channelProcessor) { this.filePath = filePath; this.offsetFile = offsetFile; this.charset = charset; this.interval = interval; this.channelProcessor = channelProcessor; //判断是否有偏移量文件 offFile = new File(offsetFile); if(!offFile.exists()){ //创建偏移量文件 try { offFile.createNewFile(); } catch (IOException e) { logger.error("create offset file error",e); } } //如果有偏移量,接着读 try { String offsetStr = FileUtils.readFileToString(offFile); if(offFile != null && !"".equals(offsetStr)){ //转换成Long类型 offset = Long.parseLong(offsetStr); } } catch (IOException e) { logger.error("read offset file error",e); } //RandomAccessFile的seek跳转到指定的偏移量 try { raf = new RandomAccessFile(filePath, "r"); //指定偏移量 raf.seek(offset); } catch (FileNotFoundException e) { logger.error("create log file error",e); } catch (IOException e) { logger.error("raf seek error",e); } } @Override public void run() { //不停的监听一个文件 while(flag){ //把读取的数据封装成Event try { String line = raf.readLine(); //读到内容 if(line != null){ //转码,解决中文乱码问题 line = new String(line.getBytes("ISO-8859-1"),charset); //封装成Event Event event = EventBuilder.withBody(line.getBytes()); //用ChannelProcessor将数据发送到Channel中 channelProcessor.processEvent(event); //获取最新的偏移量,然后写入到偏移量文件 offset = raf.getFilePointer(); //写入偏移量文件中 FileUtils.writeStringToFile(offFile,offset + ""); }else{ //如果有文件就读取,没有就睡觉 Thread.sleep(interval); } } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } //设置flag public void setFlag(boolean flag) { this.flag = flag; } } }

将打包好的jar包上传至服务器 /flume-1.8.0/apache-flume-1.8.0-bin/lib 即可

2、启动Flume

[root@nginx flume-1.8.0]# ./apache-flume-1.8.0-bin/bin/flume-ng agent -n a1 -c ./apache-flume-1.8.0-bin/conf/ -f ./nginx-hdfs.conf -Dflume.root.logger=INFO,console

注意文件路径!

3、去部署Hadoop的服务器是否传输文件

hdfs dfs -ls /flume/access-logs

可用-cat查看此目录下的日志文件


作者:Otto、龙



flume hdfs Nginx

需要 登录 后方可回复, 如果你还没有账号请 注册新账号