Spark - parquet 加载时间过长 && parquet 指定 schema 无数据

2022-09-03 12:18:45

一.引言

Parquet 是一种列式存储格式,常用于表结构数据存储,也是 sparkSql 的默认存储格式。spark 读取 parquet 文件时,偶发读取时间过长,正常  parquet 时间在 1-5 s,异常期间最长可达 10 min +,于是开始踩坑之旅。下面是读取日志,正常情况只需 1s 以内,异常时却需要很久。

二.Parquet 读取问题定位与解决

1.代码变化 && 数据变化

1s 左右的加载时间和出现异常导致 10min + 的加载时间前后代码没有发生过变化,唯一的改变就是读取的 parquet 数量增加了一倍,总大小未改变。因此问题应该出在和 parquet 相关的代码上,这里执行的是官方的 api :

spark.read.parquet(path)

 所以解决问题就需要看下读取 parquet 的具体逻辑了。

2.Schema Infer And Merge

spark 读取 read parquet 时有一个参数 .schema 用于指定 parquet 的列名与列属性,如果直接调用 .read.parquet() 时,sparkSession 需要自己 infer parquet 的 schema , schema 可以理解为 parquet 每列数据的属性,包含 name,type 等等:

override def inferSchema(
     sparkSession: SparkSession,
     parameters: Map[String, String],
     files: Seq[FileStatus]): Option[StructType] = {
  val parquetOptions = new ParquetOptions(parameters, sparkSession.sessionState.conf)

  // Should we merge schemas from all Parquet part-files?
  val shouldMergeSchemas = parquetOptions.mergeSchema

  ...

  ParquetFileFormat.mergeSchemasInParallel(filesToTouch, sparkSession)
}

这里截距了一部分代码,其中 parquetOptions.mergeSchema 参数代表是否合并 schema,默认为 true,在 spark 中配置该参数:

conf spark.sql.parquet.mergeSchema=true \

这段代码还有一段解释含义大致如下 :

如果用户指定mergeSchema = false 参数,程序会首先尝试摘要文件,如果没有摘要文件,则返回某个随机的部分文件,所以当该参数为 false 时,infer 会随机选取 parquet 文件进行推断,如果 所有 parquet 文件格式一致则不受影响,如果不一致则会 infer 出错。

如果用户指定mergeSchema = true  参数, 则程序认为所有的部分文件都是相同的,此时如果出现 parquet 不匹配,例如名称匹配,类型不匹配时 infer 会报错。但是当文件过多时,如果串行执行效率会很低,所以这里采取并行处理的方式进行 schema 推断,也就是上面 infer 函数的最后一行 mergeSchemaInParallel :

  def mergeSchemasInParallel(
      filesToTouch: Seq[FileStatus],
      sparkSession: SparkSession): Option[StructType] = {
    val assumeBinaryIsString = sparkSession.sessionState.conf.isParquetBinaryAsString
    val assumeInt96IsTimestamp = sparkSession.sessionState.conf.isParquetINT96AsTimestamp
    val serializedConf = new SerializableConfiguration(sparkSession.sessionState.newHadoopConf())

    val numParallelism = Math.min(Math.max(partialFileStatusInfo.size, 1),
      sparkSession.sparkContext.defaultParallelism)

    val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles

    // Issues a Spark job to read Parquet schema in parallel.
    val partiallyMergedSchemas =
      sparkSession
        .sparkContext
        .parallelize(partialFileStatusInfo, numParallelism)
        .mapPartitions { iterator =>
          // Resembles fake `FileStatus`es with serialized path and length information.

          ...

        }.collect()

    if (partiallyMergedSchemas.isEmpty) {
      None
    } else {
      var finalSchema = partiallyMergedSchemas.head
      partiallyMergedSchemas.tail.foreach { schema =>
        try {
          finalSchema = finalSchema.merge(schema)
        } catch { case cause: SparkException =>
          throw new SparkException(
            s"Failed merging schema:\n${schema.treeString}", cause)
        }
      }
      Some(finalSchema)
    }
  }

这里给出部分代码,如果想要查看完整的 infer,mergeSchemaInParallel 代码可以查看 :

org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat

上述代码通过 parallelize + mapPartitions 实现 schema 的并行推断,这里涉及到 fileStatus 信息与 Executor 的信息交互,最终合并得到 partiallyMergedSchemas[iterator],所有 parquet 得到的 scheme 构成了一个迭代器,然后通过可变变量 var finalSchema 进行遍历与合并,结果返回 Some(finalSchema),代表推断与merge的结果不可控。

3.原因分析

上面只是简单看了源码的实现逻辑,结合前面的代码、数据变化,在 mergeShema 参数为 true 时 读取 parquet 中涉及和 parquet 数量有关的操作有下述:

(1)parallel + mapPartitions:在 numParallelism 数量不变时,parquet 数量增加,则 mapPartitions 处理的 partition 数量增加,耗时增加

(2)fileStatus 与 Executor 的通信:parquet 数量增多时,FileStatus 增多,executor 通信量增大,耗时增加,查看运行异常日志发现任务所在机器已出现 IO 负载较大的情况:

 (3)merge:处理数据量增多,iterator 数量增加,merge 操作增加,耗时增加

4.问题解决

基于上面耗时的分析,问题解决也很简单,减少 parquet 数量,避免 mergeSchema:

A. spark.sql.parquet.mergeSchema = false

最偷懒的办法,如果可以保证每个 parquet 数据的结构一致性,则可以取消该参数,只选用少量 parquet 文件进行 infer 推断,这样处理文件的数量变小,推断 schema 的速度也会相应增加。

B. spark.sql.parquet.mergeSchema = true & 不指定 schema

该参数默认为 true,在不手动指定的情况下,可以人为缩减 parquet 数量,这样并行执行的效率会得到提升

C.指定 schema

spark.read.parquet 方法支持直接传入 schema :

spark.read.schema(TABLE_SCHEME).parquet(path)

该方法会指定 Schema 从而避免后续复杂的 infer 阶段,从而跳过 parquet 阶段,直接进入下一个 action 算子。

  def schema(schema: StructType): DataFrameReader = {
    this.userSpecifiedSchema = Option(schema)
    this
  }

三.Read parquet 指定 Schema

1.配置 schema

parquet 采用列式存储,相关的存储类型上一篇文章Spark Sql:RDD - DF 互转 中已经提到过,

第一个参数代表列名,第二个参数代表列属性 :

  final val TABLE_SCHEME = StructType(Array(
    StructField("A", StringType),
    StructField("B", StringType),
    StructField("C", StringType),
    StructField("D", StringType),
    StructField("E", StringType),
    StructField("F", StringType),
    StructField("G", StringType),
    StructField("H", StringType)
  ))

    val conf = new SparkConf().setMaster("local[*]")

    val spark = SparkSession
      .builder
      .config(conf)
      .getOrCreate()
            
     spark.read.schema(TABLE_SCHEME).parquet(path).rdd.take(10).foreach(row => {
      println(row)
    })

2.读取数据为空

看似任务已经搞定了,但是还有坑,设置 schema 时读取数据为 null,不设置 schema 时读取正常

设置 schema:

不设置 schema :

于是查找了设定 schema 的规则,当我们指定了 schema 时,相当于指定了每个对应位置字段的名称和类型,读取内容时参照如下规则:

指定的 schema 中的字段在 parquet 中默认的 schema 中不存在则返回 null :

null,null,null,null,null,null,null,null,null,null,null,null

指定的 schema 中字段在 parquet 中默认的 schema 中存在但类型不匹配,返回 false :

null,null,null,null,null,null,null,null,null,null,null,false

3.原因分析

添加 schema 后显示为 null,根据上述规则可以判定是设定的 schema 的列名在原始 parquet 的 schema 中不存在,相当于 map.getOrElse("col", null) 触发了 else 逻辑,通过原始 df 的 schema 方法可以获得原始 parquet 的 schema :

    spark.read.parquet(path).schema.map(x => {
      println(x.name, x.dataType)
    })
(_c0,StringType)
(_c1,StringType)
(_c2,StringType)
(_c3,StringType)
(_c4,StringType)
(_c5,StringType)

...

(_c41,StringType)
(_c42,StringType)

可以看到原始 schema 为 _1,_2 形式,而设置的 schema 为 A,B... 与之不匹配,所以得到的数据为 null,为什么不匹配,这个涉及到 parquet 的生成逻辑:

(1) 读取原始 hive 表,字段为 A,B,C... 生成 parquet 过程中未设置 schema ,从而存储的 parquet 采用了默认 schema 形式,即 _index 的形式

(2) 而读取 parquet 时认为 parquet 的 schema 为原始 hive 结构,所以设置了错误的 schema,从而导致 schema 规则不匹配,从而得到全 null 的数据

4.问题解决

A.存储 parquet

存储 parquet 时将 hive 表对应列名与属性的 schema 存入 parquet,这样读取时指定相同 schema 即可

B.读取 schema

数据预处理时可以调用 .schema 方法打印数据的 name 和 dataType,指定相同的 name 和 dataType,例如本例中将 A,B,C ... 替换为 _c0,_c1,_c2... 即可正常显示数据

四.总结

spark 加载 parquet 文件时最好可以手动指定 schema ,这样可以避免前期不必要的 merge 操作,优化代码执行速度,非常的好用 👍

  • 作者:BIT_666
  • 原文链接:https://blog.csdn.net/BIT_666/article/details/121957680
    更新时间:2022-09-03 12:18:45