Spark中便捷生成全局唯一自增ID

论坛 期权论坛 脚本     
匿名技术用户   2021-1-2 06:57   195   0

总体思路

利用spark RDD API所提供的的zipWithIndex() 和 zipWithUniqueId()生成ID,两者的区别如下。

zipWithIndex()

首先基于分区索引排序,然后是每个分区中的项的排序。所以第一个分区中的第一项得到索引0,第二个分区的起始值是第一个分区的最大值。从0开始。分区内id连续。会触发spark job。

zipWithUniqueId()

每个分区是一个等差数列,等差为分区数n,每个分区的第一个值为分区id(id从0开始)。第k个分区:num*n+k。分区内id不连续。从0开始。不会触发spark job。

工具类

import org.apache.spark.SparkException
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.types.LongType
import org.apache.spark.sql.{DataFrame, Row, SparkSession}

object SparkCommon {
  /**
    * 不改变分区数,不产生shuffle
    *
    * @param offset                 自增id的起始值,默认0
    * @param isPartitionConsecutive 分区内ID是否连续(不连续时是等差数列),连续时会触发spark job
    */
  def addUniqueIdColumn(sparkSession: SparkSession, df: DataFrame, uidKey: String, offset: Long = 0, isPartitionConsecutive: Boolean = false): DataFrame = {
    val newSchema = df.head().schema.add(uidKey, LongType)
    val rdd = if (isPartitionConsecutive) df.rdd.zipWithIndex() else df.rdd.zipWithUniqueId()
    val result: RDD[Row] = rdd.map(e => {Row.merge(e._1,Row(e._2+offset))})
    sparkSession.createDataFrame(result, newSchema)
  }
}

测试

数据

{"fid":1,"name":"A","score":100,"createtime":"2017-01-12"}
{"fid":2,"name":"B","score":"88","createtime":"2017-06-27"}
{"fid":3,"name":"C","score":"89","createtime":"2017-06-30"}
{"fid":4,"name":"D","score":"88","createtime":"2017-06-30"}
{"fid":5,"name":"A","score":"96","createtime":"2018-06-30"}
{"fid":6,"name":"B","score":"92","createtime":"2018-05-14"}
{"fid":7,"name":"C","score":"95","createtime":"2018-09-12"}
{"fid":8,"name":"D","score":"98","createtime":"2018-10-12"}
{"fid":9,"name":"A","score":"100","createtime":"2019-11-12"}
{"fid":10,"name":"B","score":"98","createtime":"2019-11-12"}
{"fid":11,"name":"C","score":"89","createtime":"2019-11-12"}
{"fid":12,"name":"D","score":"96","createtime":"2019-11-12"}

本地测试

import org.junit.{After, Before, Test}
import org.apache.spark.sql.{DataFrame, SparkSession}

class SparkCommonTest{

  @Test def testUid(): Unit = {
    val sparkSession=SparkSession.builder().master("local[2]").getOrCreate()
    var df: DataFrame =sparkSession.read.json("E:\\Data\\input\\JSON\\json4x12.txt")
    df=df.repartitionByRange(4,df("name"))
    SparkCommon.addUniqueIdColumn(sparkSession,df,"uid").sort("name").show()
    SparkCommon.addUniqueIdColumn(sparkSession,df,"pid",12,true).sort("name").show()
    sparkSession.stop()
  }
}

测试结果

全局唯一自增ID

如果需要多次运行程序并保证id始终自增,可以在redis中维护偏移量,在调用addUniqueIdColumn时传入对应的offset即可。

分享到 :
0 人收藏
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:7942463
帖子:1588486
精华:0
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP