Archive for 七月, 2015

业务场景及问题描述

一个分析类项目,由上层用户定制多种分析需求,每个需求可能涉及GB甚至TB级别数据。源数据是被打上多种标签的用户行为日志,如果每个标签KEY对应一列,则可能存在几千列,且较为稀疏。由于标签是策略挖掘出来的,故经常发生变化,对新入库数据产生影响。

这就要求:

  1. 底层存储读取性能高,不拖累计算耗时;压缩比高,尽量节省存储空间
  2. 数据Schema支持动态扩展,且后向兼容性好

计算平台基于Spark,存储结构自然想到使用columnar结构,在多种paper和性能测试里看到parquet基本满足需求,倾向于选择它。但是使用tags map<string, array>存储标签数据,还是tag_xxx array的格式呢?两者是否都能使用到spark的pushdown功能,以跳过不需要的columns呢?

以下将从源码和日志两方面进行分析,结论是都可以使用到pushdown,性能理论上基本持平(未数据验证)。而显然map方式的扩展性更好,map key的增删可以完全由业务层控制。

SparkSQL源码分析

网上对SparkSQL的解析处理过程分析较多,例如:

  • http://www.csdn.net/article/2014-07-15/2820658/2
  • http://blog.csdn.net/oopsoom/article/details/37658021
  • http://mmicky.blog.163.com/blog/static/150290154201487102349800/ (共十篇,比较全面)

SparkSQL对SQL语句的处理和关系型数据库对SQL语句的处理采用了类似的方法,首先会将SQL语句进行解析(Parse),然后形成一个Tree,在后续的如绑定、优化等处理过程都是对Tree的操作,而操作的方法是采用Rule,通过模式匹配,对不同类型的节点采用不同的操作。

sparkSQL1.1总体上由四个模块组成:

  • core 处理数据的输入输出,从不同的数据源获取数据(RDD、Parquet、json等),将查询结果输出成schemaRDD;
  • catalyst 处理查询语句的整个处理过程,包括解析、绑定、优化、物理计划等,说其是优化器,还不如说是查询引擎;
  • hive 对hive数据的处理
  • hive-ThriftServer 提供CLI和JDBC/ODBC接口

saveAsParquetFile

以下直接深入与Parquet文件读写相关代码,假设我们调用了schemaRDD.saveAsParquetFile(…),调用链上核心的类如下:

sparksql-uml-class

关键的调用关系如图:

sparksql-saveAsParquetFile

结合代码来看。

 Scala |  copy code |? 
1
  def saveAsParquetFile(path: String): Unit = {
2
    sqlContext.executePlan(WriteToFile(path, logicalPlan)).toRdd
3
  }

可以看到saveAsParquetFile生成了WriteToFile这个LogicPlan子类,Lazy调度后,在ParquetOperations.apply()方法里,会触发真正的执行过程:

 Scala |  copy code |? 
1
  object ParquetOperations extends Strategy {
2
    def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
3
      // TODO: need to support writing to other types of files.  Unify the below code paths.
4
      case logical.WriteToFile(path, child) =&gt;
5
        val relation =
6
          ParquetRelation.create(path, child, sparkContext.hadoopConfiguration, sqlContext)
7
        // Note: overwrite=false because otherwise the metadata we just created will be deleted
8
        InsertIntoParquetTable(relation, planLater(child), overwrite = false) :: Nil

在InsertIntoParquetTable.execute()方法里会调用self的saveAsHadoopFile(),这个方法针对RDD每一行,调用writeShard执行写入:

 Scala |  copy code |? 
01
    def writeShard(context: TaskContext, iter: Iterator[Row]): Int = {
02
      // Hadoop wants a 32−bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
03
      // around by taking a mod. We expect that no task will be attempted 2 billion times.
04
      val attemptNumber = (context.attemptId % Int.MaxValue).toInt
05
      /* "reduce task" &lt;split #&gt; &lt;attempt # = spark task #&gt; */
06
      val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, context.partitionId,
07
        attemptNumber)
08
      val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId)
09
      val format = new AppendingParquetOutputFormat(taskIdOffset)
10
      val committer = format.getOutputCommitter(hadoopContext)
11
      committer.setupTask(hadoopContext)
12
      val writer = format.getRecordWriter(hadoopContext)
13
      try {
14
        while (iter.hasNext) {
15
          val row = iter.next()
16
          writer.write(null, row)
17
        }
18
      } finally {
19
        writer.close(hadoopContext)
20
      }
21
      committer.commitTask(hadoopContext)
22
      1
23
    }

这里有几个重要的变量,都标注在核心类那张图上了。writer.write依次触发了ParquetRecordWriter.write()、InternalParquetRecordWriter.write()、RowWriteSupport.write()将数据写入内存,并“定期”调用InternalParquetRecordWriter.checkBlockSizeReached()检查是否需要格式化并flush到磁盘(一般是HDFS)。

schema生成

在说write和flush前,得先确定schema在哪里生成的。writeShard里调用val writer = format.getRecordWriter(hadoopContext),触发:

 Scala |  copy code |? 
1
    WriteContext init = writeSupport.init(conf);
2
    ParquetFileWriter w = new ParquetFileWriter(conf, init.getSchema(), file);

其中很关键的方法是ParquetTypes.fromDataType()

public static parquet.schema.Type fromDataType(org.apache.spark.sql.catalyst.types.DataType ctype,
                               String name,
                               boolean nullable,
                               boolean inArray)

Converts a given Catalyst DataType into the corresponding Parquet Type.The conversion follows the rules below:

  • Primitive types are converted into Parquet’s primitive types.
  • StructTypes are converted into Parquet’s GroupType with the corresponding field types.
  • ArrayTypes are converted into a 2-level nested group, where the outer group has the inner group as sole field. The inner group has name values and repetition level REPEATED and has the element type of the array as schema. We use Parquet’s ConversionPatterns for this purpose.
  • MapTypes are converted into a nested (2-level) Parquet GroupType with two fields: a key type and a value type. The nested group has repetition level REPEATED and name map. We use Parquet’sConversionPatterns for this purpose

Parquet’s repetition level is generally set according to the following rule:

  • If the call to fromDataType is recursive inside an enclosing ArrayType or MapType, then the repetition level is set to REPEATED.
  • Otherwise, if the attribute whose type is converted is nullable, the Parquet type gets repetition level OPTIONAL and otherwise REQUIRED.

write过程

回到write这条线,会递归调用writeValue()、writePrimitive()、writeStruct()、writeMap()、writeArray()将基本类型和struct、map、array保存起来。其中根据schema,又会触发MessageColumnIORecordConsumer内部类的startMessage()、startField()、startGroup()等方法。这3个概念很重要,应该说就是基于这些才实现了dremel论文里的数据结构:

  • Message: a new record,对应一行
  • Field: a field in a group or message. if the field is repeated the field is started only once and all values added in between start and end,对应一个kv对,key是field name,value可以是group,从而嵌套其他数据结构
  • Group: a group in a field

由于采用了抽象嵌套的方式,map<string, array>首先是一个field整体,而其中每一个string, array也构成一个子field。在底层存储和建索引的时候,应该是nested最深的field对应一个column,针对这些column计算r和d的值。

flush过程:

再来看flush这条线:

 Scala |  copy code |? 
1
# InternalParquetRecordWriter
2
  public void write(T value) throws IOException, InterruptedException {
3
    writeSupport.write(value);
4
    ++ recordCount;
5
    checkBlockSizeReached();
6
  }

如果需要flush,则checkBlockSizeReached会再调用flushRowGroupToStore()落地磁盘,并调用initStore()重新初始化内存存储区。

SparkSQL日志分析

以上源码分析的推论是,nested结构的每一个field都会被存储为一列,以下通过写入和查询日志再予以证明。

数据结构类似:

 SQL |  copy code |? 
01
CREATE TABLE IF NOT EXISTS orc_qiche (
02
    query string,
03
    uid string,
04
    country string,
05
    province string,
06
    city string,
07
    sttime string,
08
    properties map&lt;string, array&lt;string&gt;&gt;,
09
    intension array&lt;string&gt;
10
    )
11

其中properties含有brand、effects等key。

写入日志:

 Scala |  copy code |? 
1
15/07/21 14:02:09 INFO ColumnChunkPageWriteStore: written 88B for [properties, brand, array] BINARY: 2 values, 34B raw, 50B comp, 1 pages,
2
encodings: [RLE, PLAIN]
3
15/07/21 14:02:09 INFO ColumnChunkPageWriteStore: written 85B for [properties, effects, array] BINARY: 2 values, 33B raw, 48B comp, 1 pages
4
, encodings: [RLE, PLAIN]
5
15/07/21 14:02:09 INFO ColumnChunkPageWriteStore: written 69B for [properties, brand, array] BINARY: 1 values, 21B raw, 36B comp, 1 pages,
6
encodings: [RLE, PLAIN]
7
15/07/21 14:02:09 INFO ColumnChunkPageWriteStore: written 72B for [query] BINARY: 1 values, 16B raw, 35B comp, 1 pages, encodings: [RLE, BI
8
T_PACKED, PLAIN]

查询日志:

8A542FE1-9484-4B2C-9184-DEF07113B83B

from: http://blog.thislongrun.com/2015/07/Forfeit-Partition-Tolerance-Distributed-System-CAP-Theorem.html?m=1
The CA–consistent, available, but not network partition tolerant–category in CAP has a very specific history. Not only forfeiting “network partition tolerance” can be understood as impossible in theory and crazy in practice (P as an illusion of a choice), but there is also an overlap between the CA and CP categories. As a result, many consider that it’s impossible to build a production CA system. But you can actually build a system without network partition tolerance, and sometimes you should.

A brief history of the CA category

Let’s look at the academic history of the CA category in CAP:
  • In 2000, Eric Brewer presents the CAP conjecture. In his presentation, CA exists, for example for systems using the two-phase commit protocol. He considers that “‹the whole space is useful.”
  • In 2002, Seth Gilbert and Nancy Lynch publish the CAP proof. CA exists: “Systems that run on intranets and LANs are an example of these types of algorithms.
  • In 2010, Daniel Abadi raises the point that there is an overlap between CA and CP: “What does ‘not tolerant’ mean? In practice, it means that they lose availability if there is a partition. Hence CP and CA are essentially identical.
  • Still in 2010, Michael Stonebraker publishes multiple documents around the limited importance of partitions, with the tagline “Myth #6: In CAP, choose AP over CA”, considering that with the capacity of modern hardware, small distributed systems can solve most real-life issues, and that “it doesn’t much matter what you do when confronted with network partitions.”
  • And again in 2010, Coda Hale publishes a blog post: “You cannot sacrifice partition tolerance”, explaining that only AP and CP are possible.
  • This triggers a feedback from Stonebraker, who restates all his points.
  • 2 years later, in 2012, referring to these works, Eric Brewer states that “exactly what it means to forfeit P is unclear” and then clarifies: “choosing CA should mean that the probability of a partition is far less than that of other systemic failures, such as disasters or multiple simultaneous faults.”


So we need to sort out the following issues:
  • There is an overlap between CP and CA.
  • There is a theoretical impossibility: network partitions are a given, you can choose between ‘A’ and ‘C’ when a partition happens but not if partitions happen.
  • There is a practical impossibility: network partitions are too likely to happen on a real life system to be ignored, so CA is impossible in practice.
What does CA mean?
CA is about “forfeiting network partition tolerance”, i.e. being “network partition intolerant”. Partition intolerance does not mean that network partitions cannot happen, it means network partitions are a critical issue. It’s a bit like gluten: being “gluten intolerant” does not mean that you cannot eat any, it means that you should not. Like for gluten, a CA system should also have a means of recovery should a network partition actually happen. The two-phase commit is a perfect example: it comes with a repair tool to fix the transactions broken by an heuristic resolution.

The fact that CA does not mean “I have a network that cannot be partitioned” is important, because it implies a partition can actually happen. This is stressed by Brewer: “choosing CA should mean that the probability of a partition is far less than that of other systemic failures.” To estimate this probability you must be quite clear about what a partition actually is. This whole post is only about network partitions.

Let’s summarize: CA describes the specification of an operating range, and not a behavior. CP, AP describe the behavior when a partition occurs. This obviously leaves room for an overlap between CP and CA. Let’s look at this overlap now.
The overlap between CP and CA
It’s the point identified by Abadi: “What does “not tolerant” mean? In practice, it means that they lose availability if there is a partition. Hence CP and CA are essentially identical.” A system that does not do anything once partitioned is trivially CP: it does not present a non-consistent history. Such a system could also be considered as CA: it stops working when there is a partition–hence the overlap. This overlap is minimal however:
  • Many CA systems are not CP: for example, the two-phase commit protocol is not consistent (nor available, nor ACID-atomic) when there is a partition.
  • Many CP systems are not CA: for example, a consensus server like ZooKeeper is totally tolerant to partitions.
Systems that belong to these two categories are only systems that stop working during the partition, but are consistent once the partition is fixed (trivially a webserver connected to a database). I personally prefer to call these systems ‘CA’ rather than ‘CP’, even if the CAP theorem allows for both: this expresses that a partition is a severe issue for the system. Ultimately, it’s your choice.
Partitions are a given in the CAP theorem
That’s exactly CAP: if there is a partition, you have to choose between ‘A’ and ‘C’. We have a model that allows partitions, and a theorem that says we have to choose ‘A’ or ‘C’ when there is a partition, so we cannot “refuse to see partitions”.  But actually “forfeiting partitions” is exactly that: it’s removing partitions from the model and building our application on a brand new model without partitions.


From a theoretical point of view, forfeiting partitions means removing them from the model. They will never happen in our theoretical model.
From a practical point of view, forfeiting partitions means removing them from the operating range. They may happen in reality.


By definition a model differs from reality. The question is always: is this model a good representation of reality?

Partitions happen too often in real life to be ignored

Well, here ended the debate between Coda Hale and Michael Stonebraker: Hale saying that there are a lot of partitions in his datacenters, Stonebraker saying that there are problems more probable than partitions that are not fixed anyway, and that surviving partitions will not “move the needle” on availability.
Without data agreed upon, there is no real way out from this debate. The good news is we don’t have to revive it to say that CA can be used to describe a distributed system: a CA system is a system built by someone who thinks he can forfeit partitions.
But the key point of the discussion is the difficulty to reason about failures without describing the system. In the debate above, Hale was speaking about systems of “any interesting scale”, while Stonebraker was considering small systems of high range servers on a LAN (“if you need 200 nodes to support a specific SQL application, then VoltDB can probably do the same application on 4 nodes”). But these two types of distributed systems are totally different animals. When discussing a design remember the old programming rule–“fancy algorithms are slow when n is small, and n is usually small”, and check the value of n.

When to use CA

The branch can be partitioned from the tree, but it
may not be the monkey’s main concern.


Let’s recall what Brewer wrote in 2012: “choosing CA should mean that the probability of a partition is far less than that of other systemic failures, such as disasters or multiple simultaneous faults.”


Eric Brewer detailed in a mail he sent me (quoted here with his permission):
I tend to explain it a few different ways:
1) it is trivial to get CA in a non-distributed system, such as a single node
2) it is also fine to assume CA on a LAN, especially if it is (over) engineered for multiple paths or even for fail stop.  The CM-5 had an over-engineered network that would halt if it detected any errors, but it almost never did (in fact I don’t know of case where it actually stopped, but there probably were some).  The CM-5 case thus really was an operating range argument.
3) If the probability of a partition is lower than other major system failures that would take out an application, then you can claim CA.  For example, you might lose a quorum due to correlated failures (such as power or a disaster), which would also lose availability even though not a partition.  If your network is 5 9s, you can probably ignore the partition case in terms of the code you write (but you should at least detect it!).


“CA should mean that the probability of a partition is far less than that of other systemic failures” says we can call a system CA if the “probability of a partition “ is minimal–the non distributed or over-engineered network case. These systems are often not of “any interesting scale” but that doesn’t mean they don’t have any business value.


There is a more complex case: the probability of “multiple simultaneous faults” depends on many things, including the software itself. Many non-critical software are more likely to get a data corruption from a software bug than from a network partition, just because simple error scenarios like wrong user-inputs are not tested enough. A complicated administration interface is also a common source of downtime. In other words, choosing CA depends on the network quality and the software quality itself.


Network partition tolerance is a feature like any other. It has to be planned, implemented and tested. And, as any feature, the decision to implement it or not must take into account the benefits of the feature compared to its implementation cost. For such a feature it is:


expected number of partitions * cost per partition (unavailability, reputation, repair …)
vs.
cost of supporting partitions (testing effort included).


Even if the ratio is positive, i.e. the system should be partition tolerant, there could be other features that have a better ratio and they will be prioritized. That’s a well known engineering drama: it’s not because a feature is useful and brings value that it’s implemented in the end.


An example of such CA systems would be those GPU-based machine learning systems. The one built by Baidu was “comprised of 36 server nodes, each with 2 six-core Intel Xeon E5-2620 processors. Each server contains 4 Nvidia Tesla K40m GPUs and one FDR InfiniBand (56Gb/s) which is a high-performance low-latency interconnection and supports RDMA. The peak single precision floating point performance of each GPU is 4.29TFlops and each GPU has 12GB of memory.” For such a system, partition tolerance is not an imperious necessity: if a partition occurs the calculation can be restarted from scratch once the partition is fixed. As already stated, this does not mean partition tolerance is not useful. Partition tolerance would be typically useful should the calculation take weeks. But such systems can also exist without partition tolerance.

Conclusion: “the whole space is useful”

Being partition tolerant is comfortable. You have to be Stonebraker to claim partition intolerance. On the other hand, Kyle ‘’Aphyr’ Kingsbury proves regularly with smart but simple tests that many systems used in production are not network partition tolerant.
It’s not really that network partition tolerance can be easily forfeited, especially if the system is of “any interesting scale.” But first it is worth checking the system’s size: is it of “any interesting scale?” Exactly like a system that does not need to be distributed should not be distributed, a distributed system that can be kept small should be kept small.
There is also a catch in how CAP is sometimes (mis)understood: “node failures, processes crashes and network partitions are partitions so you have to be partition tolerant”. This is not only false but also dangerous: it hides the fact that each of these faults could be tackled independently with a specific priority. Before trying to be available during network partition, you should first validate that you don’t lose data with a single process crash. With fault tolerance like with any other problem, decomposing it makes it easier to fix. Network partition is just one type of fault out of many.
So, sometimes using CA just makes sense. As already stated by Eric Brewer: “the whole space is useful.
原文comments:https://aphyr.com/posts/325-comments-on-you-do-it-too

zz from: http://www.cloudera.com/content/cloudera/zh-CN/documentation/core/v5-3-x/topics/impala_parquet.html

Impala 可帮助您创建、管理和查询 Parquet 表。Parquet 是一种面向列的二进制文件格式,其设计目标是为 Impala 最擅长的大规模查询类型提供高效率支持。Parquet 对于在表中扫描特定列的查询特别有效,例如查询一个包含许多列的表,或执行需要处理列中绝大部分或全部的值的如SUM()AVG() 等聚合操作。每个数据文件包含一系列行(行组)的值。在数据文件里,每列的值被重新组织以便它们相邻,从而对这些列的值进行良好的压缩。针对 Parquet 表的查询可以快速并以最小的 I/O 从任意列快速获取并分析这些数据。

Table 1. Impala 支持 Parquet 文件
文件类型 格式化 压缩编码解码器 Impala 能否支持 CREATE? Impala 能否支持 INSERT?
Parquet 结构化 Snappy、gzip;当前默认为 Snappy 是。 是:CREATE TABLEINSERTLOAD DATA 和查询。

Continue reading:

在 Impala 中创建 Parquet 表

要创建一个名为 PARQUET_TABLE 的 Parquet 格式的表,请使用类似下方的命令,并替换为你自己的表名、列名和数据类型:

[impala-host:21000] > create table parquet_table_name (x INT, y STRING) STORED AS PARQUET;

或者,要克隆一个现有表的列名和数据类型:

[impala-host:21000] > create table parquet_table_name LIKE other_table_name STORED AS PARQUET;

在 Impala 1.4.0 及更高版本中,您可以从原始 Parquet 数据文件中导出列定义,即使没有一个现存的 Impala 表。例如,您可以基于该目录中某文件的列定义,创建一个指向 HDFS 目录的外部表:

CREATE EXTERNAL TABLE ingest_existing_files LIKE PARQUET '/user/etl/destination/datafile1.dat'
  STORED AS PARQUET
  LOCATION '/user/etl/destination';

或者,您可以参考现有的数据文件,创建带有合适列名的新的空表。 然后使用 INSERT 创建新的数据文件 或者使用 LOAD DATA 将现有数据文件 导入至新表。

CREATE TABLE columns_from_data_file LIKE PARQUET '/user/etl/destination/datafile1.dat'
  STORED AS PARQUET;

新创建的表的默认属性与其它任何 CREATE TABLE 语句相同。 例如,默认的文件格式是文本文件;如果您希望新建的表使用 Parquet 文件格式, 请再加上STORED AS PARQUET

本例中,新建的表根据年、月、日进行分区。这些分区关键列不是数据文件的一部分,因此您需要在 CREATE TABLE 语句中指定:

CREATE TABLE columns_from_data_file LIKE PARQUET '/user/etl/destination/datafile1.dat'
  PARTITION (year INT, month TINYINT, day TINYINT)
  STORED AS PARQUET;

参见 创建表语句 了解 CREATE TABLE LIKE PARQUET 语法的详细信息。

当创建了表后,请使用类似下面的命令向表中插入数据,请再次使用您自己的表名:

[impala-host:21000] > insert overwrite table parquet_table_name select * from other_table_name;

如果 Parquet 表具有与其它表不同数量的列或不同的列名,请在对其它表的 SELECT 语句中指定列名,以代替 *

加载数据到 Parquet 表

根据原始数据是存在于 Impala 表中,还是存在于 Impala 表之外的数据文件中,选择不同技术将数据加载到 Parquet 表里。

如果您的数据已经在 Impala 或 Hive 表里,可能是在不同的文件格式或分区模式下,您可以使用 Impala 语法 INSERT…SELECT 将这些数据导入 Parquet 表。可以在同一个 INSERT 语句中,对数据执行转换、过滤、重新分区,以及其它类似操作。参见 Parquet 数据文件的 Snappy 和 GZip 压缩 查看一些示例,了解如何在 Parquet 表中插入数据。

插入分区表时,尤其是使用 Parquet 文件格式时,可以在 INSERT 语句中包含提示来微调操作的整体性能及其资源使用率:

  • 这些提示在 Impala 1.2.2 及更高版本中可用。
  • 只有当由于容量限制而导致 INSERT 插入分区 Parquet 表的操作失败时,或者 INSERT 虽然成功却低于最佳性能时,才可以使用这些提示。
  • 要使用这些提示,请将提示关键字 [SHUFFLE][NOSHUFFLE](包括方括号)放置在 PARTITION 子句之后,SELECT 关键字之前。
  • [SHUFFLE] 选择的执行计划应能将同时写入到 HDFS 的文件数量以及保留分区数据的内存缓冲区的数量降到最少。它允许某些原本有可能失败的INSERT 操作成功执行,从而降低了 INSERT 操作的整体资源使用率。它涉及节点之间的一些数据传输,以便在同一个节点上构建某个特定分区的数据文件。
  • [NOSHUFFLE] 选择的执行计划应整体速度更快,但也能够生成大量的小数据文件或超出容量限制而导致 INSERT 操作失败。当由于所有节点尝试构建所有分区的数据而导致 INSERT 语句失败或运行无效时,使用 [SHUFFLE]
  • 如果 INSERT … SELECT 查询中提及的来源表的任何分区键列不显示列统计数据,则 Impala 会自动使用 [SHUFFLE] 方法。在这种情况下,仅使用[NOSHUFFLE] 提示是不会带来什么影响的。
  • 如果 INSERT … SELECT 查询中提及的来源表的所有分区键列均显示列统计数据,Impala 会根据这些列中预计的独特值数量以及 INSERT 操作中涉及的节点数量,选择是使用 [SHUFFLE] 还是 [NOSHUFFLE]。在这种情况下,您可能需要 [SHUFFLE][NOSHUFFLE] 提示来替代 Impala 选择的执行计划。

Parquet 表的任何 INSERT 语句都需要在 HDFS 文件系统中有足够的空间才能写入一个块。由于默认情况下 Parquet 数据文件使用的块大小为 1 GB,因此如果 HDFS 的运行空间不足,INSERT 就有可能失败(即使是极少量的数据)。

避免对 Parquet 表使用 INSERT…VALUES 语法,因为 INSERT…VALUES 会为每一个 INSERT…VALUES 语句产生一个极小的单独数据文件,而 Parquet 的强项在于它可以 块的方式处理数据(压缩、并行等操作)。

假如您有一个或多个 Impala 之外生成的 Parquet 数据文件,可通过以下方法之一,快速地让这些数据可以在 Impala 中查询:

  • 使用 LOAD DATA 语句,将一个单独文件或某个目录下所有数据文件移动到 Impala 表对应的数据目录中。这不会验证或转换数据。原始数据文件必须位于 HDFS 中,而不能是本地文件系统中。
  • 使用包含 LOCATION 子句的 CREATE TABLE 语句创建一个表,将数据继续存放在 Impala 数据目录之外。原始数据文件必须位于 HDFS 中,而不能是本地文件系统中。为加强安全性,假如这些数据长时间存在并被其他应用重用,您可以使用 CREATE EXTERNAL TABLE 语法,使得这些数据文件不会被 Impala 语句 DROP TABLE 删除。
  • 假如 Parquet 表已经存在,您可以直接复制 Parquet 数据文件到表的目录中,然后使用 REFRESH 语句使得 Impala 得以识别新添加的数据。请记住使用 hdfs distcp -pb 命令而不是 -put-cp 操作,以保留 Parquet 数据文件的块大小。参见 复制 Parquet 数据文件示例 了解以上操作的示例。

如果数据存在于 Impala 之外,并且是其它格式,请结合使用之前提到的两种技术。首先,使用 LOAD DATACREATE EXTERNAL TABLE … LOCATION语句把数据导入使用对应文件格式的 Impala 表中。然后,使用 INSERT…SELECT 语句将数据复制到 Parquet 表中,并转换为 Parquet 格式。

加载数据到 Parquet 表是内存密集型操作,因为输入数据会在内存中被缓存,直到其大小达到 一个数据块,然后这些块的数据会进行组织和压缩,最终写出。把数据插入到分区 Parquet 表时,内存消耗会更大,因为对分区关键列值的每种组合都要写入一个单独的数据文件中,可能需要内存同时操作几个 区块。

当向分区 Parquet 表插入数据时,Impala 会在节点之间重新分布数据以减少内存消耗。但是在插入操作时,您可能仍然需要临时增加 Impala 专用的内存量,或者把加载操作拆分到几个 INSERT 语句中,或者两种方式都采用。

  Note: 之前所有的技术都假定你所加载的数据与你的目标表的结构相匹配,包括列的顺序,列的名称,以及分区布局 (layout)。要转换或重组数据,请先将数据加载到与其底层数据相匹配的 Parquet 表中,然后使用如 CREATE TABLE AS SELECTINSERT … SELECT 之一的表复制技术,对列进行重新排序或重命名,将数据拆分到多个分区等等。举例来说,要加载单个包含所有数据的 Parquet 文件到分区表中,您应当使用包含动态分区的 INSERT … SELECT 语句,让 Impala 创建具有合适分区数值的单独的数据文件;示例参见 INSERT 语句

Impala Parquet 表的查询性能

Parquet 表的查询性能,取决于处理SELECT 列表和 WHERE 子句 时所需的列的个数,数据被分为 块大小等同于文件大小的大数据文件的方式,读取压缩格式下每列数据时 I/O 的降低,可以跳过哪些数据文件(分区表),以及解压每列数据时的 CPU 负载。

例如,以下对 Parquet 表的查询是高效的:
select avg(income) from census_data where state = 'CA';

这个查询只处理大量列中的两个列。假如表是根据 STATE 分区的,它甚至更有效率,因为查询仅仅需要对每个数据文件读取和解码 1 列,并且它可以只读取 state ‘CA’分区目录下的数据文件,跳过所有其它 state 的、物理上的位于其它目录的所有数据文件。
以下对 Parquet 表的查询相对低效:
select * from census_data;

Impala 不得不读取每个 数据文件的全部内容,并解压每一个行组中每一列的内容,浪费了面向列格式的 I/O 优化。与其它文件格式的表相比,该查询对 Parquet 表可能还是会更快,但它没有利用 Parquet 数据文件格式的独特优势。

Impala 可以优化对 Parquet 表的查询,特别是在联接查询方面,涉及对全部的表进行统计时效果更加明细。当表中加载或附加了大量数据后,对每个表发出COMPUTE STATS 语句。详细信息,请参见 COMPUTE STATS 语句

  Note: 目前,某个已知问题 (IMPALA-488) 会导致在 Parquet 表上执行 COMPUTE STATS 操作时使用的内存过多。解决办法是,发布 COMPUTE STATS语句前,在 impala-shell 中发布命令 SET NUM_SCANNER_THREADS=2。接着,发布 UNSET NUM_SCANNER_THREADS,然后继续查询。

Parquet 表的分区

正如 Impala 表分区所述,对 Impala 而言,分区是一项重要而通用的性能技术。本章节介绍一些关于分区 Parquet 表的性能考虑。

Parquet 文件格式非常适合包含许多列,并且绝大部分查询只涉及少数几列的表。正如 Parquet 数据文件如何组织所述,Parquet 数据文件的物理分布使得对于许多查询 Impala 只需读取数据的一小部分。当您结合使用 Parquet 表和分区时,这种方法的性能优势会进一步凸显。基于 WHERE 子句引用的分区关键列,Impala 可以完全跳过特定分区的数据文件。例如,分区表上的查询通常基于YEARMONTH,和/或 DAY列进行期间趋势分析,或是地理区域分析。请记住,Parquet 数据文件使用了 块尺寸,所以在确定如何精细地分区数据时,请尝试找到一个粒度,使得每个分区包含 256 MB 或更多数据,而不是创建大量属于多个分区的小文件。

插入到分区 Parquet 表的操作可能是一个资源密集型操作,因为对于每个不同分区关键列的组合,每个 Impala 节点都可能会将一个单独的数据文件写入 HDFS。大量同时打开的文件数可能会达到 HDFS transceivers 限制。考虑采用以下技术,避免达到这一限制:

  • 使用单独的 INSERT 语句加载不同的数据子集,每个语句的 PARTITION 子句包含特定值,例如 PARTITION (year=2010)
  • 增加 HDFS 的 transceivers 值,有时候写作 xcievers (sic)。即配置文件 hdfs-site.xml 中的 dfs.datanode.max.transfer.threads属性。例如,如果您加载 12 年的数据,根据年、月、日进行分区,那么即使该值设为 4096 也不够。这篇 博文 使用 HBase 例子作为说明,探讨了增加或减小这一数值的考虑。
  • 使用 COMPUTE STATS 语句在数据被复制的源表上采集 列统计信息,这样 Impala 查询可以评估分区关键列不同数值的个数,从而均衡工作负载。
  Note: 目前,某个已知问题 (IMPALA-488) 会导致在 Parquet 表上执行 COMPUTE STATS 操作时使用的内存过多。解决办法是,发布 COMPUTE STATS语句前,在 impala-shell 中发布命令 SET NUM_SCANNER_THREADS=2。接着,发布 UNSET NUM_SCANNER_THREADS,然后继续查询。

Parquet 数据文件的 Snappy 和 GZip 压缩

当 Impala 使用 INSERT 语句写入 Parquet 数据文件时,底层的压缩受 COMPRESSION_CODEC 查询选项控制。(Impala 2.0 版本以前,该查询选项的名称为PARQUET_COMPRESSION_CODEC。)这一查询选项允许的值包括 snappy(默认值)、gzipnone。选项值不区分大小写。如果该选项设为了其它无法识别的值,那么所有类型的查询都会因无效的选项设置而失败,不仅仅是涉及 Parquet 表的查询。

使用 Snappy 压缩的 Parquet 表示例

默认情况下,Parquet 表的底层数据文件采用 Snappy 压缩。快速压缩和解压,对于许多数据集来说是一个好选择。为确保使用了 Snappy 压缩,例如在试验了其它压缩编解码之后,请在插入数据之前设置 COMPRESSION_CODEC 查询选项为 snappy

[localhost:21000] > create database parquet_compression;
[localhost:21000] > use parquet_compression;
[localhost:21000] > create table parquet_snappy like raw_text_data;
[localhost:21000] > set COMPRESSION_CODEC=snappy;
[localhost:21000] > insert into parquet_snappy select * from raw_text_data;
Inserted 1000000000 rows in 181.98s

使用 GZip 压缩的 Parquet 表示例

如果您需要更密集的压缩(代价是,查询时需要更多的 CPU 周期以进行解压),请在插入数据之前设置 COMPRESSION_CODEC 查询选项为 gzip

[localhost:21000] > create table parquet_gzip like raw_text_data;
[localhost:21000] > set COMPRESSION_CODEC=gzip;
[localhost:21000] > insert into parquet_gzip select * from raw_text_data;
Inserted 1000000000 rows in 1418.24s

未压缩 Parquet 表示例

假如您的数据压缩效果非常有限,或者您想彻底避免压缩和解压缩带来的 CPU 负载,请在插入数据前设置 COMPRESSION_CODEC 查询选项为none

[localhost:21000] > create table parquet_none like raw_text_data;
[localhost:21000] > insert into parquet_none select * from raw_text_data;
Inserted 1000000000 rows in 146.90s

已压缩 Parquet 表的大小和速度示例

下面的例子演示了 10 亿条复合数据在数据大小和查询速度方面的差异,他们分别使用了不同的编解码器进行压缩。与往常一样,使用你自己真实的数据集进行类似的测试。实际的压缩比、对应的插入和查询速度,根据实际数据特征的不同而有所不同。

本例中,压缩方式从 Snappy 改为 GZip,数据大小减少了 40%,而从 Snappy 改为不压缩,大小增加了 40%:

$ hdfs dfs -du -h /user/hive/warehouse/parquet_compression.db
23.1 G  /user/hive/warehouse/parquet_compression.db/parquet_snappy
13.5 G  /user/hive/warehouse/parquet_compression.db/parquet_gzip
32.8 G  /user/hive/warehouse/parquet_compression.db/parquet_none

因为 Parquet 数据文件通常大小是 ,每一个目录都包含不同数量的数据文件并安排不同的行组。

同时,压缩比更小,解压速度就更快。在上面包含 10 亿行记录的表中,对于评估特定列全部数值的查询,不使用压缩的比使用 Snappy 压缩的快,使用 Snappy 压缩的比使用 Gzip 压缩的快。查询性能依赖于多个不同因素,因此请如往常一样,使用你自己的数据进行自己的基准测试,以获得数据大小、CPU 效率、以及插入和查询操作的速度等方面的理想均衡。

[localhost:21000] > desc parquet_snappy;
Query finished, fetching results ...
+-----------+---------+---------+
| name      | type    | comment |
+-----------+---------+---------+
| id        | int     |         |
| val       | int     |         |
| zfill     | string  |         |
| name      | string  |         |
| assertion | boolean |         |
+-----------+---------+---------+
Returned 5 row(s) in 0.14s
[localhost:21000] > select avg(val) from parquet_snappy;
Query finished, fetching results ...
+-----------------+
| _c0             |
+-----------------+
| 250000.93577915 |
+-----------------+
Returned 1 row(s) in 4.29s
[localhost:21000] > select avg(val) from parquet_gzip;
Query finished, fetching results ...
+-----------------+
| _c0             |
+-----------------+
| 250000.93577915 |
+-----------------+
Returned 1 row(s) in 6.97s
[localhost:21000] > select avg(val) from parquet_none;
Query finished, fetching results ...
+-----------------+
| _c0             |
+-----------------+
| 250000.93577915 |
+-----------------+
Returned 1 row(s) in 3.67s

复制 Parquet 数据文件示例

下面是最后一个示例,演示了使用不同压缩编解码器的数据文件在读操作上是如何相互兼容的。压缩格式的相关元数据会写入到每个数据文件中,无论当时COMPRESSION_CODEC 设置为什么值,在查询过程中都可以正常解码。本例中,我们从之前示例中使用的 PARQUET_SNAPPYPARQUET_GZIP,和PARQUET_NONE 表中复制数据文件,这几个表中每个表都包含 10 亿行记录,全都复制到新表 PARQUET_EVERYTHING的数据目录中。一对简单的查询表明,新表包含了 30 亿行记录,并且数据文件使用了不同的压缩编解码器。

首先,我们在 Impala 中创建表,以便在 HDFS 中有一个存放数据文件的目标目录:

[localhost:21000] > create table parquet_everything like parquet_snappy;
Query: create table parquet_everything like parquet_snappy

然后在 shell 中,我们将对应的数据文件复制到新表的数据目录中。 不采用 hdfs dfs -cp 这一常规文件的操作方式,我们采用 hdfs distcp -pb 命令以确保 Parquet 数据文件特有的 块大小 继续保留。

$ hdfs distcp -pb /user/hive/warehouse/parquet_compression.db/parquet_snappy \
  /user/hive/warehouse/parquet_compression.db/parquet_everything
...MapReduce output...
$ hdfs distcp -pb /user/hive/warehouse/parquet_compression.db/parquet_gzip  \
  /user/hive/warehouse/parquet_compression.db/parquet_everything
...MapReduce output...
$ hdfs distcp -pb /user/hive/warehouse/parquet_compression.db/parquet_none  \
  /user/hive/warehouse/parquet_compression.db/parquet_everything
...MapReduce output...

返回 impala-shell 编译器,我们采用 REFRESH 语句让 Impala 服务器识别表中新的数据文件,然后运行查询,结果表明数据文件包含 30 亿行记录,并且其中某个数值列的值与原来的小表相匹配:

[localhost:21000] > refresh parquet_everything;
Query finished, fetching results ...

Returned 0 row(s) in 0.32s
[localhost:21000] > select count(*) from parquet_everything;
Query finished, fetching results ...
+------------+
| _c0        |
+------------+
| 3000000000 |
+------------+
Returned 1 row(s) in 8.18s
[localhost:21000] > select avg(val) from parquet_everything;
Query finished, fetching results ...
+-----------------+
| _c0             |
+-----------------+
| 250000.93577915 |
+-----------------+
Returned 1 row(s) in 13.35s

与其他 Hadoop 组件交换 Parquet 数据文件

自 CDH 4.5 开始,您可以在 Hive、Pig、MapReduce 中读取和写入 Parquet 数据文件。参考 CDH 4 安装指南 了解详细信息。

之前,不支持在 Impala 中创建 Parquet 数据然后在 Hive 中重用这个表。现在 CDH 4.5 中,Hive 开始支持 Parquet,在 Hive 中重用已有的 Impala Parquet 数据文件需要更新表的元数据。假如您已经使用 Impala 1.1.1 或更高版本,请使用以下命令:

ALTER TABLE table_name SET FILEFORMAT PARQUET;

假如您使用比 Impala 1.1.1 老的版本,通过 Hive 执行元数据的更新:

ALTER TABLE table_name SET SERDE 'parquet.hive.serde.ParquetHiveSerDe';
ALTER TABLE table_name SET FILEFORMAT
  INPUTFORMAT "parquet.hive.DeprecatedParquetInputFormat"
  OUTPUTFORMAT "parquet.hive.DeprecatedParquetOutputFormat";

Impala 1.1.1 及以上版本可以重用 Hive 中创建的 Parquet 数据文件,不需要执行任何操作。

Impala 支持标量数据类型,您可以在 Parquet 数据文件中进行编码,但不支持复合 (composite) 或嵌套 (nested) 类型,如 maps/arrays。假如表的任何一列采用了不受支持的类型,Impala 将无法访问该表。

假如你在不同节点、乃至在相同节点的不同目录复制 Parquet 数据文件,请使用hadoop distcp -pb命令以确保保留原有的块大小。发出命令hdfs fsck -blocks HDFS_path_of_impala_table_dir 并检查平均块大小是否接近 256 MB(或是由 PARQUET_FILE_SIZE 查询设置所定义的其它任何大小),以验证是否保留了块大小。( hadoop distcp 操作通常会生出一些子目录,名称为 _distcp_logs_*,您可以之后从目标目录中删除这些目录)。发出hadoop distcp 命令,了解 distcp 命令的语法详情。

Parquet 数据文件如何组织

尽管 Parquet 是一个面向列的文件格式,但不要期望每列一个数据文件。Parquet 在同一个数据文件中保存一行中的所有数据,以确保在处理过程中,某行的所有列在同一个节点上都可用。Parquet 所做的是设置 HDFS 块大小和与之相匹配的最大数据文件大小,以确保 I/O 和网络传输请求适用于大批量数据。

在数据文件中,多个行的数据会重新排列,以便第一列的所有值都会被重新组织到一个连续的块中,然后是第二列的所有值,依此类推。相同列的值彼此相邻,从而 Impala 可以对这些列的值使用高效的压缩技术。

  Note:

Impala INSERT 语句通过一个大小 与数据文件大小相匹配的 HDFS 块来写入 Parquet 数据文件,以确保每个数据文件对应一个 HDFS 块,并且整个文件可以在单个节点上处理,不需要任何远程读取。

如果您不是通过 Impala 创建的 Parquet 数据文件,例如 MapReduce 或 Pig job,请确保 HDFS 块大小比文件要大或相同,从而维持 每个块一个文件 关系。将 dfs.block.size 或者dfs.blocksize 属性设置为足够大,使得每个文件可与单独的 HDFS 块大小匹配,即使该大小比正常的 HDFS 块要大。

如果在复制文件时块大小被重设为较低的值,你会发现涉及这些文件的查询性能会更低,并且 PROFILE 语句可通过远程读取揭示哪些 I/O 不是最优的。参见 复制 Parquet 数据文件示例 示例,了解在复制 Parquet 数据文件时,如何保留块大小。

当 Impala 检索或测试特定列的数据时,它会打开所有数据文件,但只会读取每个文件中包含该列数据的部分。这些列的值是连续存放,使得处理同一列的数据所需的 I/O 最小化。如果其它的列在 SELECT 列表或 WHERE 子句中列出,在同一个数据文件中同一行的所有列的数据都可用。

假如一个 INSERT 语句导入的数据小于 一个 Parquet 数据块的大小,那么最终的数据文件将小于理想大小。因此,如果您把一个 ETL 作业拆分成多个INSERT 语句,请尽量确保每一个 INSERT 语句插入的数据量接近 256 MB,或 256 MB 的倍数

Parquet 数据文件的 RLE 编码和字典编码

Parquet 基于实际数据值的分析,使用一些自动压缩技术,例如游程编码 (RLE) 和字典编码。当数据值被编码成紧凑的格式后,使用压缩算法,编码的数据可能会被进一步压缩。Impala 创建的 Parquet 数据文件 可以使用 Snappy、GZip,或不进行压缩;Parquet 规格还支持 LZO 压缩,但是目前不支持 LZO 压缩的 Parquet 文件。

除了应用到整个数据文件的 Snappy 或 GZip 压缩之外,RLE 和字典编码是 Impala 自动应用到 Parquet 数据值组的压缩技术。这些自动优化可以节省您的时间,并可省去传统数据仓库通常需要的规划。例如,字典编码降低了创建数值型 ID 作为长字符串缩写的需求。

游程编码压缩了一组重复的数据值。例如,如果多个连续行都包含相同的国家编码,那么可以用该值以及连续出现的次数来表示这些重复的值。

字典编码取出存在于列中的不同的值,并用紧凑的 2-字节替代原始值进行表示,而原始值可能有多个字节。(对压缩后的值还可进行进一步压缩,以节省更多空间。)当列的不同值的个数少于 2**16 (16,384) 时,使用这一类型的编码。对于 BOOLEAN数据类型的列不会应用该编码,因为原始值已经足够短。TIMESTAMP 列有时每行的值都不同,这时可能很快就超过 2**16 个不同值的限制。2**16 的列不同值限制可为每个数据文件进行重新设置,因此如果有多个不同的数据文件,每个文件都包含 10,000 个不同的城市名,每一个数据文件中的城市名一列依然可以使用字典编码来进行压缩。

为 Parquet 表压缩数据文件

如果您对 Parquet 表重用了现有的表结构或 ETL 过程,您可能会遇到 很多小文件 的情况,这时查询效率不是最优的。例如,类似以下的语句可能会产生组织低效的数据文件:

-- In an N-node cluster, each node produces a data file
-- for the INSERT operation. If you have less than
-- N GB of data to copy, some files are likely to be
-- much smaller than the default Parquet block size.
insert into parquet_table select * from text_table;

-- Even if this operation involves an overall large amount of data,
-- when split up by year/month/day, each partition might only
-- receive a small amount of data. Then the data files for
-- the partition might be divided between the N nodes in the cluster.
-- A multi-gigabyte copy operation might produce files of only
-- a few MB each.
insert into partitioned_parquet_table partition (year, month, day)
  select year, month, day, url, referer, user_agent, http_code, response_time
  from web_stats;

以下技术可帮助你在 ParquetINSERT 操作中产生大的数据文件,并压缩现有的过小的数据文件:

  • 向分区 Parquet 表中插入数据时,请使用静态分区INSERT 语句,这样分区关键值将被指定为常量。理想情况下,为每个分区使用一个单独的 INSERT语句。

  • 执行 INSERTCREATE TABLE AS SELECT 语句期间,可以暂时将 NUM_NODES 选项设为 1。通常情况下,这些语句针对每个数据节点生成一个或多个数据文件。如果写入操作涉及少量数据、Parquet 表和/或分区表,则默认的行为是当您凭直觉只希望输出一个文件时生成多个小文件。SET NUM_NODES=1 关闭写入操作的已分配方面,使其更有可能只生成一个或几个数据文件。

  • 与你习惯的传统分析数据库系统相比,做好减少分区关键列个数的准备。

  • 不要期望 Impala-写入的 Parquet 文件会填满整个 Parquet 区块大小。Impala 在计算为每个 Parquet 文件写入多少数据时,会采取保守策略。典型情况下,通过 Parquet 文件格式的压缩和解压缩技术,磁盘内存中的未压缩数据会显著减少。最终的数据文件大小取决于数据的压缩情况。因此,如果一个256 MB 的文本文件被分为 2 个 Parquet 数据文件,并且每个文件都小于 256 MB,这将是正常现象。

  • 如果不巧一个表的末尾是很多小的数据文件,可以考虑使用前述技术中的一种或多种,通过CREATE TABLE AS SELECTINSERT … SELECT 语句,将所有数据复制到一个新的 Parquet 表中。

    为避免重复查询更改表名,你可以采用一个约定,始终在一个视图上运行重要的查询。立即更改视图定义,所有后继查询都使用新的基础表:

    create view production_table as select * from table_with_many_small_files;
    -- CTAS or INSERT...SELECT all the data into a more efficient layout...
    alter view production_table as select * from table_with_few_big_files;
    select * from production_table where c1 = 100 and c2 < 50 and ...;
    

Parquet 表的模式 (Schema) 演进

模式演进是指使用ALTER TABLE … REPLACE COLUMNS 语句,更改表的名称、数据类型或列的个数。您可以根据以下步骤,执行 Parquet 表的模式演进:

  • Impala ALTER TABLE 语句不会更改表中的任何数据文件。对 Impala 而言,模式演进涉及根据新的表定义,对相同的数据文件进行解释。某些类型的模式变化合乎情理,可以正确表示。其它类型的模式变化可能无法以合理的方式表示,会在查询过程中产生特殊的结果值或是出现转换错误。

  • 语句 INSERT 总是以最新的表定义创建数据。如果你依次执行了INSERTALTER TABLE … REPLACE COLUMNS 语句,你可能得到列数不同或内部数据表示不同的数据文件。

  • 如果你在末尾使用 ALTER TABLE … REPLACE COLUMNS 来定义更多的列,当原始数据文件在查询中被使用时,最后添加的列将全部被认为是NULL 值。

  • 如果你使用 ALTER TABLE … REPLACE COLUMNS 定义了比过去更少的列,当原始数据文件在查询中被使用时,未使用的列在数据文件中仍然会被忽略。

  • Parquet 视 TINYINTSMALLINT,和 INT 为相同的内部类型,都以 32-位整数的方式存储。

    • 这意味着很容易将 TINYINT 列升级为 SMALLINTINT,或是将 SMALLINT 列升级为 INT。在数据文件中数字将以完全相同的方式展示,升级后的列不会包含任何超出范围的值。
    • 如果你把其中任何一个列类型更改为小一些的类型,那么在新类型下超出范围的值,其返回值将发生错误,典型地为负数。

    • 你无法将 TINYINTSMALLINTINT 列更改为 BIGINT,或其它类似方式。尽管 ALTER TABLE 执行成功,但是任何对这些列的查询都会导致转换错误。

    • 查询过程中,列的任何其它类型的转换都会在产生转换错误。例如, INT 转换至 STRINGFLOAT 转换至 DOUBLETIMESTAMP 转换至STRINGDECIMAL(9,0) 转换至 DECIMAL(5,2),等待。

Parquet 表的数据类型考虑

Parquet 格式定义了一系列数据类型,其名称与相应的 Impala 数据类型名称不同。如果你在准备 Parquet 文件过程中使用的是其它 Hadoop 组件(如 Pig 或 MapReduce),你可能需要使用 Parquet 定义的类型名称。下图列出了 Parquet 定义的类型,以及对应的 Impala 类型。

基本类型:

BINARY -> STRING
BOOLEAN -> BOOLEAN
DOUBLE -> DOUBLE
FLOAT -> FLOAT
INT32 -> INT
INT64 -> BIGINT
INT96 -> TIMESTAMP

逻辑类型:

BINARY + OriginalType UTF8 -> STRING
BINARY + OriginalType DECIMAL -> DECIMAL

 

zz from: http://sunyi514.github.io/2014/11/15/%E7%9B%98%E7%82%B9sql-on-hadoop%E4%B8%AD%E7%94%A8%E5%88%B0%E7%9A%84%E4%B8%BB%E8%A6%81%E6%8A%80%E6%9C%AF/

自打Hive出现之后,经过几年的发展,SQL on Hadoop相关的系统已经百花齐放,速度越来越快,功能也越来越齐全。本文并不是要去比较所谓“交互式查询哪家强”,而是试图梳理出一个统一的视角,来看看各家系统有哪些技术上相通之处。
考虑到系统使用的广泛程度与成熟度,在具体举例时一般会拿Hive和Impala为例,当然在调研的过程中也会涉及到一些其他系统,如Spark SQL,Presto,TAJO等。而对于hawq这样的商业产品和apache drill这样成熟度还不是很高的开源方案就不做过多了解了。

系统架构

runtime framework v.s. mpp

在SQL on Hadoop系统中,有两种架构,一种是基于某个运行时框架来构建查询引擎,典型案例是Hive;另一种是仿照过去关系数据库的MPP架构。前者现有运行时框架,然后套上sql层,后者则是从头打造一个一体化的查询引擎。有时我们能听到一种声音,说后者的架构优于前者,至少在性能上。那么是否果真如此?
一般来说,对于SQL on Hadoop系统很重要的一个评价指标就是:快。后面提到的所有内容也大多是为了查询速度更快。在Hive逐渐普及之后,就逐渐有了所谓交互式查询的需求,因为无论是BI系统,还是adhoc,都不能按照离线那种节奏玩。这时候无论是有实力的大公司(比如Facebook),还是专业的供应商(比如Cloudera),都试图去解决这个问题。短期可以靠商业方案或者关系数据库去支撑一下,但是长远的解决方案就是参考过去的MPP数据库架构打造一个专门的系统,于是就有了Impala,Presto等等。从任务执行的角度说,这类引擎的任务执行其实跟DAG模型是类似的,当时也有Spark这个DAG模型的计算框架了,但这终究是别人家的孩子,而且往Spark上套sql又是Hive的那种玩法了。于是在Impala问世之后就强调自己“计算全部在内存中完成”,性能也是各种碾压当时还只有MR作为计算模型的Hive。那么Hive所代表的“基于已有的计算模型”方式是否真的不行?
不可否认,按照这种方式去比较,那么类MPP模式确实有很多优势:

  • DAG v.s. MR:最主要的优势,中间结果不写磁盘(除非内存不够),一气呵成。
  • 流水线计算:上游stage一出结果马上推送或者拉到下一个stage处理,比如多表join时前两个表有结果直接给第三个表,不像MR要等两个表完全join完再给第三个表join。
  • 高效的IO:本地查询没有多余的消耗,充分利用磁盘。这个后面细说。
  • 线程级别的并发:相比之下MR每个task要启动JVM,本身就有很大延迟,占用资源也多。

当然MPP模式也有其劣势,一个是扩展性不是很高,这在关系数据库时代就已经有过结论;另一个是容错性差,对于Impala来说一旦运行过程中出点问题,整个查询就挂了。
但是,经过不断的发展,Hive也能跑在DAG框架上了,不仅有Tez,还有Spark。上面提到的一些劣势,其实大都也可以在计算模型中解决,只不过考虑到计算模型的通用性和本身的设计目标,不会去专门满足(所以如果从这个角度分类,Impala属于“专用系统”,Spark则属于“通用系统”)。在最近Cloudera做的benchmark中,虽然Impala仍然一路领先,但是基于Spark的Spark SQL完全不逊色于Presto,基于Tez的Hive也不算很差,至少在并发模式下能超过Presto,足见MPP模式并不是绝对占上风的。所以这种架构上的区别在我看来并不是制胜的关键,至少不是唯一的因素,真正要做到快速查询,各个方面的细节都要有所把握。后面说的都是这些细节。

核心组件

不管是上面提到的那种架构,一个SQL on Hadoop系统一般都会有一些通用的核心组件,这些组件根据设计者的考虑放在不同的节点角色中,在物理上节点都按照master/worker的方式去做,如果master压力太大,一些本来适合放在master上的组件可以放到一个辅助master上。

  • UI层负责提供用户输入查询的接口。一般有Web/GUI,命令行,编程方式3类。
  • QL层负责把用户提交的查询解析成可以运行的执行计划(比如MR Job)。这部分在后面会专门提到。
  • 执行层就是运行具体的Job。一般会有一个master负责query的运行管理,比如申请资源,观察进度等等,同时master也负责最终聚合局部结果到全局结果。而每个节点上会有相应的worker做本地计算。
  • IO层提供与存储层交互的接口。对于HDFS来说,需要根据I/O Format把文件转换成K/V,Serde再完成K/V到数据行的映射。对于非HDFS存储来说就需要一些专门的handler/connector。
  • 存储层一般是HDFS,但也有可以查询NoSQL,或者关系数据库的。
  • 系统另外还需要一个元数据管理服务,管理表结构等。

arch

执行计划

编译流程

从SQL到执行计划,大致分为5步。

  • 第一步将SQL转换成抽象语法树AST。这一步一般都有第三方工具库可以完成,比如antlr。
  • 第二步对AST进行语义分析,比如表是否存在,字段是否存在,SQL语义是否有误(比如select中被判定为聚合的字段在group by中有没有出现)。
  • 第三步生成逻辑执行计划,这是一个由逻辑操作符组成的DAG。比如对于Hive来说扫表会产生TableScanOperator,聚合会产生GroupByOperator。对于类MPP系统来说,情况稍微有点不同。逻辑操作符的种类还是差不多,但是会先生成单机版本,然后生成多机版本。多机版本主要是把aggregate,join,还有top n这几个操作并行化,比如aggregate会分成类似MR那样的本地aggregate,shuffle和全局aggregate三步。
  • 第四步做逻辑执行计划做优化,这步在下面单独介绍。
  • 第五步把逻辑执行计划转换成可以在机器上运行的物理计划。对于Hive来说,就是MR/Tez Job等;对于Impala来说,就是plan fragment。其他类MPP系统也是类似的概念。物理计划中的一个计算单元(或者说Job),有“输入,处理,输出”三要素组成,而逻辑执行计划中的operator相对粒度更细,一个逻辑操作符一般处于这三要素之一的角色。

下面分别举两个例子,直观的认识下sql、逻辑计划、物理计划之间的关系,具体解释各个operator的话会比较细碎,就不展开了。

Hive on MR:

1
select count(1) from status_updates where ds = '2009-08-01'

Hive_compile

Presto(引用自美团技术团队,其中SubPlan就是物理计划的一个计算单元):

1
2
3
select c1.rank, count(*) 
from dim.city c1 join dim.city c2 on c1.id = c2.id 
where c1.id > 10 group by c1.rank limit 10;

Presto_compile

优化器

关于执行计划的优化,虽然不一定是整个编译流程中最难的部分,但却是最有看点的部分,而且目前还在不断发展中。Spark系之所以放弃Shark另起炉灶做Spark SQL,很大一部分原因是想自己做优化策略,避免受Hive的限制,为此还专门独立出优化器组件Catalyst(当然Spark SQL目前还是非常新,其未来发展给人不少想象空间)。总之这部分工作可以不断的创新,优化器越智能,越傻瓜化,用户就越能解放出来解决业务问题。
早期在Hive中只有一些简单的规则优化,比如谓词下推(把过滤条件尽可能的放在table scan之后就完成),操作合并(连续的filter用and合并成一个operator,连续的projection也可以合并)。后来逐渐增加了一些略复杂的规则,比如相同key的join + group by合并为1个MR,还有star schema join。在Hive 0.12引入的相关性优化(correlation optimizer)算是规则优化的一个高峰,他能够减少数据的重复扫描,具体来说,如果查询的两个部分用到了相同的数据,并且各自做group by / join的时候用到了相同的key,这个时候由于数据源和shuffle的key是一样的,所以可以把原来需要两个job分别处理的地方合成一个job处理。
比如下面这个sql:

1
2
3
4
5
6
7
8
9
10
11
SELECT 
	sum(l_extendedprice) / 7.0 as avg_yearly 
FROM 
	 (SELECT l_partkey, l_quantity, l_extendedprice 
      FROM lineitem JOIN part ON (p_partkey=l_partkey) 
      WHERE p_brand='Brand#35' AND p_container = 'MED PKG')touter 
JOIN 
     (SELECT l_partkey as lp, 0.2 * avg(l_quantity) as lq 
      FROM lineitem GROUP BY l_partkey) tinner 
ON (touter.l_partkey = tinnter.lp) 
WHERE touter.l_quantity < tinner.lq

这个查询中两次出现lineitem表,group by和两处join用的都是l_partkey,所以本来两个子查询和一个join用到三个job,现在只需要用到一个job就可以完成。

correlation_optimizer

但是,基于规则的优化(RBO)不能解决所有问题。在关系数据库中早有另一种优化方式,也就是基于代价的优化CBO。CBO通过收集表的数据信息(比如字段的基数,数据分布直方图等等)来对一些问题作出解答,其中最主要的问题就是确定多表join的顺序。CBO通过搜索join顺序的所有解空间(表太多的情况下可以用有限深度的贪婪算法),并且算出对应的代价,可以找到最好的顺序。这些都已经在关系数据库中得到了实践。
目前Hive已经启动专门的项目,也就是Apache Optiq来做这个事情,而其他系统也没有做的很好的CBO,所以这块内容还有很大的进步空间。

执行效率

即使有了高效的执行计划,如果在运行过程本身效率较低,那么再好的执行计划也会大打折扣。这里主要关注CPU和IO方面的执行效率。

CPU

在具体的计算执行过程中,低效的cpu会导致系统的瓶颈落在CPU上,导致IO无法充分利用。在一项针对Impala和Hive的对比时发现,Hive在某些简单查询上(TPC-H Query 1)也比Impala慢主要是因为Hive运行时完全处于CPU bound的状态中,磁盘IO只有20%,而Impala的IO至少在85%。
在SQL on Hadoop中出现CPU bound的主要原因有以下几种:

  • 大量虚函数调用:这个问题在多处出现,比如对于a + 2 * b之类的表达式计算,解释器会构造一个expression tree,解释的过程就是递归调用子节点做evaluation的过程。又比如以DAG形式的operator/task在执行的过程中,上游节点会层层调用下游节点来获取产生的数据。这些都会产生大量的调用。
  • 类型装箱:由于表达式解释器需要对不同数据类型的变量做解释,所以在Java中需要把这些本来是primitive的变量包装成Object,累积起来也消耗不少资源。这算是上面一个问题附带出来的。
  • branch instruction: 现在的CPU都是有并行流水线的,但是如果出现条件判断会导致无法并行。这种情况可能出现在判断数据的类型(是string还是int),或者在判断某一列是否因为其他字段的过滤条件导致本行不需要被读取(列存储情况下)。
  • cache miss:每次处理一行数据的方式导致cpu cache命中率不高。(这么说已经暗示了解决方案)

针对上面的问题,目前大多数系统中已经加入了以下两个解决办法中至少一个。
一个方法是动态代码生成,也就是不使用解释性的统一代码。比如a + 2 * b这个表达式就会生成对应的执行语言的代码,而且可以直接用primitive type,而不是用固定的解释性代码。具体实现来说,JVM系的如Spark SQL,Presto可以用反射,C++系的Impala则使用了llvm生成中间码。对于判断数据类型造成的分支判断,动态代码的效果可以消除这些类型判断,还可以展开循环,可以对比下面这段代码,左边是解释性代码,右边是动态生成代码。

codegen

另一个方法是vectorization(向量化),基本思路是放弃每次处理一行的模式,改用每次处理一小批数据(比如1k行),当然前提条件是使用列存储格式。这样一来,这一小批连续的数据可以放进cache里面,cpu不仅减少了branch instruction,甚至可以用SIMD加快处理速度。具体的实现参考下面的代码,对一个long型的字段增加一个常量。通过把数据表示成数组,过滤条件也用selVec装进数组,形成了很紧凑的循环:

1
2
3
4
5
6
7
8
9
10
11
12
add(int vecNum, long[] result, long[] col1, int[] col2, int[] selVec) 
{   
  if (selVec == null)   
     for (int i = 0; i < vecNum; i++) 
         result[i] = col1[i] + col2[i];
  else 
     for (int i = 0; i < vecNum; i++) 
     {
         int selIdx = selVec[i];
         result[selIdx] = col1[selIdx] + col2[selIdx];
     }
}

IO

由于SQL on Hadoop存储数据都是在HDFS上,所以IO层的优化其实大多数都是HDFS的事情,各大查询引擎则提出需求去进行推动。要做到高效IO,一方面要低延迟,屏蔽不必要的消耗;另一方面要高吞吐,充分利用每一块磁盘。目前与这方面有关的特性有:

  • short-circuit local reads:当发现读取的数据是本地数据时,不走DataNode(因为要走一次socket连接),而是用DFS Client直接读本地的block replica。HDFS参数是dfs.client.read.shortcircuitdfs.domain.socket.path
  • zero copy:避免数据在内核buffer和用户buffer之间反复copy,在早期的HDFS中已经有这个默认实现。
  • disk-aware scheduling:通过知道每个block所在磁盘,可以在调度cpu资源时让不同的cpu读不同的磁盘,避免查询内和查询间的IO竞争。HDFS参数是dfs.datanode.hdfs-blocks-metadata.enabled

存储格式

对于分析类型的workload来说,最好的存储格式自然是列存储,这已经在关系数据库时代得到了证明。目前hadoop生态中有两大列存储格式,一个是由Hortonworks和Microsoft开发的ORCFile,另一个是由Cloudera和Twitter开发的Parquet。
ORCFile顾名思义,是在RCFile的基础之上改造的。RCFile虽然号称列存储,但是只是“按列存储”而已,将数据先划分成row group,然后row group内部按照列进行存储。这其中没有列存储的一些关键特性,而这些特性在以前的列式数据库中(比如我以前用过的Infobright)早已用到。好在ORCFile已经弥补了这些特性,包括:

  • 块过滤与块统计:每一列按照固定行数或大小进一步切分,对于切分出来的每一个数据单元,预先计算好这些单元的min/max/sum/count/null值,min/max用于在过滤数据的时候直接跳过数据单元,而所有这些统计值则可以在做聚合操作的时候直接采用,而不必解开这个数据单元做进一步的计算。
  • 更高效的编码方式:RCFile中没有标注每一列的类型,事实上当知道数据类型时,可以采取特定的编码方式,本身就能很大程度上进行数据的压缩。常见的针对列存储的编码方式有RLE(大量重复数据),字典(字符串),位图(数字且基数不大),级差(排序过的数据,比如日志中用户访问时间)等等。

ORCFile的结构如下图,数据先按照默认256M分为row group,也叫strip。每个strip配一个index,存放每个数据单元(默认10000行)的min/max值用于过滤;数据按照上面提到的编码方式序列化成stream,然后再进行snappy或gz压缩。footer提供读取stream的位置信息,以及更多的统计值如sum/count等。尾部的file footer和post script提供全局信息,如每个strip的行数,各列数据类型,压缩参数等。

orcfile

Parquet的设计原理跟ORC类似,不过它有两个特点:

  • 通用性:相比ORCFile专门给Hive使用而言,Parquet不仅仅是给Impala使用,还可以给其他查询工具使用,如Hive、Pig,进一步还能对接avro/thrift/pb等序列化格式。
  • 基于Dremel思想的嵌套格式存储:关系数据库设计模式中反对存储复杂格式(违反第一范式),但是现在的大数据计算不仅出现了这种需求(半结构化数据),也能够高效的实现存储和查询效率,在语法上也有相应的支持(各种UDF,Hive的lateral view等)。Google Dremel就在实现层面做出了范例,Parquet则完全仿照了Dremel。

对嵌套格式做列存储的难点在于,存储时需要标记某个数据对应于哪一个存储结构,或者说是哪条记录,所以需要用数据清楚的进行标记。 在Dremel中提出用definition level和repetition level来进行标记。definition level指的是,这条记录在嵌套结构中所处于第几层,而repetition level指的是,这条记录相对上一条记录,在第几层重复。比如下图是一个二级嵌套数组。图中的e跟f在都属于第二层的重复记录(同一个level2),所以f的r值为2,而c跟d则是不同的level2,但属于同一个level1,所以d的r值为1。对于顶层而言(新的一个嵌套结构),r值就为0。

parquet_nested

但是仅仅这样还不够。上图说明了r值的作用,但是还没有说明d值的作用,因为按照字面解释,d值对于每一个字段都是可以根据schema得到的,那为什么还要从行记录级别标记?这是因为记录中会插入一些null值,这些null值代表着他们“可以存在”但是因为是repeated或者是optional所以没有值的情况,null值是用来占位的(或者说是“想象”出来的),所以他们的值需要单独计算。null的d值就是说这个结构往上追溯到哪一层(不包括平级)就不是null(不是想象)了。在dremel paper中有完整的例子,例子中country的第一个null在code = en所在的结构里面,那么language不是null(不考虑code,他跟country平级),他就是第二层;又比如country的第二个null在url = http://B 所在的结构里面,那么name不是null(不考虑url,因为他跟本来就是null的language平级),所以就是第一层。

dremel_data
dremel_representation

通过这种方式,就对一个树状的嵌套格式完成了存储。在读取的时候可以通过构造一个状态机进行遍历。
有意思的是,虽然parquet支持嵌套格式,但是Impala还没有来得及像Hive那样增加array,map,struct等复杂格式,当然这项功能已经被列入roadmap了,相信不久就会出现。
在最近我们做的Impala2.0测试中,顺便测试了存储格式的影响。parquet相比sequencefile在压缩比上达到1:5,查询性能也相差5-10倍,足见列存储一项就给查询引擎带来的提升。

资源控制

运行时资源调整

对于一个MR Job,reduce task的数量一直是需要人为估算的一个麻烦事,基于MR的Hive也只是根据数据源大小粗略的做估计,不考虑具体的Job逻辑。但是在之后的框架中考虑到了这个情况,增加了运行时调整资源分配的功能。Tez中引入了vertex manager,可以根据运行时收集到的数据智能的判断reduce动作需要的task。类似的功能在TAJO中也有提到,叫progressive query optimization,而且TAJO不仅能做到调整task数量,还能调整join顺序。

资源集成

在Hadoop已经进入2.x的时代,所有想要得到广泛应用的SQL on Hadoop系统势必要能与YARN进行集成。虽然这是一个有利于资源合理利用的好事,但是由于加入了YARN这一层,却给系统的性能带来了一定的障碍,因为启动AppMaster和申请container也会占用不少时间,尤其是前者,而且container的供应如果时断时续,那么会极大的影响时效性。在Tez和Impala中对这些问题给出了相应的解决办法:

  • AppMaster启动延迟的问题,采取long lived app master,AppMaster启动后长期驻守,而非像是MR那样one AM per Job。具体实现时,可以给fair scheduler或capacity scheduler配置的每个队列配上一个AM池,有一定量的AM为提交给这个队列的任务服务。
  • container供应的问题,在Tez中采取了container复用的方式,有点像jvm复用,即container用完以后不马上释放,等一段时间,实在是没合适的task来接班了再释放,这样不仅减少container断供的可能,而且可以把上一个task留下的结果cache住给下一个task复用,比如做map join;Impala则采取比较激进的方式,一次性等所有的container分配到位了才开始执行查询,这种方式也能让它的流水线式的计算不至于阻塞。

其他

到这里为止,已经从上到下顺了一遍各个层面用到的技术,当然SQL on Hadoop本身就相当复杂,涉及到方方面面,时间精力有限不可能一一去琢磨。比如其他一些具有技术复杂度的功能有:

  • 多数据源查询:Presto支持从mysql,cassandra,甚至kafka中去读取数据,这就大大减少了数据整合时间,不需要放到HDFS里才能查询。Impala和Hive也支持查询hbase。Spark SQL也在1.2版本开始支持External Datasource。国内也有类似的工作,如秒针改造Impala使之能查询postgres。
  • 近似查询:count distinct(基数估计)一直是sql性能杀手之一,如果能接受一定误差的话可以采用近似算法。Impala中已经实现了近似算法(ndv),Presto则是请blinkDB合作完成。两者都是采用了HyperLogLog Counting。当然,不仅仅是count distinct可以使用近似算法,其他的如取中位数之类的也可以用。

结束语

尽管现在相关系统已经很多,也经过了几年的发展,但是目前各家系统仍然在不断的进行完善,比如:

  • 增加分析函数,复杂数据类型,SQL语法集的扩展。
  • 对于已经成形的技术也在不断的改进,如列存储还可以增加更多的encoding方式。
  • 甚至对于像CBO这样的领域,开源界拿出来的东西还算是刚刚起步,相比HAWQ中的ORCA这种商业系统提供的优化器还差的很多。

毕竟相比已经比较成熟的关系数据库,分布式环境下需要解决的问题更多,未来一定还会出现很多精彩的技术实践,让我们在海量数据中更快更方便的查到想要的数据。

zz from: https://www.mapr.com/blog/what-kind-hive-table-best-your-data#.VaM5YZO1ZGw

Over the last few releases, the options for how you store data in Hive has advanced in many ways. In this post, let’s take a look at how to go about determining what Hive table storage format would be best for the data you are using. Starting with a basic table, we’ll look at creating duplicate tables for each of the storage format options, and then comparing queries and data compression. Just keep in mind that the goal of this post is to talk about ways of comparing table formats and compression options, and not define the fastest Hive setup for all things data. After all, the fun is in figuring out the Hive table storage format for your own Hive project, and not just reading about mine.

The Hadoop Cluster Layout

For our discussion today, I used a MapR Hadoop cluster consisting of 5 nodes in an Amazon EC2 environment. The MapR Version is 4.0.1 (Hadoop 2.4.1) running Hive 0.13. I have MRv1 and YARN running on all nodes, so I can run comparisons between legacy MRv1 jobs and YARN-controlled jobs.

Cluster Diagram

In this cluster, I spun up 5 nodes in an Amazon EC2 multi-tenant environment. The systems are running CentOS 6.5. I’ll focus on the Hive aspects for the purpose of this blog post, and save the specifics of supporting separate projects with separate SLAs for a later post.

1. Our Starting Data

We have two tables that we will start with. One table consists of information that details bike rental stations. The second table contains information on trips where bikes were rented and where they were turned in. The starting dataset is stored as a standard text table with delimited columns.

1.1      Table 1: Bike Stations
This table has information on the rental stations. Here is an example of some of the fields and data:

1.2      Table 2: Bike Trips
This table has information on bike rental activity. Here is an example of some of the fields and data:

2. Our Table Storage Options

Now that we have an idea of what is in our original data set, we can take a look at the storage options available in Hive. You can add or subtract to the list of storage formats, but for this example, we will look at our starting text tables, RC and ORC. This section covers what some of these formats mean and “why” tables are laid out this way in my MapR cluster as opposed to the “how.” Don’t worry, we’ll cover the “how” later.

2.1      Different Storage Formats

2.1.1      Text File
Text is where Hive started, and has evolved into handling just about any text file with any separation you may be looking for. It’s one of the things that gives Hive the ability to get your data from files into SQL-EC2 multi-tenant-fed tools.

This is our text file setup:

ROW FORMAT DELIMITED
  FIELDS TERMINATED BY ','
  LINES TERMINATED BY '\n'
STORED AS INPUTFORMAT
  'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'

2.1.2      RC: Record Columnar File
The RC format was designed for clusters with MapReduce in mind. It is a huge step up over standard text files. It’s a mature format with ways to ingest into the cluster without ETL. It is supported in several Hadoop system components. For our comparison, we will ETL the data from text into the RC table using Hive.

The full table creation and load process is covered later, but this is what our table format looks like:

ROW FORMAT SERDE
  'org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.RCFileInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.RCFileOutputFormat'

2.1.3      ORC: Optimized Row Columnar File
The ORC format showed up in Hive 0.11. As the name implies, it is more optimized than the RC format. If you want to hold onto speed and compress the data as much as possible, then ORC is for you. We won’t be digging into the how or why the “O” in ORC works—we’re just taking it for granted that it does and will be using it in our comparison.

Our ORC settings in our ORC table:

ROW FORMAT SERDE
  'org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.ORCFileInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.ORCFileOutputFormat'

2.2      Table Locations and Partitions
Do I use an external table? Do I use partitions? Do file system settings make a difference? All good questions, but in this section we’ll lay out some of those options to give a scope of reference on deciding what you want to include in your comparison for your project.

2.2.1      External or Internal
The main difference between internal and external tables is a matter of perspective. What tool do I expect will control the data—Hive, or something else? To oversimplify this, external tables are normally used when alterations to the data could happen with some other tool, or the data already exists and you want to keep it in its original form and use it in Hive.

For our example, you could use either, or do comparisons around both. In the table creation process later we will do both, but specify the storage location for the tables on the file system.

2.2.2      File System Volumes
One of the great things about using MapR is the power of logical file system volumes in my Hadoop cluster. On a small cluster with one use case this is not a big deal, but when you get to 300 nodes and 20 projects, all with specific SLAs, then it makes a huge difference. It’s one of the reasons you can support all those SLAs and projects in one cluster. But in this case, we are using separate volumes to help get a solid assessment on the statistics of how the data is stored for each table in our comparison.

3. Data Compression Options

For those of you who can remember Schoolhouse Rock: “Compression, Compression…what’s my compression?” There are several options for compression. For those of you who don’t want to worry about compression, you can just pick an option for the MapR Hadoop file system compression and not worry about it. For those of you who have a drive to tweak all things, then you can run comparisons on the file system, Hive, and mixes of both till smoke comes out your ears. In this post, we are sticking to one set of values for each table format. Different compression settings can affect data in projects differently but the combinations picked, while not the final say in all things data, will hopefully result in some interesting comparisons.

3.1      Hive Compression Options
Here’s a list of some of the Hive compression options looked at in our example:

3.2      MapR File System Options
The file system itself can also be set for specific compression formats. The tradeoff is always compression vs. speed. Below is a list of the file system compression options in MapR:

3.3      Our Comparison Matrix

4. Setting Up Hadoop File System Locations in MapR

The ability to set up logical file system volumes inside Hadoop is a powerful capability not found anywhere else in Hadoop world. It allows you to isolate the access and locality of data, which is handy when your Hadoop system graduates to production or past one use case. Here we are using it to help isolate the statistics of our Hive tables for the purposes of comparison, but if you’re interested in this capability, you can read more about it on our “Get Real with Hadoop: True Multi-tenancy” blog post.

This step is optional, but this capability opens up a world of possibilities when you gain such granular control of your data.

Let’s quickly step through setting up our volumes in our Hadoop cluster where I created our tables.

4.1      Using MapR Control System UI to Establish File System Volumes
The most common way to create volumes is using the MapR Control System (MCS). All aspects of block replication, snapshot scheduling, data mirroring, access, data locality and data quotas can be set through the MCS.

4.2      Using MapR REST API to Establish File System Volumes
Anything that is done through the MCS can be done through the REST API. The full documentation for this can be found on our MapR Volume Creation documentation.

To create the same volume pictured above, the following cur command to the rest API would get the job done:

curl -X GET -k -H "Authorization: Basic bWFwcjpyb290NG1hcHI=" -H 
"Cache-Control: no-cache" 
'https://jbates1:8443/rest/volume/create?name=hive_txt&path=/data/hive/text
"a=500M&replication=3&schedule=2&topology=/data&type=0&advisoryquota=100M'

4.3      Using MapR CLI to Establish File System Volumes
The last method to create a volume is using the MapR CLI. This is the method I used for my volume creation process.
maprcli volume create -name hive_txt -path /data/hive/text -advisoryquota 
100M -quota 500M -replication 3 -schedule 2 -topology "/data" -type 0

4.4      Volumes Used for This Comparison
Here are all the volumes created from the methods above:

4.5      Setting the File System Compression Options
As mentioned above, you can use Hive the file system to set the compression. Compression settings are managed at the directory level. Since MapR is a Hadoop system, I can use a Hadoop command to set the compression settings for my directories.

Set compression with something like this: hadoop mfs -setcompression zlib /data/hive/orc1

Verifying the compression setting can be done with this command: hadoop mfs -ls /data/hive

That’s all there is to adjusting file system compression. All new data will be compressed with the provisioned setting. More details on compression can be found at MapR Compression Documentation.

5. Creating the Tables

The text data in my csv format loaded into the file system at /data/hive/text. We have external tables created in Hive partitioned around the year and the month.

5.1      Original Text Tables
Here are the commands used to create our original Hive tables:

5.1.1      Bike stations table

CREATE EXTERNAL TABLE `STATIONS`(
  `id` int,
  `installdate` string,
  `name` string,
  `longitude` float,
  `latitude` float)
PARTITIONED BY (
  `year` int,
  `month` string)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY ','
  LINES TERMINATED BY '\n'
STORED AS INPUTFORMAT
  'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  'maprfs:/mapr/demo.jbates.mapr/data/hive/text/bikestations';

5.1.2      Bike trips table
CREATE EXTERNAL TABLE `TRIPS`(
  `bike_nr` string,
  `duration` int,
  `start_date` string,
  `start_station` string,
  `end_station` string)
PARTITIONED BY (
  `year` int,
  `month` string)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY ','
  LINES TERMINATED BY '\n'
STORED AS INPUTFORMAT
  'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  'maprfs:/mapr/my_cluster/data/hive/text/trips';

5.2      Hive RC Tables
The RC tables in Hive will have a significant performance increase over our original text files. Table creation is almost identical. In this case, the table location was specified, but it was not built as an external table. Dropping an external table will not drop the data, but with this one, dropping it discards the dataset.

5.2.1      stations_rc table

CREATE TABLE `STATIONS_RC`(
  `id` int,
  `installdate` string,
  `name` string,
  `longitude` float,
  `latitude` float)
PARTITIONED BY (
  `year` int,
  `month` string)
ROW FORMAT SERDE
  'org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.RCFileInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.RCFileOutputFormat'
LOCATION
  'maprfs:/mapr/demo.jbates.mapr/data/hive/rc/bikestations';

5.2.2      trips_rc table
CREATE TABLE `TRIPS_RC`(
  `bike_nr` string,
  `duration` int,
  `start_date` string,
  `start_station` string,
  `end_station` string)
PARTITIONED BY (
  `year` int,
  `month` string)
ROW FORMAT SERDE
  'org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.RCFileInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.RCFileOutputFormat'
LOCATION
  'maprfs:/mapr/demo.jbates.mapr/data/hive/rc/trips;

5.3      Hive ORC Tables
With the ORC tables here, we added the wrinkle of setting a table property in the table creation process that will set the compression settings for our table.

5.3.1      stations_orc1 table

CREATE EXTERNAL TABLE `STATIONS_ ORC1`(
  `id` int,
  `installdate` string,
  `name` string,
  `longitude` float,
  `latitude` float)
PARTITIONED BY (
  `year` int,
  `month` string)
STORED AS ORC
LOCATION
  'maprfs:/mapr/demo.jbates.mapr/data/hive/orc1/bikestations'
TBLPROPERTIES ( "orc.compress"="NONE" );

5.3.2      trips_orc1 table
CREATE EXTERNAL TABLE `TRIPS_ ORC1`(
  `bike_nr` string,
  `duration` int,
  `start_date` string,
  `start_station` string,
  `end_station` string)
PARTITIONED BY (
  `year` int,
  `month` string)
STORED AS ORC
LOCATION
  'maprfs:/mapr/demo.jbates.mapr/data/hive/orc1/trips
TBLPROPERTIES ( "orc.compress"="NONE" );

5.3.3      stations_orc2 table
CREATE EXTERNAL TABLE `STATIONS_ ORC2`(
  `id` int,
  `installdate` string,
  `name` string,
  `longitude` float,
  `latitude` float)
PARTITIONED BY (
  `year` int,
  `month` string)
LOCATION
  'maprfs:/mapr/demo.jbates.mapr/data/hive/orc2/bikestations'
TBLPROPERTIES ( "orc.compress"="LZ4" );

5.3.4      trips_orc2 table
CREATE EXTERNAL TABLE `TRIPS_ORC2`(
  `bike_nr` string,
  `duration` int,
  `start_date` string,
  `start_station` string,
  `end_station` string)
PARTITIONED BY (
  `year` int,
  `month` string)
STORED AS ORC
LOCATION
  'maprfs:/mapr/demo.jbates.mapr/data/hive/orc2/trips
TBLPROPERTIES ( "orc.compress"="LZ4" );

6. Loading Our Data into the Tables

Now that our tables are all created, we can load the RC and ORC tables from the original text dataset. There are all kinds of examples on creating a new table from an old table, but those are for simple tables. When you add in some partitions, things get more complicated. All the tables we have created are partitioned. In general, partitions improve performance on larger datasets. When writing data into your table, the partitions must be named. Below are examples of copying data into the new tables.

6.1      Writing to the RC Tables
The RC tables in Hive will have a significant performance increase over our original tables. In order to have the correct compression, we need to set that before we load the data. The commands here will write the data into the rc tables with the correct compression values.

6.1.1      Loading stations_rc

set mapred.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec;
set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec;
set hive.default.rcfile.serde=org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;
set io.compression.codecs=org.apache.hadoop.io.compress.GzipCodec;
set zlib.compress.level=BEST_SPEED;
set hive.exec.compress.output=true;
set mapred.output.compress=true;
INSERT INTO TABLE bikes.stations_rc partition(year=2014,month="nov") 
SELECT `id`, `installdate`, `name`, `longitude`, `latitude`
FROM bikes.stations WHERE year=2014 AND month="nov";

6.1.2      Loading trips_rc
set mapred.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec;
set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec;
set hive.default.rcfile.serde=org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;
set io.compression.codecs=org.apache.hadoop.io.compress.GzipCodec;
set zlib.compress.level=BEST_SPEED;
set hive.exec.compress.output=true;
set mapred.output.compress=true;
INSERT INTO TABLE bikes.trips_rc partition(year=2014,month="dec") 
SELECT `bike_nr`, `duration`, `start_date`, `start_station`, `end_station`
FROM bikes.trips WHERE year=2014 AND month="dec";

6.2      Writing to the ORC Tables
The process of putting data into the ORC tables is a little simpler, since the compression is set in the table properties. Since we are using partitioned tables, that part remains unchanged.

6.2.1      Loading stations_orc1

INSERT INTO TABLE bikes.stations_orc1 partition(year=2014,month="nov") 
SELECT `id`, `installdate`, `name`, `longitude`, `latitude`
FROM bikes.stations WHERE year=2014 AND month="nov";

6.2.2      Loading trips_orc1
INSERT INTO TABLE bikes.trips_orc1 partition(year=2014,month="dec") 
SELECT `bike_nr`, `duration`, `start_date`, `start_station`, `end_station`
FROM bikes.trips WHERE year=2014 AND month="dec";

6.2.3      Loading stations_orc2
INSERT INTO TABLE bikes.stations_orc2 partition(year=2014,month="nov") 
SELECT `id`, `installdate`, `name`, `longitude`, `latitude`
FROM bikes.stations WHERE year=2014 AND month="nov";

6.2.4      Loading trips_orc2
INSERT INTO TABLE bikes.trips_orc2 partition(year=2014,month="dec") 
SELECT `bike_nr`, `duration`, `start_date`, `start_station`, `end_station`
FROM bikes.trips WHERE year=2014 AND month="dec";

6.3      Hive Table Partitions
The examples above have partitions. Partitions are transparent for the most part, except for two areas. Specify the partition when you’re writing data into your tables. Make sure your partition is loaded into Hive when reading from the tables. Some quick scripting can get you past this without much effort.

7. Comparing Data Compression

OK…the tables are made… compression values set… table format specified…the data is loaded…what does it look like on disk?

Now that we have data in our tables, we can look at the effect of the compression settings on disk. Since I’m using a MapR Hadoop cluster, I have a read/write POSIX file system with native NFS on every node. I can use standard Linux commands to see how large my data set is on disk.

I also put each table in its own logical file system volume, so I can pull stats for each one of those over REST API, CLI or from the MapR Control System UI.

7.1      Pulling Data Size from the File System
Nothing complicated here. I ran a du command on each of the table directories.

du -h /mapr/my_cluster/data/hive/text

From that I pulled the values below:

From the data above, it looks like the table using ORC SNAPPY compression is the winner, but that may not be the not the case. If you remember, earlier we set up different file system compressions as well. The command above will not reflect that.

7.2      Pulling Data Size from the MapR Volumes
It looks like the SNAPPY compression in the ORC table worked best, but let’s look at the information from the MapR file system volumes. I used a simple cli command to pull my values. I’m pulling the logical space used, which should be close to the table above and the actual space used.

maprcli volume info -name hive_txt -columns logicalUsed,used

The results are…

It looks like the MapR file system zlib settings compressed well in the first ORC table group and the snappy compression in the second ORC table group landing right next to it, but there are always the questions of performance.

8. Executing Query Comparisons

So far, we have created and loaded our Hive tables, and we have looked at the storage and compression options. Now we need to execute some queries and see what we see. This post is not the end all for everything Hive; the goal is just to get you started looking at your own data in your own cluster. With that in mind, I quickly picked a few queries that I thought were interesting, and used them to evaluate the settings I had configured in my tables.

8.1      The Queries
My query goals: I wanted dome counting, some joins, and some conversions, and some MapReduce jobs to run. With that in mind, this is what I ended up with:

8.1.1      Query 1: Joining My Trip and Station Info
This query joins my trip data with my station data, so I can get full information on where the trip started and where it ended.

SELECT a.start_date, a.bike_nr, a.duration, b.start_station_id, b.start_latitude, b.start_longitude, b.stat_station, c.end_station_id, c.end_latitude, c.end_longitude, c.end_station
FROM (SELECT duration, bike_nr, start_date, cast(split(trips_rc.start_station, '/')[4] as int) as start_station, cast(split(trips_rc.end_station, '/')[4] as int) as end_station FROM trips_rc) a 
JOIN (SELECT id as start_station_id, longitude as start_longitude, latitude as start_latitude, name as stat_station FROM stations_rc) b ON a.start_station = b.start_station_id
JOIN (SELECT id as end_station_id,longitude as end_longitude, latitude as end_latitude, name as end_station FROM stations_rc) c ON a.end_station = c.end_station_id;

8.1.2      Query 2: Bike Utilization and Movement
This query just added up the time the bikes were in service, and added in the joins to get more info on the starting and ending stations.
SELECT a.bike_nr, sum(a.duration)
FROM (SELECT duration, gender, subscription_type, status, bike_nr, start_date, cast(split(trips_rc.start_station, '/')[4] as int) as start_station, cast(split(trips_rc.end_station, '/')[4] as int) as end_station FROM trips_rc) a 
JOIN (SELECT id as start_station_id, longitude as start_longitude, latitude as start_latitude, name as stat_station FROM stations_rc) b ON a.start_station = b.start_station_id
JOIN (SELECT id as end_station_id,longitude as end_longitude, latitude as end_latitude, name as end_station FROM stations_rc) c ON a.end_station = c.end_station_id
WHERE a.duration > 0
GROUP BY a.bike_nr;

8.1.3      Query 3: Station A
This query takes a look at the amount of times a bike leaves a station compared to the amount of times one is returned to the station.
SELECT s.station_id as station_id, e.end_count as end_count, s.start_count as start_count FROM
(SELECT a.end_station as station_id, count(a.end_station) as end_count FROM (SELECT cast(split(trips_rc.end_station, '/')[4] as int) as end_station FROM trips_rc) a group by a.end_station) e
FULL JOIN
(SELECT b.start_station as station_id, count(b.start_station) as start_count FROM (SELECT cast(split(trips_rc.start_station, '/')[4] as int) as start_station FROM trips_rc ) b group by b.start_station) s
WHERE e.station_id = s.station_id;

8.2      Comparison Scripts
Now that I have my queries, it’s time to run them in a script. I wanted to run each query for each table multiple times and record the results. Using some of the methods in my earlier posts, POST, I put together the script below to execute each query a number of times to each table and log the results.
#!/bin/bash
# run_compare.sh


# this will print the usage statements and exit
usage() {
	case $1 in
		"")
			echo ""
			echo "Usage: run_compare.sh [-l /my/log/file.txt] [-c run_count] [-h|--help]"
			echo ""
			echo "  This is a quick example of comparing some hive queries to different tables  with bash"
			echo "     The queries and tables are hard coded in the script"
			echo ""
			echo "Params:"
			echo "      -c|--count run_count: default is 10"
			echo "      -h|--help: print this help info and exit"
			echo "      -l|--logfile: default is run_compare.csv in execution dir"
			echo "Examples:"
			echo ""
			echo "		./run_compare.sh -c 100 -l myfile.csv"
			echo ""
			;;
		
	esac
	exit
}

# this will process command line arguments enough 
# to get you to a specific function
args() {
	run_compare $@
}

run_compare() {
	# init params
	database="bikes"
	table_entensions="null rc orc1 orc2"
	count=10
	logfile="run_compare.csv"
	row_count=""
	start_time=""
	end_time=""
	my_value=""
	my_query=""
	name=""

	# process args for this block
	while test $# -gt 0
	do
    	case $1 in
    		-l|--logfile)
            	shift
            	logfile=$1
            	;;
            -c|--count)
            	shift
            	count=$1
            	;;
        	-h|--help)
            	usage pull_with_one_liner
            	;;
        	*)
            	echo >&2 "Invalid argument: $1"
            	usage ""
        	    ;;
    	esac
    	shift
	done
	
	# clean out data from existing log file
	cat /dev/null > $logfile
	
	# execute comparison for specified count
	c=0
	while test $c -lt $count
	do
		let c++
		echo "running round $c"
		for ext in $table_entensions; do
			if [ "$ext" = "null" ]; then
				ext=""
				name="text"
			else
				name=$ext
				ext="_$ext"
			fi
			
			echo "round $c: table group $name" 
			
			# execute Query1
			my_query_name="Query1"
			my_query="set mapred.reduce.tasks=30;"
			my_query="$my_query use $database;"
			my_query="$my_query SELECT a.start_date, a.bike_nr, a.duration, b.start_station_id, b.start_latitude, b.start_longitude, b.stat_station, c.end_station_id, c.end_latitude, c.end_longitude, c.end_station"
			my_query="$my_query FROM (SELECT duration, bike_nr, start_date, cast(split(trips$ext.start_station, '/')[4] as int) as start_station, cast(split(trips$ext.end_station, '/')[4] as int) as end_station FROM trips$ext) a"
			my_query="$my_query JOIN (SELECT id as start_station_id, longitude as start_longitude, latitude as start_latitude, name as stat_station FROM stations$ext) b ON a.start_station = b.start_station_id"
			my_query="$my_query JOIN (SELECT id as end_station_id,longitude as end_longitude, latitude as end_latitude, name as end_station FROM stations$ext) c ON a.end_station = c.end_station_id"
			
			start_time=`date "+%s"`
			my_value=$(hive -S -e "$my_query")
			end_time=`date "+%s"`
			r_count=`echo "$my_value"| wc -l`
			log_it $logfile $name $start_time $end_time $my_query_name "$r_count" "$my_query" 
			
			
			# execute Query 2
			my_query_name="Query2"
			my_query="set mapred.reduce.tasks=30;"
			my_query="$my_query use $database;"
			my_query="$my_query SELECT a.bike_nr, sum(a.duration)"
			my_query="$my_query FROM (SELECT duration, gender, subscription_type, status, bike_nr, start_date, cast(split(trips$ext.start_station, '/')[4] as int) as start_station, cast(split(trips$ext.end_station, '/')[4] as int) as end_station FROM trips$ext) a"
			my_query="$my_query JOIN (SELECT id as start_station_id, longitude as start_longitude, latitude as start_latitude, name as stat_station FROM stations$ext) b ON a.start_station = b.start_station_id"
			my_query="$my_query JOIN (SELECT id as end_station_id,longitude as end_longitude, latitude as end_latitude, name as end_station FROM stations$ext) c ON a.end_station = c.end_station_id"
			my_query="$my_query WHERE a.duration > 0"
			my_query="$my_query GROUP BY a.bike_nr"
			
			start_time=`date "+%s"`
			my_value=$(hive -S -e "$my_query")
			#my_value="MyValue"
			end_time=`date "+%s"`
			r_count=`echo "$my_value"| wc -l`
			log_it $logfile $name $start_time $end_time $my_query_name "$r_count" "$my_query"
			
			# execute Query 3
			my_query_name="Query3"
			my_query="set mapred.reduce.tasks=30;"
			my_query="$my_query use $database;"
			my_query="$my_query SELECT s.station_id as station_id, e.end_count as end_count, s.start_count as start_count FROM"
			my_query="$my_query (SELECT a.end_station as station_id, count(a.end_station) as end_count FROM (SELECT cast(split(trips$ext.end_station, '/')[4] as int) as end_station FROM trips$ext) a group by a.end_station) e"
			my_query="$my_query FULL JOIN"
			my_query="$my_query (SELECT b.start_station as station_id, count(b.start_station) as start_count FROM (SELECT cast(split(trips$ext.start_station, '/')[4] as int) as start_station FROM trips$ext ) b group by b.start_station) s"
			my_query="$my_query WHERE e.station_id = s.station_id"
			
			start_time=`date "+%s"`
			my_value=$(hive -S -e "$my_query")
			end_time=`date "+%s"`
			r_count=`echo "$my_value"| wc -l`
			log_it $logfile $name $start_time $end_time $my_query_name "$r_count" "$my_query" 
			
		done
	done
	exit	
}

# pass in logfile, start, end, query_name, result_count, query
# count result set, and log the data to csv file
log_it() {
	log_file=$1
	n=$2
	start_t=$3
	end_t=$4
	q_name=$5
	result_count=$6
	q=$7
		
	let duration=$end_t-$start_t
	time_run=`date -d @$start_t`
	echo "$n,$q_name,\"$time_run\",$duration,$result_count,\"$q\"" >> $log_file
}

# -------------------------------------------------------------------------------------
# Beginning of script execution
#

args $@

9. What Performance Did We Get?

My goal here is to help anyone new to the game of SQL on Hadoop, as they step through what can be done with their data. I even tweaked some of the queries and ran them against Impala andApache Drill, since they were all running in my cluster. I ran iterations in YARN as well as MRv1. Below are the averages for performance times for the little set up I ran. Hopefully this will help you in kick-starting your thoughts on running this with your data.

After running 20 iterations on this data set, the results are…

In the compression results you can see that using the ORC format saved space just by the nature of its architecture. Adding in the MapR file compression took it the rest of the way, but the snappy compression that ORC uses also works well. When you look at the RC tables compressed with gzip, you can see a performance increase and significant space savings. For this data set, the combination that performed the best was the ORC table, where the MapR file system handled the compression but the difference was close enough that you could go with either option.

When comparing the response from the Hive data to Impala or Apache Drill, you can start to see where things are going with SQL on Hadoop. Just looking at the query that took the longest in Hive yields significant improvements in my responses by moving to services that do in-memory querying without needing MapReduce. Still, these gains are not free, and for queries where you don’t mind the wait, Hive works well.

10. Closing Thoughts

The journey into Hive and Hadoop has been an eye opener for me, and has helped me re-think how I can deal with data. Using MapR has helped make the process enjoyable. From a data science perspective, I like having the options of running the Hadoop tools and even pulling in legacy scripts that have no knowledge of Hadoop. From a data engineering perspective, I like the read/write file system with standard Linux capabilities. Just being able to edit the data where it sits has saved me a large amount of time. From an IT perspective, I like the ability to run all my Hadoop projects in one cluster. As things grow, the level of effort needed to monitor and maintain the gear stays fairly constant.

As SQL on Hadoop and Hive continues to evolve, it is beneficial to re-look at what you are doing or what you may have done in the past. Being able to look at all the capabilities of Hive as well as all the other SQL possibilities on Hadoop in one cluster just makes life so much simpler as you go from project to production. The point here is not to go with the results that I found with this data set, but to jump into your own data and see where the tables take you.

Hadoop Weekly

Issue #128

5 July 2015

It was a long weekend in the US this week, but there’s still quite a bit of great content. In addition to several technical posts on Spark and Drill, there are some great news articles and several releases. Specifically, the interview with AMPLab’s Michael Franklin and the post on Hadoop ecosystem projects are both highly recommended.

Technical

=======

The MapR blog has an introduction to Spark DataFrames that uses two sample data sets (eBay auctions and the SFPD Crime Incident Reporting system) to illustrate them. The demo shows two strategies for building a DataFrame from a text file—the first using the RDD.toDF() method and the second using the spark-csv library. It also demonstrates how to use explain() to see the physical plan for materializing a DataFrame.

https://www.mapr.com/blog/using-apache-spark-dataframes-processing-tabular-data

This post entitled “The Tragedy of Tez” describes the weakness (which is also the strength) of Tez. Namely, Tez is close to the MapReduce paradigm, which makes it easy to integrate into existing projects but also doesn’t offer as big a leap as other projects like Spark. There’s a lot of background on the project in the post and the comments have some more discussion.

http://bigdatapage.com/tragedy-tez/

This post is a good introduction to Apache HBase. It describes the data model, how HBase scales horizontally, and more. There are useful visualizations to help understand how HBase compares to a RDBMS and how data is stored in HBase across rows, column families, and columns.

https://www.mapr.com/blog/hbase-and-mapr-db-designed-distribution-scale-and-speed

The Duchess France blog has an introduction to and collection of exercises for getting started with Apache Spark. The exercises are available both in Scala and Java 8, and they cover the core Spark API, Spark streaming (consuming the Twitter firehose), and the Spark DataFrame API.

Spark入门的教程,提供了示例源代码和讲解,如果以后给别人讲spark入门,是不错的参考。

http://www.duchess-france.org/starting-with-spark-in-practice/

The MapR blog also has two tutorials for Apache Drill. The first demonstrates how to use Drill to process delimited data in a file on disk, convert it to Parquet, and query it using the embedded Drill engine. The second looks at how to access data via Drill’s ODBC drivers from Python, R, and Perl.

Apache Drill是MapR公司开源的SQL on Hadoop产品。

https://www.mapr.com/blog/drilling-healthy-choices

https://www.mapr.com/blog/using-drill-programmatically-python-r-and-perl

The JW Player blog has a post on their data platform, and how they’ve overcome massive skew in their main MapReduce processing. JW Player collects data via nginx, loads it into Kafka, re-encodes data as Avro, aggregates using MapReduce, and writes output to archival storage. The aggregation step collects statistics at many different layers, such as geo/video/device, and the makeup of high-level networks resulted in major skew. The post describes their current solution for this problem.

JW公司在使用hadoop进行数据分析过程中,解决数据倾斜(skew)的方法。开始采用了开起来很高大上的算法、数据采样探测、自动化干预等方法,但后来还是随机partition更靠谱。

http://www.jwplayer.com/blog/jw-at-scale/

The Cloudera blog has a post with several practical suggestions for deploying Kafka. It covers topics like SSDs, encryption, the role of Zookeeper for storing offsets, cross-data center replication, suggested configurations for compression/brokers/consumers, and more.

http://blog.cloudera.com/blog/2015/07/deploying-apache-kafka-a-practical-faq/

Version 4.0 of Apache Phoenix added a phoenix-spark module for reading and writing data stored in Phoenix via Spark. This walkthrough shows an example of using the functionality by computing PageRank using Spark’s GraphX on data in a Phoenix table (and writing back the results).

https://blogs.apache.org/phoenix/entry/spark_integration_in_apache_phoenix

The Amazon Web Services blog has a guest post describing how Expedia integrated AWS Lambda, DynamoDB, EMR, and S3 for data processing. As data arrives in S3, a Lambda routine updates state in DynamoDB and potentially triggers EMR jobs. There’s a good overview of the Lambda javascript routine and how to deploy the project. This seems like a pretty compelling and interesting pattern for folks that are in AWS.

http://blogs.aws.amazon.com/bigdata/post/Tx1R28PXR3NAO1I/How-Expedia-Implemented-Near-Real-time-Analysis-of-Interdependent-Datasets

One of the novel capabilities of MapReduce is the flexibility to crunch over many types since input is stored in arbitrary files. A traditional RDBMS, though, requires table definitions before any data is loaded. There are advantages to both strategies—schema on read and schema on write—and this post describes and contrasts both in detail.

http://adaptivesystemsinc.com/blog/to-schema-on-read-or-to-schema-on-write-that-is-the-hadoop-data-lake-question/

News

====

The IBM Big Data blog has a recap of a recent #SparkInsight CrowdChat. The post contains popular answers from questions relating to what Spark is (and what the important parts are) to how mature Spark is and how it’s likely to evolve.

http://www.ibmbigdatahub.com/blog/answers-more-your-burning-questions-about-spark

S C A L E has a two-part interview with AMPLab co-creator and UC Berkeley professor Michael Franklin. In the article, Michael discusses the origins of AMPLab, what has positioned it to spin off successful projects and companies, why he thinks Spark has taken off so well, plans for making machine learning easy to use, how database architecture has changed in recent years, the importance of SQL, and much more.

https://medium.com/s-c-a-l-e/amplab-s-co-creator-on-where-big-data-is-headed-and-why-spark-is-so-big-f0c0da2f7c0f

https://medium.com/s-c-a-l-e/database-guru-on-why-nosql-matters-and-sql-still-matters-c64239fe84fd

The Hortonworks Gallery is a collection of Ambari views/extensions, big data tutorials, and sample big data applications. The gallery is open-source and powered by github so users are encouraged to contribute via pull requests. Some initial entries include an Ambari extension for deploying OpenTSDB, a tutorial for Apache Spark, and a real-time data ingestion sample application.

一堆开源大数据小工具的源代码,包括一个spark的小教程,及hadoop生态的教程。

http://hortonworks.com/blog/announcing-hortonworks-gallery/

Gartner is trying to help pin down the definition of Hadoop (which is something that I struggle with a lot for content in this newsletter). To that end, this post describes the expansion of the number of projects in the ecosystem, and it contains a matrix of which 39(!) projects/products are supported by six of the major vendors.

总结了hadoop生态圈的现状,包括几个大公司的使用概况。

http://blogs.gartner.com/merv-adrian/2015/07/02/now-what-is-hadoop/

The agenda for the upcoming Strata + Hadoop World NYC, which takes place from September 29-Oct 1, has been posted. There are three days of training and two days of keynotes and sessions.

http://strataconf.com/big-data-conference-ny-2015/public/schedule/grid/public/2015-09-30

Releases

=======

Apache Hive 1.2.1 was released this week. The new version contains a number of bug fixes and some performance improvements.

http://mail-archives.us.apache.org/mod_mbox/www-announce/201506.mbox/%3CCAPNSvC6_zcOGjB1tzHCwuTMEkf+-vpk6BKf-LqcB0=mM3BUwYQ@mail.gmail.com%3E

Corc is a new open-source project from Hotels.com for reading and writing data in the Apache ORC file format from within Cascading. The implementation supports all ORC types, optimizations like column projection and predicate pushdown, and can read data from Hive’s ACID datasets.

http://hortonworks.com/blog/hotels-com-announces-corc-1-0-0/

Apache Falcon, the feed management and processing system, released version 0.6.1, which is the first release since Falcon became a top-level project. The Hortonworks blog has a summary of key improvements in the release: the web-based user interface (build feeds with a UI rather than XML), Hive replication while preserving metadata (like views and annotations), and a new UI for Hive/HDFS replication.

http://hortonworks.com/blog/announcing-apache-falcon-0-6-1/

Apache Tajo, the data warehouse system for Hadoop (and more), released version 0.10.1 this week. The release includes a number of bug fixes and improvements.

https://blogs.apache.org/tajo/entry/apache_tajo_0_10_1

Apache Accumulo 1.5.3, the latest bug-fix release for the 1.5.x branch, was announced this week. Key changes include disabling of SSLv3 to secure against POODLE and several stability-related bug fixes.

https://accumulo.apache.org/release_notes/1.5.3.html

Cloudera Enterprise 5.4.3 was released, with fixes for YARN rolling upgrades, a potential data loss bug, and a speedup for NameNode startup. The release also includes fixes to Cloudera Manager and Navigator.

http://community.cloudera.com/t5/Release-Announcements/Announcing-Cloudera-Enterprise-5-4-3/m-p/29218#U29218

GridGain announced GridGain Enterprise Edition v7.1 and GridGain Community Edition v1.1. The in-memory compute framework is powered by Apache Ignite (incubating), and the new version adds several new features. These include a mechanism to share state in-memory across Spark jobs, an integration with Mesos and YARN, and an integration with Apache Zeppelin (incubating).

http://www.datanami.com/this-just-in/latest-releases-of-gridgain-enterprise-edition-and-community-edition-announced/

Events

=====

Curated by Datadog ( http://www.datadoghq.com )

UNITED STATES

California

Spark Trends and Spark Analytics (Los Angeles) – Wednesday, July 8

http://www.meetup.com/Big-Data-Developers-in-Los-Angeles/events/223443493/

Semantic Indexing of 4 Million Documents with Apache Spark (San Francisco) – Thursday, July 9

http://www.meetup.com/SF-Spark-and-Friends/events/222858278/

SF / East Bay Area Stream Processing Meetup (Emeryville) – Thursday, July 9

http://www.meetup.com/Bay-Area-Stream-Processing/events/222940260/

DataFrame: Spark’s New Abstraction for Data Science, by Reynold Xin of Databricks (Redondo Beach) – Thursday, July 9

http://www.meetup.com/Los-Angeles-Apache-Spark-Users-Group/events/223497971/

Texas

An Introduction to Apache Drill (Addison) – Monday, July 6

http://www.meetup.com/DFW-Data-Science/events/223215366/

Michigan

Big Data Discovery: Leveraging Oracle GraphX DB and Spark! (Grand Rapids) – Wednesday, July 8

http://www.meetup.com/Big-Data-and-Hadoop-Users-Group-of-West-Michigan/events/222542581/

Georgia

A Java Developer’s Companion to the Hadoop World  (Atlanta) – Thursday, July 9

http://www.meetup.com/BigData-Atlanta/events/223589254/

UNITED KINGDOM

Hadoop: Big Data or Big Deal? (London) – Monday, July 6

http://www.meetup.com/London-Business-Analytics-Group/events/222607253/

Distributed Stream Processing (London) – Thursday, July 9

http://www.meetup.com/hadoop-users-group-uk/events/223444014/