spark中的aggregate函数

学习到spark的RDD行动操作时,有一个函数可让我废了半天脑筋,就是aggregate函数,aggregate的意思是聚合

我们首先来看一下spark官方文档对这一函数的说明:

1
def aggregate[U](zeroValue: U)(seqOp: (U, T)  U, combOp: (U, U)  U)(implicit arg0: ClassTag[U]): U

Aggregate the elements of each partition, and then the results for all the partitions, using given combine functions and a neutral “zero value”. This function can return a different result type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U and one operation for merging two U’s, as in scala.TraversableOnce. Both of these functions are allowed to modify and return their first argument instead of creating a new U to avoid memory allocation.

zeroValue: the initial value for the accumulated result of each partition for the seqOp operator, and also the initial value for the combine results from different partitions for the combOp operator - this will typically be the neutral element (e.g. Nil for list concatenation or 0 for summation)

seqOp: an operator used to accumulate results within a partition

combOp: an associative operator used to combine results from different partitions

aggregate聚合每个分区的元素,然后使用给定的combine函数和一个初始值“zeroValue”来将所有分区的结果再次聚合。这个函数可以返回一个与RDD类型T不同的一个结果类型U。因此,我们需要有一个将T合并到U,并且还要有一个能够合并两个U的操作,就像 scala中的TraversableOnce一样。这两个函数都被允许修改和返回他们的第一个参数,而不是创建一个新的U来避免内存分配。

zeroValue:seqOp,每个分区累加结果的初始值;combOp,不同分区的combine结果的初始值

seqOp:用于分区累加结果

combOp:用于合并来自不同分区的结果

字面描述不太直观,我们举个例子,计算数据的平均数:

1
2
3
4
5
6
val input = sc.parallelize(List(1,2,3,4))
val result = input.aggregate((0,0))(
		(acc,number) => (acc._1+number, acc._2+1),
		(part1,part2) => (part1._1+part2._1, part1._2+part2._2)
		)
val avg = result._1 / result._2.toDouble

首先我们定义了一个RDD叫input,里面数据是1~4这4个数字,然后result记录input进行aggregate的结果。

aggregate初始值为(0,0)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
计算过程为:
acc一开始为初始值(0,0),所以acc._1和acc._2都是0,而number是1
则第一步为:0+1,0+1 ---->(1,1)   此时acc变成了(1,1),number下一步就是2了
2.  1+2,1+1 ---->(3,2)  接上,类推
3.  3+3,2+1 ---->(6,3)
4.  6+4,3+1 ---->(10,4)

这里我们只有一个分区,当然我们也可以假设成两个分区的情况.
比如:List(1,2,3,4)被分成(1,2)和(3,4),那么刚才的步骤得到两个结果part1(3,2),part2(7,2)
所以这个时候会进行combOp操作,将两个分区再聚合,(part1,part2) => (part1._1+part2._1, part1._2+part2._2)   ---> (3+7,2+2)也就等于(10,4)
最后10/4得到平均数2.5