Spark RDD Operations

可以在hdfs中创建一个数据文件,当文件存在时运行一次,然后将文件移除,再进行读取,如果报错,说明是重复加载。 //第一次加载文件 scala> var rdd=sc.textFile("hdfs:///demo/word").collect rdd: Array[String] = Array("this is demo ", haha haha, hello hello yes, good good study, day day up) //移除文件 [root@train ~]# hdfs dfs -rm -r -f /demo/word Deleted /demo/word //再次执行 scala> var rdd=sc.textFile("hdfs:///demo/word").collect org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs:/demo/word at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus( at org.apache.hadoop.mapred.FileInputFormat.listStatus( at org.apache.hadoop.mapred.FileInputFormat.getSplits( at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:269) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:269) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126) at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:990) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:385) at org.apache.spark.rdd.RDD.collect(RDD.scala:989) ... 49 elided


基于缓存 scala> var rdd=sc.textFile("hdfs:///demo/word") rdd: org.apache.spark.rdd.RDD[String] = hdfs:///demo/word MapPartitionsRDD[5] at textFile at :24 //将转换状态存入缓存中 scala> rdd.cache res0: org.apache.spark.rdd.RDD[String] = hdfs:///demo/word MapPartitionsRDD[5] at textFile at :24 //第一次执行 scala> rdd.collect res1: Array[String] = Array("this is demo ", haha haha, hello hello yes, good good study, day day up) //删除文件 [root@train ~]# hdfs dfs -rm -r -f /demo/word Deleted /demo/word //再次执行 不报错 scala> rdd.collect res2: Array[String] = Array("this is demo ", haha haha, hello hello yes, good good study, day day up)



Transformations map(func)

Return a new distributed dataset formed by passing each element of the source through a function func.


scala> var rdd=sc.makeRDD(List("a","b","c","a")) rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[8] at makeRDD at :24 scala>>(w,1)) res3: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[9] at map at :26 filter(func)

Return a new dataset formed by selecting those elements of the source on which func returns true.


scala>>(w,1)) res3: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[9] at map at :26 scala> var rdd=sc.makeRDD(List(1,2,3,4,5)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at makeRDD at :24 scala> rdd.filter(num=> num % 2 == 0 ) res4: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[11] at filter at :26 scala> rdd.filter(num=> num % 2 == 0 ).collect res5: Array[Int] = Array(2, 4) flatMap(func)

Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).

map类似,也是将一个RDD(U)转换为RDD(T)类型。但是需要用户提供一个方法func:U => Seq[T]

scala> var rdd=sc.makeRDD(List("this is","good good")) rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[13] at makeRDD at :24 scala> rdd.flatMap(line=>for(i rdd.flatMap(line=> line.split(" ").map((_,1))).collect res8: Array[(String, Int)] = Array((this,1), (is,1), (good,1), (good,1)) mapPartitions(func)

Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator => Iterator when running on an RDD of type T.

和map类似,但是该方法的输入是一个分区的全量数据,因此需要用户提供一个分区的转换方法func:Iterator => Iterator

scala> var rdd=sc.makeRDD(List(1,2,3,4,5)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[17] at makeRDD at :24 scala> rdd.mapPartitions(values =>>(n,n%2==0)),true) res9: org.apache.spark.rdd.RDD[(Int, Boolean)] = MapPartitionsRDD[18] at mapPartitions at :26 scala> rdd.mapPartitions(values =>>(n,n%2==0)),true).collect res10: Array[(Int, Boolean)] = Array((1,false), (2,true), (3,false), (4,true), (5,false)) mapPartitionsWithIndex(func)

Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator) => Iterator when running on an RDD of type T.

与mappartition类似,但是该方法会提供RDD元素所在的分区编号。因此func:(Int, Iterator) => Iterator

scala> rdd.mapPartitionsWithIndex((p,values) =>>(n,p))) res12: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[21] at mapPartitionsWithIndex at :26 scala> rdd.mapPartitionsWithIndex((p,values) =>>(n,p))).collect res13: Array[(Int, Int)] = Array((1,1), (2,2), (3,3), (4,4), (5,5)) scala> var rdd=sc.makeRDD(List(1,2,3,4,5),2) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[23] at makeRDD at :24 scala> rdd.mapPartitionsWithIndex((p,values) =>>(n,p))).collect res14: Array[(Int, Int)] = Array((1,0), (2,0), (3,1), (4,1), (5,1)) sample(采样)(withReplacement, fraction, seed)

Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed.


//10 object TestRandom { def main(args: Array[String]): Unit = { val random = new Random(1L) for(i<- 0 to 10){ println(random.nextDouble()) } } } 0.7308781907032909 0.41008081149220166 0.20771484130971707 0.3327170559595112 0.9677559094241207 0.006117182265761301 0.9637047970232077 0.9398653887819098 0.9471949176631939 0.9370821488959696 0.3971743421847056 //5 object TestRandom { def main(args: Array[String]): Unit = { val random = new Random(1L) for(i<- 0 to 5){ println(random.nextDouble()) } } } 0.7308781907032909 0.41008081149220166 0.20771484130971707 0.3327170559595112 0.9677559094241207 0.006117182265761301 scala> var rdd=sc.makeRDD(List(1,2,3,4,5)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[25] at makeRDD at :24 scala> rdd.sample(true,0.5,1L) res15: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[26] at sample at :26 scala> rdd.sample(true,0.5,1L).collect res16: Array[Int] = Array(2, 2) scala> rdd.sample(true,0.5,1L).collect res17: Array[Int] = Array(2, 2) scala> rdd.sample(false,0.5,1L).collect res18: Array[Int] = Array(4, 5) scala> rdd.sample(false,0.5,1L).collect res19: Array[Int] = Array(4, 5) scala> rdd.sample(false,0.5,2L).collect res20: Array[Int] = Array(1, 2) scala> rdd.sample(false,0.5,2L).collect res21: Array[Int] = Array(1, 2)



Return a new dataset that contains the union of the elements in the source dataset and the argument.


scala> var rdd=sc.makeRDD(List(1,2,3,4,5,6)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at :24 scala> var rdd1=sc.makeRDD(List(6,7)) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at :24 scala> rdd.union(rdd1).collect res0: Array[Int] = Array(1, 2, 3, 4, 5, 6, 6, 7) intersection(otherDataset)

Return a new RDD that contains the intersection of elements in the source dataset and the argument.


scala> rdd.intersection(rdd1).collect res1: Array[Int] = Array(6) distinct([numPartitions]))

Return a new dataset that contains the distinct elements of the source dataset.


scala> var rdd=sc.makeRDD(List(1,2,3,4,5,6,6)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at makeRDD at :24 scala> rdd.distinct.collect res2: Array[Int] = Array(6, 1, 2, 3, 4, 5) scala> rdd.distinct(3).getNumPartitions res3: Int = 3 join(otherDataset, [numPartitions])

When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin, rightOuterJoin, and fullOuterJoin.

当调用RDD[(K,V)]和RDD[(K,W)]系统可以返回一个新的RDD[(K,(V,W))],默认内连接,目前支持leftOuterJoin, rightOuterJoin, 和 fullOuterJoin

scala> var userRDD=sc.makeRDD(List((1,"zhangsan"),(2,"lisi"))) userRDD: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[16] at makeRDD at :24 scala> case class OrderItem(name:String,price:Double,count:Int) defined class OrderItem scala> var orderItemRDD=sc.makeRDD(List((1,OrderItem("apple",4.5,2)))) orderItemRDD: org.apache.spark.rdd.RDD[(Int, OrderItem)] = ParallelCollectionRDD[18] at makeRDD at :26 scala> userRDD.join(orderItemRDD) res4: org.apache.spark.rdd.RDD[(Int, (String, OrderItem))] = MapPartitionsRDD[21] at join at :28 scala> userRDD.join(orderItemRDD).collect res5: Array[(Int, (String, OrderItem))] = Array((1,(zhangsan,OrderItem(apple,4.5,2)))) scala> userRDD.leftOuterJoin(orderItemRDD).collect res6: Array[(Int, (String, Option[OrderItem]))] = Array((1,(zhangsan,Some(OrderItem(apple,4.5,2)))), (2,(lisi,None))) cogroup(otherDataset, [numPartitions])

When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable, Iterable)) tuples. This operation is also called groupWith.

scala> var userRDD=sc.makeRDD(List((1,"zhangsan"),(2,"lisi"))) userRDD: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[16] at makeRDD at :24 scala> var orderItemRDD=sc.makeRDD(List((1,OrderItem("apple",4.5,2),(1,OrderItem("peer",1.5,2))))) orderItemRDD: org.apache.spark.rdd.RDD[(Int, OrderItem, (Int, OrderItem))] = ParallelCollectionRDD[28] at makeRDD at :26 scala> userRDD.cogroup(orderItemRDD).collect res15: Array[(Int, (Iterable[String], Iterable[OrderItem]))] = Array((1,(CompactBuffer(zhangsan),CompactBuffer(OrderItem(apple,4.5,2), OrderItem(peer,1.5,2)))), (2,(CompactBuffer(lisi),CompactBuffer()))) scala> userRDD.groupWith(orderItemRDD).collect res16: Array[(Int, (Iterable[String], Iterable[OrderItem]))] = Array((1,(CompactBuffer(zhangsan),CompactBuffer(OrderItem(apple,4.5,2), OrderItem(peer,1.5,2)))), (2,(CompactBuffer(lisi),CompactBuffer()))) cartesian(otherDataset)

When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).


scala> var rdd1=sc.makeRDD(List(1,2,4)) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[29] at makeRDD at :24 scala> var rdd2=sc.makeRDD(List("a","b","c")) rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[30] at makeRDD at :24 scala> rdd1.cartesian(rdd2).collect res7: Array[(Int, String)] = Array((1,a), (1,b), (1,c), (2,a), (2,b), (2,c), (4,a), (4,b), (4,c)) coalesce(numPartitions)

Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.


scala> var rdd1=sc.makeRDD(0 to 100) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[32] at makeRDD at :24 scala> rdd1.getNumPartitions res8: Int = 6 scala> rdd1.filter(n=> n%2==0).collect res9: Array[Int] = Array(0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48, 50, 52, 54, 56, 58, 60, 62, 64, 66, 68, 70, 72, 74, 76, 78, 80, 82, 84, 86, 88, 90, 92, 94, 96, 98, 100) scala> rdd1.filter(n=> n%2==0).getNumPartitions res10: Int = 6 scala> rdd1.filter(n=> n%2==0).coalesce(3).getNumPartitions res11: Int = 3 repartition(numPartitions)

Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.


scala> rdd1.filter(n=> n%2==0).repartition(12).getNumPartitions res12: Int = 12 scala> rdd1.filter(n=> n%2==0).repartition(3).getNumPartitions res13: Int = 3 repartitionAndSortWithinPartitions(partitioner)

Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys. This is more efficient than calling repartition and then sorting within each partition because it can push the sorting down into the shuffle machinery.


case class User(name:String,deptNo:Int) object Testopr { def main(args: Array[String]): Unit = { //创建sc val conf = new SparkConf() .setMaster("local[*]") .setAppName("Testopr") val sc = new SparkContext(conf) val empRDD = sc.parallelize(List(User("张三",1),User("lisi",2),User("wangwu",1))) val empRDD2 => (t.deptNo, empRDD2.repartitionAndSortWithinPartitions(new Partitioner { override def numPartitions: Int = 4 override def getPartition(key: Any): Int = { key.hashCode() & Integer.MAX_VALUE % numPartitions } }).mapPartitionsWithIndex((p,values)=>{ println(p+"\t"+values.mkString("|")) values }).collect() sc.stop() } }






When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable) pairs.

类似于MapReduce的计算模型,将RDD[(K,V)]类型转换为RDD[(K, Iterable)]。

scala> var lines=sc.parallelize(List("this is good good")) lines: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[54] at parallelize at :24 scala> lines.flatMap(_.split(" ")).map((_,1)) res17: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[56] at map at :26 scala> lines.flatMap(_.split(" ")).map((_,1)).collect res18: Array[(String, Int)] = Array((this,1), (is,1), (good,1), (good,1)) scala> lines.flatMap(_.split(" ")).map((_,1)).groupByKey.collect res19: Array[(String, Iterable[Int])] = Array((this,CompactBuffer(1)), (is,CompactBuffer(1)), (good,CompactBuffer(1, 1))) scala> lines.flatMap(_.split(" ")).map((_,1))> (t._1,t._2.sum)).collect res20: Array[(String, Int)] = Array((this,1), (is,1), (good,2)) groupBy(f:(k,v)=>T) scala> lines.flatMap(_.split(" ")).map((_,1)).groupBy(t=>t._1).collect res21: Array[(String, Iterable[(String, Int)])] = Array((this,CompactBuffer((this,1))), (is,CompactBuffer((is,1))), (good,CompactBuffer((good,1), (good,1)))) scala> lines.flatMap(_.split(" ")).map((_,1)).groupBy(t=>t._1).map(t=>(t._1,t._2.size)).collect res22: Array[(String, Int)] = Array((this,1), (is,1), (good,2)) reduceByKey(func, [numPartitions])

When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.

scala> lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect res23: Array[(String, Int)] = Array((this,1), (is,1), (good,2)) aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions])

When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral “zero” value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.

scala> lines.flatMap(_.split(" ")).map((_,1)).aggregateByKey(0)(_+_,_+_).collect res24: Array[(String, Int)] = Array((this,1), (is,1), (good,2)) sortByKey([ascending], [numPartitions])

When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.

scala> lines.flatMap(_.split(" ")).map((_,1)).aggregateByKey(0)(_+_,_+_).sortByKey(true).collect res26: Array[(String, Int)] = Array((good,2), (is,1), (this,1)) scala> lines.flatMap(_.split(" ")).map((_,1)).aggregateByKey(0)(_+_,_+_).sortByKey(false).collect res27: Array[(String, Int)] = Array((this,1), (is,1), (good,2)) sortBy(T=>U,ascending,numPartions) scala> lines.flatMap(_.split(" ")).map((_,1)).aggregateByKey(0)(_+_,_+_).sortBy(_._2,true).collect res28: Array[(String, Int)] = Array((is,1), (this,1), (good,2)) scala> lines.flatMap(_.split(" ")).map((_,1)).aggregateByKey(0)(_+_,_+_).sortBy(_._2,false).collect res29: Array[(String, Int)] = Array((good,2), (this,1), (is,1))

