流统计插件化 module 的说明

1. PluggableModule

1.1 执行流程

2019/01/module-5640bf74.png

1.2 使用方法

配置的基础模板为:

rdb {
  url = "jdbc:mysql://node17:3306/sight?autoReconnect=true&failOverReadOnly=false"
  user = "root"
  password = "root_root"
  kafka.offset.table = "offset"
  transaction.manager.table="sight.transaction_commited_table"
}
hive {
  jdbc.url = "jdbc:hive2://node17:10000/default"
}
message.client.url="http://node14:5555/"
kylin.client.url="http://node14:7070/kylin/api/"
kafka.producer {
  is.async=false
  set {
    bootstrap.servers="node30:6667,node31:6667,node32:6667"
    client.id="niubility_producer"
    acks=-1
    key.serializer="org.apache.kafka.common.serialization.StringSerializer"
    value.serializer="org.apache.kafka.common.serialization.StringSerializer"
  }
}
kafka.consumer {
  set {
    bootstrap.servers = "node30:6667,node31:6667,node32:6667"
    key.deserializer = "org.apache.kafka.common.serialization.StringDeserializer"
    value.deserializer = "org.apache.kafka.common.serialization.StringDeserializer"
    auto.offset.reset = "latest"
    enable.auto.commit = "false"
    request.timeout.ms = 2000
    session.timeout.ms = 1500
    heartbeat.interval.ms = 1000
  }
}

hbase {
  transactional {
    tables = ["uuid.stat"]
  }
  set {
    hbase.zookeeper.quorum = "node106,node107,node108"
    hbase.zookeeper.property.clientPort = "2181"
    spark.serializer = org.apache.spark.serializer.KryoSerializer
  }
}

spark.conf {
  streaming.batch.buration = 300
  set {
    spark.app.name = "{appName}"
    mapreduce.job.queuename = queueA
    mapreduce.job.priority = HIGH
    hive.exec.dynamic.partition.mode = nonstrict
    spark.streaming.kafka.maxRatePerPartition = 5000
    spark.serializer = org.apache.spark.serializer.KryoSerializer
    spark.default.parallelism = 3
    hive.merge.mapfiles = true
    hive.merge.mapredfiles = true
    hive.merge.smallfiles.avgsize=1024000000
    spark.sql.shuffle.partitions = 3
    spark.kryoserializer.buffer.max=512
    spark.streaming.concurrentJobs = 16
    spark.scheduler.mode=FAIR
    spark.sql.broadcastTimeout=3000
  }
}

modules= {

  {moduleName} {
    class = "com.mobikok.ssp.data.streaming.module.PluggableModule"
    business.date.extract.by = "createTime"
    commit.batch.size =   0
    commit.time.interval = 0
    dwi.enable = false
    dwi.table = "{dwiTableName}"
    dwi.kafka.schema = "com.mobikok.ssp.data.streaming.schema.dwi.kafka.SspTrafficDWISchema"
    dwr.enable = false
    dwr.acc.day.enable = false
    dwr.acc.day.overwrite = []
    dwr.acc.month.enable= false
    dwr.acc.month.overwrite = []

    dwr.groupby.fields = [{}]
    dwr.groupby.aggs = [{}]
    fast.polling.enable=true
    dm.offline.handler.enable = true
    dm.offline.handlers = [{
      class = "com.mobikok.ssp.data.streaming.handler.dm.offline.ClickHouseQueryByBTimeHandler"
      items = [{
        view = "ssp_report_overall_dm"
        message.consumer = "cktest"
        message.topics = [
          "ssp_report_overall_dwr",
          "update_ssp_report_overall2",
          "PublisherThirdIncomeDMReflush",
          "ck_report_overall"
        ]
      }]
    }]

    kafka.consumer {
      partitoins = [{
        topic = "{topicName}"
        partition = {partition}
      }]
    }
  }
}

核心的 handler 位于 com.mobikok.ssp.data.streaming.handler.dwi/dwr.core 中,直接通过配置是否为 true 选择是否使用该 handler,非核心 handler 通过配置

modules.${moduleName}.dwi.handler
modules.${moduleName}.dwr.handler
modules.${moduleName}.dm.handler.online
modules.${moduleName}.dm.handler.offline

选择是否使用相应的 handler,并且所有 handler 继承于 com.mobikok.ssp.data.streaming.handler.Handler 类,用于决定该 handler 是否使用异步的方式执行。

2. overall 的配置

rdb {
  url = "jdbc:mysql://node17:3306/sight"
  user = "root"
  password = "root_root"
  kafka.offset.table = "offset"
  transaction.manager.table = "sight.transaction_commited_table"
}
hive {
  jdbc.url = "jdbc:hive2://node17:10000/default"
}
message.client.url = "http://node14:5555/"
kylin.client.url = "http://node14:7070/kylin/api/"
kafka.producer {
  is.async = false
  set {
	bootstrap.servers = "node30:6667,node31:6667,node32:6667"
	client.id = "niubility_producer"
	acks = -1
	key.serializer = "org.apache.kafka.common.serialization.StringSerializer"
	value.serializer = "org.apache.kafka.common.serialization.StringSerializer"
  }
}
kafka.consumer {
  set {
	bootstrap.servers = "node30:6667,node31:6667,node32:6667"
	key.deserializer = "org.apache.kafka.common.serialization.StringDeserializer"
	value.deserializer = "org.apache.kafka.common.serialization.StringDeserializer"
	//    auto.offset.reset = "earliest"
	auto.offset.reset = "latest"
	enable.auto.commit = "false"
	request.timeout.ms = 2000
	session.timeout.ms = 1500
	heartbeat.interval.ms = 1000
  }
}
hbase {
  set {
	//hbase.client.retries.number = 1
	hbase.rpc.timeout = 300000
	hbase.zookeeper.quorum = "node106,node107,node108"
	hbase.zookeeper.property.clientPort = "2181"
	spark.serializer = org.apache.spark.serializer.KryoSerializer
  }
}
clickhouse {
  hosts = [
	"node111",
	"node110",
	"node15",
	"node16"
  ]
}
spark.conf {
  app.name = "test__overall"
  streaming.batch.buration = 300
  set {
	mapreduce.job.queuename = queueA
	mapreduce.job.priority = HIGH
	hive.exec.dynamic.partition.mode = nonstrict
	//    spark.streaming.kafka.maxRatePerPartition = 500
	spark.streaming.kafka.maxRatePerPartition = 1250
	#spark.streaming.receiver.maxRate=1000
	spark.serializer = org.apache.spark.serializer.KryoSerializer
	spark.default.parallelism = 20
	hive.merge.mapfiles = true
	hive.merge.mapredfiles = true
	hive.merge.smallfiles.avgsize = 1024000000
	spark.sql.shuffle.partitions = 20
	spark.kryoserializer.buffer.max = 256
	spark.scheduler.mode = FAIR
	//    spark.streaming.concurrentJobs = 6
  }
}
modules {

  test__overall_fill {
	class = "com.mobikok.ssp.data.streaming.module.PluggableModule"
	//    class = "com.mobikok.ssp.data.streaming.module.MixModuleForBTime"
	business.time.extract.by = "createTime"
	commit.batch.size = 1
	commit.time.interval = 1800
	master = true

	dwi.enable = false
	dwi.table = "test__ssp_overall_fill_dwi"
	dwi.kafka.schema = "com.mobikok.ssp.data.streaming.schema.dwi.kafka.SspTrafficDWISchema"
	dwr.enable = true
	dwr.acc.day.enable = false
	dwr.acc.day.overwrite = []
	dwr.acc.month.enable = false
	dwr.acc.month.overwrite = []

	dwr.groupby.fields = [
	  {
		expr = "publisherId", as = "publisherId"
	  }, {
		expr = "subId", as = "appId"
	  }, {
		expr = "countryId", as = "countryId"
	  }, {
		expr = "carrierId", as = "carrierId"
	  }, {
		expr = "sv", as = "versionName"
	  }, {
		expr = "adType", as = "adType"
	  }, {
		expr = "campaignId", as = "campaignId"
	  }, {
		expr = "offerId", as = "offerId"
	  }, {
		expr = "imageId", as = "imageId"
	  }, {
		expr = "affSub", as = "affSub"
	  }, {
		expr = "packageName", as = "packageName"
	  }, {
		expr = "domain", as = "domain"
	  }, {
		expr = "operatingSystem(userAgent)", as = "operatingSystem"
	  }, {
		expr = "language(userAgent)", as = "systemLanguage"
	  }, {
		expr = "machineModel(userAgent)", as = "deviceBrand"
	  }, {
		expr = "deviceType(userAgent)", as = "deviceType"
	  }, {
		expr = "browserKernel(userAgent)", as = "browserKernel"
	  }, {
		expr = "respStatus", as = "respStatus"
	  }, {
		expr = "test", as = "test"
	  }, {
		expr = "ruleId", as = "ruleId"
	  }, {
		expr = "smartId", as = "smartId"
	  }, {
		expr = "eventName", as = "eventName"
	  }, {
		expr = "recommender", as = "recommender"
	  }, {
		expr = "raterType", as = "raterType"
	  }, {
		expr = "raterId", as = "raterId"
	  }
	]
	dwr.groupby.aggs = [
	  {
		expr = "count(1)", as = "requestCount", union = "sum(requestCount)"
	  }, {
		expr = "0", as = "sendCount", union = "sum(sendCount)"
	  }, {
		expr = "0", as = "showCount", union = "sum(showCount)"
	  }, {
		expr = "0", as = "clickCount", union = "sum(clickCount)"
	  }, {
		expr = "0", as = "feeReportCount", union = "sum(feeReportCount)"  //计费条数
	  }, {
		expr = "0", as = "feeSendCount", union = "sum(feeSendCount)"    //计费显示条数
	  }, {
		expr = "0", as = "feeReportPrice", union = "sum(feeReportPrice)"  //计费金额(真实收益)
	  }, {
		expr = "0", as = "feeSendPrice", union = "sum(feeSendPrice)"    //计费显示金额(收益)
	  }, {
		expr = "0", as = "cpcBidPrice", union = "sum(cpcBidPrice)"
	  }, {
		expr = "0", as = "cpmBidPrice", union = "sum(cpmBidPrice)"
	  }, {
		expr = "0", as = "conversion", union = "sum(conversion)"      //转化数,目前不要含展示和点击产生的
	  }, {
		expr = "0", as = "allConversion", union = "sum(allConversion)"   //转化数,含展示和点击产生的
	  }, {
		expr = "0", as = "revenue", union = "sum(revenue)"         //收益
	  }, {
		expr = "0", as = "realRevenue", union = "sum(realRevenue)"     //真实收益
	  }, {
		expr = "0", as = "feeCpcTimes", union = "sum(feeCpcTimes)"     // fee cpc转化条数
	  }, {
		expr = "0", as = "feeCpmTimes", union = "sum(feeCpmTimes)"     // fee cpm转化条数
	  }, {
		expr = "0", as = "feeCpaTimes", union = "sum(feeCpaTimes)"     // fee cpa转化条数
	  }, {
		expr = "0", as = "feeCpaSendTimes", union = "sum(feeCpaSendTimes)"     // fee cpa send转化条数
	  }, {
		expr = "0", as = "feeCpcReportPrice", union = "sum(feeCpcReportPrice)"     // fee cpc上游收益
	  }, {
		expr = "0", as = "feeCpmReportPrice", union = "sum(feeCpmReportPrice)"     // fee cpm上游收益
	  }, {
		expr = "0", as = "feeCpaReportPrice", union = "sum(feeCpaReportPrice)"     // fee cpa上游收益
	  }, {
		expr = "0", as = "feeCpcSendPrice", union = "sum(feeCpcSendPrice)"     // fee cpc下游收益
	  }, {
		expr = "0", as = "feeCpmSendPrice", union = "sum(feeCpmSendPrice)"     // fee cpm下游收益
	  }, {
		expr = "0", as = "feeCpaSendPrice", union = "sum(feeCpaSendPrice)"     // fee cpa下游收益
	  }, {
		expr = "0", as = "winPrice", union = "sum(winPrice)"     // 中签价格
	  }, {
		expr = "0", as = "winNotices", union = "sum(winNotices)"     // 中签数
	  }, {
		expr = "0", as = "newCount", union = "sum(newCount)"
	  }, {
		expr = "0", as = "activeCount", union = "sum(activeCount)"
	  }
	]
	dwr.include.repeated = true
	dwr.table = "test__ssp_report_overall_dwr"
	kafka.consumer {
	  partitoins = [
		{topic = "topic_ad_fill_new", partition = 0},
		{topic = "topic_ad_fill_new", partition = 1},
		{topic = "topic_ad_fill_new", partition = 2},
		{topic = "topic_ad_fill_new", partition = 3},
		{topic = "topic_ad_fill_new", partition = 4},
		{topic = "topic_ad_fill_new", partition = 5},
		{topic = "topic_ad_fill_new", partition = 6},
		{topic = "topic_ad_fill_new", partition = 7},
		{topic = "topic_ad_fill_new", partition = 8},
		{topic = "topic_ad_fill_new", partition = 9},
		{topic = "topic_ad_fill_new", partition = 10},
		{topic = "topic_ad_fill_new", partition = 11},
		{topic = "topic_ad_fill_new", partition = 12},
		{topic = "topic_ad_fill_new", partition = 13},
		{topic = "topic_ad_fill_new", partition = 14},
		{topic = "topic_ad_fill_new", partition = 15},
		{topic = "topic_ad_fill_new", partition = 16},
		{topic = "topic_ad_fill_new", partition = 17},
		{topic = "topic_ad_fill_new", partition = 18},
		{topic = "topic_ad_fill_new", partition = 19}
	  ]
	}
  }

  test__overall_send {
	class = "com.mobikok.ssp.data.streaming.module.PluggableModule"
	//    class = "com.mobikok.ssp.data.streaming.module.MixModuleForBTime"
	business.time.extract.by = "createTime"
	commit.batch.size = 1
	commit.time.interval = 1800
	dwi.enable = true
	dwi.table = "test__ssp_overall_send_dwi"
	//    dwi.partition.fields = ["repeated", "l_time", "b_date", "b_time"]
	dwi.kafka.schema = "com.mobikok.ssp.data.streaming.schema.dwi.kafka.SspTrafficDWISchema"

	dwr.enable = true
	dwr.groupby.fields = [
	  {
		expr = "publisherId", as = "publisherId"
	  }, {
		expr = "subId", as = "appId"
	  }, {
		expr = "countryId", as = "countryId"
	  }, {
		expr = "carrierId", as = "carrierId"
	  }, {
		expr = "sv", as = "versionName"
	  }, {
		expr = "adType", as = "adType"
	  }, {
		expr = "campaignId", as = "campaignId"
	  }, {
		expr = "offerId", as = "offerId"
	  }, {
		expr = "imageId", as = "imageId"
	  }, {
		expr = "affSub", as = "affSub"
	  }, {
		expr = "packageName", as = "packageName"
	  }, {
		expr = "domain", as = "domain"
	  }, {
		expr = "operatingSystem(userAgent)", as = "operatingSystem"
	  }, {
		expr = "language(userAgent)", as = "systemLanguage"
	  }, {
		expr = "machineModel(userAgent)", as = "deviceBrand"
	  }, {
		expr = "deviceType(userAgent)", as = "deviceType"
	  }, {
		expr = "browserKernel(userAgent)", as = "browserKernel"
	  }, {
		expr = "respStatus", as = "respStatus"
	  }, {
		expr = "test", as = "test"
	  }, {
		expr = "ruleId", as = "ruleId"
	  }, {
		expr = "smartId", as = "smartId"
	  }, {
		expr = "eventName", as = "eventName"
	  }, {
		expr = "recommender", as = "recommender"
	  }, {
		expr = "raterType", as = "raterType"
	  }, {
		expr = "raterId", as = "raterId"
	  }
	]
	dwr.groupby.aggs = [
	  {
		expr = "0", as = "requestCount", union = "sum(requestCount)"
	  }, {
		expr = "count(1)", as = "sendCount", union = "sum(sendCount)"
	  }, {
		expr = "0", as = "showCount", union = "sum(showCount)"
	  }, {
		expr = "0", as = "clickCount", union = "sum(clickCount)"
	  }, {
		expr = "0", as = "feeReportCount", union = "sum(feeReportCount)"  //计费条数
	  }, {
		expr = "0", as = "feeSendCount", union = "sum(feeSendCount)"    //计费显示条数
	  }, {
		expr = "0", as = "feeReportPrice", union = "sum(feeReportPrice)"  //计费金额(真实收益)
	  }, {
		expr = "0", as = "feeSendPrice", union = "sum(feeSendPrice)"    //计费显示金额(收益)
	  }, {
		expr = "0", as = "cpcBidPrice", union = "sum(cpcBidPrice)"
	  }, {
		expr = "0", as = "cpmBidPrice", union = "sum(cpmBidPrice)"
	  }, {
		expr = "0", as = "conversion", union = "sum(conversion)"      //转化数,目前不要含展示和点击产生的
	  }, {
		expr = "0", as = "allConversion", union = "sum(allConversion)"   //转化数,含展示和点击产生的
	  }, {
		expr = "0", as = "revenue", union = "sum(revenue)"         //收益
	  }, {
		expr = "0", as = "realRevenue", union = "sum(realRevenue)"     //真实收益
	  }, {
		expr = "0", as = "feeCpcTimes", union = "sum(feeCpcTimes)"     // fee cpc转化条数
	  }, {
		expr = "0", as = "feeCpmTimes", union = "sum(feeCpmTimes)"     // fee cpm转化条数
	  }, {
		expr = "0", as = "feeCpaTimes", union = "sum(feeCpaTimes)"     // fee cpa转化条数
	  }, {
		expr = "0", as = "feeCpaSendTimes", union = "sum(feeCpaSendTimes)"     // fee cpa send转化条数
	  }, {
		expr = "0", as = "feeCpcReportPrice", union = "sum(feeCpcReportPrice)"     // fee cpc上游收益
	  }, {
		expr = "0", as = "feeCpmReportPrice", union = "sum(feeCpmReportPrice)"     // fee cpm上游收益
	  }, {
		expr = "0", as = "feeCpaReportPrice", union = "sum(feeCpaReportPrice)"     // fee cpa上游收益
	  }, {
		expr = "0", as = "feeCpcSendPrice", union = "sum(feeCpcSendPrice)"     // fee cpc下游收益
	  }, {
		expr = "0", as = "feeCpmSendPrice", union = "sum(feeCpmSendPrice)"     // fee cpm下游收益
	  }, {
		expr = "0", as = "feeCpaSendPrice", union = "sum(feeCpaSendPrice)"     // fee cpa下游收益
	  }, {
		expr = "0", as = "winPrice", union = "sum(winPrice)"     // 中签价格
	  }, {
		expr = "0", as = "winNotices", union = "sum(winNotices)"     // 中签数
	  }, {
		expr = "0", as = "newCount", union = "sum(newCount)"
	  }, {
		expr = "0", as = "activeCount", union = "sum(activeCount)"
	  }
	]
	dwr.include.repeated = true
	dwr.table = "test__ssp_report_overall_dwr"
	kafka.consumer {
	  partitoins = [
		{topic = "topic_ad_send_new", partition = 0},
		{topic = "topic_ad_send_new", partition = 1},
		{topic = "topic_ad_send_new", partition = 2},
		{topic = "topic_ad_send_new", partition = 3},
		{topic = "topic_ad_send_new", partition = 4},
		{topic = "topic_ad_send_new", partition = 5},
		{topic = "topic_ad_send_new", partition = 6},
		{topic = "topic_ad_send_new", partition = 7}
		,
		{topic = "topic_ad_send_new", partition = 8},
		{topic = "topic_ad_send_new", partition = 9},
		{topic = "topic_ad_send_new", partition = 10},
		{topic = "topic_ad_send_new", partition = 11},
		{topic = "topic_ad_send_new", partition = 12},
		{topic = "topic_ad_send_new", partition = 13},
		{topic = "topic_ad_send_new", partition = 14},
		{topic = "topic_ad_send_new", partition = 15},
		{topic = "topic_ad_send_new", partition = 16},
		{topic = "topic_ad_send_new", partition = 17},
		{topic = "topic_ad_send_new", partition = 18},
		{topic = "topic_ad_send_new", partition = 19}
	  ]
	}
  }

  test__overall_show {
	class = "com.mobikok.ssp.data.streaming.module.PluggableModule"
	//    class = "com.mobikok.ssp.data.streaming.module.MixModuleForBTime"
	business.time.extract.by = "showTime"
	commit.batch.size = 1
	commit.time.interval = 1800
	dwi.uuid.enable = false
	dwi.uuid.fields = ["clickId"]
	dwi.enable = true
	dwi.table = "test__ssp_overall_show_dwi"
	dwi.kafka.schema = "com.mobikok.ssp.data.streaming.schema.dwi.kafka.SspTrafficDWISchema"
	dwr.enable = true
	dwr.groupby.fields = [
	  {
		expr = "publisherId", as = "publisherId"
	  }, {
		expr = "subId", as = "appId"
	  }, {
		expr = "countryId", as = "countryId"
	  }, {
		expr = "carrierId", as = "carrierId"
	  }, {
		expr = "sv", as = "versionName"
	  }, {
		expr = "adType", as = "adType"
	  }, {
		expr = "campaignId", as = "campaignId"
	  }, {
		expr = "offerId", as = "offerId"
	  }, {
		expr = "imageId", as = "imageId"
	  }, {
		expr = "affSub", as = "affSub"
	  }, {
		expr = "packageName", as = "packageName"
	  }, {
		expr = "domain", as = "domain"
	  }, {
		expr = "operatingSystem(userAgent)", as = "operatingSystem"
	  }, {
		expr = "language(userAgent)", as = "systemLanguage"
	  }, {
		expr = "machineModel(userAgent)", as = "deviceBrand"
	  }, {
		expr = "deviceType(userAgent)", as = "deviceType"
	  }, {
		expr = "browserKernel(userAgent)", as = "browserKernel"
	  }, {
		expr = "respStatus", as = "respStatus"
	  }, {
		expr = "test", as = "test"
	  }, {
		expr = "ruleId", as = "ruleId"
	  }, {
		expr = "smartId", as = "smartId"
	  }, {
		expr = "eventName", as = "eventName"
	  }, {
		expr = "recommender", as = "recommender"
	  }, {
		expr = "raterType", as = "raterType"
	  }, {
		expr = "raterId", as = "raterId"
	  }
	]
	dwr.groupby.aggs = [
	  {
		expr = "0", as = "requestCount", union = "sum(requestCount)"
	  }, {
		expr = "0", as = "sendCount", union = "sum(sendCount)"
	  }, {
		expr = "count(1)", as = "showCount", union = "sum(showCount)"
	  }, {
		expr = "0", as = "clickCount", union = "sum(clickCount)"
	  }, {
		expr = "0", as = "feeReportCount", union = "sum(feeReportCount)"  //计费条数
	  }, {
		expr = "0", as = "feeSendCount", union = "sum(feeSendCount)"    //计费显示条数
	  }, {
		expr = "0", as = "feeReportPrice", union = "sum(feeReportPrice)"  //计费金额(真实收益)
	  }, {
		expr = "0", as = "feeSendPrice", union = "sum(feeSendPrice)"    //计费显示金额(收益)
	  }, {
		expr = "0", as = "cpcBidPrice", union = "sum(cpcBidPrice)"
	  }, {
		expr = "sum( cast( if( priceMethod = 2,  bidPrice, 0) as decimal(19,10) ) ) / 1000", as = "cpmBidPrice", union = "sum(cpmBidPrice)"
	  }, {
		expr = "0", as = "conversion", union = "sum(conversion)"      //转化数,目前不要含展示和点击产生的
	  }, {
		expr = "count( if( priceMethod = 2, 1, null) )", as = "allConversion", union = "sum(allConversion)"   //转化数,含展示和点击产生的
	  }, {
		expr = "sum( cast( if( priceMethod = 2,  sendPrice, 0) as decimal(19,10) ) )", as = "revenue", union = "sum(revenue)"         //收益
	  }, {
		expr = "sum( cast( if( priceMethod = 2,  bidPrice, 0) as decimal(19,10) ) ) / 1000", as = "realRevenue", union = "sum(realRevenue)"     //真实收益
	  }, {
		expr = "0", as = "feeCpcTimes", union = "sum(feeCpcTimes)"     // fee cpc转化条数
	  }, {
		expr = "0", as = "feeCpmTimes", union = "sum(feeCpmTimes)"     // fee cpm转化条数
	  }, {
		expr = "0", as = "feeCpaTimes", union = "sum(feeCpaTimes)"     // fee cpa转化条数
	  }, {
		expr = "0", as = "feeCpaSendTimes", union = "sum(feeCpaSendTimes)"     // fee cpa send转化条数
	  }, {
		expr = "0", as = "feeCpcReportPrice", union = "sum(feeCpcReportPrice)"     // fee cpc上游收益
	  }, {
		expr = "0", as = "feeCpmReportPrice", union = "sum(feeCpmReportPrice)"     // fee cpm上游收益
	  }, {
		expr = "0", as = "feeCpaReportPrice", union = "sum(feeCpaReportPrice)"     // fee cpa上游收益
	  }, {
		expr = "0", as = "feeCpcSendPrice", union = "sum(feeCpcSendPrice)"     // fee cpc下游收益
	  }, {
		expr = "0", as = "feeCpmSendPrice", union = "sum(feeCpmSendPrice)"     // fee cpm下游收益
	  }, {
		expr = "0", as = "feeCpaSendPrice", union = "sum(feeCpaSendPrice)"     // fee cpa下游收益
	  }, {
		expr = "0", as = "winPrice", union = "sum(winPrice)"     // 中签价格
	  }, {
		expr = "0", as = "winNotices", union = "sum(winNotices)"     // 中签数
	  }, {
		expr = "0", as = "newCount", union = "sum(newCount)"
	  }, {
		expr = "0", as = "activeCount", union = "sum(activeCount)"
	  }
	]
	dwr.include.repeated = true
	dwr.table = "test__ssp_report_overall_dwr"
	kafka.consumer {
	  partitoins = [
		{
		  topic = "topic_ad_show"
		  partition = 0
		}
	  ]
	}
  }

  test__overall_click {
	class = "com.mobikok.ssp.data.streaming.module.PluggableModule"
	//    class = "com.mobikok.ssp.data.streaming.module.MixModuleForBTime"
	business.time.extract.by = "clickTime"
	commit.batch.size = 1
	commit.time.interval = 1800
	dwi.uuid.enable = false
	dwi.uuid.fields = ["clickId"]
	dwi.enable = false
	dwi.table = "test__ssp_overall_click_dwi"
	dwi.kafka.schema = "com.mobikok.ssp.data.streaming.schema.dwi.kafka.SspTrafficDWISchema"
	dwr.enable = true
	dwr.groupby.fields = [
	  {
		expr = "publisherId", as = "publisherId"
	  }, {
		expr = "subId", as = "appId"
	  }, {
		expr = "countryId", as = "countryId"
	  }, {
		expr = "carrierId", as = "carrierId"
	  }, {
		expr = "sv", as = "versionName"
	  }, {
		expr = "adType", as = "adType"
	  }, {
		expr = "campaignId", as = "campaignId"
	  }, {
		expr = "offerId", as = "offerId"
	  }, {
		expr = "imageId", as = "imageId"
	  }, {
		expr = "affSub", as = "affSub"
	  }, {
		expr = "packageName", as = "packageName"
	  }, {
		expr = "domain", as = "domain"
	  }, {
		expr = "operatingSystem(userAgent)", as = "operatingSystem"
	  }, {
		expr = "language(userAgent)", as = "systemLanguage"
	  }, {
		expr = "machineModel(userAgent)", as = "deviceBrand"
	  }, {
		expr = "deviceType(userAgent)", as = "deviceType"
	  }, {
		expr = "browserKernel(userAgent)", as = "browserKernel"
	  }, {
		expr = "respStatus", as = "respStatus"
	  }, {
		expr = "test", as = "test"
	  }, {
		expr = "ruleId", as = "ruleId"
	  }, {
		expr = "smartId", as = "smartId"
	  }, {
		expr = "eventName", as = "eventName"
	  }, {
		expr = "recommender", as = "recommender"
	  }, {
		expr = "raterType", as = "raterType"
	  }, {
		expr = "raterId", as = "raterId"
	  }
	]
	dwr.groupby.aggs = [{
	  expr = "0", as = "requestCount", union = "sum(requestCount)"
	}, {
	  expr = "0", as = "sendCount", union = "sum(sendCount)"
	}, {
	  expr = "0", as = "showCount", union = "sum(showCount)"
	}, {
	  expr = "count(1)", as = "clickCount", union = "sum(clickCount)"
	}, {
	  expr = "0", as = "feeReportCount", union = "sum(feeReportCount)"  //计费条数
	}, {
	  expr = "0", as = "feeSendCount", union = "sum(feeSendCount)"    //计费显示条数
	}, {
	  expr = "0", as = "feeReportPrice", union = "sum(feeReportPrice)"  //计费金额(真实收益)
	}, {
	  expr = "0", as = "feeSendPrice", union = "sum(feeSendPrice)"    //计费显示金额(收益)
	}, {
	  expr = "sum( cast( if( priceMethod = 1,  bidPrice, 0) as decimal(19,10) ) )", as = "cpcBidPrice", union = "sum(cpcBidPrice)"
	}, {
	  expr = "0", as = "cpmBidPrice", union = "sum(cpmBidPrice)"
	}, {
	  expr = "0", as = "conversion", union = "sum(conversion)"      //转化数,目前不要含展示和点击产生的
	}, {
	  expr = "count(if( priceMethod = 1,  1, null))", as = "allConversion", union = "sum(allConversion)"   //转化数,含展示和点击产生的
	}, {
	  expr = "sum( cast( if( priceMethod = 1,  sendPrice, 0) as decimal(19,10) ) )", as = "revenue", union = "sum(revenue)"         //收益
	}, {
	  expr = "sum( cast( if( priceMethod = 1,  bidPrice, 0) as decimal(19,10) ) )", as = "realRevenue", union = "sum(realRevenue)"     //真实收益
	}, {
	  expr = "0", as = "feeCpcTimes", union = "sum(feeCpcTimes)"     // fee cpc转化条数
	}, {
	  expr = "0", as = "feeCpmTimes", union = "sum(feeCpmTimes)"     // fee cpm转化条数
	}, {
	  expr = "0", as = "feeCpaTimes", union = "sum(feeCpaTimes)"     // fee cpa转化条数
	}, {
	  expr = "0", as = "feeCpaSendTimes", union = "sum(feeCpaSendTimes)"     // fee cpa send转化条数
	}, {
	  expr = "0", as = "feeCpcReportPrice", union = "sum(feeCpcReportPrice)"     // fee cpc上游收益
	}, {
	  expr = "0", as = "feeCpmReportPrice", union = "sum(feeCpmReportPrice)"     // fee cpm上游收益
	}, {
	  expr = "0", as = "feeCpaReportPrice", union = "sum(feeCpaReportPrice)"     // fee cpa上游收益
	}, {
	  expr = "0", as = "feeCpcSendPrice", union = "sum(feeCpcSendPrice)"     // fee cpc下游收益
	}, {
	  expr = "0", as = "feeCpmSendPrice", union = "sum(feeCpmSendPrice)"     // fee cpm下游收益
	}, {
	  expr = "0", as = "feeCpaSendPrice", union = "sum(feeCpaSendPrice)"     // fee cpa下游收益
	}, {
	  expr = "0", as = "winPrice", union = "sum(winPrice)"     // 中签价格
	}, {
	  expr = "0", as = "winNotices", union = "sum(winNotices)"     // 中签数
	}, {
	  expr = "0", as = "newCount", union = "sum(newCount)"
	}, {
	  expr = "0", as = "activeCount", union = "sum(activeCount)"
	}
	]
	dwr.include.repeated = true
	dwr.table = "test__ssp_report_overall_dwr"
	kafka.consumer {
	  partitoins = [
		{topic = "topic_ad_click", partition = 0},
		{topic = "topic_ad_click", partition = 1},
		{topic = "topic_ad_click", partition = 2},
		{topic = "topic_ad_click", partition = 3},
		{topic = "topic_ad_click", partition = 4},
		{topic = "topic_ad_click", partition = 5},
		{topic = "topic_ad_click", partition = 6},
		{topic = "topic_ad_click", partition = 7},
		{topic = "topic_ad_click", partition = 8},
		{topic = "topic_ad_click", partition = 9},
		{topic = "topic_ad_click", partition = 10},
		{topic = "topic_ad_click", partition = 11},
		{topic = "topic_ad_click", partition = 12},
		{topic = "topic_ad_click", partition = 13},
		{topic = "topic_ad_click", partition = 14},
		{topic = "topic_ad_click", partition = 15},
		{topic = "topic_ad_click", partition = 16},
		{topic = "topic_ad_click", partition = 17},
		{topic = "topic_ad_click", partition = 18},
		{topic = "topic_ad_click", partition = 19}
	  ]
	}
  }

  test__overall_fee {
	class = "com.mobikok.ssp.data.streaming.module.PluggableModule"
	//      class = "com.mobikok.ssp.data.streaming.module.MixModuleForBTime"
	business.time.extract.by = "reportTime"
	//    commit.batch.size = 1
	//    commit.time.interval = 1800

	dwi.uuid.enable = true
	dwi.uuid.fields = ["clickId"]
	uuid.filter.class = "com.mobikok.ssp.data.streaming.module.support.uuid.NativeUuidFilter"
	dwi.enable = true
	dwi.table = "test__ssp_overall_fee_dwi"
	dwi.partition.fields = ["repeated", "l_time", "b_date", "b_time"]
	dwi.kafka.schema = "com.mobikok.ssp.data.streaming.schema.dwi.kafka.SspTrafficDWISchema"

	//    dwi.handler=[{
	//      class = "com.mobikok.ssp.data.streaming.handler.dm.online.ClickHouseDMPersistHandler"
	//    }]

	master = true

	dwr.enable = true
	dwr.partition.fields = ["l_time", "b_date", "b_time"]
	dwr.acc.day.enable = true
	dwr.acc.day.overwrite = [{expr = "null",    as = "operatingSystem"}]
	dwr.acc.month.enable = true
	dwr.acc.month.overwrite = [{expr = "null",    as = "operatingSystem"}]

	dwr.groupby.fields = [
	  {
		expr = "publisherId", as = "publisherId"
	  }, {
		expr = "subId", as = "appId"
	  }, {
		expr = "countryId", as = "countryId"
	  }, {
		expr = "carrierId", as = "carrierId"
	  }, {
		expr = "sv", as = "versionName"
	  }, {
		expr = "adType", as = "adType"
	  }, {
		expr = "campaignId", as = "campaignId"
	  }, {
		expr = "offerId", as = "offerId"
	  }, {
		expr = "imageId", as = "imageId"
	  }, {
		expr = "affSub", as = "affSub"
	  }, {
		expr = "packageName", as = "packageName"
	  }, {
		expr = "domain", as = "domain"
	  }, {
		expr = "operatingSystem(userAgent)", as = "operatingSystem"
	  }, {
		expr = "language(userAgent)", as = "systemLanguage"
	  }, {
		expr = "machineModel(userAgent)", as = "deviceBrand"
	  }, {
		expr = "deviceType(userAgent)", as = "deviceType"
	  }, {
		expr = "browserKernel(userAgent)", as = "browserKernel"
	  }, {
		expr = "respStatus", as = "respStatus"
	  }, {
		expr = "test", as = "test"
	  }, {
		expr = "ruleId", as = "ruleId"
	  }, {
		expr = "smartId", as = "smartId"
	  }, {
		expr = "eventName", as = "eventName"
	  }, {
		expr = "recommender", as = "recommender"
	  }, {
		expr = "raterType", as = "raterType"
	  }, {
		expr = "raterId", as = "raterId"
	  }
	]
	dwr.groupby.aggs = [
	  {
		expr = "0", as = "requestCount", union = "sum(requestCount)"
	  }, {
		expr = "0", as = "sendCount", union = "sum(sendCount)"
	  }, {
		expr = "0", as = "showCount", union = "sum(showCount)"
	  }, {
		expr = "0", as = "clickCount", union = "sum(clickCount)"
	  }, {
		expr = "count(1)", as = "feeReportCount", union = "sum(feeReportCount)"  //计费条数
	  }, {
		expr = "count(if(isSend = 1, 1, null))", as = "feeSendCount", union = "sum(feeSendCount)"    //计费显示条数
	  }, {
		expr = "sum( cast(reportPrice as decimal(19,10)) )", as = "feeReportPrice", union = "sum(feeReportPrice)"  //计费金额(真实收益)
	  }, {
		expr = "sum( cast(sendPrice as decimal(19,10)) )", as = "feeSendPrice", union = "sum(feeSendPrice)"    //计费显示金额(收益)
	  }, {
		expr = "0", as = "cpcBidPrice", union = "sum(cpcBidPrice)"
	  }, {
		expr = "0", as = "cpmBidPrice", union = "sum(cpmBidPrice)"
	  }, {
		expr = "count(1)", as = "conversion", union = "sum(conversion)"      //转化数,目前不要含展示和点击产生的
	  }, {
		expr = "count(1)", as = "allConversion", union = "sum(allConversion)"   //转化数,含展示和点击产生的
	  }, {
		expr = "sum( cast(sendPrice as decimal(19,10)) )", as = "revenue", union = "sum(revenue)"         //收益
	  }, {
		expr = "sum( cast(reportPrice as decimal(19,10)) )", as = "realRevenue", union = "sum(realRevenue)"     //真实收益
	  }, {
		expr = "count(if( priceMethod = 1,  1, null))", as = "feeCpcTimes", union = "sum(feeCpcTimes)"     // fee cpc转化条数
	  }, {
		expr = "count(if( priceMethod = 2,  1, null))", as = "feeCpmTimes", union = "sum(feeCpmTimes)"     // fee cpm转化条数
	  }, {
		expr = "count(if( priceMethod = 3,  1, null))", as = "feeCpaTimes", union = "sum(feeCpaTimes)"     // fee cpa转化条数
	  }, {
		expr = "count(if( priceMethod = 3 and isSend = 1,  1, null))", as = "feeCpaSendTimes", union = "sum(feeCpaSendTimes)"     // fee cpa send转化条数
	  }, {
		expr = "sum( cast( if( priceMethod = 1,  reportPrice, 0) as decimal(19,10) ) )", as = "feeCpcReportPrice", union = "sum(feeCpcReportPrice)"     // fee cpc上游收益
	  }, {
		expr = "sum( cast( if( priceMethod = 2,  reportPrice, 0) as decimal(19,10) ) )", as = "feeCpmReportPrice", union = "sum(feeCpmReportPrice)"     // fee cpm上游收益
	  }, {
		expr = "sum( cast( if( priceMethod = 3,  reportPrice, 0) as decimal(19,10) ) )", as = "feeCpaReportPrice", union = "sum(feeCpaReportPrice)"     // fee cpa上游收益
	  }, {
		expr = "sum( cast( if( priceMethod = 1,  sendPrice, 0) as decimal(19,10) ) )", as = "feeCpcSendPrice", union = "sum(feeCpcSendPrice)"     // fee cpc下游收益
	  }, {
		expr = "sum( cast( if( priceMethod = 2,  sendPrice, 0) as decimal(19,10) ) )", as = "feeCpmSendPrice", union = "sum(feeCpmSendPrice)"     // fee cpm下游收益
	  }, {
		expr = "sum( cast( if( priceMethod = 3 and isSend = 1,  sendPrice, 0) as decimal(19,10) ) )", as = "feeCpaSendPrice", union = "sum(feeCpaSendPrice)"     // fee cpa下游收益
	  }, {
		expr = "0", as = "winPrice", union = "sum(winPrice)"     // 中签价格
	  }, {
		expr = "0", as = "winNotices", union = "sum(winNotices)"     // 中签数
	  }, {
		expr = "0", as = "newCount", union = "sum(newCount)"
	  }, {
		expr = "0", as = "activeCount", union = "sum(activeCount)"
	  }
	]

	//dwr.include.repeated = false
	dwr.table = "test__ssp_report_overall_dwr"
	dm.handler.online.enable = true
	dm.handler.online = [{
	  class = "com.mobikok.ssp.data.streaming.handler.dm.online.ClickHouseDMPersistHandler"
	  hive.table = "test__ssp_report_overall_dwr"
	  clickhouse.table = "test__ssp_report_overall_dm"
	  sql = """
		select
		  coalesce(dwr.publisherId, a.publisherId) as publisherid,
		  dwr.appId         as appid,
		  dwr.countryId     as countryid,
		  dwr.carrierId     as carrierid,
		  dwr.adType        as adtype,
		  dwr.campaignId    as campaignid,
		  dwr.offerId       as offerid,
		  dwr.imageId       as imageid,
		  dwr.affSub        as affsub,
		  dwr.requestCount  as requestcount,
		  dwr.sendCount     as sendcount,
		  dwr.showCount     as showcount,
		  dwr.clickCount    as clickcount,
		  dwr.feeReportCount as feereportcount,
		  dwr.feeSendCount   as feesendcount,
		  dwr.feeReportPrice as feereportprice,
		  dwr.feeSendPrice  as feesendprice,
		  dwr.cpcBidPrice   as cpcbidprice,
		  dwr.cpmBidPrice   as cpmbidprice,
		  dwr.conversion    as conversion,
		  dwr.allConversion as allconversion,
		  dwr.revenue       as revenue,
		  dwr.realRevenue   as realrevenue,
		  dwr.b_time,
		  dwr.l_time,
		  dwr.b_date,
		  nvl(p.amId, a_p.amId)      as publisheramid,
		  nvl(p_am.name, ap_am.name) as publisheramname,
		  ad.amId           as advertiseramid,
		  a_am.name         as advertiseramname,
		  a.mode            as appmodeid,
		  m.name            as appmodename,
		  cam.adCategory1   as adcategory1id,
		  adc.name          as adcategory1name,
		  cam.name          as campaignname,
		  cam.adverId       as adverid,
		  ad.name           as advername,
		  o.optStatus       as offeroptstatus,
		  o.name            as offername,
		  nvl(p.name, a_p.name ) as publishername,
		  a.name            as appname,
		  i.iab1name        as iab1name,
		  i.iab2name        as iab2name,
		  c.name            as countryname,
		  ca.name           as carriername,
		  dwr.adType        as adtypeid,
		  adt.name          as adtypename,
		  v.id              as versionid,
		  dwr.versionName   as versionname,
		  p_am.proxyId      as publisherproxyid,
		  cast(null as string)    as data_type,
		  feeCpcTimes             as feecpctimes,
		  feeCpmTimes             as feecpmtimes,
		  feeCpaTimes             as feecpatimes,
		  feeCpaSendTimes         as feecpasendtimes,
		  feeCpcReportPrice       as feecpcreportprice,
		  feeCpmReportPrice       as feecpmreportprice,
		  feeCpaReportPrice       as feecpareportprice,
		  feeCpcSendPrice         as feecpcsendprice,
		  feeCpmSendPrice         as feecpmsendprice,
		  feeCpaSendPrice         as feecpasendprice,
		  c.alpha2_code           as countrycode,
		  dwr.respStatus          as respstatus,
		  dwr.winPrice            as winprice,
		  dwr.winNotices          as winnotices,
		  a.isSecondHighPriceWin  as issecondhighpricewin,
		  co.id                   as companyid,
		  co.name                 as companyname,
		  dwr.test,
		  dwr.ruleId        as ruleid,
		  dwr.smartId       as smartid,
		  pro.id            as proxyid,
		  s.name            as smartname,
		  sr.name           as rulename,
		  co.id             as appcompanyid,
		  co_o.id           as offercompanyid,
		  newCount          as newcount,
		  activeCount       as activecount,
		  cam.adCategory2   as adcategory2id,
		  adc2.name         as adcategory2name,
		  coalesce(p.ampaId, a_p.ampaId)    as publisherampaid,
		  coalesce(p_amp.name, ap_amp.name) as publisherampaname,
		  ad.amaaId                         as advertiseramaaid,
		  a_ama.name                        as advertiseramaaname,
		  dwr.eventName                     as eventname,
		  dwr.recommender                   as recommender,
		  dwr.ratertype                     as ratertype,
		  dwr.raterid                       as raterid
		from {className}_dwr dwr
		  left join campaign cam      on cam.id = dwr.campaignId
		  left join advertiser ad     on ad.id = cam.adverId
		  left join employee a_am     on a_am.id = ad.amid
		  left join offer o           on o.id = dwr.offerId
		  left join publisher p       on p.id = dwr.publisherId
		  left join employee p_am     on p_am.id = p.amid
		  left join app a             on a.id = dwr.appId
		  left join iab i             on i.iab1 = a.iab1 and i.iab2 = a.iab2
		  left join app_mode m        on m.id = a.mode
		  left join country c         on c.id = dwr.countryId
		  left join carrier ca        on ca.id = dwr.carrierId
		  left join ad_type adt       on adt.id = dwr.adType
		  left join version_control v on v.version = dwr.versionName
		  left join ad_category1 adc  on adc.id =  cam.adCategory1
		  left join ad_category2 adc2 on adc2.id =  cam.adCategory2
		  left join publisher a_p     on  a_p.id = a.publisherId
		  left join employee ap_am    on  ap_am.id  = a_p.amId
		  left join proxy pro         on  pro.id  = ap_am.proxyId
		  left join company co        on  co.id = pro.companyId
		  left join other_smart_link s  on s.ID = dwr.smartId
		  left join smartlink_rules sr  on sr.ID = dwr.ruleId
		  left join campaign cam_o    on cam_o.id = o.campaignId
		  left join advertiser ad_o   on ad_o.id = cam_o.adverId
		  left join employee em_o     on em_o.id = ad_o.amId
		  left join company co_o      on co_o.id = em_o.companyId
		  left join employee p_amp    on p_amp.id = p.ampaId
		  left join employee ap_amp   on  ap_amp.id  = a_p.ampaId
		  left join employee a_ama    on a_ama.id = ad.amaaId
		where v.id is not null and b_date > "2018-01-15"
		"""
	}, {
	  class = "com.mobikok.ssp.data.streaming.handler.dm.online.ClickHouseDMPersistDayHandler"
	  hive.table = "test__ssp_report_overall_dwr"
	  clickhouse.table = "test__ssp_report_overall_dm_day"
	  sql = """
		select
		  coalesce(dwr.publisherId, a.publisherId) as publisherid,
		  dwr.appId         as appid,
		  dwr.countryId     as countryid,
		  dwr.carrierId     as carrierid,
		  dwr.adType        as adtype,
		  dwr.campaignId    as campaignid,
		  dwr.offerId       as offerid,
		  dwr.imageId       as imageid,
		  dwr.affSub        as affsub,
		  dwr.requestCount  as requestcount,
		  dwr.sendCount     as sendcount,
		  dwr.showCount     as showcount,
		  dwr.clickCount    as clickcount,
		  dwr.feeReportCount as feereportcount,
		  dwr.feeSendCount   as feesendcount,
		  dwr.feeReportPrice as feereportprice,
		  dwr.feeSendPrice  as feesendprice,
		  dwr.cpcBidPrice   as cpcbidprice,
		  dwr.cpmBidPrice   as cpmbidprice,
		  dwr.conversion    as conversion,
		  dwr.allConversion as allconversion,
		  dwr.revenue       as revenue,
		  dwr.realRevenue   as realrevenue,
		  date_format(dwr.b_time, 'yyyy-MM-dd 00:00:00') as b_time,
		  date_format(dwr.l_time, 'yyyy-MM-dd 00:00:00') as l_time,
		  date_format(dwr.b_date, 'yyyy-MM-dd')          as b_date,
		  nvl(p.amId, a_p.amId)      as publisheramid,
		  nvl(p_am.name, ap_am.name) as publisheramname,
		  ad.amId           as advertiseramid,
		  a_am.name         as advertiseramname,
		  a.mode            as appmodeid,
		  m.name            as appmodename,
		  cam.adCategory1   as adcategory1id,
		  adc.name          as adcategory1name,
		  cam.name          as campaignname,
		  cam.adverId       as adverid,
		  ad.name           as advername,
		  o.optStatus       as offeroptstatus,
		  o.name            as offername,
		  nvl(p.name, a_p.name ) as publishername,
		  a.name            as appname,
		  i.iab1name        as iab1name,
		  i.iab2name        as iab2name,
		  c.name            as countryname,
		  ca.name           as carriername,
		  dwr.adType        as adtypeid,
		  adt.name          as adtypename,
		  v.id              as versionid,
		  dwr.versionName   as versionname,
		  p_am.proxyId      as publisherproxyid,
		  cast(null as string)    as data_type,
		  feeCpcTimes             as feecpctimes,
		  feeCpmTimes             as feecpmtimes,
		  feeCpaTimes             as feecpatimes,
		  feeCpaSendTimes         as feecpasendtimes,
		  feeCpcReportPrice       as feecpcreportprice,
		  feeCpmReportPrice       as feecpmreportprice,
		  feeCpaReportPrice       as feecpareportprice,
		  feeCpcSendPrice         as feecpcsendprice,
		  feeCpmSendPrice         as feecpmsendprice,
		  feeCpaSendPrice         as feecpasendprice,
		  c.alpha2_code           as countrycode,
		  dwr.respStatus          as respstatus,
		  dwr.winPrice            as winprice,
		  dwr.winNotices          as winnotices,
		  a.isSecondHighPriceWin  as issecondhighpricewin,
		  co.id                   as companyid,
		  co.name                 as companyname,
		  dwr.test,
		  dwr.ruleId        as ruleid,
		  dwr.smartId       as smartid,
		  pro.id            as proxyid,
		  s.name            as smartname,
		  sr.name           as rulename,
		  co.id             as appcompanyid,
		  co_o.id           as offercompanyid,
		  newCount          as newcount,
		  activeCount       as activecount,
		  cam.adCategory2   as adcategory2id,
		  adc2.name         as adcategory2name,
		  coalesce(p.ampaId, a_p.ampaId)    as publisherampaid,
		  coalesce(p_amp.name, ap_amp.name) as publisherampaname,
		  ad.amaaId                         as advertiseramaaid,
		  a_ama.name                        as advertiseramaaname,
		  dwr.eventName                     as eventname,
		  dwr.recommender                   as recommender,
		  dwr.ratertype                     as ratertype,
		  dwr.raterid                       as raterid
		from {className}_dwr dwr
		  left join campaign cam      on cam.id = dwr.campaignId
		  left join advertiser ad     on ad.id = cam.adverId
		  left join employee a_am     on a_am.id = ad.amid
		  left join offer o           on o.id = dwr.offerId
		  left join publisher p       on p.id = dwr.publisherId
		  left join employee p_am     on p_am.id = p.amid
		  left join app a             on a.id = dwr.appId
		  left join iab i             on i.iab1 = a.iab1 and i.iab2 = a.iab2
		  left join app_mode m        on m.id = a.mode
		  left join country c         on c.id = dwr.countryId
		  left join carrier ca        on ca.id = dwr.carrierId
		  left join ad_type adt       on adt.id = dwr.adType
		  left join version_control v on v.version = dwr.versionName
		  left join ad_category1 adc  on adc.id =  cam.adCategory1
		  left join ad_category2 adc2 on adc2.id =  cam.adCategory2
		  left join publisher a_p     on  a_p.id = a.publisherId
		  left join employee ap_am    on  ap_am.id  = a_p.amId
		  left join proxy pro         on  pro.id  = ap_am.proxyId
		  left join company co        on  co.id = pro.companyId
		  left join other_smart_link s  on s.ID = dwr.smartId
		  left join smartlink_rules sr  on sr.ID = dwr.ruleId
		  left join campaign cam_o    on cam_o.id = o.campaignId
		  left join advertiser ad_o   on ad_o.id = cam_o.adverId
		  left join employee em_o     on em_o.id = ad_o.amId
		  left join company co_o      on co_o.id = em_o.companyId
		  left join employee p_amp    on p_amp.id = p.ampaId
		  left join employee ap_amp   on  ap_amp.id  = a_p.ampaId
		  left join employee a_ama    on a_ama.id = ad.amaaId
		where v.id is not null and b_date > "2018-01-15"
		"""
	}, {
	  class = "com.mobikok.ssp.data.streaming.handler.dm.online.ClickHouseDMPersistMonthHandler"
	  hive.table = "test__ssp_report_overall_dwr"
	  clickhouse.table = "test__ssp_report_overall_dm_month"
	  sql = """
		select
		  coalesce(dwr.publisherId, a.publisherId) as publisherid,
		  dwr.appId         as appid,
		  dwr.countryId     as countryid,
		  dwr.carrierId     as carrierid,
		  dwr.adType        as adtype,
		  dwr.campaignId    as campaignid,
		  dwr.offerId       as offerid,
		  dwr.imageId       as imageid,
		  dwr.affSub        as affsub,
		  dwr.requestCount  as requestcount,
		  dwr.sendCount     as sendcount,
		  dwr.showCount     as showcount,
		  dwr.clickCount    as clickcount,
		  dwr.feeReportCount as feereportcount,
		  dwr.feeSendCount   as feesendcount,
		  dwr.feeReportPrice as feereportprice,
		  dwr.feeSendPrice  as feesendprice,
		  dwr.cpcBidPrice   as cpcbidprice,
		  dwr.cpmBidPrice   as cpmbidprice,
		  dwr.conversion    as conversion,
		  dwr.allConversion as allconversion,
		  dwr.revenue       as revenue,
		  dwr.realRevenue   as realrevenue,
		  date_format(dwr.b_time, 'yyyy-MM-01 00:00:00') as b_time,
		  date_format(dwr.l_time, 'yyyy-MM-01 00:00:00') as l_time,
		  date_format(dwr.b_date, 'yyyy-MM-01')          as b_date,
		  nvl(p.amId, a_p.amId)      as publisheramid,
		  nvl(p_am.name, ap_am.name) as publisheramname,
		  ad.amId           as advertiseramid,
		  a_am.name         as advertiseramname,
		  a.mode            as appmodeid,
		  m.name            as appmodename,
		  cam.adCategory1   as adcategory1id,
		  adc.name          as adcategory1name,
		  cam.name          as campaignname,
		  cam.adverId       as adverid,
		  ad.name           as advername,
		  o.optStatus       as offeroptstatus,
		  o.name            as offername,
		  nvl(p.name, a_p.name ) as publishername,
		  a.name            as appname,
		  i.iab1name        as iab1name,
		  i.iab2name        as iab2name,
		  c.name            as countryname,
		  ca.name           as carriername,
		  dwr.adType        as adtypeid,
		  adt.name          as adtypename,
		  v.id              as versionid,
		  dwr.versionName   as versionname,
		  p_am.proxyId      as publisherproxyid,
		  cast(null as string)    as data_type,
		  feeCpcTimes             as feecpctimes,
		  feeCpmTimes             as feecpmtimes,
		  feeCpaTimes             as feecpatimes,
		  feeCpaSendTimes         as feecpasendtimes,
		  feeCpcReportPrice       as feecpcreportprice,
		  feeCpmReportPrice       as feecpmreportprice,
		  feeCpaReportPrice       as feecpareportprice,
		  feeCpcSendPrice         as feecpcsendprice,
		  feeCpmSendPrice         as feecpmsendprice,
		  feeCpaSendPrice         as feecpasendprice,
		  c.alpha2_code           as countrycode,
		  dwr.respStatus          as respstatus,
		  dwr.winPrice            as winprice,
		  dwr.winNotices          as winnotices,
		  a.isSecondHighPriceWin  as issecondhighpricewin,
		  co.id                   as companyid,
		  co.name                 as companyname,
		  dwr.test,
		  dwr.ruleId        as ruleid,
		  dwr.smartId       as smartid,
		  pro.id            as proxyid,
		  s.name            as smartname,
		  sr.name           as rulename,
		  co.id             as appcompanyid,
		  co_o.id           as offercompanyid,
		  newCount          as newcount,
		  activeCount       as activecount,
		  cam.adCategory2   as adcategory2id,
		  adc2.name         as adcategory2name,
		  coalesce(p.ampaId, a_p.ampaId)    as publisherampaid,
		  coalesce(p_amp.name, ap_amp.name) as publisherampaname,
		  ad.amaaId                         as advertiseramaaid,
		  a_ama.name                        as advertiseramaaname,
		  dwr.eventName                     as eventname,
		  dwr.recommender                   as recommender,
		  dwr.ratertype                     as ratertype,
		  dwr.raterid                       as raterid
		from {className}_dwr dwr
		  left join campaign cam      on cam.id = dwr.campaignId
		  left join advertiser ad     on ad.id = cam.adverId
		  left join employee a_am     on a_am.id = ad.amid
		  left join offer o           on o.id = dwr.offerId
		  left join publisher p       on p.id = dwr.publisherId
		  left join employee p_am     on p_am.id = p.amid
		  left join app a             on a.id = dwr.appId
		  left join iab i             on i.iab1 = a.iab1 and i.iab2 = a.iab2
		  left join app_mode m        on m.id = a.mode
		  left join country c         on c.id = dwr.countryId
		  left join carrier ca        on ca.id = dwr.carrierId
		  left join ad_type adt       on adt.id = dwr.adType
		  left join version_control v on v.version = dwr.versionName
		  left join ad_category1 adc  on adc.id =  cam.adCategory1
		  left join ad_category2 adc2 on adc2.id =  cam.adCategory2
		  left join publisher a_p     on  a_p.id = a.publisherId
		  left join employee ap_am    on  ap_am.id  = a_p.amId
		  left join proxy pro         on  pro.id  = ap_am.proxyId
		  left join company co        on  co.id = pro.companyId
		  left join other_smart_link s  on s.ID = dwr.smartId
		  left join smartlink_rules sr  on sr.ID = dwr.ruleId
		  left join campaign cam_o    on cam_o.id = o.campaignId
		  left join advertiser ad_o   on ad_o.id = cam_o.adverId
		  left join employee em_o     on em_o.id = ad_o.amId
		  left join company co_o      on co_o.id = em_o.companyId
		  left join employee p_amp    on p_amp.id = p.ampaId
		  left join employee ap_amp   on  ap_amp.id  = a_p.ampaId
		  left join employee a_ama    on a_ama.id = ad.amaaId
		where v.id is not null and b_date > "2018-01-15"
		"""
	}]

	kafka.consumer {
	  partitoins = [
		{
		  topic = "topic_ad_fee"
		  partition = 0
		}
	  ]
	}

  }

  test__overall_win {
	class = "com.mobikok.ssp.data.streaming.module.PluggableModule"
	//    class = "com.mobikok.ssp.data.streaming.module.MixModuleForBTime"
	business.time.extract.by = "winTime"
	commit.batch.size = 1
	commit.time.interval = 1800
	dwi.uuid.enable = false
	dwi.uuid.fields = ["clickId"]
	dwi.enable = true
	dwi.table = "test__ssp_overall_win_dwi"
	dwi.kafka.schema = "com.mobikok.ssp.data.streaming.schema.dwi.kafka.SspTrafficDWISchema"
	dwr.enable = true
	dwr.groupby.fields = [
	  {
		expr = "publisherId", as = "publisherId"
	  }, {
		expr = "subId", as = "appId"
	  }, {
		expr = "countryId", as = "countryId"
	  }, {
		expr = "carrierId", as = "carrierId"
	  }, {
		expr = "sv", as = "versionName"
	  }, {
		expr = "adType", as = "adType"
	  }, {
		expr = "campaignId", as = "campaignId"
	  }, {
		expr = "offerId", as = "offerId"
	  }, {
		expr = "imageId", as = "imageId"
	  }, {
		expr = "affSub", as = "affSub"
	  }, {
		expr = "packageName", as = "packageName"
	  }, {
		expr = "domain", as = "domain"
	  }, {
		expr = "operatingSystem(userAgent)", as = "operatingSystem"
	  }, {
		expr = "language(userAgent)", as = "systemLanguage"
	  }, {
		expr = "machineModel(userAgent)", as = "deviceBrand"
	  }, {
		expr = "deviceType(userAgent)", as = "deviceType"
	  }, {
		expr = "browserKernel(userAgent)", as = "browserKernel"
	  }, {
		expr = "respStatus", as = "respStatus"
	  }, {
		expr = "test", as = "test"
	  }, {
		expr = "ruleId", as = "ruleId"
	  }, {
		expr = "smartId", as = "smartId"
	  }, {
		expr = "eventName", as = "eventName"
	  }, {
		expr = "recommender", as = "recommender"
	  }, {
		expr = "raterType", as = "raterType"
	  }, {
		expr = "raterId", as = "raterId"
	  }
	]
	dwr.groupby.aggs = [
	  {
		expr = "0", as = "requestCount", union = "sum(requestCount)"
	  }, {
		expr = "0", as = "sendCount", union = "sum(sendCount)"
	  }, {
		expr = "0", as = "showCount", union = "sum(showCount)"
	  }, {
		expr = "0", as = "clickCount", union = "sum(clickCount)"
	  }, {
		expr = "0", as = "feeReportCount", union = "sum(feeReportCount)"  //计费条数
	  }, {
		expr = "0", as = "feeSendCount", union = "sum(feeSendCount)"    //计费显示条数
	  }, {
		expr = "0", as = "feeReportPrice", union = "sum(feeReportPrice)"  //计费金额(真实收益)
	  }, {
		expr = "0", as = "feeSendPrice", union = "sum(feeSendPrice)"    //计费显示金额(收益)
	  }, {
		expr = "0", as = "cpcBidPrice", union = "sum(cpcBidPrice)"
	  }, {
		expr = "0", as = "cpmBidPrice", union = "sum(cpmBidPrice)"
	  }, {
		expr = "0", as = "conversion", union = "sum(conversion)"      //转化数,目前不要含展示和点击产生的
	  }, {
		expr = "0", as = "allConversion", union = "sum(allConversion)"   //转化数,含展示和点击产生的
	  }, {
		expr = "0", as = "revenue", union = "sum(revenue)"         //收益
	  }, {
		expr = "0", as = "realRevenue", union = "sum(realRevenue)"     //真实收益
	  }, {
		expr = "0", as = "feeCpcTimes", union = "sum(feeCpcTimes)"     // fee cpc转化条数
	  }, {
		expr = "0", as = "feeCpmTimes", union = "sum(feeCpmTimes)"     // fee cpm转化条数
	  }, {
		expr = "0", as = "feeCpaTimes", union = "sum(feeCpaTimes)"     // fee cpa转化条数
	  }, {
		expr = "0", as = "feeCpaSendTimes", union = "sum(feeCpaSendTimes)"     // fee cpa send转化条数
	  }, {
		expr = "0", as = "feeCpcReportPrice", union = "sum(feeCpcReportPrice)"     // fee cpc上游收益
	  }, {
		expr = "0", as = "feeCpmReportPrice", union = "sum(feeCpmReportPrice)"     // fee cpm上游收益
	  }, {
		expr = "0", as = "feeCpaReportPrice", union = "sum(feeCpaReportPrice)"     // fee cpa上游收益
	  }, {
		expr = "0", as = "feeCpcSendPrice", union = "sum(feeCpcSendPrice)"     // fee cpc下游收益
	  }, {
		expr = "0", as = "feeCpmSendPrice", union = "sum(feeCpmSendPrice)"     // fee cpm下游收益
	  }, {
		expr = "0", as = "feeCpaSendPrice", union = "sum(feeCpaSendPrice)"     // fee cpa下游收益
	  }, {
		expr = "sum( cast(winPrice as decimal(19,10)) )", as = "winPrice", union = "sum(winPrice)"     // 中签价格
	  }, {
		expr = "count(1)", as = "winNotices", union = "sum(winNotices)"     // 中签数
	  }, {
		expr = "0", as = "newCount", union = "sum(newCount)"
	  }, {
		expr = "0", as = "activeCount", union = "sum(activeCount)"
	  }
	]
	dwr.include.repeated = true
	dwr.table = "test__ssp_report_overall_dwr"
	kafka.consumer {
	  partitoins = [
		{
		  topic = "topic_ad_win"
		  partition = 0
		}
	  ]
	}
  }

  test__overall_user_new {
	class = "com.mobikok.ssp.data.streaming.module.PluggableModule"
	//    class = "com.mobikok.ssp.data.streaming.module.MixModuleForBTime"
	business.time.extract.by = "createTime"
	commit.batch.size = 1
	commit.time.interval = 1800
	dwi.uuid.enable = false
	dwi.uuid.fields = ["clickId"]
	dwi.enable = true
	dwi.table = "test__ssp_overall_user_new_dwi"
	dwi.partition.fields = ["repeated", "l_time", "b_date", "b_time"]
	dwi.kafka.schema = "com.mobikok.ssp.data.streaming.schema.dwi.kafka.SspUserNewDWISchema"
	dwr.enable = true
	dwr.groupby.fields = [
	  {
		expr = "null", as = "publisherId"
	  }, {
		expr = "appId", as = "appId"
	  }, {
		expr = "countryId", as = "countryId"
	  }, {
		expr = "carrierId", as = "carrierId"
	  }, {
		expr = "sv", as = "versionName"
	  }, {
		expr = "null", as = "adType"
	  }, {
		expr = "null", as = "campaignId"
	  }, {
		expr = "null", as = "offerId"
	  }, {
		expr = "null", as = "imageId"
	  }, {
		expr = "affSub", as = "affSub"
	  }, {
		expr = "packageName", as = "packageName"
	  }, {
		expr = "null", as = "domain"
	  }, {
		expr = "operatingSystem(userAgent)", as = "operatingSystem"
	  }, {
		expr = "language(userAgent)", as = "systemLanguage"
	  }, {
		expr = "machineModel(userAgent)", as = "deviceBrand"
	  }, {
		expr = "deviceType(userAgent)", as = "deviceType"
	  }, {
		expr = "browserKernel(userAgent)", as = "browserKernel"
	  }, {
		expr = "null", as = "respStatus"
	  }, {
		expr = "null", as = "test"
	  }, {
		expr = "null", as = "ruleId"
	  }, {
		expr = "null", as = "smartId"
	  }, {
		expr = "null", as = "eventName"
	  }, {
		expr = "recommender", as = "recommender"
	  }, {
		expr = "raterType", as = "raterType"
	  }, {
		expr = "raterId", as = "raterId"
	  }
	]
	dwr.groupby.aggs = [
	  {
		expr = "0", as = "requestCount", union = "sum(requestCount)"
	  }, {
		expr = "0", as = "sendCount", union = "sum(sendCount)"
	  }, {
		expr = "0", as = "showCount", union = "sum(showCount)"
	  }, {
		expr = "0", as = "clickCount", union = "sum(clickCount)"
	  }, {
		expr = "0", as = "feeReportCount", union = "sum(feeReportCount)"  //计费条数
	  }, {
		expr = "0", as = "feeSendCount", union = "sum(feeSendCount)"    //计费显示条数
	  }, {
		expr = "0", as = "feeReportPrice", union = "sum(feeReportPrice)"  //计费金额(真实收益)
	  }, {
		expr = "0", as = "feeSendPrice", union = "sum(feeSendPrice)"    //计费显示金额(收益)
	  }, {
		expr = "0", as = "cpcBidPrice", union = "sum(cpcBidPrice)"
	  }, {
		expr = "0", as = "cpmBidPrice", union = "sum(cpmBidPrice)"
	  }, {
		expr = "0", as = "conversion", union = "sum(conversion)"      //转化数,目前不要含展示和点击产生的
	  }, {
		expr = "0", as = "allConversion", union = "sum(allConversion)"   //转化数,含展示和点击产生的
	  }, {
		expr = "0", as = "revenue", union = "sum(revenue)"         //收益
	  }, {
		expr = "0", as = "realRevenue", union = "sum(realRevenue)"     //真实收益
	  }, {
		expr = "0", as = "feeCpcTimes", union = "sum(feeCpcTimes)"     // fee cpc转化条数
	  }, {
		expr = "0", as = "feeCpmTimes", union = "sum(feeCpmTimes)"     // fee cpm转化条数
	  }, {
		expr = "0", as = "feeCpaTimes", union = "sum(feeCpaTimes)"     // fee cpa转化条数
	  }, {
		expr = "0", as = "feeCpaSendTimes", union = "sum(feeCpaSendTimes)"     // fee cpa send转化条数
	  }, {
		expr = "0", as = "feeCpcReportPrice", union = "sum(feeCpcReportPrice)"     // fee cpc上游收益
	  }, {
		expr = "0", as = "feeCpmReportPrice", union = "sum(feeCpmReportPrice)"     // fee cpm上游收益
	  }, {
		expr = "0", as = "feeCpaReportPrice", union = "sum(feeCpaReportPrice)"     // fee cpa上游收益
	  }, {
		expr = "0", as = "feeCpcSendPrice", union = "sum(feeCpcSendPrice)"     // fee cpc下游收益
	  }, {
		expr = "0", as = "feeCpmSendPrice", union = "sum(feeCpmSendPrice)"     // fee cpm下游收益
	  }, {
		expr = "0", as = "feeCpaSendPrice", union = "sum(feeCpaSendPrice)"     // fee cpa下游收益
	  }, {
		expr = "0", as = "winPrice", union = "sum(winPrice)"     // 中签价格
	  }, {
		expr = "0", as = "winNotices", union = "sum(winNotices)"     // 中签数
	  }, {
		expr = "count(1)", as = "newCount", union = "sum(newCount)"
	  }, {
		expr = "0", as = "activeCount", union = "sum(activeCount)"
	  }
	]
	dwr.include.repeated = true
	dwr.table = "test__ssp_report_overall_dwr"
	kafka.consumer {
	  partitoins = [
		{
		  topic = "topic_ad_user"
		  partition = 0
		}
	  ]
	}
  }

  test__overall_user_active {
	class = "com.mobikok.ssp.data.streaming.module.PluggableModule"
	//    class = "com.mobikok.ssp.data.streaming.module.MixModuleForBTime"
	business.time.extract.by = "createTime"
	commit.batch.size = 1
	commit.time.interval = 1800
	dwi.uuid.enable = false
	dwi.uuid.fields = ["clickId"]
	dwi.enable = true
	dwi.table = "test__ssp_overall_user_active_dwi"
	dwi.partition.fields = ["repeated", "l_time", "b_date", "b_time"]
	dwi.kafka.schema = "com.mobikok.ssp.data.streaming.schema.dwi.kafka.SspUserNewDWISchema"

	dwr.enable = true
	dwr.groupby.fields = [
	  {
		expr = "null", as = "publisherId"
	  }, {
		expr = "appId", as = "appId"
	  }, {
		expr = "countryId", as = "countryId"
	  }, {
		expr = "carrierId", as = "carrierId"
	  }, {
		expr = "sv", as = "versionName"
	  }, {
		expr = "null", as = "adType"
	  }, {
		expr = "null", as = "campaignId"
	  }, {
		expr = "null", as = "offerId"
	  }, {
		expr = "null", as = "imageId"
	  }, {
		expr = "affSub", as = "affSub"
	  }, {
		expr = "packageName", as = "packageName"
	  }, {
		expr = "null", as = "domain"
	  }, {
		expr = "operatingSystem(userAgent)", as = "operatingSystem"
	  }, {
		expr = "language(userAgent)", as = "systemLanguage"
	  }, {
		expr = "machineModel(userAgent)", as = "deviceBrand"
	  }, {
		expr = "deviceType(userAgent)", as = "deviceType"
	  }, {
		expr = "browserKernel(userAgent)", as = "browserKernel"
	  }, {
		expr = "null", as = "respStatus"
	  }, {
		expr = "null", as = "test"
	  }, {
		expr = "null", as = "ruleId"
	  }, {
		expr = "null", as = "smartId"
	  }, {
		expr = "null", as = "eventName"
	  }, {
		expr = "recommender", as = "recommender"
	  }, {
		expr = "raterType", as = "raterType"
	  }, {
		expr = "raterId", as = "raterId"
	  }
	]
	dwr.groupby.aggs = [
	  {
		expr = "0", as = "requestCount", union = "sum(requestCount)"
	  }, {
		expr = "0", as = "sendCount", union = "sum(sendCount)"
	  }, {
		expr = "0", as = "showCount", union = "sum(showCount)"
	  }, {
		expr = "0", as = "clickCount", union = "sum(clickCount)"
	  }, {
		expr = "0", as = "feeReportCount", union = "sum(feeReportCount)"  //计费条数
	  }, {
		expr = "0", as = "feeSendCount", union = "sum(feeSendCount)"    //计费显示条数
	  }, {
		expr = "0", as = "feeReportPrice", union = "sum(feeReportPrice)"  //计费金额(真实收益)
	  }, {
		expr = "0", as = "feeSendPrice", union = "sum(feeSendPrice)"    //计费显示金额(收益)
	  }, {
		expr = "0", as = "cpcBidPrice", union = "sum(cpcBidPrice)"
	  }, {
		expr = "0", as = "cpmBidPrice", union = "sum(cpmBidPrice)"
	  }, {
		expr = "0", as = "conversion", union = "sum(conversion)"      //转化数,目前不要含展示和点击产生的
	  }, {
		expr = "0", as = "allConversion", union = "sum(allConversion)"   //转化数,含展示和点击产生的
	  }, {
		expr = "0", as = "revenue", union = "sum(revenue)"         //收益
	  }, {
		expr = "0", as = "realRevenue", union = "sum(realRevenue)"     //真实收益
	  }, {
		expr = "0", as = "feeCpcTimes", union = "sum(feeCpcTimes)"     // fee cpc转化条数
	  }, {
		expr = "0", as = "feeCpmTimes", union = "sum(feeCpmTimes)"     // fee cpm转化条数
	  }, {
		expr = "0", as = "feeCpaTimes", union = "sum(feeCpaTimes)"     // fee cpa转化条数
	  }, {
		expr = "0", as = "feeCpaSendTimes", union = "sum(feeCpaSendTimes)"     // fee cpa send转化条数
	  }, {
		expr = "0", as = "feeCpcReportPrice", union = "sum(feeCpcReportPrice)"     // fee cpc上游收益
	  }, {
		expr = "0", as = "feeCpmReportPrice", union = "sum(feeCpmReportPrice)"     // fee cpm上游收益
	  }, {
		expr = "0", as = "feeCpaReportPrice", union = "sum(feeCpaReportPrice)"     // fee cpa上游收益
	  }, {
		expr = "0", as = "feeCpcSendPrice", union = "sum(feeCpcSendPrice)"     // fee cpc下游收益
	  }, {
		expr = "0", as = "feeCpmSendPrice", union = "sum(feeCpmSendPrice)"     // fee cpm下游收益
	  }, {
		expr = "0", as = "feeCpaSendPrice", union = "sum(feeCpaSendPrice)"     // fee cpa下游收益
	  }, {
		expr = "0", as = "winPrice", union = "sum(winPrice)"     // 中签价格
	  }, {
		expr = "0", as = "winNotices", union = "sum(winNotices)"     // 中签数
	  }, {
		expr = "0", as = "newCount", union = "sum(newCount)"
	  }, {
		expr = "count(1)", as = "activeCount", union = "sum(activeCount)"
	  }
	]
	dwr.include.repeated = true
	dwr.table = "test__ssp_report_overall_dwr"

	kafka.consumer {
	  partitoins = [
		{
		  topic = "topic_ad_user_active"
		  partition = 0
		}
	  ]
	}
  }

  test__overall_events {
	class = "com.mobikok.ssp.data.streaming.module.PluggableModule"
	//    class = "com.mobikok.ssp.data.streaming.module.MixModuleForBTime"
	business.time.extract.by = "reportTime"
	commit.batch.size = 1
	commit.time.interval = 1800
	dwi.uuid.enable = false
	dwi.uuid.fields = ["clickId"]
	dwi.enable = true
	dwi.table = "test__ssp_overall_events_dwi"
	dwi.kafka.schema = "com.mobikok.ssp.data.streaming.schema.dwi.kafka.SspTrafficDWISchema"
	dwr.enable = true
	dwr.groupby.fields = [
	  {
		expr = "publisherId", as = "publisherId"
	  }, {
		expr = "subId", as = "appId"
	  }, {
		expr = "countryId", as = "countryId"
	  }, {
		expr = "carrierId", as = "carrierId"
	  }, {
		expr = "sv", as = "versionName"
	  }, {
		expr = "adType", as = "adType"
	  }, {
		expr = "campaignId", as = "campaignId"
	  }, {
		expr = "offerId", as = "offerId"
	  }, {
		expr = "imageId", as = "imageId"
	  }, {
		expr = "affSub", as = "affSub"
	  }, {
		expr = "packageName", as = "packageName"
	  }, {
		expr = "domain", as = "domain"
	  }, {
		expr = "operatingSystem(userAgent)", as = "operatingSystem"
	  }, {
		expr = "language(userAgent)", as = "systemLanguage"
	  }, {
		expr = "machineModel(userAgent)", as = "deviceBrand"
	  }, {
		expr = "deviceType(userAgent)", as = "deviceType"
	  }, {
		expr = "browserKernel(userAgent)", as = "browserKernel"
	  }, {
		expr = "respStatus", as = "respStatus"
	  }, {
		expr = "test", as = "test"
	  }, {
		expr = "ruleId", as = "ruleId"
	  }, {
		expr = "smartId", as = "smartId"
	  }, {
		expr = "eventName", as = "eventName"
	  }, {
		expr = "recommender", as = "recommender"
	  }, {
		expr = "raterType", as = "raterType"
	  }, {
		expr = "raterId", as = "raterId"
	  }
	]
	dwr.groupby.aggs = [
	  {
		expr = "0", as = "requestCount", union = "sum(requestCount)"
	  }, {
		expr = "0", as = "sendCount", union = "sum(sendCount)"
	  }, {
		expr = "0", as = "showCount", union = "sum(showCount)"
	  }, {
		expr = "0", as = "clickCount", union = "sum(clickCount)"
	  }, {
		expr = "0", as = "feeReportCount", union = "sum(feeReportCount)"  //计费条数
	  }, {
		expr = "0", as = "feeSendCount", union = "sum(feeSendCount)"    //计费显示条数
	  }, {
		expr = "0", as = "feeReportPrice", union = "sum(feeReportPrice)"  //计费金额(真实收益)
	  }, {
		expr = "0", as = "feeSendPrice", union = "sum(feeSendPrice)"    //计费显示金额(收益)
	  }, {
		expr = "0", as = "cpcBidPrice", union = "sum(cpcBidPrice)"
	  }, {
		expr = "0", as = "cpmBidPrice", union = "sum(cpmBidPrice)"
	  }, {
		expr = "0", as = "conversion", union = "sum(conversion)"      //转化数,目前不要含展示和点击产生的
	  }, {
		expr = "0", as = "allConversion", union = "sum(allConversion)"   //转化数,含展示和点击产生的
	  }, {
		expr = "0", as = "revenue", union = "sum(revenue)"         //收益
	  }, {
		expr = "0", as = "realRevenue", union = "sum(realRevenue)"     //真实收益
	  }, {
		expr = "0", as = "feeCpcTimes", union = "sum(feeCpcTimes)"     // fee cpc转化条数
	  }, {
		expr = "0", as = "feeCpmTimes", union = "sum(feeCpmTimes)"     // fee cpm转化条数
	  }, {
		expr = "0", as = "feeCpaTimes", union = "sum(feeCpaTimes)"     // fee cpa转化条数
	  }, {
		expr = "0", as = "feeCpaSendTimes", union = "sum(feeCpaSendTimes)"     // fee cpa send转化条数
	  }, {
		expr = "0", as = "feeCpcReportPrice", union = "sum(feeCpcReportPrice)"     // fee cpc上游收益
	  }, {
		expr = "0", as = "feeCpmReportPrice", union = "sum(feeCpmReportPrice)"     // fee cpm上游收益
	  }, {
		expr = "0", as = "feeCpaReportPrice", union = "sum(feeCpaReportPrice)"     // fee cpa上游收益
	  }, {
		expr = "0", as = "feeCpcSendPrice", union = "sum(feeCpcSendPrice)"     // fee cpc下游收益
	  }, {
		expr = "0", as = "feeCpmSendPrice", union = "sum(feeCpmSendPrice)"     // fee cpm下游收益
	  }, {
		expr = "0", as = "feeCpaSendPrice", union = "sum(feeCpaSendPrice)"     // fee cpa下游收益
	  }, {
		expr = "0", as = "winPrice", union = "sum(winPrice)"     // 中签价格
	  }, {
		expr = "0", as = "winNotices", union = "sum(winNotices)"     // 中签数
	  }, {
		expr = "0", as = "newCount", union = "sum(newCount)"
	  }, {
		expr = "0", as = "activeCount", union = "sum(activeCount)"
	  }
	]
	dwr.include.repeated = true
	dwr.table = "test__ssp_report_overall_dwr"
	kafka.consumer {
	  partitoins = [
		{
		  topic = "topic_ad_event"
		  partition = 0
		}
	  ]
	}
  }

}