RDD操作

Oriel ·
更新时间:2024-09-21
· 727 次阅读

概述:针对RDD的操作分为两种,一种是Transform变换操作,一种是Action执行操作。
Transform操作是懒操作(简称:算子),懒操作不会真正的触发RDD的处理计算;
Action操作会触发计算。
一、Transform操作
1.map(fun): 参数是函数,函数会作用于RDD的每一个元素,并会返回一个RDD

val rdd = sc.makeRDD(List(1,3,5,7,9)) rdd.map(_*10)

结果是 10,30,50,70,90
2.flatmap(fun) : 扁平化map,对RDD每个元素转换, 然后再扁平化处理

val rdd = sc.makeRDD(List("hello world","hello count","world spark"),2) rdd.map(_.split{" "})//Array(Array(hello, world), Array(hello, count), Array(world, spark)) rdd.flatMap(_.split{" "})//Array[String] = Array(hello, world, hello, count, world, spark) //Array[String] = Array(hello, world, hello, count, world, spark)

3.filter(fun) : 过滤器操作

val rdd = sc.makeRDD(List(1,3,5,7,9)); rdd.filter(_<5);// 结果:1,3

4.mapPartitions(fun) : 该函数和map函数类似,只不过映射函数的参数由RDD中的每一个元素变成了RDD中每一个分区的迭代器。

var rdd1 = sc.makeRDD(1 to 5,2) val rdd3 = rdd1.mapPartitions{ x => { val result = List[Int]() var i = 0 while(x.hasNext){ i += x.next() //累加 } result.::(i).iterator }} // 结果: 3 , 12

5.union(otherDataset) : 并集

val rdd1 = sc.makeRDD(List(1,3,5)); val rdd2 = sc.makeRDD(List(2,4,6,8)); val rdd = rdd1.union(rdd2); val rdd = rdd1 ++ rdd2;

6.intersection(otherDataset) : 交集

val rdd1 = sc.makeRDD(List(1,3,5,7)); val rdd2 = sc.makeRDD(List(5,7,9,11)); val rdd = rdd1.intersection(rdd2);

7.subtract:差集

val rdd1 = sc.makeRDD(List(1,3,5,7,9)); val rdd2 = sc.makeRDD(List(5,7,9,11,13)); val rdd = rdd1.subtract(rdd2);

8.distinct([numTasks])):去重

val rdd = sc.makeRDD(List(1,3,5,7,9,3,7,10,23,7)); rdd.distinct

9.groupByKey([numTasks]):分组

val rdd = sc.parallelize(List(("cat",2), ("dog",5),("cat",4),("dog",3),("cat",6),("dog",3),("cat",9),("dog",1)),2); rdd.groupByKey()

10.reduceByKey(func, [numTasks]):reduce操作,根据相同key值对value操作

var rdd = sc.makeRDD( List( ("hello",1),("spark",1),("hello",1),("world",1) ) ) rdd.reduceByKey(_+_);

11.sortByKey([ascending], [numTasks]):排序

val d2 = sc.parallelize(Array(("cc",32),("bb",32),("cc",22),("aa",18),("bb",6),("dd",16),("ee",104),("cc",1),("ff",13),("gg",68),("bb",44)))

在这里插入图片描述
二、Action操作
1.reduce(func) : 并行整合所有RDD数据,例如求和操作;
2.collect():返回RDD所有元素,将rdd分布式存储在集群中不同分区的数据 获取到一起组成一个数组返回。注意:这个方法将会把所有数据收集到一个机器内,容易造成内存的溢出 在生产环境下千万慎用;
3.count():统计RDD里元素个数;
4.first():返回RDD的第一个元素;
5.take(n):返回RDD的前n个元素;
6.takeOrdered(n, [ordering]) : 先将RDD中的元素升序排序,在取前n个;
7.Top(n):先降序排,再取前n个;
8.saveAsTextFile(path):saveAsTextFile 按照文本方式保存分区数据

val rdd = sc.makeRDD(List(1,2,3,4,5),2); rdd.saveAsTextFile("/root/work/aaa")

可能不太全,请大家见谅!


作者:憨大牛



rdd

需要 登录 后方可回复, 如果你还没有账号请 注册新账号