虽然Parquet等列式存储号称可提升性能N倍,但在实际中,正如新近发布的Apache Arrow指出的,其本身的序列化、反序列化耗时占比可达70%。Spark 1.6中,针对flat schema进行了一些优化,本文尝试源码级别的解读。

在SqlNewHadoopRDD类中,若开启了spark.sql.parquet.enableUnsafeRowRecordReader、数据源是parquet格式,且tryInitialize成功的话,就会采用UnsafeRowParquetRecordReader替代之前的parquet-hadoop jar提供的ParquetRecordReader类,作为parquet文件解析的入口。

在tryInitialize中,首先调用parent方法,处理了task.side.metadata不同设置时,对footer meta的处理。随后才是检查当前schema是否满足特殊处理的要求:

  • 不可包含非primitive类型 或 repeated类型(例如 struct、array、map)
  • 由于parquet里有type和original type的概念,需要再对original检查,original仅为null、decimal、utf8或date
  • 不支持original type为decimal,且精度超过18的数字
  • 不支持int96 type
  • 不支持schema演进

接下来loadBatch方法就很关键了。

首先调用ParquetFileReader.readNextRowGroup()读入未解压、未序列化的数据,会一次性读入一个row group里涉及到的所有columns,这也是压缩和序列化的单位。

随后,loadBatch进行批量加载。与ParquetRecordReader相比,它最大的区别之一应该在于批量反序列化。可以想象,此时内存中有一个row group的n个columns数据,针对每个column一次性解压缩后,每轮最多反序列化64个values存放到缓存里。

 Scala |  copy code |? 
01
  private boolean loadBatch() throws IOException {
02
    // no more records left
03
    if (rowsReturned >= totalRowCount) { return false; }
04
    checkEndOfRowGroup();
05
 
06
    int num = (int)Math.min(rows.length, totalCountLoadedSoFar − rowsReturned);
07
    rowsReturned += num;
08
 
09
    if (containsVarLenFields) {
10
      for (int i = 0; i < rowWriters.length; ++i) {
11
        rowWriters[i].holder().resetTo(fixedSizeBytes);
12
      }
13
    }
14
 
15
    for (int i = 0; i < columnReaders.length; ++i) {
16
      switch (columnReaders[i].descriptor.getType()) {
17
        case BOOLEAN:
18
          decodeBooleanBatch(i, num);
19
          break;
20
        case INT32:
21
          if (originalTypes[i] == OriginalType.DECIMAL) {
22
            decodeIntAsDecimalBatch(i, num);
23
          } else {
24
            decodeIntBatch(i, num);
25
          }
26
          break;
27
        case INT64:
28
          Preconditions.checkState(originalTypes[i] == null
29
              || originalTypes[i] == OriginalType.DECIMAL,
30
              "Unexpected original type: " + originalTypes[i]);
31
          decodeLongBatch(i, num);
32
          break;
33
        case FLOAT:
34
          decodeFloatBatch(i, num);
35
          break;
36
        case DOUBLE:
37
          decodeDoubleBatch(i, num);
38
          break;
39
        case BINARY:
40
          decodeBinaryBatch(i, num);
41
          break;
42
        case FIXED_LEN_BYTE_ARRAY:
43
          Preconditions.checkState(originalTypes[i] == OriginalType.DECIMAL,
44
              "Unexpected original type: " + originalTypes[i]);
45
          decodeFixedLenArrayAsDecimalBatch(i, num);
46
          break;
47
        case INT96:
48
          throw new IOException("Unsupported " + columnReaders[i].descriptor.getType());
49
      }
50
      numBatched = num;
51
      batchIdx = 0;
52
    }
53
    return true;
54
  }

另一个重要区别是,这里使用unsafeRow,backed by raw memory instead of Java objects。每行包含numFields个tuples,一个tuple对应parquet的一列,每个tuple又由3部分组成:

[null bit set] [values] [variable length portion]

The bit set is used for null tracking and is aligned to 8-byte word boundaries. It stores one bit per field.以bit位记录列的null情况,8字节对齐。

In the `values` region, we store one 8-byte word per field. For fields that hold fixed-length primitive types, such as long, double, or int, we store the value directly in the word. For fields with non-primitive or variable-length values, we store a relative offset (w.r.t. the base address of the row) that points to the beginning of the variable-length field, and length (they are combined into a long).

values部分则采用了经典的变长存储方法。对于基本数字类型,直接存储在value字段里;其他变长类型,则在value里存储地址偏移量,而在第三部分variable length portion里存储真正的数据。

通过这种unsafe的方式,一方面可以减少GC的消耗。另一方面,也减少了java对象wrap的冗余存储,由于parquet flat格式里,有很多small size数据,这方面的优化对应内存用量应该也有比较大的效果。

最后结合UnsafeRow的get方法,看下内存复用程度。由于我不是Java出身,措辞可能不专业,如果有更好的解释方式,请留言。

针对基本数据类型,例如int直接返回变量本身(因为基本类型本身的内存是不可复用的):

 Scala |  copy code |? 
1
  @Override
2
  public int getInt(int ordinal) {
3
    assertIndexIsValid(ordinal);
4
    return Platform.getInt(baseObject, getFieldOffset(ordinal));
5
  }

而针对可修改的binary,则返回内存copy,以免row被后续mapPartitions等处理时修改了:

 Scala |  copy code |? 
01
02
  @Override
03
  public byte[] getBinary(int ordinal) {
04
    if (isNullAt(ordinal)) {
05
      return null;
06
    } else {
07
      final long offsetAndSize = getLong(ordinal);
08
      final int offset = (int) (offsetAndSize >> 32);
09
      final int size = (int) offsetAndSize;
10
      final byte[] bytes = new byte[size];
11
      Platform.copyMemory(
12
        baseObject,
13
        baseOffset + offset,
14
        bytes,
15
        Platform.BYTE_ARRAY_OFFSET,
16
        size
17
      );
18
      return bytes;
19
    }
20
  }

另外,没有看到unsaferow的内存何时释放。

Leave a Reply