val conf = new SparkConf().setMaster("local").setAppName("PairRDD")
val sc = new SparkContext(conf)
val lines = sc.parallelize(List((1, 2), (3, 4), (3, 6)))
val lines_1 = sc.parallelize(List((3, 9)))
1、删掉RDD中与other RDD 中的键相同得元素
(1, 2), (3, 4), (3, 6)
private val subtractByKey: RDD[(Int, Int)] = lines.subtractByKey(lines_1)
println("1、删掉RDD中与other RDD 中的键相同得元素")
subtractByKey.foreach(println)
1、删掉RDD中与other RDD 中的键相同得元素
(1,2)
2、对两个RDD进行内连接
(1, 2), (3, 4), (3, 6)
private val join: RDD[(Int, (Int, Int))] = lines.join(lines_1)
println("2、对两个RDD进行内连接")
join.foreach(println)
2、对两个RDD进行内连接
(3,(4,9))
(3,(6,9))
3、对两个RDD进行连接操作,确保第一个RDD的必须存在(右外连接)
(1, 2), (3, 4), (3, 6)
private val rightOuterJoin: RDD[(Int, (Option[Int], Int))] = lines.rightOuterJoin(lines_1)
println("3、对两个RDD进行连接操作,确保第一个RDD的必须存在(右外连接)")
rightOuterJoin.foreach(println)
3、对两个RDD进行连接操作,确保第一个RDD的必须存在(右外连接)
(3,(Some(4),9))
(3,(Some(6),9))
4、对两个RDD进行连接操作,确保第二个RDD的必须存在(左外连接)
(1, 2), (3, 4), (3, 6)
private val leftOuterJoin: RDD[(Int, (Int, Option[Int]))] = lines.leftOuterJoin(lines_1)
println("4、对两个RDD进行连接操作,确保第二个RDD的必须存在(左外连接)")
leftOuterJoin.foreach(println)
4、对两个RDD进行连接操作,确保第二个RDD的必须存在(左外连接)
(1,(2,None))
(3,(4,Some(9)))
(3,(6,Some(9)))
5、将两个RDD中拥有相同键的数据分组到一起
(1, 2), (3, 4), (3, 6)
private val cogroup: RDD[(Int, (Iterable[Int], Iterable[Int]))] = lines.cogroup(lines_1)
println("5、将两个RDD中拥有相同键的数据分组到一起")
cogroup.foreach(println)
5、将两个RDD中拥有相同键的数据分组到一起
(1,(CompactBuffer(2),CompactBuffer()))
(3,(CompactBuffer(4, 6),CompactBuffer(9)))