本文共 7305 字,大约阅读时间需要 24 分钟。
RDD是啥
Resilient Distributed Dataset (RDD),弹性分布式数据集,是对不可修改,分区的数据集合的抽象。
RDD is characterized by five main properties:
A list of partitions
A function for computing each split A list of dependencies on other RDDs Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)org.spark.rdd.RDD类方法
RDD是一个抽象类,定义如下
abstract class RDD[T] extends Serializable with Logging
RDD类的public方法大约有80多个(包括不同参数重载的),均在下面列出。
值得注意的是,RDD类中并没有定义xxxByKey形式的方法,这类方法其实是在PairRDDFunctions中定义的,通过隐式转换,键值对形式的RDD(即RDD[(K, V))可以调用PairRDDFunctions中定义的方法。
键值转换操作filter(f: (T) ⇒ Boolean): RDD[T]
过滤数据,仅留下使得f返回true的元素。 map[U](f: (T) ⇒ U)(implicit arg0: ClassTag[U]): RDD[U] 将一个RDD中的每个数据项,通过map中的函数映射变为一个新的元素。 输入分区与输出分区一对一,即:有多少个输入分区,就有多少个输出分区。 flatMap[U](f: (T) ⇒ TraversableOnce[U])(implicit arg0: ClassTag[U]): RDD[U] 第一步和map一样,最后将所有的输出分区合并成一个。 使用flatMap时候需要注意: flatMap会将字符串看成是一个字符数组。 mapPartitions[U](f: (Iterator[T]) ⇒ Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U] 该函数和map函数类似,只不过映射函数的参数由RDD中的每一个元素变成了RDD中每一个分区的迭代器。如果在映射的过程中需要频繁创建额外的对象,使用mapPartitions要比map高效的过。 比如,将RDD中的所有数据通过JDBC连接写入数据库,如果使用map函数,可能要为每一个元素都创建一个connection,这样开销很大,如果使用mapPartitions,那么只需要针对每一个分区建立一个connection。 参数preservesPartitioning表示是否保留父RDD的partitioner分区信息。 mapPartitionsWithIndex[U](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U] 函数作用同mapPartitions,不过提供了两个参数,第一个参数为分区的索引。 keyBy[K](f: (T) ⇒ K): RDD[(K, T)] 通过f函数为每个元素生成一个KEY sortBy[K](f: (T) ⇒ K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] 通过给定的函数对元素排序 zip[U](other: RDD[U])(implicit arg0: ClassTag[U]): RDD[(T, U)] 与另一个RDD组合成(k,v)对。 zipPartitions[B, V](rdd2: RDD[B], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B]) ⇒ Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[V]): RDD[V] zipWithIndex(): RDD[(T, Long)] zipWithUniqueId(): RDD[(T, Long)]聚合相关
aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): U
aggregate用户聚合RDD中的元素,先使用seqOp将RDD中每个分区中的T类型元素聚合成U类型,再使用combOp将之前每个分区聚合后的U类型聚合成U类型,特别注意seqOp和combOp都会使用zeroValue的值,zeroValue的类型为U。 treeAggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U, depth: Int = 2)(implicit arg0: ClassTag[U]): U 多层级聚合 reduce(f: (T, T) ⇒ T): T 根据映射函数f,对RDD中的元素进行二元计算,返回计算结果。 treeReduce(f: (T, T) ⇒ T, depth: Int = 2): T 多级reduce归并聚合 fold(zeroValue: T)(op: (T, T) ⇒ T): T fold是aggregate的简化,将aggregate中的seqOp和combOp使用同一个函数op。 count(): Long count返回RDD中的元素数量。 countApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] 近似count countApproxDistinct(relativeSD: Double = 0.05): Long countApproxDistinct(p: Int, sp: Int): Long 近似distinct count countByValue()(implicit ord: Ordering[T] = null): Map[T, Long] 计算每个值出现次数 countByValueApprox(timeout: Long, confidence: Double = 0.95)(implicit ord: Ordering[T] = null): 计算每个值出现次数近似值 distinct(): RDD[T] distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] 返回元素去重后的RDD groupBy[K](f: (T) ⇒ K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] groupBy[K](f: (T) ⇒ K, numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] groupBy[K](f: (T) ⇒ K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null): RDD[(K, Iterable[T])] 按指定函数生成key,并按key分组。 注意:性能比较差,推荐用PairRDDFunctions.reduceByKey or PairRDDFunctions.aggregateByKey. 因为reduceByKey会先在分区内做聚合,再进行数据交换(shuffle)。 glom(): RDD[Array[T]] 该函数是将RDD中每一个分区中类型为T的元素转换成Array[T],这样每一个分区就只有一个数组元素。 max()(implicit ord: Ordering[T]): T 最大的元素 min()(implicit ord: Ordering[T]): T 最小的元素遍历元素
foreach(f: (T) ⇒ Unit): Unit
foreach用于遍历RDD,将函数f应用于每一个元素。 但要注意,如果对RDD执行foreach,只会在Executor端有效,而并不是Driver端。 比如:rdd.foreach(println),只会在Executor的stdout中打印出来,Driver端是看不到的。 foreachPartition(f: (Iterator[T]) ⇒ Unit): Unit foreachPartition和foreach类似,只不过是对每一个分区使用f。取元素相关
collect(): Array[T]
collect用于将一个RDD转换成数组。 first(): T first返回RDD中的第一个元素,不排序。 take(num: Int): Array[T] take用于获取RDD中从0到num-1下标的元素,不排序。 top(num: Int)(implicit ord: Ordering[T]): Array[T] top函数用于从RDD中,按照默认(降序)或者指定的排序规则,返回前num个元素。 takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] takeOrdered和top类似,只不过以和top相反的顺序返回元素 takeSample(withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T] 取样本元素集合间运算
++(other: RDD[T]): RDD[T]
与另一个RDD union。 intersection(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] intersection(other: RDD[T], numPartitions: Int): RDD[T] intersection(other: RDD[T]): RDD[T] 取交集 subtract(other: RDD[T], p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] subtract(other: RDD[T], numPartitions: Int): RDD[T] subtract(other: RDD[T]): RDD[T] 求差集 union(other: RDD[T]): RDD[T] 与另一个RDD合并,类似union all,不会去重。其他
persist(): RDD.this.type
persist(newLevel: StorageLevel): RDD.this.type 缓存数据,可设置缓存级别(如果尚未设置过,才可以设置,本地checkpoint除外) unpersist(blocking: Boolean = true): RDD.this.type Mark the RDD as non-persistent, and remove all blocks for it from memory and disk. cache(): RDD.this.type MEMORY_ONLY级别缓存数据 cartesian[U](other: RDD[U])(implicit arg0: ClassTag[U]): RDD[(T, U)]: 计算两个RDD的迪卡尔积 checkpoint(): Unit 标记将该RDD进行checkpoint处理? coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null): RDD[T] 分区合并(只能减少分区),使用HashPartitioner。 第一个参数为重分区的数目,第二个为是否进行shuffle,默认为false; repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] 调整分区数,会导致shuffle,如果是减少分区,可以使用coalesce,避免shuffle。 toDebugString: String 返回RDD依赖树/血统图 getCheckpointFile: Option[String] 获取checkpoint文件夹名称 localCheckpoint(): RDD.this.type 标记为使用本地checkpoint isEmpty(): Boolean 是否含0个元素 iterator(split: Partition, context: TaskContext): Iterator[T] 返回迭代器,不应直接调用,而是给RDD的子类用的。 toLocalIterator: Iterator[T] 返回元素的本地迭代器 pipe(command: String): RDD[String] pipe(command: String, env: Map[String, String]): RDD[String] pipe(command: Seq[String], env: Map[String, String] = Map(), printPipeContext: ((String) ⇒ Unit) ⇒ Unit = null, printRDDElement: (T, (String) ⇒ Unit) ⇒ Unit = null, separateWorkingDir: Boolean = false, bufferSize: Int = 8192, encoding: String = Codec.defaultCharsetCodec.name): RDD[String] 调用外部进程处理RDD,如通过标准输入传给shell脚本。 preferredLocations(split: Partition): Seq[String] Get the preferred locations of a partition, taking into account whether the RDD is checkpointed. randomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong): Array[RDD[T]] 按权随机将元素分组 sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T] 取样本/子集 setName(_name: String): RDD.this.type 设置RDD名字保存
saveAsObjectFile(path: String): Unit
保存为SequenceFile saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit saveAsTextFile(path: String): Unit 保存为文本文件变量
context: SparkContext
创建RDD的SparkContext sparkContext: SparkContext 创建RDD的SparkContext dependencies: Seq[Dependency[_]] RDD的依赖列表 getNumPartitions: Int 获取RDD的分区数 getStorageLevel: StorageLevel 获取存储等级,如果设置为none,则返回StorageLevel.NONE 。 id: Int 该RDD的unique ID isCheckpointed: Boolean 是否checkpointed and materialized, either reliably or locally. name: String RDD的名字 partitioner: Option[Partitioner] 分区器 partitions: Array[Partition] 各个分区转载地址:http://dykni.baihongyu.com/