sc.textFile("file:///root/data/word").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._1,true,1).saveAsSequenceFile("hdfs:///demo/results03")
因为saveAsxxx都是将计算结果写入到HDFS或者是本地文件系统中,因此如果需要将计算结果写出到第三方数据库,此时就需要借助于给我们提供的一个foreach
算子写出。
foreach写出
场景一:频繁的打开和关闭连接池,写入效率很低。(可以运行成功)
sc.textFile("file:///root/data/word")
.flatMap(_.split(" "))
.map((_,1)).reduceByKey(_+_)
.sortBy(_._1,true,3)
.foreach(tuple=>{ //数据库
//1.创建连接
//2.开始写入
//3.关闭连接
})
场景二:错误写法,因为链接池不可能被序列化。(运行失败)
//1.定义连接
sc.textFile("file:///root/data/word")
.flatMap(_.split(" "))
.map((_,1)).reduceByKey(_+_)
.sortBy(_._1,true,3)
.foreach(tuple=>{ //数据库
//2.开始写入
})
//3.关闭连接
场景三:一个分区一个链接池foreachPartition (还不错,但是不是最优),有可能一个JVM运行多个分区,也就意味着一个JVM中创建了多个链接,造成资源的浪费。
sc.textFile("file:///root/data/word")
.flatMap(_.split(" "))
.map((_,1)).reduceByKey(_+_)
.sortBy(_._1,true,3)
.foreachPartition(values=>{
//创建链接
//写入分区数据
//关闭链接
})
将创建链接的代码使用单例对象创建,这样如果一个计算节点拿到多个分区,通过JVM单例定义可以知道,在整个JVM中仅仅只会创建一次。
import org.apache.spark.{SparkConf, SparkContext}
object TestSparkDataSink {
def main(args: Array[String]): Unit = {
//创建sc
val conf = new SparkConf()
.setMaster("local[*]")
.setAppName("Testopr")
val sc = new SparkContext(conf)
sc.textFile("hdfs://train:9000/demo/word")
.flatMap(_.split(" "))
.map((_,1)).reduceByKey(_+_)
.sortBy(_._1,true,3)
.foreachPartition(values=>{
HbaseSink.writeToHbase("baizhi:t_word",values.toList)
})
sc.stop()
}
}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.{HConstants, TableName}
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put}
import scala.collection.JavaConverters._
object HbaseSink {
//定义链接参数
var conn:Connection=createConn()
def createConn():Connection = {
val hadoopConf = new Configuration()
hadoopConf.set(HConstants.ZOOKEEPER_QUORUM,"train")
return ConnectionFactory.createConnection(hadoopConf)
}
def writeToHbase(tableName:String,values:List[(String,Int)]): Unit ={
var tName:TableName = TableName.valueOf(tableName)
val mutator = conn.getBufferedMutator(tName)
var scalaList = values.map(t=>{
val put = new Put(t._1.getBytes())
put.addColumn("cf1".getBytes(),"count".getBytes(),(t._2+" ").getBytes())
put
})
//批量写出
mutator.mutate(scalaList.asJava)
mutator.flush()
mutator.close()
}
//监控JVM退出,如果JVM退出系统回调该方法
Runtime.getRuntime.addShutdownHook(new Thread(new Runnable {
override def run(): Unit = {
println("-----close-----")
conn.close()
}
}))
}