流统计 data-straming 框架说明 1.3 Module

目录

源码篇

Module由 App 调用(调用Module.init()Module.handler()),是根据具体业务逻辑划分的功能模块,一个 App 中至少配置一个 Module 实例,其中每个 Module 都是并行执行的,它根据配置调用各种Handler实现类来完成数据的处理。对于实时数据统计案例,一般一个 Module 对应 Kafka 的一个 topic(也支持属同类数据的多个 topic),然后对于这个 topic 的数据做统计处理,当然也可支持将多个 Module 的统计数据进行汇合到某一个 Module 中,形成一张包含来自多个 topic 的多种维度和度量值的数据。

Module 是一个接口,它有 3 个方法:

trait Module {
    def init() : Unit
    def handler() : Unit
    def stop() : Unit
}

其中init()方法是执行一些初始化操作,比如根据配置创建对应的Handlerhandler()是关键流统计关键地方,负责创建 Spark RDD 流和在每个流批次里调用各种 Handler 完成对数据的处理;stop()方法是在系统关闭时候调用,做一些必要的关闭操作(目前没有实质性被调用到,暂可忽略)。

目前 Module 的默认实现类是com.mobikok.ssp.data.streaming.module.PluggableModule,开发者也可自定义实现类,然后在配置文件app.conf中配置(具体配置见下文)。

PluggableModule.init()中执行各种初始化操作,如:TransactionalClient实现类的初始化(如HiveClientkafkaClient)、尝试回滚操作、初始化对应 Handler 和将该 Module 的初始化信息记录到 MySQL 表module_running_status中(用于监控)。

override def init(): Unit = {
    try {
      ...
      // TransactionalClient实现类的初始化
      hiveClient.init()
      hbaseClient.init()
      kafkaClient.init()

      // 尝试回滚操作,如果上次系统关掉时有未提交的事务,则这次启动会回滚为最近的一次事务备份数据
      hiveRollbackedCleanable = hiveClient.rollback()
      hbaseRollbackedCleanable = hbaseClient.rollback()
      kafkaRollbackedCleanable = kafkaClient.rollback()
      // 清理上次遗留的事务状态标记,
      // 避免在多次失败的重启中(即this.init()方法成功,但是this.handler()方法有异常导致失败)每次都会先回滚,没必要
      transactionManager.deleteTransactionCookie(moduleName)

      //根据配置初始化对应Handler
      initDwiHandlers()
      initDwrHandlers()
      initDmHandlers()
      ...
      // 将该Module的初始化信息记录到MySQL表中
      if(mixModulesBatchController.isRunnable(moduleName)){
        mySqlJDBCClient.execute(
		s""" 
     		| insert into module_running_status(`
                ...

PluggableModule.handler()是处理逻辑执行中心,负责创建 Kafka 的 Spark RDD 流,并在每个批次中开始事务调用Handlers提交事务清理事务等这几个关键步骤。Handler 是功能组件接口,目前实现了常用的功能(比如数据去重、统计、写 Hive 和写 HBase 等),具体参考:Handler

在整个数据处理过程中,我们将数据分为 3 层,每一层都有实现一些对应的 Handler,执行总体流程是:

  1. 数据先通过 dwi 层 handler 处理;
  2. 然后传到 dwr 层的 handler 处理;
  3. 最后给 dm 层的 handler 处理。

具体每层含义:

  • dwi 层,明细层,即从 kafka 中读到的原始数据。对明细数据做处理的 Handler,比如明细写入 HBase 的HiveDWIPersistHandler
  • dwr 层,是统计数据层,即通过 Module 配置的统计规则生成的 dwr 统计数据,每个 moduel 只负责统计自身 kakfa topic 数据的维度和度量值,因此接下来还会将自身 Module 的 dwr 数据会混合到其中的某一个 Module 中(可以指定,在 app.conf 中 module 配置中设置 mastr=true),从而生成含多维度多度量值的 dwr 数据(由MixModulesBatchController.waitingForUnionAll()方法实现),最后由这个 module 将这个数据写入 dwr 统计宽表中(由HiveDWRPersistHandler实现)。
  • dm 层,一般是个 hive 视图,通过将 dwr 数据表 join 一些维表构建的视图,是最终面向数据接口,面向用户的。它通过 dm 层的相关 handler 将 hive 的 dm 视图数据上传到 clickhouse 或 bigquery 中(clickhouse 的由ClickHouseUploadByBTimeHandler实现),最后用户用 bi 平台调用数据接口查询这些数据。

PluggableModule.handler()中关键的代码按照顺序解释如下。
1)生成 kafka 的 Spark 流,等流启动后,每个批次都会执行下面stream.foreachRDD中的代码,首先根据配置 Schema 将读到的 RDD 转成 DataFreame:

def handler(): Unit = {
    // 通过指定kafka topic起始偏移来创建Kafka的Spark RDD流,
    // 每一个module的偏移都在每次批次结束时保存偏移到MySQL中MySQL的表`topics`中
    stream = kafkaClient.createDirectStream(globalConfig, ssc, kafkaClient.getCommitedOffset(topics: _*), moduleName)

    stream.foreachRDD { sourceRDD =>
      try {
        // 为每个批次产生一个事务的排队号,后续会用到,
        // 每个批次排队执行事务操作时,不能上一次批次还没执行完,这个批次又执行,
        // 就算时间到了也需要等待上一个批次(空跑除外),不然会数据错乱
        val order = transactionManager.generateTransactionOrder(moduleName)
       ...
	   
       // 判断是否可以空跑当前批次,需满足下面条件:
       // 1) 上一个批次还未结束(如果结束了isLastUncompletedTransaction()不会返回true了);
       // 2) 并且当前批次数据为空
       // 3) 并且是独立module, 即不需要汇合每个module自身的dwr数据到master module中
       if (transactionManager.isLastUncompletedTransaction(moduleName)  && dwiCount == 0 && !mixModulesBatchController.isMultipleModulesOperateSameShareDwrTable()) {
       ...
	   
       // 将从kafka读到的RDD数据(JSON格式的)根据定义的Schema转换成dwi层的DataFrame
       var dwi = hiveContext
         .read
         .schema(dwiStructType)
         .json(filteredRDD)
         ...

2)开始处理数据,首先要开始事务,获取事务父 IDparentTransactionId,后面每个事务性操作,都需要用这个 ID

...
var parentTransactionId: String = null
moduleTracer.trace("wait mix tx begin", {
  parentTransactionId = transactionManager.beginTransaction(moduleName, groupName, order)
}, false)
...

3)调用 dwi 层的 handlers:

...
var handledDwi: DataFrame = dwi
dwiHandlers.foreach{ h =>
  if(h.isAsynchronous){
        // 异步执行独自执行,各个handler不存在上下游handler的`DataFrame`值传递
	asyncWorker.run{
	  h.handle(handledDwi)
	  h.commit()
	}
  }else{
        // 同步Handler执行是链式的,即上一个handler执行完了后,返回值`DataFrame`作为下一个handler的输入
	handledDwi = h.handle(handledDwi)
	h.commit()
  }
}
...

dwiHandlers具体包含哪些 handler,是在PluggableModule.initDwiHandlers()中根据用户配置创建的。其中InitializedKafkaDwiHandler是必须包含的,它包含两个功能:

  • 将从 kafka 读到的原始数据转成成包含 b_time、b_date、l_time、repeats 等分区字段的 DataFrame
  • 明细数据去重功能

从上面的代码可看出,Handler 分异步和同步两种类型,同步 Handler 执行是链式的,即上一个 handler 执行完了后,返回值 DataFrame 作为下一个 handler 的输入;而异步 Handler(handler.isAsynchronous等于true)则是异步执行独自执行,各个 handler 不存在上下游 handler 的DataFrame值传递。开发人员可以根据实际需求情况,来配置为同步还是异步 handler。异步 handler 因为时并发的,所以多任务时效率更高。当一个 handler 不需要将返回值 DataFrame 传递给下一个 handler 时,则可将该 handler 设置为异步,如HiveDWIPersistHandler的功能是将明细数据保存到 Hive,不需要传递给下一个 handler,因此设为异步。

4)dwr 数据预处理,对 dwr 的来源数据 dwi 明细数据做预处理

...
var dwrDwi = handledDwi
  dwrHandlers.foreach { h =>
	dwrDwi = h.prepare(dwrDwi) // 同步和异步的handler都要执行预处理
  }
...

如需对 dwi 层数据做去重后,再传递给 dwr 层,这个就是NonRepeatedPrepareDwrHandler.prepare()实现的

5)生成该 module 的 dwr 统计数据,并 union 多个 module 的 dwr 数据:

...
var preparedDwr = dwrDwi
if (dwrHandlers.nonEmpty) {
  //生成该module的dwr统计数据
  preparedDwr = dwrDwi
	.withColumn("l_time", expr(dwrLTimeExpr))
	.withColumn("b_date", to_date(expr(businessTimeExtractBy)).cast("string"))
	.withColumn("b_time", expr(s"from_unixtime(unix_timestamp($businessTimeExtractBy), '$dwrBTimeFormat')").as("b_time"))
	/*.withColumn("b_version", expr("'0'")) // 默认0*/
	.groupBy(col("l_time") :: col("b_date") :: col("b_time") /*:: col("b_version")*/ :: dwrGroupByExprs: _*)
	.agg(aggExprs.head, aggExprs.tail: _*)

  moduleTracer.trace("wait mix dwr union all", {
        // 收集来自多个Module的dwr数据,并将他们`union`起来,形成一个的多度量值的dwr数据
	mixModulesBatchController.waitingForUnionAll(preparedDwr, isMaster, moduleName)
  }, mixModulesBatchController.isWaitingOtherMixModules(isMaster))
}
...

一般情况下,一个 kafka topic 对应一个 Module 实例,因此该 Module 只包含该 kafka topic 的 dwr 数据,而实际需求往往要混合多个 kafka topic 的数据,最终生成一个包含多个度量值的 DataFrame,这个功能由MixModulesBatchController实现,它收集来自多个 Module 的 dwr 数据,并将他们union起来,形成一个的多度量值的 dwr 数据。

为了进一步理解,举个例子,假如业务系统往 kakfka 的两个 topic 分别发了两种 json 数据:展示和点击数据。
展示数据,在 kafka topic2,有 2 条明细:

{user:A, shows:1}
{user:A, shows:1}

点击数据,kafka topic1,有 3 条明细:

{user:A, clicks:1}
{user:B, clicks:1}
{user:B, clicks:1}

现在需求是:将展示和点击数据统计到一张 dwr 表中。
那么解决方案是:针对展示数据配置一个 module,负责统计展示数,它的 dwr 统计结果:

{user:A, shows:2}

针对点击数据再配置一个 module,负责统计点击数,它的 dwr 统计结果:

{user:A, clicks:1}
{user:B, clicks:2}

又因为要统计到同一张 dwr 表中,所以要将两个 module 的 dwr 统计数据需要混合到一起,产生出既含展示数又含点击数的 dwr 数据,那么最终 dwr 数据是这样的:

{user:A, shows:2, clicks:1}
{user:B, shows:0, clicks:2}

可以看出最终的 dwr 是一张混合了 2 个度量值的 dwr 数据,符合当初的需求。这就是为什么要通过mixModulesBatchController.waitingForUnionAll()union 多个 module dwr 数据的用意了。

6)接下来将 union 后的 dwr 数据传递给 dwr 层,依次调用每个的 dwr 层的 Handler

if(isMaster) {
  dwrHandlers.foreach{ h =>
	if(h.isAsynchronous){
  	  //异步handler分配在线程池`AsyncWorker`中执行
	  asyncWorker.run{
		h.handle(mixModulesBatchController.get())
		h.commit()
	  }
	}else {
	  // 同步handler,按照顺序依次执行,上一个handler的返回结果,作为下一个handler输入数据
	  mixModulesBatchController.set({
		h.handle(mixModulesBatchController.get())
	  })
	  h.commit()
	}

  }
}

如果是异步的 dwr handler,那么分配在线程池AsyncWorker中执行。整体逻辑和 dwi handler 是相似的,只不过这里只是针对 dwr 层数据做处理,数据不一样,hander 实例不一样而已。

7)到这一步,就是最后一层,dm 层,它也是调用 dm 层相关的 handler 完成时数据的处理,如果是异步的,那就线程池中异步执行,和 dwi 和 dwr 层 handlers 类似。

dmHandlers.foreach{ h =>
  if(h.isAsynchronous){
	asyncWorker.run{
	  h.handle()
	}
  }else {
	h.handle()
  }
}

8)至此数据处理操作都做完了,接下来就是提交 kafka 偏移了到 mysql 定义的offset表中,这个表包含了,每个 Module 处理的 kafka topic 数据偏移。因为偏移是我们自己管理,没有交给 spark streaming 自动管理,所以我们要手动更新当前的偏移。

// 将当前批次的kafka偏移了记录到mysql定义的`offset`表中
val kafkaCookie = kafkaClient.setOffset(parentTransactionId, offsetRanges)
// 提交事务,只有提交了事务,上述setOffset才能生效,类似mysql 事务的概念,提交了事务,对数据库的操作才能生效
kafkaClient.commit(kafkaCookie)
// 收集事务cookie,改cookie记录了改事务操作的一些参数,比如事务备份表的表名,在后续清理事务备份数据时要用的
transactionManager.collectTransactionCookie(kafkaClient, kafkaCookie)

9)因为有些 handler 是异步的,交由 asyncWorker 异步执行了,所以需要等待所有的异步 handler 结束了,我们才继续往下走。

// 等待所有的异步handler结束了
asyncWorker.await()

10)当所有的 handler 都结束了,包括同步和异步的,那么此时就需要提交全局的事务了,即告诉系统,所有的 handler,所有的操作都执行完了,可以提交全局事务了,只有提交了全局事务,下次重启系统时,才不会回滚,换言之,只要上次系统运行时有一个事务性操作没有 commit,那么下次重启系统时就得回滚上次所有的事务性操作

moduleTracer.trace("wait mix tx commit", {
            // 提交全局的事务了,只有提交了全局事务,下次重启系统时,才不会回滚
            transactionManager.commitTransaction(isMaster, moduleName, {
              mixModulesBatchController.completeBatch(isMaster)
            })
          }, transactionManager.isWaitingOtherMixModulesCommitTransaction(isMaster))

11)我们的事务性功能的实现,就是先将需要修改的那部分分区数据做备份,万一后面出异常,就那以前备份的数据来原来,因此每一次事务性操作都有一次备份数据的生成,而我们只需要保留最新一次的备份数据就够了,所以需要清理以前的备份数据:

// 清理上一次用于事务的临时数据
cleanLastTransactionCookies(dwi, handledDwi, parentTransactionId)
// 清理上一次“启动流统计”遗留的用于事务的临时数据
cleanLastTransactionRollbackedCookies()

12)至此一个批次的数据就处理完了,最后我们把每个这个批次运行情况,特别是每个 Handler 的运行耗时记录下来,方便运维监控,及时发现性能问题并调优:

mySqlJDBCClient.execute(
            s"""
               |  insert into module_running_status(
               |    app_name,
               |    module_name,
			   ....

配置篇

data-streaming 流统计框架,它就核心功能就是统计,下面是一个简单的数据统计的配置案例,我们约定配置文件名为app.conf,关于 modules 的配置都在这个文件里,在流统计启动的时候会加载这个文件,下面是配置实例。
我们将根据需求来写配置,需求是这样的,有两份数据,一个是存在 kafka topic_a 中,另一个是 kafka topic_b中,需要统计这个

modules {

  module_a {
    class = "com.mobikok.ssp.data.streaming.module.PluggableModule"
    b_time.by = "createTime"
    master = true
    dwi.kafka.schema = "com.mobikok.ssp.data.streaming.schema.dwi.kafka.SspTrafficDWISchema"
    dwr.store = true
    dwr.groupby.fields = [
      {
        expr = "publisherId", as = "publisherId"
      },{
        expr = "subId",       as = "appId"
      }
    ]
    dwr.groupby.aggs = [
      {
        expr = "count(1)",          as ="requestCount",   union = "sum(requestCount)"
      },{
        expr = "0",                 as ="sendCount",      union = "sum(sendCount)"
      }
    ]
    dwr.table = "hello_dwr"
    kafka.consumer {
      partitions = [
        { topic = "topic_b"}
      ]
    }
  }

  module_b {
    class = "com.mobikok.ssp.data.streaming.module.PluggableModule"
    b_time.by = "createTime"
    dwi.kafka.schema = "com.mobikok.ssp.data.streaming.schema.dwi.kafka.SspTrafficDWISchema"
    dwr.store = true
    dwr.groupby.fields = [
      {
        expr = "publisherId", as = "publisherId"
      },{
        expr = "subId",       as = "appId"
      }
    ]
    dwr.groupby.aggs = [
      {
        expr = "0",          as ="requestCount",   union = "sum(requestCount)"
      },{
        expr = "count(1)",   as ="sendCount",      union = "sum(sendCount)"
      }
    ]
    dwr.table = "hello_dwr"
    kafka.consumer {
      partitions = [
        { topic = "topic_a"}
      ]
    }
  }

}