如何解决 spark 写 hive 慢的问题

在使用 spark 写 hive 过程中,发现最耗时的部分是将产生的结果写入 hive,举个例子,对 3g*1G 表的 join 来讲,将结果使用以下方式直接写入 hive 表需要超过半小时的时间:

dataframe.registerTempTable("result")
sql(s"""INSERT OVERWRITE Table $outputTable PARTITION (dt ='$outputDate') select * from result""")

而整个结果数据的产生只需要 4 分钟左右的时间,比如以下方式:将结果以 textfile 存入 hdfs:

result.rdd.saveAsTextFile(output_tmp_dir)

由此可见,对 hive 的写入操作耗用了大量的时间。

对此现象的优化可以是,将文件存为符合 hive table 文件的格式,然后使用 hive load 将产生的结果文件直接 move 到指定目录下。代码如下:

result.rdd.map { r => r.mkString("\001") }.repartition(partitions).saveAsTextFile(output_tmp_dir)

sql(s"""load data inpath '$output_tmp_dir' overwrite into table $output partition (dt='$dt')""")

详解:

  1. result.rdd.map { r => r.mkString("\001") }.repartition(partitions).saveAsTextFile(output_tmp_dir)
    hive column 默认分隔符在 scala/java 中的表示为“/001”,r.mkString(“/001”) 既是将 column 以分隔符 /001 进行分割,hive 在导入时会自动识别。
    repartition(partitions) 是为了防止 hdfs 中产生大量小文件。partitions 的设定与最终结果大小有关,一般是 result_size/hdfs_block_size。
  2. sql(s"""load data inpath '$output_tmp_dir' overwrite into table $output partition (dt='$dt')""")
    此处使用 hive load data 命令,将 hdfs 文件 load 到 hive 表中。后台操作为直接将目录下的文件移到 hive table 所在目录,所以只是 hdfs move 数据的过程,执行非常快。 
  3. 需要注意的是,此处要求 hive 建表时,已 textfile 格式建表。orc 的方式不支持。对 orc 的表,可以建立临时表使用 textfile 临时存储,然后用以下命令进行导入:
    sql(s"""INSERT OVERWRITE Table $outputTable PARTITION (dt ='$outputDate') select * from $tmp_table where dt='$dt'""") 
    

    在资源配置为–num-executors 20 –executor-cores 4,结果数据为 1.8g 的情况下,需要额外耗时 50s。好处是结果数据使用列式、压缩方式存储,压缩比 12.7 左右。

使用优化后的方式,原有 test case 的耗时从半小时降到 4 分钟,效率提升明显。