流统计 data-straming 框架说明 1.2 App

目录

源码走读

App 的默认实现是com.mobikok.ssp.data.streaming.App,它包含 main() 方法,是程序启动的入口,负责读取配置文件、创建 Spark Streaming 并实例化配置的 Module 和启动流统计这三步:

def main (args: Array[String]): Unit = {
  ...
  loadConfigFile(args)
  // 初始化流统计配置与实例化相关Module
  initSparkStreaming()
  startSparkStreaming()	  
  ...

App.main (args: Array[String])方法中 args 参数会传入配置文件名,配置文件内容是 Typesafe Config 格式的。loadConfigFile()方法不仅仅读取了配置文件,还读取启动参数的配置载入ArgsConfig中,启动参数是配置文件的补充和覆盖,即有些配置是配置文件中不支持的,需要通过启动参数来配置,如 kill=true,有些启动参数是对配置文件中配置的覆盖,如 rate=10000(更多参考App 启动参数)。

initSparkStreaming方法是关键地方,它结合配置文件和启动参数来重整 Module 配置、初始化自研消息队列、创建 Spark StreamingContext 和将配置文件中的 Module 实例化:

 def initSparkStreaming() = {
   initLoggerLevel()
   //结合配置文件和启动参数来重整 Module 配置
   initModulesConfig()
   //初始化自研消息队列和Yarn Client
   initClient()
   initRDBConfig()
   //创建Spark StreamingContext
   initStreamingContext()
   //将配置文件中的Module实例化
   initAllModulesInstances()
  }

initAllModulesInstances()方法会遍历配置文件中所有 Module 配置并实例化,读取配置文件 (app.conf) 里指定 Module 实现类的完整路径(目前最新版的是com.mobikok.ssp.data.streaming.module.PluggableModule),通过反射newInstance()实例化:

var cer: MixModulesBatchController = generateMixMoudlesBatchController(new HiveContext(ssc.sparkContext), shufflePartitions, moduleName, runnableModuleNames)

val m: Module = Class
        .forName(moduleClass)
        .getConstructor(classOf[Config], argsConfig.getClass,  cer.getClass, moduleName.getClass, runnableModuleNames.getClass, ssc.getClass)
        .newInstance(allModulesConfig, argsConfig, cer, moduleName, runnableModuleNames, ssc).asInstanceOf[Module]

其中MixModulesBatchController是多 Module 控制器,用于收集来自多个 Module 的 dwr 层统计数据并混合在一起形成新的 dwr 统计数据。

接着调用 Module 的init()handler()方法:

if(isInitable){
  m.init
}

if(isRunnable) {
  m.handler
}

init()中最重要的操作是调用各种TransactionalClient实现类的rollback()尝试回滚操作:

class PluggableModule {
  override def init(): Unit = {
	try {
	  ...
	  hiveRollbackedCleanable = hiveClient.rollback()
	  hbaseRollbackedCleanable = hbaseClient.rollback()
	  kafkaRollbackedCleanable = kafkaClient.rollback()

handler()中创建Kafka DirectStream并处理每个微批次的 RDD(就是调用各种 Handler 功能组件):

class PluggableModule {
  override def handler(): Unit = {
	stream = kafkaClient.createDirectStream(globalConfig, ssc, kafkaClient.getCommitedOffset(topics: _*), moduleName)

	stream.foreachRDD { source =>
	  // 根据配置调用各种Handler组件
	}
	...

更多参见:Module

运行

通过 spark-submit 提交运行 App 在 yarn 上跑,app.conf 中包含了至少一个 module 的配置 ( 配置样例参考:app.conf 配置样例),data-streaming.jar 就是本流统计框架编译后的 jar 包:

spark-submit --name <自定义App名字> \
--class com.mobikok.ssp.data.streaming.App \
--master yarn-cluster \
--driver-memory 1g \
--executor-memory 1g \
--executor-cores 1 \
--num-executors 1 \
--queue default \
--jars 各种依赖包 \
--verbose \
--files /apps/data-streaming/<自定义App名字>/app.conf \
/apps/data-streaming/<自定义App名字>/data-streaming.jar \
app.conf