环境配置:
linux:Centos7 JDK:1.8 Hadoop:2.8.5(默认已安装好集群环境) Nginx:14.0(独立部署) Flume:1.8.0(与Nginx部署在一起) 一、Nginx编译安装1、官网下载.tar.gz文件,上传至linux服务器
http://nginx.org/en/download.html(建议下载Stable version)
2、解压nginx
tar -zxvf nginx-14.0.tar.gz -C /usr/local/src/
3、 进入到nginx的源码目录中
cd /usr/local/src/nginx-14.0/
4、 预编译(报错)
./configure
5、安装gcc及其他所需依赖
yum -y install gcc pcre-devel openssl openssl-devel
6、再编译
./configure
7、编译安装nginx
make && make install
8、启动nginx
sbin/nginx
9、查看进程
ps -ef | grep nginx
netstat -anpt | grep nginx
去浏览器输入linux服务器ip地址
完成!
1、官网下载.tar.gz安装包
推荐:https://mirrors.tuna.tsinghua.edu.cn/apache/flume/1.8.0/apache-flume-1.8.0-bin.tar.gz
2、解压flume
tar -zxvf apache-flume-1.8.0-bin.tar.gz -C /flume.1.8.0/
3、配置nginx-hdfs文件
vi nginx-hdfs.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#具体定义source
a1.sources.r1.type = cn.edu360.flume.source.TailFileSource
#指定以后监听的文件
a1.sources.r1.filePath = /usr/local/nginx/logs/access.log
#指定以后记录数据读取到哪里的偏移量文件
a1.sources.r1.offsetFile = /opt/module/flume-1.8.0/offset.txt
#数据读取的时间间隔
a1.sources.r1.interval = 1000
#读取文件的字符集
a1.sources.r1.charset = UTF-8
#指定内存channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Describe the sink
a1.sinks.k1.type = hdfs
#按照日期生成目录,必须添加header
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.path = hdfs://192.168.139.111:9000/flume/access-logs/%y-%m-%d
a1.sinks.k1.hdfs.filePrefix = event-
#生成文件的时间
a1.sinks.k1.hdfs.rollInterval = 30
#不按条数生成文件
a1.sinks.k1.hdfs.rollCount = 0
#生成文件的大小
a1.sinks.k1.hdfs.rollSize = 104857600
#文件写入到HDFS中的储存格式
a1.sinks.k1.hdfs.fileType = DataStream
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
三、Flume传输Nginx日志到HDFS
1、用IDEA写Maven项目,导入依赖
1.8
1.8
UTF-8
org.apache.flume
flume-ng-core
1.8.0
provided
org.apache.maven.plugins
maven-shade-plugin
2.4.3
package
shade
写Flume传输Nginx日志到HDFS的java进程程序,并package为jar包
import org.apache.commons.io.FileUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.AbstractSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.charset.Charset;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 是用来监听一个文件的source,特点:
* 实时监听一个文件,只要有新内容产生,就收集起来
* 可以记录偏移量(读到哪一行),如果flume出现问题,重启后,接着上一次的偏移量继续读取
*/
public class TailFileSource extends AbstractSource implements Configurable, EventDrivenSource {
private static Logger logger = LoggerFactory.getLogger(TailFileSource.class);
private String filePath;
private String offsetFile;
private String charset;
private Long interval;
private ExecutorService pool;
private FileRunner runner;
/**
* 先调用configure方法,且只调用一次
* @param context
*/
@Override
public void configure(Context context) {
filePath = context.getString("filePath");
offsetFile = context.getString("offsetFile");
charset = context.getString("charset", "UTF-8");
interval = context.getLong("interval", 1000L);
}
/**
* 在configure方法调用后,调用一次
*/
@Override
public synchronized void start() {
//创建一个线程池(只有一个线程池)
pool = Executors.newSingleThreadExecutor();
//创建一个实现Runnable接口的实现类
//调用父类的方法拿到ChannelProcessor
ChannelProcessor channelProcessor = getChannelProcessor();
runner = new FileRunner(filePath,offsetFile,charset,interval,channelProcessor);
//提交到线程池
pool.submit(runner);
//执行父类的start方法
super.start();
}
@Override
public synchronized void stop() {
//停掉线程
runner.setFlag(false);
//释放线程池
pool.shutdown();
super.stop();
}
private static class FileRunner implements Runnable{
private String filePath;
private String offsetFile;
private String charset;
private Long interval;
private ChannelProcessor channelProcessor;
private File offFile;
private long offset = 0L;
private RandomAccessFile raf;
private boolean flag = true;
public FileRunner(String filePath, String offsetFile, String charset, Long interval,ChannelProcessor channelProcessor) {
this.filePath = filePath;
this.offsetFile = offsetFile;
this.charset = charset;
this.interval = interval;
this.channelProcessor = channelProcessor;
//判断是否有偏移量文件
offFile = new File(offsetFile);
if(!offFile.exists()){
//创建偏移量文件
try {
offFile.createNewFile();
} catch (IOException e) {
logger.error("create offset file error",e);
}
}
//如果有偏移量,接着读
try {
String offsetStr = FileUtils.readFileToString(offFile);
if(offFile != null && !"".equals(offsetStr)){
//转换成Long类型
offset = Long.parseLong(offsetStr);
}
} catch (IOException e) {
logger.error("read offset file error",e);
}
//RandomAccessFile的seek跳转到指定的偏移量
try {
raf = new RandomAccessFile(filePath, "r");
//指定偏移量
raf.seek(offset);
} catch (FileNotFoundException e) {
logger.error("create log file error",e);
} catch (IOException e) {
logger.error("raf seek error",e);
}
}
@Override
public void run() {
//不停的监听一个文件
while(flag){
//把读取的数据封装成Event
try {
String line = raf.readLine();
//读到内容
if(line != null){
//转码,解决中文乱码问题
line = new String(line.getBytes("ISO-8859-1"),charset);
//封装成Event
Event event = EventBuilder.withBody(line.getBytes());
//用ChannelProcessor将数据发送到Channel中
channelProcessor.processEvent(event);
//获取最新的偏移量,然后写入到偏移量文件
offset = raf.getFilePointer();
//写入偏移量文件中
FileUtils.writeStringToFile(offFile,offset + "");
}else{
//如果有文件就读取,没有就睡觉
Thread.sleep(interval);
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
//设置flag
public void setFlag(boolean flag) {
this.flag = flag;
}
}
}
将打包好的jar包上传至服务器 /flume-1.8.0/apache-flume-1.8.0-bin/lib 即可
2、启动Flume
[root@nginx flume-1.8.0]# ./apache-flume-1.8.0-bin/bin/flume-ng agent -n a1 -c ./apache-flume-1.8.0-bin/conf/ -f ./nginx-hdfs.conf -Dflume.root.logger=INFO,console
注意文件路径!
3、去部署Hadoop的服务器是否传输文件
hdfs dfs -ls /flume/access-logs
可用-cat查看此目录下的日志文件