package com.day_215
import org.apache.spark.{SparkConf, SparkContext}
object IPLocation {
//*****************************将传进来的数转换成10进制的数
def ip2Long(ip: String): Long = {
val fragments = ip.split("[.]")
var ipNum = 0L
for (i <- 0 until fragments.length){
ipNum = fragments(i).toLong | ipNum << 8L
}
ipNum
}
//*****************************8二分查找
def binarySearch(lines: Array[(String, String, String)], ip: Long) : Int = {
var low = 0
var high = lines.length - 1
while (low = lines(middle)._1.toLong) && (ip <= lines(middle)._2.toLong))
return middle
if (ip {
val fields=lines.split("[|]")
val startNum=fields(2)
val endNum=fields(3)
val privence=fields(6)
(startNum,endNum,privence)
})
val ipRulesArr=ipRDD.collect() //将分散的字典表集中到dirver端
val ipBrocast=sc.broadcast(ipRulesArr) //广播出去
//加载数据
val ip=sc.textFile("F:\\vm\\老師錄屏\\stage4 大数据流式数据分析scala和spark和kafka和sparkstream\\线上教学的资料及录屏\\线上第六天 广播变量完整版\\20090121000132.394251.http.format").map(lines=>{
val fields=lines.split("[|]")
(fields(1))
})
val res=ip.map(ip=>{
val ipNum=ip2Long(ip) //将ip转换成十进制数
val index=binarySearch(ipRulesArr,ipNum) //查找ip在字典表中的index
val info=ipBrocast.value(index)._3 //将index放到广播中查找名称
info
})
// println(res.collect.toBuffer)
//求出次数最多的三个省
val wordcountip = res.map((_,1)).reduceByKey(_+_).sortBy(_._2,false).take(3)
println(wordcountip.toBuffer)
sc.stop()
}
}