linux 之父出席、干货分享、圆桌讨论,精彩尽在 opencloudos 社区开放日,报名戳
写点什么

第四范式openmldb: 拓展spark源码实现高性能join-金马国际

  • 2021 年 9 月 18 日
  • 本文字数:7872 字

    阅读完需:约 26 分钟

背景

spark 是目前最流行的分布式大数据批处理框架,使用 spark 可以轻易地实现上百 g 甚至 t 级别数据的 sql 运算,例如单行特征计算或者多表的 join 拼接。


第四范式 openmldb 是针对 ai 场景优化的机器学习开源数据库项目,实现了数据与计算一致性的离线 mpp 场景和在线 oltp 场景计算引擎。其实 mpp 引擎可基于 spark 实现,并通过拓展 spark 源码实现数倍性能提升。



spark 本身实现也非常高效,基于 antlr 实现的了标准 ansi sql 的词法解析、语法分析,还有在 catalyst 模块中实现大量 sql 静态优化,然后转成分布式 rdd 计算,底层数据结构是使用了 java unsafe api 来自定义内存分布的 unsaferow,还依赖 janino jit 编译器为计算方法动态生成优化后的 jvm bytecode。但在拓展性上仍有改进空间,尤其针对机器学习计算场景的需求虽能满足但不高效,本文以 lastjoin 为例介绍 openmldb 如何通过拓展 spark 源码来实现数倍甚至数十倍性能提升。

机器学习场景 lastjoin


lastjoin 是一种 ai 场景引入的特殊拼表类型,是 leftjoin 的变种,在满足 join 条件的前提下,左表的每一行只拼取右表符合一提交的最后一行。lastjoin 的语义特性,可以保证拼表后输出结果的行数与输入的左表一致。在机器学习场景中就是维持了输入的样本表数量一致,不会因为拼表等数据操作导致最终的样本数量增加或者减少,这种方式对在线金马国际的服务支持比较友好也更符合科学家建模需求。



包含 lastjoin 功能的 openmldb 项目代码以 apache 2.0 协议在 github 中开源(),所有用户都可放心使用。

基于 spark 的 lastjoin 实现


由于 lastjoin 类型并非 ansi sql 中的标准,因此在 sparksql 等主流计算平台中都没有实现,为了实现类似功能用户只能通过更底层的 dataframe 或 rdd 等算子来实现。基于 spark 算子实现 lastjoin 的思路是首先对左表添加索引列,然后使用标准 leftouterjoin,最后对拼接结果进行 reduce 和去掉索引行,虽然可以实现 lastjoin 语义但性能还是有很大瓶颈。


相比于兼容 sql 功能和语法,spark 的另一个特点是用户可以通过 map、reduce、groupby 等接口和自定义 udf 的方式来实现标准 sql 所不支持的数值计算逻辑。但 join 功能用户却无法通过 dataframe 或者 rdd api 来拓展实现,因为拼表的实现是在 spark catalyst 物理节点中实现的,涉及了 shuffle 后多个 internal row 的拼接,以及生成 java 源码字符串进行 jit 的过程,而且根据不同的输入表数据量,spark 内部会适时选择 brocasthashjoin、sortmergejoin 或 shufflehashjoin 来实现,普通用户无法用 rdd api 来拓展这些拼表实现算法。


在 openmldb 项目中可以查看完整的 spark lastjoin 实现,github 代码地址:。


第一步是对输入的左表进行索引列扩充,扩充方式有多种实现,只要添加的索引列每一行有 unique id 即可,下面是第一步的实现代码。


// add the index column for spark dataframe  def addindexcolumn(spark: sparksession, df: dataframe, indexcolname: string, method: string): dataframe = {    logger.info("add the indexcolname(%s) to spark dataframe(%s)".format(indexcolname, df.tostring()))     method.tolowercase() match {      case "zipwithuniqueid" | "zip_withunique_id" => addcolumnbyzipwithuniqueid(spark, df, indexcolname)      case "zipwithindex" | "zip_with_index" => addcolumnbyzipwithindex(spark, df, indexcolname)      case "monotonicallyincreasingid" | "monotonically_increasing_id" =>        addcolumnbymonotonicallyincreasingid(spark, df, indexcolname)      case _ => throw new hybridseexception("unsupported add index column method: "   method)    }   }   def addcolumnbyzipwithuniqueid(spark: sparksession, df: dataframe, indexcolname: string = null): dataframe = {    logger.info("use zipwithuniqueid to generate index column")    val indexedrdd = df.rdd.zipwithuniqueid().map {      case (row, id) => row.fromseq(row.toseq :  id)    }    spark.createdataframe(indexedrdd, df.schema.add(indexcolname, longtype))  }   def addcolumnbyzipwithindex(spark: sparksession, df: dataframe, indexcolname: string = null): dataframe = {    logger.info("use zipwithindex to generate index column")    val indexedrdd = df.rdd.zipwithindex().map {      case (row, id) => row.fromseq(row.toseq :  id)    }    spark.createdataframe(indexedrdd, df.schema.add(indexcolname, longtype))  }   def addcolumnbymonotonicallyincreasingid(spark: sparksession,                                           df: dataframe, indexcolname: string = null): dataframe = {    logger.info("use monotonicallyincreasingid to generate index column")    df.withcolumn(indexcolname, monotonically_increasing_id())  }
复制代码


第二步是进行标准的 leftouterjoin,由于 openmldb 底层是基于 c 实现,因此多个 join condition 的表达式都要转成 spark 表达式(封装成 spark column 对象),然后调用 spark dataframe 的 join 函数即可,拼接类型使用“left”或者“left_outer"。


val joined = leftdf.join(rightdf, joinconditions.reduce(_ && _),  "left")
复制代码


第三步是对拼接后的表进行 reduce,因为通过 leftouterjoin 有可能对输入数据进行扩充,也就是 1:n 的变换,而所有新增的行都拥有第一步进行索引列拓展的 unique id,因此针对 unique id 进行 reduce 即可,这里使用 spark dataframe 的 groupbykey 和 mapgroups 接口(注意 spark 2.0 以下不支持此 api),同时如果有额外的排序字段还可以取得每个组的最大值或最小值。


val distinct = joined  .groupbykey {    row => row.getlong(indexcolidx)  }  .mapgroups {    case (_, iter) =>      val timeextractor = sparkrowutil.createorderkeyextractor(        timeidxinjoined, timecoltype, nullable=false)       if (isasc) {        iter.maxby(row => {          if (row.isnullat(timeidxinjoined)) {            long.minvalue          } else {            timeextractor.apply(row)          }        })      } else {        iter.minby(row => {          if (row.isnullat(timeidxinjoined)) {            long.maxvalue          } else {            timeextractor.apply(row)          }        })      }  }(rowencoder(joined.schema))
复制代码


最后一步只是去掉索引列即可,通过预先指定的索引列名即可实现。


distinct.drop(indexname)
复制代码


总结一下基于 spark 算子实现的 lastjoin 方案,这是目前基于 spark 编程接口最高效的实现了,对于 spark 1.6 等低版本还需要使用 mappartition 等接口来实现类似 mapgroups 的功能。由于是基于 leftouterjoin 实现,因此 lastjoin 的这种实现比 leftouterjoin 还差,实际输出的数据量反而是更少的,对于左表与右表有大量拼接条件能满足的情况下,整体内存消耗量还是也是非常大的。因此下面介绍基于 spark 源码修改实现的原生 lastjoin,可以避免上述问题。

拓展 spark 源码的 lastjoin 实现


原生 lastjoin 实现,是指直接在 spark 源码上实现的 lastjoin 功能,而不是基于 spark dataframe 和 leftouterjoin 来实现,在性能和内存消耗上有巨大的优化。openmldb 使用了定制优化的 spark distribution,其中依赖的 spark 源码也在 github 中开源 (github - 4paradigm/spark at v3.0.0-openmldb) 。


要支持原生的 lastjoin,首先在 jointype 上就需要加上 last 语法,由于 spark 基于 antlr 实现的 sql 语法解析也会直接把 sql join 类型转成 jointype,因此只需要修改 jointype.scala 文件即可。


object jointype {  def apply(typ: string): jointype = typ.tolowercase(locale.root).replace("_", "") match {    case "inner" => inner    case "outer" | "full" | "fullouter" => fullouter    case "leftouter" | "left" => leftouter    // add by 4paradigm    case "last" => lastjointype    case "rightouter" | "right" => rightouter    case "leftsemi" | "semi" => leftsemi    case "leftanti" | "anti" => leftanti    case "cross" => cross    case _ =>      val supported = seq(        "inner",        "outer", "full", "fullouter", "full_outer",        "last", "leftouter", "left", "left_outer",        "rightouter", "right", "right_outer",        "leftsemi", "left_semi", "semi",        "leftanti", "left_anti", "anti",        "cross")       throw new illegalargumentexception(s"unsupported join type '$typ'. "          "supported join types include: "   supported.mkstring("'", "', '", "'")   ".")  }}
复制代码


其中 lastjointype 类型的实现如下


// add by 4paradigmcase object lastjointype extends jointype {  override def sql: string = "last"}
复制代码


在 spark 源码中,还有一些语法检查类和优化器类都会检查内部支持的 join type,因此在 analyzer.scala、optimizer.scala、basiclogicaloperators.scala、sparkstrategies.scala 这几个文件中都需要有简单都修改,scala switch case 支持都枚举类型中增加对新 join type 的支持,这里不一一赘述了,只要解析和运行时缺少对新枚举类型支持就加上即可。


// the output list looks like: join keys, columns from left, columns from rightval projectlist = jointype match {  case leftouter =>    leftkeys    luniqueoutput    runiqueoutput.map(_.withnullability(true))  // add by 4paradigm  case lastjointype =>    leftkeys    luniqueoutput    runiqueoutput.map(_.withnullability(true))  case leftexistence(_) =>    leftkeys    luniqueoutput  case rightouter =>    rightkeys    luniqueoutput.map(_.withnullability(true))    runiqueoutput  case fullouter =>    // in full outer join, joincols should be non-null if there is.    val joinedcols = joinpairs.map { case (l, r) => alias(coalesce(seq(l, r)), l.name)() }    joinedcols         luniqueoutput.map(_.withnullability(true))         runiqueoutput.map(_.withnullability(true))  case _ : innerlike =>    leftkeys    luniqueoutput    runiqueoutput  case _ =>    sys.error("unsupported natural join type "   jointype)}
复制代码


前面语法解析和数据结构支持新的 join type 后,重点就是来修改三种 spark join 物理算子的实现代码了。首先是右表比较小时 spark 会自动优化成 brocasthashjoin,这时右表通过 broadcast 拷贝到所有 executor 的内存里,遍历右表可以找到所有符合 join condiction 的行,如果右表没有符合条件则保留左表 internal row 并且右表字段值为 null,如果有一行或多行符合条件就合并两个 internal row 到输出 internal row 里,代码实现在 broadcasthashjoinexec.scala 中。因为新增了 join type 枚举类型,因此我们修改这两个方法来表示支持这种 join type,并且通过参数来区分和之前 join type 的实现。


  override def doconsume(ctx: codegencontext, input: seq[exprcode], row: exprcode): string = {    jointype match {      case _: innerlike => codegeninner(ctx, input)      case leftouter | rightouter => codegenouter(ctx, input)      // add by 4paradigm      case lastjointype => codegenouter(ctx, input, true)      case leftsemi => codegensemi(ctx, input)      case leftanti => codegenanti(ctx, input)      case j: existencejoin => codegenexistence(ctx, input)      case x =>        throw new illegalargumentexception(          s"broadcasthashjoin should not take $x as the jointype")    }  }
复制代码


brocasthashjoin 的核心实现代码也是使用 jit 来实现的,因此我们需要修改 codegen 成 java 代码字符串的逻辑,在 codegenouter 函数中,保留原来 leftouterjoin 的实现,并且使用前面的参数来区分是否使用新的 join type 实现。这里修改的逻辑也非常简单,因为新的 join type 只要保证右表有一行数据拼到后就返回,因此不需要通过 while 来遍历右表候选集。


   // add by 4paradigm  if (islastjoin) {    s"""       |// generate join key for stream side       |${keyev.code}       |// find matches from hashrelation       |$iteratorcls $matches = $anynull ? null : ($iteratorcls)$relationterm.get(${keyev.value});       |boolean $found = false;       |// the last iteration of this loop is to emit an empty row if there is no matched rows.       |if ($matches != null && $matches.hasnext() || !$found) {       |  unsaferow $matched = $matches != null && $matches.hasnext() ?       |    (unsaferow) $matches.next() : null;       |  ${checkcondition.trim}       |  if ($conditionpassed) {       |    $found = true;       |    $numoutput.add(1);       |    ${consume(ctx, resultvars)}       |  }       |}   """.stripmargin  }
复制代码


然后是修改 sortmergejoin 的实现来支持新的 join type,如果右表比较大不能直接 broacast 那么大概率会使用 sortmergejoin 实现,实现原理和前面的修改类似,不一样的是这里不是通过 jit 实现的,因此直接修改拼表的逻辑即可,保证只要有一行符合条件即可拼接并返回。


 private def buffermatchingrows(): unit = {    assert(streamedrowkey != null)    assert(!streamedrowkey.anynull)    assert(bufferedrowkey != null)    assert(!bufferedrowkey.anynull)    assert(keyordering.compare(streamedrowkey, bufferedrowkey) == 0)    // this join key may have been produced by a mutable projection, so we need to make a copy:    matchjoinkey = streamedrowkey.copy()    bufferedmatches.clear()     // add by 4paradigm    if (islastjoin) {      bufferedmatches.add(bufferedrow.asinstanceof[unsaferow])      advancedbufferedtorowwithnullfreejoinkey()    } else {      do {        bufferedmatches.add(bufferedrow.asinstanceof[unsaferow])        advancedbufferedtorowwithnullfreejoinkey()      } while (bufferedrow != null && keyordering.compare(streamedrowkey, bufferedrowkey) == 0)    }   }
复制代码


最后是 shufflehashjoin 的实现,对应的实现在子类 hashjoin.scala 中,原理与前面也类似,调用 outerjoin 函数遍历 stream table 的时候,修改核心的遍历逻辑,保证左表在拼不到时保留并添加 null,在拼到一行时立即返回即可。


private def outerjoin(      streamediter: iterator[internalrow],    hashedrelation: hashedrelation,    islastjoin: boolean = false): iterator[internalrow] = {    val joinedrow = new joinedrow()    val keygenerator = streamsidekeygenerator()    val nullrow = new genericinternalrow(buildplan.output.length)     streamediter.flatmap { currentrow =>      val rowkey = keygenerator(currentrow)      joinedrow.withleft(currentrow)      val builditer = hashedrelation.get(rowkey)      new rowiterator {        private var found = false        override def advancenext(): boolean = {           // add by 4paradigm to support last join          if (islastjoin && found) {            return false          }           // add by 4paradigm to support last join          if (islastjoin) {            if (builditer != null && builditer.hasnext) {              val nextbuildrow = builditer.next()              if (boundcondition(joinedrow.withright(nextbuildrow))) {                found = true                return true              }            }          } else {            while (builditer != null && builditer.hasnext) {              val nextbuildrow = builditer.next()              if (boundcondition(joinedrow.withright(nextbuildrow))) {                found = true                return true              }            }          }           if (!found) {            joinedrow.withright(nullrow)            found = true            return true          }          false        }        override def getrow: internalrow = joinedrow      }.toscala    }  }
复制代码


通过对前面 jointype 和三种 join 物理节点的修改,用户就可以像其他内置 join type 一样,使用 sql 或者 dataframe 接口来做新的拼表逻辑了,拼表后保证输出行数与左表一致,结果和最前面基于 leftouterjoin dropduplicated 的方案也是一样的。

lastjoin 实现性能对比


那么既然实现的新的 join 算法,我们就对比前面两种方案的性能吧,前面直接基于最新的 spark 3.0 开源版,不修改 spark 优化器的情况下对于小数据会使用 broadcast join 进行性能优化,后者直接使用修改 spark 源码编译后的版本,在小数据下 spark 也会优化成 broadcast join 实现。


首先是测试 join condiction 能拼接多行的情况,对于 leftouterjoin 由于能拼接多行,因此第一个阶段使用 leftouterjoin 输出的表会大很多,第二阶段 dropduplication 也会更耗时,而 lastjoin 因为在 shuffle 时拼接到单行就返回了,因此不会因为拼接多行导致性能下降。



从结果上看性能差异也很明显,由于右表数据量都比较小,因此这三组数据 spark 都会优化成 broadcast join 的实现,由于 leftouterjoin 会拼接多行,因此性能就比新的 lastjoin 慢很多,当数据量增大时 leftouterjoin 拼接的结果表数据量更加爆炸,性能成指数级下降,与 lastjoin 有数十倍到数百倍的差异,最后还可能因为 oom 导致失败,而 lastjoin 不会因为数据量增大有明显的性能下降。


右表能拼接多行对 leftouterjoin dropdupilicated 方案多少有些不公平,因此我们新增一个测试场景,拼接时保证左表只可能与右表的一行拼接成功,这样无论是 leftouterjoin 还是 lastjoin 结果都是一模一样的,这种场景下性能对比更有意义。



从结果上看性能差异已经没有那么明显了,但 lastjoin 还是会比前者方案快接近一倍,前面两组右表数据量比较小被 spark 优化成 broadcast join 实现,最后一组没有优化会使用 sorge merge join 实现。从 broadcasthashjoin 和 sortmergejoin 最终生成的代码可以看到,如果右表只有一行拼接成功的话,leftouterjoin 和 lastjoin 的实现逻辑基本是一模一样的,那么性能差异主要在于前者方案还需要进行一次 dropduplicated 计算,这个 stage 虽然计算复杂度不高但在小数据规模下耗时占比还是比较大,无论是哪种测试方案在这种特殊的拼表场景下修改 spark 源码还是性能最优的实现方案。

技术总结


最后简单总结下,openmldb 项目通过理解和修改 spark 源码,可以根据业务场景来实现新的拼表算法逻辑,从性能上看比使用原生 spark 接口实现性能可以有巨大的提升。spark 源码涉及 sql 语法解析、catalyst 逻辑计划优化、jit 代码动态编译等,拥有这些基础后可以对 spark 功能和性能进行更底层的拓展。

2021 年 9 月 18 日 17:213976
infoq记者

发布了 855 篇内容, 共 289.8 次阅读, 收获喜欢 1624 次。

关注

评论

发布
暂无评论
  • 2020 年 12 月 10 日

  • 本文介绍apache kylin 以及它的查询原理等。

  • 2020 年 11 月 26 日

  • 在遇到shuffle的时候,你是否也只会使用默认的广播变量?今天,我们学习优先选择broadcast joins的方法。

    2021 年 4 月 12 日

  • 本文基于apahce spark 3.1.1版本,讲述aqe自适应查询优化的原理,以及网易数帆在aqe实践中遇到的痛点和做出的思考。

  • 本文详细介绍了如何将hive里的数据快速稳定的写进hbase中。

  • 本次分享基于基础架构团队过往的工作成果,介绍字节跳动在提升基于 spark sql 的 etl 稳定性以及优化 ad-hoc 查询的性能方面的实践。

  • 为了进一步丰富计算引擎的功能以及适用更多业务场景,51信用卡开源了核心库https://github.com/51nb/marble

  • apache spark 3.0 增加了很多令人兴奋的新特性

  • 本文将深入讲解apache spark 2.0的三种api——rdd、dataframe和dataset,在什么情况下该选用哪一种以及为什么,并概述它们的性能和优化点,列举那些应该使用dataframe和dataset而不是rdd的场景。

  • spark sql提供dataframe和dataset,它们既有rdd的特性,又拥有类似关系型数据库的结构化信息。

    2019 年 5 月 22 日

  • 查询请求输入和查询计划完成后,整体查询任务的下一个阶段就是查询计划的执行,承担这部分工作的组件一般称为查询执行引擎。

    2020 年 9 月 25 日

  • 今天,我们继续分析第7~12讲的课后思考题,会涉及数据库索引、判等问题、数值计算、集合类、空值处理和异常处理6大知识点。

    2020 年 4 月 6 日

  • spark sql 可以说是 spark 中的精华部分了,那么到底该怎么学习呢?

  • 本次分享题目为基于spark的大规模推荐系统特征工程及优化,主要内容包括:大规模推荐系统;spark sql应用与fesql;基于llvm的spark优化。

  • 2016年12月腾讯宣布推出面向机器学习的第三代高性能计算平台——angel,并于2017年6月开源。本文以l-bfgs为例,分析spark在机器学习算法的实现上的问题,以及spark on angel是如何解决spark在机器学习任务中的遇到的瓶颈,使spark的机器学习能力更加强大。

  • apache spark第一版发布时隔两年后,databricks公布了基于上游分支2.0.0-preview的apache spark 2.0技术预览版。该预览版在稳定性和api方面均不适合用于生产环境,主要是为了在正式上市前收集来及社区的反馈。

  • apache kylin(麒麟)是由ebay贡献给开源社区的大数据分析引擎,支持在超大数据集上进行秒级别的sql及olap查询,目前是apache基金会的孵化项目。本文是一系列介绍快速数据立方体计算(fast cubing)的第一篇,将从概念上介绍新算法与旧算法的区别以及分析它的优劣。该算法目前正在内部进行测试和改进,将在apache kylin 后续版本中发布。源代码已经公开在kylin的git代码库中,感兴趣的读者可以克隆并切换到0.8分支查看。

  • apache spark是一个围绕速度、易用性和复杂分析构建的大数据处理框架。最初在2009年由加州大学伯克利分校的amplab开发,并于2010年成为apache的开源项目之一。apache spark社区刚刚发布了1.5版本,明略数据高级工程师梁堰波解析了该版本中的众多新特性,同时梁堰波也是qcon上海《基于大数据的机器学习技术》专题的讲师,他将分享《基于机器学习的银行卡消费数据预测与推荐》的专题演讲。

  • hadoop于2006年1月28日诞生,至今已有10年,它改变了企业对数据的存储、处理和分析的过程,加速了大数据的发展,形成了自己的极其火爆的技术生态圈,并受到非常广泛的应用。在2016年hadoop十岁生日之际,infoq策划了一个hadoop热点系列文章,为大家梳理hadoop这十年的变化,技术圈的生态状况,回顾以前,激励以后。本文介绍了达观数据分析平台架构和hadoop/hive实践。

发现更多内容

gpu容器虚拟化:用户态和内核态的技术和实践详解

gpu容器虚拟化:用户态和内核态的技术和实践详解

网站地图