业务场景及问题描述

一个分析类项目,由上层用户定制多种分析需求,每个需求可能涉及GB甚至TB级别数据。源数据是被打上多种标签的用户行为日志,如果每个标签KEY对应一列,则可能存在几千列,且较为稀疏。由于标签是策略挖掘出来的,故经常发生变化,对新入库数据产生影响。

这就要求:

  1. 底层存储读取性能高,不拖累计算耗时;压缩比高,尽量节省存储空间
  2. 数据Schema支持动态扩展,且后向兼容性好

计算平台基于Spark,存储结构自然想到使用columnar结构,在多种paper和性能测试里看到parquet基本满足需求,倾向于选择它。但是使用tags map<string, array>存储标签数据,还是tag_xxx array的格式呢?两者是否都能使用到spark的pushdown功能,以跳过不需要的columns呢?

以下将从源码和日志两方面进行分析,结论是都可以使用到pushdown,性能理论上基本持平(未数据验证)。而显然map方式的扩展性更好,map key的增删可以完全由业务层控制。

SparkSQL源码分析

网上对SparkSQL的解析处理过程分析较多,例如:

  • http://www.csdn.net/article/2014-07-15/2820658/2
  • http://blog.csdn.net/oopsoom/article/details/37658021
  • http://mmicky.blog.163.com/blog/static/150290154201487102349800/ (共十篇,比较全面)

SparkSQL对SQL语句的处理和关系型数据库对SQL语句的处理采用了类似的方法,首先会将SQL语句进行解析(Parse),然后形成一个Tree,在后续的如绑定、优化等处理过程都是对Tree的操作,而操作的方法是采用Rule,通过模式匹配,对不同类型的节点采用不同的操作。

sparkSQL1.1总体上由四个模块组成:

  • core 处理数据的输入输出,从不同的数据源获取数据(RDD、Parquet、json等),将查询结果输出成schemaRDD;
  • catalyst 处理查询语句的整个处理过程,包括解析、绑定、优化、物理计划等,说其是优化器,还不如说是查询引擎;
  • hive 对hive数据的处理
  • hive-ThriftServer 提供CLI和JDBC/ODBC接口

saveAsParquetFile

以下直接深入与Parquet文件读写相关代码,假设我们调用了schemaRDD.saveAsParquetFile(…),调用链上核心的类如下:

sparksql-uml-class

关键的调用关系如图:

sparksql-saveAsParquetFile

结合代码来看。

 Scala |  copy code |? 
1
  def saveAsParquetFile(path: String): Unit = {
2
    sqlContext.executePlan(WriteToFile(path, logicalPlan)).toRdd
3
  }

可以看到saveAsParquetFile生成了WriteToFile这个LogicPlan子类,Lazy调度后,在ParquetOperations.apply()方法里,会触发真正的执行过程:

 Scala |  copy code |? 
1
  object ParquetOperations extends Strategy {
2
    def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
3
      // TODO: need to support writing to other types of files.  Unify the below code paths.
4
      case logical.WriteToFile(path, child) =&gt;
5
        val relation =
6
          ParquetRelation.create(path, child, sparkContext.hadoopConfiguration, sqlContext)
7
        // Note: overwrite=false because otherwise the metadata we just created will be deleted
8
        InsertIntoParquetTable(relation, planLater(child), overwrite = false) :: Nil

在InsertIntoParquetTable.execute()方法里会调用self的saveAsHadoopFile(),这个方法针对RDD每一行,调用writeShard执行写入:

 Scala |  copy code |? 
01
    def writeShard(context: TaskContext, iter: Iterator[Row]): Int = {
02
      // Hadoop wants a 32−bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
03
      // around by taking a mod. We expect that no task will be attempted 2 billion times.
04
      val attemptNumber = (context.attemptId % Int.MaxValue).toInt
05
      /* "reduce task" &lt;split #&gt; &lt;attempt # = spark task #&gt; */
06
      val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, context.partitionId,
07
        attemptNumber)
08
      val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId)
09
      val format = new AppendingParquetOutputFormat(taskIdOffset)
10
      val committer = format.getOutputCommitter(hadoopContext)
11
      committer.setupTask(hadoopContext)
12
      val writer = format.getRecordWriter(hadoopContext)
13
      try {
14
        while (iter.hasNext) {
15
          val row = iter.next()
16
          writer.write(null, row)
17
        }
18
      } finally {
19
        writer.close(hadoopContext)
20
      }
21
      committer.commitTask(hadoopContext)
22
      1
23
    }

这里有几个重要的变量,都标注在核心类那张图上了。writer.write依次触发了ParquetRecordWriter.write()、InternalParquetRecordWriter.write()、RowWriteSupport.write()将数据写入内存,并“定期”调用InternalParquetRecordWriter.checkBlockSizeReached()检查是否需要格式化并flush到磁盘(一般是HDFS)。

schema生成

在说write和flush前,得先确定schema在哪里生成的。writeShard里调用val writer = format.getRecordWriter(hadoopContext),触发:

 Scala |  copy code |? 
1
    WriteContext init = writeSupport.init(conf);
2
    ParquetFileWriter w = new ParquetFileWriter(conf, init.getSchema(), file);

其中很关键的方法是ParquetTypes.fromDataType()

public static parquet.schema.Type fromDataType(org.apache.spark.sql.catalyst.types.DataType ctype,
                               String name,
                               boolean nullable,
                               boolean inArray)

Converts a given Catalyst DataType into the corresponding Parquet Type.The conversion follows the rules below:

  • Primitive types are converted into Parquet’s primitive types.
  • StructTypes are converted into Parquet’s GroupType with the corresponding field types.
  • ArrayTypes are converted into a 2-level nested group, where the outer group has the inner group as sole field. The inner group has name values and repetition level REPEATED and has the element type of the array as schema. We use Parquet’s ConversionPatterns for this purpose.
  • MapTypes are converted into a nested (2-level) Parquet GroupType with two fields: a key type and a value type. The nested group has repetition level REPEATED and name map. We use Parquet’sConversionPatterns for this purpose

Parquet’s repetition level is generally set according to the following rule:

  • If the call to fromDataType is recursive inside an enclosing ArrayType or MapType, then the repetition level is set to REPEATED.
  • Otherwise, if the attribute whose type is converted is nullable, the Parquet type gets repetition level OPTIONAL and otherwise REQUIRED.

write过程

回到write这条线,会递归调用writeValue()、writePrimitive()、writeStruct()、writeMap()、writeArray()将基本类型和struct、map、array保存起来。其中根据schema,又会触发MessageColumnIORecordConsumer内部类的startMessage()、startField()、startGroup()等方法。这3个概念很重要,应该说就是基于这些才实现了dremel论文里的数据结构:

  • Message: a new record,对应一行
  • Field: a field in a group or message. if the field is repeated the field is started only once and all values added in between start and end,对应一个kv对,key是field name,value可以是group,从而嵌套其他数据结构
  • Group: a group in a field

由于采用了抽象嵌套的方式,map<string, array>首先是一个field整体,而其中每一个string, array也构成一个子field。在底层存储和建索引的时候,应该是nested最深的field对应一个column,针对这些column计算r和d的值。

flush过程:

再来看flush这条线:

 Scala |  copy code |? 
1
# InternalParquetRecordWriter
2
  public void write(T value) throws IOException, InterruptedException {
3
    writeSupport.write(value);
4
    ++ recordCount;
5
    checkBlockSizeReached();
6
  }

如果需要flush,则checkBlockSizeReached会再调用flushRowGroupToStore()落地磁盘,并调用initStore()重新初始化内存存储区。

SparkSQL日志分析

以上源码分析的推论是,nested结构的每一个field都会被存储为一列,以下通过写入和查询日志再予以证明。

数据结构类似:

 SQL |  copy code |? 
01
CREATE TABLE IF NOT EXISTS orc_qiche (
02
    query string,
03
    uid string,
04
    country string,
05
    province string,
06
    city string,
07
    sttime string,
08
    properties map&lt;string, array&lt;string&gt;&gt;,
09
    intension array&lt;string&gt;
10
    )
11

其中properties含有brand、effects等key。

写入日志:

 Scala |  copy code |? 
1
15/07/21 14:02:09 INFO ColumnChunkPageWriteStore: written 88B for [properties, brand, array] BINARY: 2 values, 34B raw, 50B comp, 1 pages,
2
encodings: [RLE, PLAIN]
3
15/07/21 14:02:09 INFO ColumnChunkPageWriteStore: written 85B for [properties, effects, array] BINARY: 2 values, 33B raw, 48B comp, 1 pages
4
, encodings: [RLE, PLAIN]
5
15/07/21 14:02:09 INFO ColumnChunkPageWriteStore: written 69B for [properties, brand, array] BINARY: 1 values, 21B raw, 36B comp, 1 pages,
6
encodings: [RLE, PLAIN]
7
15/07/21 14:02:09 INFO ColumnChunkPageWriteStore: written 72B for [query] BINARY: 1 values, 16B raw, 35B comp, 1 pages, encodings: [RLE, BI
8
T_PACKED, PLAIN]

查询日志:

8A542FE1-9484-4B2C-9184-DEF07113B83B

Leave a Reply