/**
* 处理真实数据
* 1.创建spark环境
* 2.导入数据
* 3.处理数据:我们只用到一个applog的日志数据,web和wx_App的数据也是一样的数据处理!
* a.解析json
* b.生成tuple
* 4.保存结果
* 5.关流
*/
object idmapping_tabay {
def main(args: Array[String]): Unit = {
//1.1.创建spark环境
val spark: SparkSession = SparkSession.builder()
.appName(this.getClass.getSimpleName)
.master("local[*]")
.getOrCreate()
//将rdd变成df
import spark.implicits._
//2.导入数据
val applog: Dataset[String] = spark.read.textFile("F:\\yiee_logs\\2020-01-11\\app\\doit.mall.access.log.8")
//3.处理数据
//构造一个点集合
val data: RDD[Array[String]] = applog.rdd.map(line => {
//将每行数据解析成json对象
val jsonObj = JSON.parseObject(line)
// 从json对象中取user对象
val userObj = jsonObj.getJSONObject("user")
val uid = userObj.getString("uid")
// 从user对象中取phone对象
val phoneObj = userObj.getJSONObject("phone")
val imei = phoneObj.getString("imei")
val mac = phoneObj.getString("mac")
val imsi = phoneObj.getString("imsi")
val androidId = phoneObj.getString("androidId")
val deviceId = phoneObj.getString("deviceId")
val uuid = phoneObj.getString("uuid")
Array(uid, imei, mac, imsi, androidId, deviceId, uuid).filter(StringUtils.isNotBlank(_))
})
//构造一个点集合
// 将每一个元素和他的hashcode组合成对偶元组
val vertices: RDD[(Long, String)] = data.flatMap(arr => {
for (biaoshi {
//用双重for循环的方法让数组中所有的两两组合成边
for (i <- 0 to arr.length - 2; j (edge, 1)).reduceByKey(_ + _)
//过滤将重复次数 tp._2 > 2)
.map(x => x._1)
edges
//用 点集合 和 边集合 构造一张图 使用Graph算法
val graph = Graph(vertices,edges)
//并调用最大连同子图算法VertexRDD[VertexId] ==>rdd 里面装的元组(Long值,组中最小值)
val res: VertexRDD[VertexId] = graph.connectedComponents().vertices
//最终我们要输出的文件是parquet的文件
res.toDF("biaoshi_hashcode","guid").write.parquet("data/dict/idmapping_dict/2020-02-15")
spark.stop()
}
}