ReduceByKey 与 groupByKey 之间的火花差异

有人能解释一下 reducebykeygroupbykeyaggregatebykeycombinebykey之间的区别吗?我已经阅读了有关这方面的文件,但不能理解确切的差异。

有例子的解释就更好了。

137128 次浏览

ReduceByKey reduceByKey(func, [numTasks])-

Data is combined so that at each partition there should be at least one value for each key. And then shuffle happens and it is sent over the network to some particular executor for some action such as reduce.

GroupByKey - groupByKey([numTasks])

It doesn't merge the values for the key but directly the shuffle process happens and here lot of data gets sent to each partition, almost same as the initial data.

And the merging of values for each key is done after the shuffle. Here lot of data stored on final worker node so resulting in out of memory issue.

AggregateByKey - aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) It is similar to reduceByKey but you can provide initial values when performing aggregation.

Use of reduceByKey

  • reduceByKey can be used when we run on large data set.

  • reduceByKey when the input and output value types are of same type over aggregateByKey

Moreover it recommended not to use groupByKey and prefer reduceByKey. For details you can refer here.

You can also refer this question to understand in more detail how reduceByKey and aggregateByKey.

  • groupByKey() is just to group your dataset based on a key. It will result in data shuffling when RDD is not already partitioned.
  • reduceByKey() is something like grouping + aggregation. We can say reduceByKey() equivalent to dataset.group(...).reduce(...). It will shuffle less data unlike groupByKey().
  • aggregateByKey() is logically same as reduceByKey() but it lets you return result in different type. In another words, it lets you have an input as type x and aggregate result as type y. For example (1,2),(1,4) as input and (1,"six") as output. It also takes zero-value that will be applied at the beginning of each key.

Note: One similarity is they all are wide operations.

While both reducebykey and groupbykey will produce the same answer, the reduceByKey example works much better on a large dataset. That's because Spark knows it can combine output with a common key on each partition before shuffling the data.

On the other hand, when calling groupByKey - all the key-value pairs are shuffled around. This is a lot of unnessary data to being transferred over the network.

for more detailed check this below link

https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html

Although both of them will fetch the same results, there is a significant difference in the performance of both the functions. reduceByKey() works better with larger datasets when compared to groupByKey().

In reduceByKey(), pairs on the same machine with the same key are combined (by using the function passed into reduceByKey()) before the data is shuffled. Then the function is called again to reduce all the values from each partition to produce one final result.

In groupByKey(), all the key-value pairs are shuffled around. This is a lot of unnecessary data to being transferred over the network.

groupByKey:

Syntax:

sparkContext.textFile("hdfs://")
.flatMap(line => line.split(" ") )
.map(word => (word,1))
.groupByKey()
.map((x,y) => (x,sum(y)))
            

groupByKey can cause out of disk problems as data is sent over the network and collected on the reduced workers.

reduceByKey:

Syntax:

sparkContext.textFile("hdfs://")
.flatMap(line => line.split(" "))
.map(word => (word,1))
.reduceByKey((x,y)=> (x+y))

Data are combined at each partition, with only one output for one key at each partition to send over the network. reduceByKey required combining all your values into another value with the exact same type.

aggregateByKey:

same as reduceByKey, which takes an initial value.

3 parameters as input

  1. initial value
  2. Combiner logic
  3. sequence op logic

Example:

val keysWithValuesList = Array("foo=A", "foo=A", "foo=A", "foo=A", "foo=B", "bar=C", "bar=D", "bar=D")
val data = sc.parallelize(keysWithValuesList)
//Create key value pairs
val kv = data.map(_.split("=")).map(v => (v(0), v(1))).cache()
val initialCount = 0;
val addToCounts = (n: Int, v: String) => n + 1
val sumPartitionCounts = (p1: Int, p2: Int) => p1 + p2
val countByKey = kv.aggregateByKey(initialCount)(addToCounts, sumPartitionCounts)

ouput: Aggregate By Key sum Results bar -> 3 foo -> 5

combineByKey:

3 parameters as input

  1. Initial value: unlike aggregateByKey, need not pass constant always, we can pass a function that will return a new value.
  2. merging function
  3. combine function

Example:

val result = rdd.combineByKey(
(v) => (v,1),
( (acc:(Int,Int),v) => acc._1 +v , acc._2 +1 ) ,
( acc1:(Int,Int),acc2:(Int,Int) => (acc1._1+acc2._1) , (acc1._2+acc2._2))
).map( { case (k,v) => (k,v._1/v._2.toDouble) })
result.collect.foreach(println)

reduceByKey,aggregateByKey,combineByKey preferred over groupByKey

Reference: Avoid groupByKey

Then apart from these 4, we have

foldByKey which is same as reduceByKey but with a user defined Zero Value.

AggregateByKey takes 3 parameters as input and uses 2 functions for merging(one for merging on same partitions and another to merge values across partition. The first parameter is ZeroValue)

whereas

ReduceBykey takes 1 parameter only which is a function for merging.

CombineByKey takes 3 parameter and all 3 are functions. Similar to aggregateBykey except it can have a function for ZeroValue.

GroupByKey takes no parameter and groups everything. Also, it is an overhead for data transfer across partitions.