Spark二次排序笔记

引言

让我们先思考一下什么是二次排序,这是一个很典型的数据处理算法。首先我们有一个数据,这个数据的key之间是有序的,而且每个key对应的value也是有序的。比如以下这种经典二次排序示例数据。

1
2
3
4
5
6
7
8
2000,12,10
2000,11,30
2013,01,70
2013,01,-10
2000,11,-40
2013,01,80
---
字段分别为年,月,值

我们的key是“年-月”,value就是值了,很明显,key是有序的,而且value也是有序的,最终要得到的结果就是这种

1
2
3
2000-11:-40,30
2000-12:10
2013-01:-10,70,80

思路很简单,每行数据都是由逗号分隔,所以我们将一行数据按逗号分割成一个字符串数组,那么这个数组里有三个元素,将第一个和第二个元素“年”“月”拼接成key,然后第三个元素就是value,这样的话,结果就是 (key,value)了,然后对key进行groupByKey即可得到最终的数据结构 (key,List(value)),但是后面的值还是无序的,所以需要再进行value内部排序。

示例

关于示例,我就不用”年月值“数据了,采用一个name,time,value 数据,要求以时间大小来进行二次排序

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
p,4,40
p,6,20
x,2,9
y,2,5
x,1,3
y,1,7
y,3,1
x,3,6
z,1,4
z,2,8
z,3,7
z,4,0
p,1,10
p,3,60

最终结果为

1
2
3
4
(z=>,4,8,7,0)
(p=>,10,60,40,20)
(x=>,3,9,6)
(y=>,7,5,1)

实现思路

二次排序,算法思路都是一致的,此例数据以逗号为分隔符,得到字符串数组,第一个元素name作为分组的key,然后第二个元素time作为排序的key,value呢就是第三个元素,然后map返回值为 (groupKey,(sortKey,value)),再利用groupByKey得到数据结构为(groupKey,List((sortKey,value))),然后内部排序使用sortBy,参数就是sortKey。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.hjj.SecondarySort

import org.apache.spark.{SparkConf, SparkContext}

object TimeSort {
  def main(args: Array[String]): Unit = {
    val inputPath = "D:\\Code\\javaCode\\data\\input\\time_series.txt"
    val outputPath = "D:\\Code\\javaCode\\data\\outputV2"

    val conf = new SparkConf().setMaster("local").setAppName("TimeSort")
    val sc = new SparkContext(conf)
    val data = sc.textFile(inputPath)

    // 源数据格式为  p,4,10 => name,time,value
    /**
     * 我们按照时间大小来对值进行二次排序,比如:
     * p,4,10
     * p,1,30
     * p,6,40
     * 这种数据经过二次排序后就是
     * p=>(30,10,40)
     */
    val result = data.map(line => {
      val arr = line.split(",")
      val groupKey = arr(0)
      val sortKey = arr(1)
      val value = arr(2)
      (groupKey,(sortKey,value)) // map返回一个key-value对,而这个key-value对的value也是一个key-value对,不过后者的key是用来后面进行内部排序
    })
      .groupByKey()  // 利用groupKey进行分组
      .map(line => (line._1+"=>",line._2.toList.sortBy(_._1).map(_._2).mkString(","))) // 内部排序
      //.sortByKey()

    result.foreach(println)
    // result.saveAsTextFile(outputPath)
  }
}

打印的结果如图

image-20200909135445416