Archive for 二月, 2016

虽然Parquet等列式存储号称可提升性能N倍,但在实际中,正如新近发布的Apache Arrow指出的,其本身的序列化、反序列化耗时占比可达70%。Spark 1.6中,针对flat schema进行了一些优化,本文尝试源码级别的解读。

在SqlNewHadoopRDD类中,若开启了spark.sql.parquet.enableUnsafeRowRecordReader、数据源是parquet格式,且tryInitialize成功的话,就会采用UnsafeRowParquetRecordReader替代之前的parquet-hadoop jar提供的ParquetRecordReader类,作为parquet文件解析的入口。

在tryInitialize中,首先调用parent方法,处理了task.side.metadata不同设置时,对footer meta的处理。随后才是检查当前schema是否满足特殊处理的要求:

  • 不可包含非primitive类型 或 repeated类型(例如 struct、array、map)
  • 由于parquet里有type和original type的概念,需要再对original检查,original仅为null、decimal、utf8或date
  • 不支持original type为decimal,且精度超过18的数字
  • 不支持int96 type
  • 不支持schema演进

接下来loadBatch方法就很关键了。

首先调用ParquetFileReader.readNextRowGroup()读入未解压、未序列化的数据,会一次性读入一个row group里涉及到的所有columns,这也是压缩和序列化的单位。

随后,loadBatch进行批量加载。与ParquetRecordReader相比,它最大的区别之一应该在于批量反序列化。可以想象,此时内存中有一个row group的n个columns数据,针对每个column一次性解压缩后,每轮最多反序列化64个values存放到缓存里。

 Scala |  copy code |? 
01
  private boolean loadBatch() throws IOException {
02
    // no more records left
03
    if (rowsReturned >= totalRowCount) { return false; }
04
    checkEndOfRowGroup();
05
 
06
    int num = (int)Math.min(rows.length, totalCountLoadedSoFar − rowsReturned);
07
    rowsReturned += num;
08
 
09
    if (containsVarLenFields) {
10
      for (int i = 0; i < rowWriters.length; ++i) {
11
        rowWriters[i].holder().resetTo(fixedSizeBytes);
12
      }
13
    }
14
 
15
    for (int i = 0; i < columnReaders.length; ++i) {
16
      switch (columnReaders[i].descriptor.getType()) {
17
        case BOOLEAN:
18
          decodeBooleanBatch(i, num);
19
          break;
20
        case INT32:
21
          if (originalTypes[i] == OriginalType.DECIMAL) {
22
            decodeIntAsDecimalBatch(i, num);
23
          } else {
24
            decodeIntBatch(i, num);
25
          }
26
          break;
27
        case INT64:
28
          Preconditions.checkState(originalTypes[i] == null
29
              || originalTypes[i] == OriginalType.DECIMAL,
30
              "Unexpected original type: " + originalTypes[i]);
31
          decodeLongBatch(i, num);
32
          break;
33
        case FLOAT:
34
          decodeFloatBatch(i, num);
35
          break;
36
        case DOUBLE:
37
          decodeDoubleBatch(i, num);
38
          break;
39
        case BINARY:
40
          decodeBinaryBatch(i, num);
41
          break;
42
        case FIXED_LEN_BYTE_ARRAY:
43
          Preconditions.checkState(originalTypes[i] == OriginalType.DECIMAL,
44
              "Unexpected original type: " + originalTypes[i]);
45
          decodeFixedLenArrayAsDecimalBatch(i, num);
46
          break;
47
        case INT96:
48
          throw new IOException("Unsupported " + columnReaders[i].descriptor.getType());
49
      }
50
      numBatched = num;
51
      batchIdx = 0;
52
    }
53
    return true;
54
  }

另一个重要区别是,这里使用unsafeRow,backed by raw memory instead of Java objects。每行包含numFields个tuples,一个tuple对应parquet的一列,每个tuple又由3部分组成:

[null bit set] [values] [variable length portion]

The bit set is used for null tracking and is aligned to 8-byte word boundaries. It stores one bit per field.以bit位记录列的null情况,8字节对齐。

In the `values` region, we store one 8-byte word per field. For fields that hold fixed-length primitive types, such as long, double, or int, we store the value directly in the word. For fields with non-primitive or variable-length values, we store a relative offset (w.r.t. the base address of the row) that points to the beginning of the variable-length field, and length (they are combined into a long).

values部分则采用了经典的变长存储方法。对于基本数字类型,直接存储在value字段里;其他变长类型,则在value里存储地址偏移量,而在第三部分variable length portion里存储真正的数据。

通过这种unsafe的方式,一方面可以减少GC的消耗。另一方面,也减少了java对象wrap的冗余存储,由于parquet flat格式里,有很多small size数据,这方面的优化对应内存用量应该也有比较大的效果。

最后结合UnsafeRow的get方法,看下内存复用程度。由于我不是Java出身,措辞可能不专业,如果有更好的解释方式,请留言。

针对基本数据类型,例如int直接返回变量本身(因为基本类型本身的内存是不可复用的):

 Scala |  copy code |? 
1
  @Override
2
  public int getInt(int ordinal) {
3
    assertIndexIsValid(ordinal);
4
    return Platform.getInt(baseObject, getFieldOffset(ordinal));
5
  }

而针对可修改的binary,则返回内存copy,以免row被后续mapPartitions等处理时修改了:

 Scala |  copy code |? 
01
02
  @Override
03
  public byte[] getBinary(int ordinal) {
04
    if (isNullAt(ordinal)) {
05
      return null;
06
    } else {
07
      final long offsetAndSize = getLong(ordinal);
08
      final int offset = (int) (offsetAndSize >> 32);
09
      final int size = (int) offsetAndSize;
10
      final byte[] bytes = new byte[size];
11
      Platform.copyMemory(
12
        baseObject,
13
        baseOffset + offset,
14
        bytes,
15
        Platform.BYTE_ARRAY_OFFSET,
16
        size
17
      );
18
      return bytes;
19
    }
20
  }

另外,没有看到unsaferow的内存何时释放。

Spark1.6在2016年1月发布,其中包含内存模型的优化,将之前shuffle、storage、other内存静态配置的方式,修改为动态抢占式。其对应设计稿中,包含了多种折衷和思考,对于理解新老版本内存模型都很有好处,故翻译如下。其中部分专有名词维持英文。

英文原文地址:https://issues.apache.org/jira/secure/attachment/12765646/unified-memory-management-spark-10000.pdf

Unified Memory Management in Spark 1.6

本文提出一种新的Spark内存管理模型,打破了老版本中execution和storage内存空间之间不可逾越的边界。Execution可以借用空闲的storage内存,反之亦然。而当内存压力上升时,被借用的内存会被回收。但考虑到实现复杂度,不会回收被借用于exection的内存(即仅会将execution借给storage的内存,回收还给execution)。

概览

Spark的内存可以大体归为两类:execution和storage。前者包括shuffles、joins、sorts和aggregations所需内存,后者包括cache和节点间数据传输所需内存。

在Spark 1.5和之前版本里,两者是静态配置的,不支持借用。这种方式有如下限制:

  • 不存在普适的默认配置
  • 内存配置的调优,需要使用者了解spark内部原理
  • (默认配置下)即使application无需cache,也只能使用很小一部分内存

我们的目标是,通过内存空间的融合,消除以上限制。

最终结果是提供更好的性能,且无须使用者调优就可以获得较优的内存使用。而且,由于无须静态配置内存分配方式,可以使用一个application支撑多种不同负载,而不会导致过多的spill。

老版本的内存管理

老版本Spark采用静态分配的内存空间,即将全部内存划分为3个独立区块,通过Spark配置设定每个区块相对JVM heap的百分比。

Execution:用于缓存shuffle、join、sort和aggregation的临时数据,通过spark.shuffle.memoryFraction(默认0.2)配置。

Storage:主要用于缓存未来可能会访问的数据,但也用于broadcast和发送较大的task结果(缓存和部分网络应用),通过spark.storage.memoryFraction(默认0.6)配置。

Other:剩余内存主要用于用户代码和spark内部meta数据。由于该类型内存不可控(默认0.2),故本文不再予以讨论。

在可控内存区块,一旦达到配置上限,内存中的数据会被spill至磁盘。而storage区块的数据(在ONLY_MEM配置时)甚至会被抛弃。不论哪种情况,都会导致IO或重算,造成性能下降。

内存配置详解

为了降低OOM风险,例如应对超大数据单元,Spark内存管理过于小心了 ,引入了safety配置解决数据倾斜。内存相关配置如下:

spark.shuffle.memoryFraction(default0.2)

spark.shuffle.safetyFraction(default0.8)

spark.storage.memoryFraction(default0.6)

spark.storage.safetyFraction(default0.9)

spark.storage.unrollFraction(default0.2)

即execution内存最多仅占JVM heap的0.2*0.8=16%!对于无需cache数据的应用,大部分heap内存都被浪费了,而(shuffle等)中间数据却被频繁spill到磁盘并读取。unroll相关配置将在后面介绍。

Execution内存管理

execution内存被分配给JVM里的多个task线程。与上面不同的是,task间的execution内存分配是动态的,如果没有其他tasks存在,Spark允许一个task占用所有可用execution内存。

有以下几个内存相关类:

ShuffleMemoryManager负责全局计数和内存调度(policy enforcement)。它是核心仲裁者,根据task当前内存用量决定如何进行分配。一个JVM里仅有一个实例。

TaskMemoryManager负责每个task的内存分配和跟踪。它通过page table追踪on-heap内存块,task退出时它若检查到有page未释放则会抛出异常。它使用ExecutorMemoryManager真正执行分配和释放。每个task一个实例。

ExecutorMemoryManager负责具体实现,它管理on-heap和off-heap内存,并实现了一个weak reference pool支持跨tasks的空闲页复用。一个JVM里仅有一个实例。

这几个类如下进行交互。一旦task需要分配较大内存,它首先向ShuffleMemoryManager申请X bytes。如果申请可以被满足,task会向TaskMemoryManager要求分配X bytes内存。后者更新自己的page table,并且要求ExecutorMemoryManager真正分配内存。

内存分配策略

每个task可以向ShuffleMemoryManager申请1/N的execution内存,N是当前存在的tasks数量(通过cores配置)。如果内存申请无法满足,可能会导致数据spill从而释放一些内存。依赖于不同的操作类型,该task随后可能继续尝试,或要求分配较小一些的内存。

为了避免过多spill,task仅当已分配到1/(2N)内存后才会spill。如果还没达到1/(2N)就内存不足了,那么该task会阻塞等待其他tasks释放内存。否则,如果先启动的task占用过多内存,会导致后启动的tasks持续spill。

例子:某executor先启动一个task A,并在task B启动前快速占用了所有可用内存。(B启动后)N变成2,task B会阻塞直到task A spill,自己可获得1/(2N)=1/4的execution内存。而一旦task B获取到了1/4的内存,A和B就都有可能spill了。

注意:直到A无法获得更多内存,它才会spill。与此同时,由于内存被占用,其他新tasks可能被饿死。可用通过一些强制spill机制予以规避,但不在本文讨论范围。

Storage内存管理

Storage内存通过BlockManager管理,虽然主要用于缓存RDD分片,它也被用于torrent broadcast和将较大的结果数据发送给driver。

Storage level

每个block都设置了一个storage level,用于指定是存储在内存、磁盘还是off-heap。Block也可以设置为存储在内存和磁盘,当内存不足的时候就可以转移至磁盘了。

Storage level也指定是否以序列化的方式进行存储。需要注意MEMORY_AND_DISK_SER,即内存数据也是以序列化方式存储的,故当需要转移至磁盘时无需额外序列化了,这种配置时,内存回收代价较低。

回收(evict)策略

老版本的回收策略基本就是LRU,且仅用于内存存储数据。有两个例外需要注意。首先,永远不会为了保存当前RDD的新blocks而回收其已存在的blocks。其次,如果unrolling失败,那所涉及的新block直接被回收。

Unrolling

如果BlockManager收到需存储在内存中的iterator,需要将其unroll为数组。但由于迭代数据可能较大、无法全部放入内存,BlockManager需要逐步unroll以避免OOM,并间歇性的检查内存是否足够。

Unroll所需内存消耗storage空间。如果当前没有缓存数据,unrolling可占用全部storage空间。否则,unrolling内存上限由spark.storage.unrollFraction(默认0.2)配置。注意,这些sub-region(unrolling、cache等)都不是静态保留,而是随着抛弃现存blocks而冬天变化的。

如果在抛弃现存blocks后unrolling还是失败了,BlockManager会直接把unrolling对应的block存储到磁盘里。

设计方案

下面给出设计方案,并讨论折衷点。这里仅涉及high level内存管理策略。

建议

Execution和storage的内存空间可交叉。如果execution内存不足,可以借用storage的空闲空间,反之亦然。被借用的内存可以随时被回收,但考虑到实现复杂度,第一版设计里,借给execution的内存用于不会被强制回收。将引入以下新配置:

  • spark.memory.fraction(默认0.75):用于execution和storage的堆内存比例。该值越低,越容易发生spill和缓存数据回收。该配置实际上也限定了OTHER内存大小,以及偶发超大record的内存消耗。
  • spark.memory.storageFraction(默认0.5):spark.memory.fraction中storage内存的比例。缓存数据仅在该内存超限时回收。
  • spark.memory.useLegacyMode(默认false):是否使用老版本的内存管理模型。老版本将堆内存分为固定大小的区块,在未调优时可能导致过度spill。以下deprecated内存配置仅该配置打开时生效:

    • spark.storage.memoryFraction
    • spark.storage.safetyFraction
    • spark.storage.unrollFraction
    • spark.shuffle.memoryFraction
    • spark.shuffle.safetyFraction

以下将讨论折衷。

如何处理内存压力

由于execution和storage共享同一块内存空间,在内存压力上升导致竞争时,我们必须明确内存回收规则。有以下三种可能的选择:

  • 优先回收execution内存
  • 优先回收storage内存
  • 不回收内存

内存回收代价

Storage内存的回收依赖于storage level。MEMORY_ONLY的数据回收代价最大,因为后续的访问将导致重算。MEMORY_AND_DISK_SER代价最小,因为内存回收过程不涉及序列化,仅增加了I/O。

Execution内存的回收场景比较单一,所有被回收的数据将被spill到磁盘,不涉及重算。而且随着最近新增的unsafe功能,execution数据大多以紧凑格式存储,序列化成本也较低。

但有一点需要注意的是,被spill到磁盘的数据必须被回读,所以虽然没有重算,但execution内存回收仍有一定成本。

实现复杂度

Storage回收比较简单,基于现有机制就可将数据落地磁盘。但execution回收较为复杂,有以下方式:

  • 为所有execution内存块注册一个spill callback
  • 修改回读(poll)和spill以实现联动

每种方式,我们都需要关注是否有一个页存储了cache数据(我的理解是,都需要跟踪存储位置)。而且,一些操作还假设至少有一个page内存可用,如果我们强制spill execution内存,就需要小心、以免回收掉保留内存,而把这些操作饿死。

另外,当回收execution内存时,还需要解决如何处理即将被cache的blocks的问题。最简单的方式是等待直到有足够的内存被释放了,但这可能会导致死锁,尤其是待缓存块还依赖内存中exection数据的时候(例如,persist shuffled数据的时候)。一种替代方案是,将待缓存块直接落地磁盘,随后一旦有空闲内存再读取出来。为了避免落地所有待缓存块,可预留少量(例如5%的堆)内存用于存储一部分待缓存块。

但考虑到复杂度,第一版中暂不实现。如果今后证明该功能确实重要,再考虑升级。

选型

最终选择回收缓存数据。由于设计目的是期望提供更多的execution内存,而强制回收execution内存于事无补。另外该选择也更容易实现。

但如果允许所有回收缓存数据,那对于依赖缓存的applications也会有显著影响。所以,我们也需要为缓存块保留一部分内存。

最小保留(Minimum reservations)

不论是回收缓存块还是spill execution内存,都需要允许使用者保留一部分内存以免任一方被饿死(注:因为回收和spill都会释放出空闲的、可被借用的内存)。有动态和静态两者设置方法。老版本为execution和storage都提供了静态保留方式,不足之处仍然是割裂。

这次新的设计里提供动态最小预留内存的方式。即storage可以借用execution内存,但一旦后者需要就会立即被回收并归还。反之亦然,但有一个例外,如果application尝试缓存数据块,但所有内存都用于execution了,那我们不会回收execution内存,而是直接将待缓存数据块落地磁盘。

另外,我们还需为OTHER用途设置内存,老版本默认是20%的堆空间,且在application执行期不变。本次设计不予以改变,但配置方式改为通过spark.memory.fraction(默认0.75),即剩余空间用于OTHER。

其他设计方式

以上分析了相关技术折衷点,这里再对比其他一些方面。

在所有新设计里(A-C),execution和storage内存之和都由spark.memory.fraction限制。另外,为了后向兼容性,使用者可开启spark.memory.useLegacyMode以使用老版本。

(X)Existing behavior。将execution和storage内存分别限制在独立空间,不允许交叉。各自内存空间大小是静态配置的。用于Spark1.5及之前版本。

(A)Evict cached blocks,full fluidity。Execution和storage共享统一的内存空间。当内存压力大时,回收缓存数据。仅当抢占storage后依然内存不足时,execution才会spill。

(B)Evict cached blocks,static storage reservation。与A类似,但为storage保留一定内存不可被借用,缓存块仅当超出该空间时才会被回收和借用。通过spark.memory.storageFraction(默认0.0)进行配置,执行期不可变。

(C)Evict cached blocks,dynamic storage reservation。与B类似,但storage的保留内存也是动态可变的。区别是当内存空闲时,execution可以最大限量的借用。这也是我们选择的方案。

未选择A的原因是,它没有解决多租赁(multi-tenancy)和严重依赖缓存的application问题。所以提出B,添加了保留内存。

B的问题是,保留内存在某些场景下仍然需要使用者配置。在共享环境里(注:一个application支撑多处理请求时)默认值0并不奏效;通过较大的shuffle,一个使用者可能回收另一个使用者的缓存数据。另外,不论设置什么非0的配置,都会导致内存块割裂。例如,为了避免共享环境下潜在的性能倒退,可能设置该值为0.6(旧版本默认storage比例),从而导致execution内存仅占0.4*0.75 = 0.3的堆空间,并不比之前好,尤其是根本无需缓存数据时。

C没有设立强制的内存隔离。当storage空间有空闲时,execution可以借用。唯一的问题是,当storage需要缓存数据,但内存都被execution占用时怎么办?初始版本直接将待缓存数据落地磁盘。

C也是唯一满足如下要求的:

  • Storage内存没有上限(X不满足)
  • Execution内存没有上限(X和B不满足)
  • 可保证最小storage空间(A不满足)

所以我们选择了方案C

(注,后面是实现层面,看代码更合适,故不翻译了)

========================

最后总结下,在新版本且legacy=false时,有如下内存借用规则:

  • execution可借用所有空闲的storage内存
  • execution也可以收回storage向自己借用的内存
  • 但storage已使用的、属于storage的内存,不可被借用
  • storage可借用execution空闲内存
  • 但storage无法收回已经被execution借用的内存

故可以看到规则整体倾向于execution,尽量保证shuffle内存充沛。

另外,该模型不涉及内存实际的分配与释放,仍是由JVM保证的。它做的是内存计数器。

Dynamo使用一致性hash来实现partition and replication,从而达到高扩展和高可用。在实现上,对经典一致性hash进行了一些优化,本文尝试予以解释。

partitioning algorithm

One of the key design requirements for Dynamo is that it must scale incrementally. This requires a mechanism to dynamically partition the data over the set of nodes (i.e., storage hosts) in the system. Dynamo’s partitioning scheme relies on consistent hashing to distribute the load across multiple storage hosts. In consistent hashing [10], the output range of a hash function is treated as a fixed circular space or “ring” (i.e. the largest hash value wraps around to the smallest hash value). Each node in the system is assigned a random value within this space which represents its “position” on the ring. Each data item identified by a key is assigned to a node by hashing the data item’s key to yield its position on the ring, and then walking the ring clockwise to find the first node with a position larger than the item’s position.

假设我们认为hash ring的取值范围是pow(2, 32),那么经典一致性hash算法采用随机的方式计算position,例如直接random(0, pow(2,32)-1),将存储节点映射到ring上,两个存储节点的position就圈出了一个region。在读写item时,也采用任意hash函数计算范围是pow(2,32)的position,看它落在哪个region里以决定由哪个存储节点负责。

Thus, each node becomes responsible for the region in the ring between it and its predecessor node on the ring. The principle advantage of consistent hashing is that departure or arrival of a node only affects its immediate neighbors and other nodes remain unaffected.

采用经典一致性hash的好处是,当有存储节点变化时,影响较为平滑,仅邻居节点需要向新加入的节点迁移数据。

The basic consistent hashing algorithm presents some challenges. First, the random position assignment of each node on the ring leads to non-uniform data and load distribution. Second, the basic algorithm is oblivious to the heterogeneity in the performance of nodes. To address these issues, Dynamo uses a variant of consistent hashing (similar to the one used in [10, 20]): instead of mapping a node to a single point in the circle, each node gets assigned to multiple points in the ring. To this end, Dynamo uses the concept of “virtual nodes”. A virtual node looks like a single node in the system, but each node can be responsible for more than one virtual node. Effectively, when a new node is added to the system, it is assigned multiple positions (henceforth, “tokens”) in the ring. The process of fine-tuning Dynamo’s partitioning scheme is discussed in Section 6.

但是经典方法也有一些弊端。随机计算的节点position会导致不均衡,而且也没有考虑节点间的异构性。Dynamo引入virtual nodes的概念予以解决。

Replication

To achieve high availability and durability, Dynamo replicates its data on multiple hosts. Each data item is replicated at N hosts, where N is a parameter configured “per-instance”. Each key, k, is assigned to a coordinator node (described in the previous section). The coordinator is in charge of the replication of the data items that fall within its range. In addition to locally storing each key within its range, the coordinator replicates these keys at the N-1 clockwise successor nodes in the ring. This results in a system where each node is responsible for the region of the ring between it and its Nth predecessor. In Figure 2, node B replicates the key k at nodes C and D in addition to storing it locally. Node D will store the keys that fall in the ranges (A, B], (B, C], and (C, D].

为了获得高可用,Dynamo采用多副本的方式,将数据copy到ring上连续的N个节点。

The list of nodes that is responsible for storing a particular key is called the preference list. The system is designed, as will be explained in Section 4.8, so that every node in the system can determine which nodes should be in this list for any particular key. To account for node failures, preference list contains more than N nodes. Note that with the use of virtual nodes, it is possible that the first N successor positions for a particular key may be owned by less than N distinct physical nodes (i.e. a node may hold more than one of the first N positions). To address this, the preference list for a key is constructed by skipping positions in the ring to ensure that the list contains only distinct physical nodes.

regions与存储节点间的一对多映射关系被称为preference list,这个list需要在Dynamo节点间传播、共享。并保证仅存储distinct物理节点。

更进一步的优化

Dynamo uses consistent hashing to partition its key space across its replicas and to ensure uniform load distribution. A uniform key distribution can help us achieve uniform load distribution assuming the access distribution of keys is not highly skewed. In particular, Dynamo’s design assumes that even where there is a significant skew in the access distribution there are enough keys in the popular end of the distribution so that the load of handling popular keys can be spread across the nodes uniformly through partitioning. This section discusses the load imbalance seen in Dynamo and the impact of different partitioning strategies on load distribution.

关于数据倾斜,这里有几个假设的前提。首先,在keys的流量分布不是严重倾斜时,均匀的keys分布可以获得较为均衡的负载。其次,Dynamo认为当popular keys足够多时,即使流量不均衡,只要把popular keys分散开,也可以获得均衡的负载。但这反过来也就意味着,当流量不太大、且仅集中于少量popular keys的时候,可能有些节点会过载。Dynamo的分片策略也在演进如下。(另外,这里有一个隐含的概念,即负载均衡,需要数据分布均衡、流量分布均衡)

Strategy 1: T random tokens per node and partition by token value: This was the initial strategy deployed in production (and described in Section 4.2). In this scheme, each node is assigned T tokens (chosen uniformly at random from the hash space). The tokens of all nodes are ordered according to their values in the hash space. Every two consecutive tokens define a range. The last token and the first token form a range that “wraps” around from the highest value to the lowest value in the hash space. Because the tokens are chosen randomly, the ranges vary in size. As nodes join and leave the system, the token set changes and consequently the ranges change. Note that the space needed to maintain the membership at each node increases linearly with the number of nodes in the system.

策略1是最原始的方式,每个物理节点虚出T个virtual nodes,每个virtual node随机分配一个token。tokens圈出的range决定其存储数据的范围。由于采用随机的方式,故需要一个完整的映射表,才可以获悉整个Dynamo集群的存储映射关系,而该表又随着节点数的线性增长。

While using this strategy, the following problems were encountered. First, when a new node joins the system, it needs to “steal” its key ranges from other nodes. However, the nodes handing the key ranges off to the new node have to scan their local persistence store to retrieve the appropriate set of data items. Note that performing such a scan operation on a production node is tricky as scans are highly resource intensive operations and they need to be executed in the background without affecting the customer performance. This requires us to run the bootstrapping task at the lowest priority. However, this significantly slows the bootstrapping process and during busy shopping season, when the nodes are handling millions of requests a day, the bootstrapping has taken almost a day to complete. Second, when a node joins/leaves the system, the key ranges handled by many nodes change and the Merkle trees for the new ranges need to be recalculated, which is a non-trivial operation to perform on a production system. Finally, there was no easy way to take a snapshot of the entire key space due to the randomness in key ranges, and this made the process of archival complicated. In this scheme, archiving the entire key space requires us to retrieve the keys from each node separately, which is highly inefficient.

这种方式仍存在弊端。首先,新节点的加入,需要从一些old节点迁移数据,由于数据不是连续存储的,扫描较为耗时,且需后台执行。其次,由于range变化导致数据变化了,所涉及的merkle trees需要重算。最后,snapshot也不好做。(这里由于对Dynamo底层存储细节不太了解,无法细致推导)

The fundamental issue with this strategy is that the schemes for data partitioning and data placement are intertwined. For instance, in some cases, it is preferred to add more nodes to the system in order to handle an increase in request load. However, in this scenario, it is not possible to add nodes without affecting data partitioning. Ideally, it is desirable to use independent schemes for partitioning and placement. To this end, following strategies were evaluated:

根本原因是数据分片与数据分布耦合在一起了!

Strategy 2: T random tokens per node and equal sized partitions: In this strategy, the hash space is divided into Q equally sized partitions/ranges and each node is assigned T random tokens. Q is usually set such that Q >> N and Q >> S*T, where S is the number of nodes in the system. In this strategy, the tokens are only used to build the function that maps values in the hash space to the ordered lists of nodes and not to decide the partitioning. A partition is placed on the first N unique nodes that are encountered while walking the consistent hashing ring clockwise from the end of the partition. Figure 7 illustrates this strategy for N=3. In this example, nodes A, B, C are encountered while walking the ring from the end of the partition that contains key k1. The primary advantages of this strategy are: (i) decoupling of partitioning and partition placement, and (ii) enabling the possibility of changing the placement scheme at runtime.

策略2将data partition和location的概念分开,partition是数据分片,而location是由token圈定的范围决定的。另外partition是等分的,一般取值非常大。在读写item时,hash出posittion,看它落在哪几个tokens圈定的范围里。这时partition的概念感觉意义还不大,从论文里也可以看到,策略2仅是一个过渡方案。真正重要的是策略3。

Strategy 3: Q/S tokens per node, equal-sized partitions: Similar to strategy 2, this strategy divides the hash space into Q equally sized partitions and the placement of partition is decoupled from the partitioning scheme. Moreover, each node is assigned Q/S tokens where S is the number of nodes in the system. When a node leaves the system, its tokens are randomly distributed to the remaining nodes such that these properties are preserved. Similarly, when a node joins the system it “steals” tokens from nodes in the system in a way that preserves these properties.

策略3也是将hash space等分为取值较大的Q个partitions,而且partition与存储位置解耦(映射关系,而不是相等关系)。同时,每个存储节点分配Q/S个tokens(即形成Q/S个virtual nodes)。所以virtual node的存储范围其实与partition大小是一致的了,但由于物理节点与virtual节点间是1对多关系,故可以灵活的以virtual node为单位进行数据迁移。当有存储节点离开时,它的tokens随机分布到其他节点上;有节点加入时,不是随机分配新tokens了,而是从现有tokens里随机获取所需。

另外,由于tokens总量都不变,故映射表也恒定。猜测:在put/get item时,需要通过item hash出的position算出partition,再由partition映射到token(由于两者都是等分且数量一致,可以直接=或算出)、token映射到node。有节点增删时,需要all token list,从中随机获取tokens后,根据token到node的映射关系,迁移数据,并修改token到node的映射关系,这时partition到token的关系是不需要变化的。

Hadoop生态主要是基于GitHub和jira构建开源社区,今年希望可以参与进去,记录使用方法如下。

由于每个开源项目的要求都可能不同,所以在开始之前,必须先阅读其Contribute页面,一般从README里就可以找到链接。

JIRA操作

当有了代码修改想法时,不论是bugfix,还是功能改进,都可以到项目JIRA页面上,提交一个issue,用英文大致描述想要做的事情。这里需要注意两点:

  1. 提交issue前,先用英文关键词search一下,确认该功能没有实现、且没有其他人提交类似issue
  2. issue粒度最好足够细,一个独立的小功能就好,类似我们敏捷中的task卡片。

如果想自己动手提交代码,可以将该issue assign给自己。真正动手时,可以将issue状态修改为 in progress。

GitHub 操作

在正式coding前,得在github上找到该项目,点击fork按钮fork出一个自己的分支,这样后续在这个分支上的工作只要不被管理员merge回去,就不会有任何影响。

git操作

这时可以建立本地代码仓库了:

git clone https://<path-to-your-repo> <your-local-prj-name> 将远端项目拉取到本地

cd <your-local-prj-name>

git checkout -b <new-branch-name> 建立一个分支,该issue相关的功能都会在这个分支里进行(git与svn不同,git的分支很轻量级,可以认为是功能隔离的单位)

git push origin <new-branch-name>提交新分支,这时github上自己的project下这个分支可见、但为空。

这时可以在新分支上编码了,完成并通过自测后,可以先提交到分支上

git add <new-files>

git commit -m ‘comments’

git push origin <new-branch-name> 这时代码在新分支上可见,但放心,不会影响到社区版本

如果编码持续了一段时间,可能需要从社区版本更新代码下来

git remote add upstream https://<path-to-public-repo> 添加社区版本为upstream源

git fetch upstream 下载社区版本的更新到本地隐藏目录

git checkout master 切换到master分支

git merge upstream/master 合并代码到本地master分支

git push origin master 将合并的结果提交到自己远端的master分支

GitHub操作

这时需要让别人看到自己的代码了,在github自己的project页面上,点击 pull request发起请求。在收到别人回复时,可以进行交流、重复代码修改过程。

JIRA操作

这时还需要将pull request与JIRA关联起来,点击issue页面More/Link,添加一个Web Link,将pull reqeust的uri填进去,link text可以写PR #xxx。

最后,当pull request完成,即被merge回社区或被彻底拒绝后,可以在git里删除分支,并且关闭issue。