Spark Actions

Mercia ·
更新时间:2024-11-10
· 852 次阅读

Actions

Spark任何一个计算任务,有且仅有一个动作算子,用于触发job的执行。将RDD中的数据写出到外围系统或者传递给Driver主程序。

reduce(func)

Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.

该算子能够对远程结果进行计算,然后将计算结果返回给Driver。
计算文件中的字符数

scala> sc.textFile("file:///root/data/word").collect res0: Array[String] = Array("this is demo ", haha haha, hello hello yes, good good study, day day up) scala> sc.textFile("file:///root/data/word").map(_.length).reduce(_+_) res1: Int = 62 collect()

Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.

将远程RDD中数据传输给Driver端。通常用于测试环境或者RDD中数据非常的小的情况才可以使用Collect算子,否则Driver可能因为数据太大会抛出内存溢出。

scala> sc.textFile("file:///root/data/word").collect res3: Array[String] = Array("this is demo ", haha haha, hello hello yes, good good study, day day up) foreach(func)

Run a function func on each element of the dataset. This is usually done for side effects such as updating an Accumulator or interacting with external storage systems.

对数据集的每个元素运行函数func。这通常是为了解决诸如更新累加器或与外部存储系统交互等副作用。

scala> sc.textFile("file:///root/data/word").foreach(line=>println(line)) count()

Return the number of elements in the dataset.

返回数据集中元素的数量。

scala> sc.textFile("file:///root/data/word").count() res5: Long = 5 first()|take(n)

Return the first element of the dataset (similar to take(1)).

返回数据集的第一个元素(类似于take(1))。

Return an array with the first n elements of the dataset.

返回数据集的前n个元素的数组

scala> sc.textFile("file:///root/data/word").first res6: String = "this is demo " scala> sc.textFile("file:///root/data/word").take(1) res7: Array[String] = Array("this is demo ") scala> sc.textFile("file:///root/data/word").take(2) res8: Array[String] = Array("this is demo ", haha haha) takeSample(withReplacement, num, [seed])

Return an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed.

随机的从RDD中采样num个元素,并且将采样的元素返回给Driver主程序。因此这和Sample转换算子有很大区别

scala> sc.textFile("file:///root/data/word").takeSample(false,2) res9: Array[String] = Array(hello hello yes, "this is demo ") scala> sc.textFile("file:///root/data/word").takeSample(false,2) res10: Array[String] = Array(haha haha, hello hello yes) scala> sc.textFile("file:///root/data/word").takeSample(false,2,1) res11: Array[String] = Array(day day up, hello hello yes) scala> sc.textFile("file:///root/data/word").takeSample(false,2,1) res12: Array[String] = Array(day day up, hello hello yes) takeOrdered(n, [ordering])

Return the first n elements of the RDD using either their natural order or a custom comparator.

使用RDD的自然顺序或自定义比较器返回RDD的前n个元素

scala> var userRDD=sc.parallelize(List(User("zhangsan",1,1000.0),User("lisi",2,1500.0),User("wangwu",2,1000.0))) userRDD: org.apache.spark.rdd.RDD[User] = ParallelCollectionRDD[29] at parallelize at :26 //没有User类型的隐式值 scala> userRDD.takeOrdered(3) :26: error: No implicit Ordering defined for User. userRDD.takeOrdered(3) scala> implicit var userOrder=new Ordering[User]{ | override def compare(x:User,y:User):Int={ | if(x.deptNo!=y.deptNo){ | x.deptNo.compareTo(y.deptNo) | }else{ | x.salary.compareTo(y.salary)* -1 | } | } | } userOrder: Ordering[User] = $anon$1@5965eb71 scala> userRDD.takeOrdered(3)(userOrder) res14: Array[User] = Array(User(zhangsan,1,1000.0), User(lisi,2,1500.0), User(wangwu,2,1000.0)) saveAsTextFile(path)

Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.

Spark会调用RDD中元素的toString方法将元素以文本行的形式写入到文件中。

scala> sc.textFile("file:///root/data/word").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._1,true,1).map(t=>t._1+"\t"+t._2).saveAsTextFile("hdfs:///demo/results02") saveAsSequenceFile(path)

Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that implement Hadoop’s Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc).

该方法只能用于RDD[(k,v)]类型,并且k/v都必须实现Writable接口,由于使用Scala编程,Spark已经实现隐式转换将Int, Double, String等类型可以自动的转换为Writable

scala> sc.textFile("file:///root/data/word").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._1,true,1).saveAsSequenceFile("hdfs:///demo/results03") scala> sc.sequenceFile[String,Int]("hdfs:///demo/results03").collect res17: Array[(String, Int)] = Array((day,2), (demo,1), (good,2), (haha,2), (hello,2), (is,1), (study,1), (this,1), (up,1), (yes,1))
作者:丿沐染烟忱丶



spark

需要 登录 后方可回复, 如果你还没有账号请 注册新账号