11HDFS的读写流程NameNode、DataNode工作机制——好程序

Rae ·
更新时间:2024-09-21
· 677 次阅读

一、 HDFS前言

设计思想
       分而治之:将大文件、大批量文件,分布式存放在大量服务器上,以便于采取分而治之的方式对海量数据进行运算分析;

在大数据系统中作用:
为各类分布式运算框架(如:mapreduce,spark,tez,……)提供数据存储服务

      重点概念:文件切块,副本存放,元数据

二、 HDFS的概念和特性

首先,它是一个文件系统,用于存储文件,通过统一的命名空间——目录树来定位文件

其次,它是分布式的,由很多服务器联合起来实现其功能,集群中的服务器有各自的角色;

重要特性如下:

HDFS中的文件在物理上是分块存储(block),块的大小可以通过配置参数( dfs.blocksize)来规定,默认大小在hadoop2.x版本中是128M,老版本中是64M
HDFS文件系统会给客户端提供一个统一的抽象目录树,客户端通过路径来访问文件,形如:hdfs://namenode:port/dir-a/dir-b/dir-c/file.data
目录结构及文件分块信息(元数据)的管理由namenode节点承担   ——namenode是HDFS集群主节点,负责维护整个hdfs文件系统的目录树,以及每一个路径(文件)所对应的block块信息(block的id,及所在的datanode服务器)
文件的各个block的存储管理由datanode节点承担    ---- datanode是HDFS集群从节点,每一个block都可以在多个datanode上存储多个副本(副本数量也可以通过参数设置dfs.replication)
HDFS是设计成适应一次写入,多次读出的场景,且不支持文件的修改

三、 HDFS的shell(命令行客户端)操作

3.1 HDFS命令行客户端使用

HDFS提供shell命令行客户端,使用方法如下:

3.2 命令行客户端支持的命令参数:hdfs dfs       

 [-appendToFile ... ]         [-cat [-ignoreCrc] ...]         [-checksum ...]         [-chgrp [-R] GROUP PATH...]         [-chmod [-R] PATH...]         [-chown [-R] [OWNER][:[GROUP]] PATH...]         [-copyFromLocal [-f] [-p] ... ]         [-copyToLocal [-p] [-ignoreCrc] [-crc] ... ]         [-count [-q] ...]         [-cp [-f] [-p] ... ]         [-createSnapshot []]         [-deleteSnapshot ]         [-df [-h] [ ...]]         [-du [-s] [-h] ...]         [-expunge]         [-get [-p] [-ignoreCrc] [-crc] ... ]         [-getfacl [-R] ]         [-getmerge [-nl] ]         [-help [cmd ...]]         [-ls [-d] [-h] [-R] [ ...]]         [-mkdir [-p] ...]         [-moveFromLocal ... ]         [-moveToLocal ]         [-mv ... ]         [-put [-f] [-p] ... ]         [-renameSnapshot ]         [-rm [-f] [-r|-R] [-skipTrash] ...]         [-rmdir [--ignore-fail-on-non-empty] ...]         [-setfacl [-R] [{-b|-k} {-m|-x } ]|[--set ]]         [-setrep [-R] [-w] ...]         [-stat [format] ...]         [-tail [-f] ]         [-test -[defsz] ]         [-text [-ignoreCrc] ...]         [-touchz ...]         [-usage [cmd ...]]

3.2 常用命令参数介绍

-help             功能:输出这个命令参数手册 -ls                  功能:显示目录信息 示例: hadoop fs -ls hdfs://hadoop-server01:9000/ 备注:这些参数中,所有的hdfs路径都可以简写   ->   hadoop fs -ls /   等同于上一条命令的效果 -mkdir              功能:在hdfs上创建目录 示例:hadoop fs  -mkdir  -p  /aaa/bbb/cc/dd -moveFromLocal            功能:从本地剪切粘贴到hdfs 示例:hadoop  fs  - moveFromLocal  /home/hadoop/a.txt  /aaa/bbb/cc/dd -moveToLocal              功能:从hdfs剪切粘贴到本地 示例:hadoop  fs  - moveToLocal   /aaa/bbb/cc/dd  /home/hadoop/a.txt --appendToFile  功能:追加一个文件到已经存在的文件末尾 示例:hadoop  fs  -appendToFile  ./hello.txt  hdfs://hadoop-server01:9000/hello.txt 可以简写为: Hadoop  fs  -appendToFile  ./hello.txt  /hello.txt  -cat  功能:显示文件内容  示例:hadoop fs -cat  /hello.txt -tail                 功能:显示一个文件的末尾 示例:hadoop  fs  -tail  /weblog/access_log.1 -text                  功能:以字符形式打印一个文件的内容 示例:hadoop  fs  -text  /weblog/access_log.1 -chgrp -chmod -chown 功能:linux文件系统中的用法一样,对文件所属权限 示例: hadoop  fs  -chmod  666  /hello.txt hadoop  fs  -chown  someuser:somegrp   /hello.txt -copyFromLocal    功能:从本地文件系统中拷贝文件到hdfs路径去 示例:hadoop  fs  -copyFromLocal  ./jdk.tar.gz  /aaa/ -copyToLocal      功能:从hdfs拷贝到本地 示例:hadoop fs -copyToLocal /aaa/jdk.tar.gz -cp              功能:从hdfs的一个路径拷贝hdfs的另一个路径 示例: hadoop  fs  -cp  /aaa/jdk.tar.gz  /bbb/jdk.tar.gz.2  -mv                     功能:在hdfs目录中移动文件 示例: hadoop  fs  -mv  /aaa/jdk.tar.gz  / -get              功能:等同于copyToLocal,就是从hdfs下载文件到本地 示例:hadoop fs -get  /aaa/jdk.tar.gz -getmerge             功能:合并下载多个文件 示例:比如hdfs的目录 /aaa/下有多个文件:log.1, log.2,log.3,... hadoop fs -getmerge /aaa/log.* ./log.sum -put                功能:等同于copyFromLocal 示例:hadoop  fs  -put  /aaa/jdk.tar.gz  /bbb/jdk.tar.gz.2 -rm                功能:删除文件或文件夹 示例:hadoop fs -rm -r /aaa/bbb/ -rmdir                 功能:删除空目录 示例:hadoop  fs  -rmdir   /aaa/bbb/ccc -df               功能:统计文件系统的可用空间信息 示例:hadoop  fs  -df  -h  / -du 功能:统计文件夹的大小信息 示例: hadoop  fs  -du  -s  -h /aaa/* -count         功能:统计一个指定目录下的文件节点数量 示例:hadoop fs -count /aaa/ -setrep                功能:设置hdfs中文件的副本数量 示例:hadoop fs -setrep 3 /aaa/jdk.tar.gz

namenode的工作机制

dfs.namenode.checkpoint.check.period=60  #检查触发条件是否满足的频率,60秒 dfs.namenode.checkpoint.dir=file://${hadoop.tmp.dir}/dfs/namesecondary #以上两个参数做checkpoint操作时,secondary namenode的本地工作目录 dfs.namenode.checkpoint.edits.dir=${dfs.namenode.checkpoint.dir} dfs.namenode.checkpoint.max-retries=3  #最大重试次数 dfs.namenode.checkpoint.period=3600  #两次checkpoint之间的时间间隔3600秒 dfs.namenode.checkpoint.txns=1000000 #两次checkpoint之间最大的操作记录

镜像是合并日记

  dataNode的工作机制

datanode的下线时长: 超时时长 dfs.heartbeat.interval 3 Determines datanode heartbeat interval in seconds. dfs.namenode.heartbeat.recheck-interval 300000 This time decides the interval to check for expired datanodes. With this value and dfs.heartbeat.interval, the interval of deciding the datanode is stale or not is also calculated. The unit of this configuration is millisecond.

1、引入依赖

    org.apache.hadoop     hadoop-client     2.6.1

7.4 HDFS客户端操作数据代码示例:

7.4.1 文件的增删改查 public class HdfsClient {      FileSystem fs = null;      @Before      public void init() throws Exception {            // 构造一个配置参数对象,设置一个参数:我们要访问的hdfs的URI            // 从而FileSystem.get()方法就知道应该是去构造一个访问hdfs文件系统的客户端,以及hdfs的访问地址            // new Configuration();的时候,它就会去加载jar包中的hdfs-default.xml            // 然后再加载classpath下的hdfs-site.xml            Configuration conf = new Configuration();            conf.set("fs.defaultFS", "hdfs://hdp-node01:9000");            /**             * 参数优先级: 1、客户端代码中设置的值 2、classpath下的用户自定义配置文件 3、然后是服务器的默认配置             */            conf.set("dfs.replication", "3");            // 获取一个hdfs的访问客户端,根据参数,这个实例应该是DistributedFileSystem的实例            // fs = FileSystem.get(conf);            // 如果这样去获取,那conf里面就可以不要配"fs.defaultFS"参数,而且,这个客户端的身份标识已经是hadoop用户            fs = FileSystem.get(new URI("hdfs://hdp-node01:9000"), conf, "hadoop");      }      /**       * 往hdfs上传文件       *       * @throws Exception       */      @Test      public void testAddFileToHdfs() throws Exception {            // 要上传的文件所在的本地路径            Path src = new Path("g:/redis-recommend.zip");            // 要上传到hdfs的目标路径            Path dst = new Path("/aaa");            fs.copyFromLocalFile(src, dst);            fs.close();      }      /**       * 从hdfs中复制文件到本地文件系统       *       * @throws IOException       * @throws IllegalArgumentException       */      @Test      public void testDownloadFileToLocal() throws IllegalArgumentException, IOException {            fs.copyToLocalFile(new Path("/jdk-7u65-linux-i586.tar.gz"), new Path("d:/"));            fs.close();      }      @Test      public void testMkdirAndDeleteAndRename() throws IllegalArgumentException, IOException {            // 创建目录            fs.mkdirs(new Path("/a1/b1/c1"));            // 删除文件夹 ,如果是非空文件夹,参数2必须给值true            fs.delete(new Path("/aaa"), true);            // 重命名文件或文件夹            fs.rename(new Path("/a1"), new Path("/a2"));      }      /**       * 查看目录信息,只显示文件       *       * @throws IOException       * @throws IllegalArgumentException       * @throws FileNotFoundException       */      @Test      public void testListFiles() throws FileNotFoundException, IllegalArgumentException, IOException {            // 思考:为什么返回迭代器,而不是List之类的容器            RemoteIterator listFiles = fs.listFiles(new Path("/"), true);            while (listFiles.hasNext()) {                 LocatedFileStatus fileStatus = listFiles.next();                 System.out.println(fileStatus.getPath().getName());                 System.out.println(fileStatus.getBlockSize());                 System.out.println(fileStatus.getPermission());                 System.out.println(fileStatus.getLen());                 BlockLocation[] blockLocations = fileStatus.getBlockLocations();                 for (BlockLocation bl : blockLocations) {                      System.out.println("block-length:" + bl.getLength() + "--" + "block-offset:" + bl.getOffset());                      String[] hosts = bl.getHosts();                      for (String host : hosts) {                            System.out.println(host);                      }                 }                 System.out.println("--------------为angelababy打印的分割线--------------");            }      }      /**       * 查看文件及文件夹信息       *       * @throws IOException       * @throws IllegalArgumentException       * @throws FileNotFoundException       */      @Test      public void testListAll() throws FileNotFoundException, IllegalArgumentException, IOException {            FileStatus[] listStatus = fs.listStatus(new Path("/"));            String flag = "d--         ";            for (FileStatus fstatus : listStatus) {                 if (fstatus.isFile())  flag = "f--     ";                 System.out.println(flag + fstatus.getPath().getName());            }      } } 7.4.2 通过流的方式访问hdfs  /**  * 相对那些封装好的方法而言的更底层一些的操作方式  * 上层那些mapreduce   spark等运算框架,去hdfs中获取数据的时候,就是调的这种底层的api  * @author  *  */ public class StreamAccess {         FileSystem fs = null;     @Before     public void init() throws Exception {               Configuration conf = new Configuration();               fs = FileSystem.get(new URI("hdfs://hdp-node01:9000"), conf, "hadoop");     }     @Test     public void testDownLoadFileToLocal() throws IllegalArgumentException, IOException{                              //先获取一个文件的输入流----针对hdfs上的               FSDataInputStream in = fs.open(new Path("/jdk-7u65-linux-i586.tar.gz"));               //再构造一个文件的输出流----针对本地的               FileOutputStream out = new FileOutputStream(new File("c:/jdk.tar.gz"));               //再将输入流中数据传输到输出流               IOUtils.copyBytes(in, out, 4096);                   }     /**      * hdfs支持随机定位进行文件读取,而且可以方便地读取指定长度      * 用于上层分布式运算框架并发处理数据      * @throws IllegalArgumentException      * @throws IOException      */     @Test     public void testRandomAccess() throws IllegalArgumentException, IOException{               //先获取一个文件的输入流----针对hdfs上的               FSDataInputStream in = fs.open(new Path("/iloveyou.txt"));                                //可以将流的起始偏移量进行自定义               in.seek(22);                                 //再构造一个文件的输出流----针对本地的               FileOutputStream out = new FileOutputStream(new File("c:/iloveyou.line.2.txt"));               IOUtils.copyBytes(in,out,19L,true);                       }                     /**      * 显示hdfs上文件的内容      * @throws IOException      * @throws IllegalArgumentException      */     @Test     public void testCat() throws IllegalArgumentException, IOException{                                 FSDataInputStream in = fs.open(new Path("/iloveyou.txt"));               IOUtils.copyBytes(in, System.out, 1024);     } } 7.4.3 场景编程

在mapreduce 、spark等运算框架中,有一个核心思想就是将运算移往数据,或者说,就是要在并发计算中尽可能让运算本地化,这就需要获取数据所在位置的信息并进行相应范围读取

以下模拟实现:获取一个文件的所有block位置信息,然后读取指定block中的内容

@Test  public void testCat() throws IllegalArgumentException, IOException{    FSDataInputStream in = fs.open(new Path("/weblog/input/access.log.10"));    //拿到文件信息    FileStatus[] listStatus = fs.listStatus(new Path("/weblog/input/access.log.10"));    //获取这个文件的所有block的信息    BlockLocation[] fileBlockLocations = fs.getFileBlockLocations(listStatus[0], 0L, listStatus[0].getLen());    //第一个block的长度    long length = fileBlockLocations[0].getLength();    //第一个block的起始偏移量    long offset = fileBlockLocations[0].getOffset();    System.out.println(length);    System.out.println(offset);    //获取第一个block写入输出流 //       IOUtils.copyBytes(in, System.out, (int)length);    byte[] b = new byte[4096];    FileOutputStream os = new FileOutputStream(new File("d:/block0"));    while(in.read(offset, b, 0, 4096)!=-1){     os.write(b);     offset += 4096;     if(offset>=length) return;    };    os.flush();    os.close();    in.close();  } 八、案例1:开发shell采集脚本 8.1需求说明

点击流日志每天都10T,在业务应用服务器上,需要准实时上传至数据仓库(Hadoop HDFS)上

8.2需求分析

一般上传文件都是在凌晨24点操作,由于很多种类的业务数据都要在晚上进行传输,为了减轻服务器的压力,避开高峰期。

如果需要伪实时的上传,则采用定时上传的方式

8.3技术分析

          HDFS SHELL:  hadoop fs  –put   xxxx.tar  /data    还可以使用 Java Api

                          满足上传一个文件,不能满足定时、周期性传入。

          定时调度器:

                   Linux crontab

                   crontab -e

*/5 * * * * $home/bin/command.sh   //五分钟执行一次

系统会自动执行脚本,每5分钟一次,执行时判断文件是否符合上传规则,符合则上传

8.4实现流程 8.4.1日志产生程序

日志产生程序将日志生成后,产生一个一个的文件,使用滚动模式创建文件名。

日志生成的逻辑由业务系统决定,比如在log4j配置文件中配置生成规则,如:当xxxx.log 等于10G时,滚动生成新日志

log4j.logger.msg=info,msg log4j.appender.msg=cn.maoxiangyi.MyRollingFileAppender log4j.appender.msg.layout=org.apache.log4j.PatternLayout log4j.appender.msg.layout.ConversionPattern=%m%n log4j.appender.msg.datePattern='.'yyyy-MM-dd log4j.appender.msg.Threshold=info log4j.appender.msg.append=true log4j.appender.msg.encoding=UTF-8 log4j.appender.msg.MaxBackupIndex=100 log4j.appender.msg.MaxFileSize=10GB log4j.appender.msg.File=/home/hadoop/logs/log/access.log

如果日志文件后缀是1\2\3等数字,该文件满足需求可以上传的话。把该文件移动到准备上传的工作区间。
工作区间有文件之后,可以使用hadoop put命令将文件上传。
阶段问题:

待上传文件的工作区间的文件,在上传完成之后,是否需要删除掉。
8.4.2伪代码

         使用ls命令读取指定路径下的所有文件信息,

         ls  | while read  line

          //判断line这个文件名称是否符合规则

if       line=access.log.* (

            将文件移动到待上传的工作区间

  )

//批量上传工作区间的文件

hadoop fs  –put   xxx

脚本写完之后,配置linux定时任务,每5分钟运行一次。

8.5代码实现

代码第一版本,实现基本的上传功能和定时调度功能

代码第二版本:增强版V2(基本能用,还是不够健全)

8.6效果展示及操作步骤

1、日志收集文件收集数据,并将数据保存起来,效果如下:

2、上传程序通过crontab定时调度

3、程序运行时产生的临时文件

4、Hadoo hdfs上的效果


作者:人体健康与床位研究



程序 hdfs

需要 登录 后方可回复, 如果你还没有账号请 注册新账号
相关文章