Spark数据写出

Gelsey ·
更新时间:2024-09-20
· 803 次阅读

Spark数据写出 将数据写出到HDFS sc.textFile("file:///root/data/word").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._1,true,1).saveAsSequenceFile("hdfs:///demo/results03")

因为saveAsxxx都是将计算结果写入到HDFS或者是本地文件系统中,因此如果需要将计算结果写出到第三方数据库,此时就需要借助于给我们提供的一个foreach算子写出。

foreach写出
场景一:频繁的打开和关闭连接池,写入效率很低。(可以运行成功)

sc.textFile("file:///root/data/word") .flatMap(_.split(" ")) .map((_,1)).reduceByKey(_+_) .sortBy(_._1,true,3) .foreach(tuple=>{ //数据库 //1.创建连接 //2.开始写入 //3.关闭连接 })

场景二:错误写法,因为链接池不可能被序列化。(运行失败)

//1.定义连接 sc.textFile("file:///root/data/word") .flatMap(_.split(" ")) .map((_,1)).reduceByKey(_+_) .sortBy(_._1,true,3) .foreach(tuple=>{ //数据库 //2.开始写入 }) //3.关闭连接

场景三:一个分区一个链接池foreachPartition (还不错,但是不是最优),有可能一个JVM运行多个分区,也就意味着一个JVM中创建了多个链接,造成资源的浪费。

sc.textFile("file:///root/data/word") .flatMap(_.split(" ")) .map((_,1)).reduceByKey(_+_) .sortBy(_._1,true,3) .foreachPartition(values=>{ //创建链接 //写入分区数据 //关闭链接 })

将创建链接的代码使用单例对象创建,这样如果一个计算节点拿到多个分区,通过JVM单例定义可以知道,在整个JVM中仅仅只会创建一次。

import org.apache.spark.{SparkConf, SparkContext} object TestSparkDataSink { def main(args: Array[String]): Unit = { //创建sc val conf = new SparkConf() .setMaster("local[*]") .setAppName("Testopr") val sc = new SparkContext(conf) sc.textFile("hdfs://train:9000/demo/word") .flatMap(_.split(" ")) .map((_,1)).reduceByKey(_+_) .sortBy(_._1,true,3) .foreachPartition(values=>{ HbaseSink.writeToHbase("baizhi:t_word",values.toList) }) sc.stop() } } import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.{HConstants, TableName} import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put} import scala.collection.JavaConverters._ object HbaseSink { //定义链接参数 var conn:Connection=createConn() def createConn():Connection = { val hadoopConf = new Configuration() hadoopConf.set(HConstants.ZOOKEEPER_QUORUM,"train") return ConnectionFactory.createConnection(hadoopConf) } def writeToHbase(tableName:String,values:List[(String,Int)]): Unit ={ var tName:TableName = TableName.valueOf(tableName) val mutator = conn.getBufferedMutator(tName) var scalaList = values.map(t=>{ val put = new Put(t._1.getBytes()) put.addColumn("cf1".getBytes(),"count".getBytes(),(t._2+" ").getBytes()) put }) //批量写出 mutator.mutate(scalaList.asJava) mutator.flush() mutator.close() } //监控JVM退出,如果JVM退出系统回调该方法 Runtime.getRuntime.addShutdownHook(new Thread(new Runnable { override def run(): Unit = { println("-----close-----") conn.close() } })) }
作者:丿沐染烟忱丶



spark

需要 登录 后方可回复, 如果你还没有账号请 注册新账号