Spark1.6在2016年1月发布,其中包含内存模型的优化,将之前shuffle、storage、other内存静态配置的方式,修改为动态抢占式。其对应设计稿中,包含了多种折衷和思考,对于理解新老版本内存模型都很有好处,故翻译如下。其中部分专有名词维持英文。

英文原文地址:https://issues.apache.org/jira/secure/attachment/12765646/unified-memory-management-spark-10000.pdf

Unified Memory Management in Spark 1.6

本文提出一种新的Spark内存管理模型,打破了老版本中execution和storage内存空间之间不可逾越的边界。Execution可以借用空闲的storage内存,反之亦然。而当内存压力上升时,被借用的内存会被回收。但考虑到实现复杂度,不会回收被借用于exection的内存(即仅会将execution借给storage的内存,回收还给execution)。

概览

Spark的内存可以大体归为两类:execution和storage。前者包括shuffles、joins、sorts和aggregations所需内存,后者包括cache和节点间数据传输所需内存。

在Spark 1.5和之前版本里,两者是静态配置的,不支持借用。这种方式有如下限制:

  • 不存在普适的默认配置
  • 内存配置的调优,需要使用者了解spark内部原理
  • (默认配置下)即使application无需cache,也只能使用很小一部分内存

我们的目标是,通过内存空间的融合,消除以上限制。

最终结果是提供更好的性能,且无须使用者调优就可以获得较优的内存使用。而且,由于无须静态配置内存分配方式,可以使用一个application支撑多种不同负载,而不会导致过多的spill。

老版本的内存管理

老版本Spark采用静态分配的内存空间,即将全部内存划分为3个独立区块,通过Spark配置设定每个区块相对JVM heap的百分比。

Execution:用于缓存shuffle、join、sort和aggregation的临时数据,通过spark.shuffle.memoryFraction(默认0.2)配置。

Storage:主要用于缓存未来可能会访问的数据,但也用于broadcast和发送较大的task结果(缓存和部分网络应用),通过spark.storage.memoryFraction(默认0.6)配置。

Other:剩余内存主要用于用户代码和spark内部meta数据。由于该类型内存不可控(默认0.2),故本文不再予以讨论。

在可控内存区块,一旦达到配置上限,内存中的数据会被spill至磁盘。而storage区块的数据(在ONLY_MEM配置时)甚至会被抛弃。不论哪种情况,都会导致IO或重算,造成性能下降。

内存配置详解

为了降低OOM风险,例如应对超大数据单元,Spark内存管理过于小心了 ,引入了safety配置解决数据倾斜。内存相关配置如下:

spark.shuffle.memoryFraction(default0.2)

spark.shuffle.safetyFraction(default0.8)

spark.storage.memoryFraction(default0.6)

spark.storage.safetyFraction(default0.9)

spark.storage.unrollFraction(default0.2)

即execution内存最多仅占JVM heap的0.2*0.8=16%!对于无需cache数据的应用,大部分heap内存都被浪费了,而(shuffle等)中间数据却被频繁spill到磁盘并读取。unroll相关配置将在后面介绍。

Execution内存管理

execution内存被分配给JVM里的多个task线程。与上面不同的是,task间的execution内存分配是动态的,如果没有其他tasks存在,Spark允许一个task占用所有可用execution内存。

有以下几个内存相关类:

ShuffleMemoryManager负责全局计数和内存调度(policy enforcement)。它是核心仲裁者,根据task当前内存用量决定如何进行分配。一个JVM里仅有一个实例。

TaskMemoryManager负责每个task的内存分配和跟踪。它通过page table追踪on-heap内存块,task退出时它若检查到有page未释放则会抛出异常。它使用ExecutorMemoryManager真正执行分配和释放。每个task一个实例。

ExecutorMemoryManager负责具体实现,它管理on-heap和off-heap内存,并实现了一个weak reference pool支持跨tasks的空闲页复用。一个JVM里仅有一个实例。

这几个类如下进行交互。一旦task需要分配较大内存,它首先向ShuffleMemoryManager申请X bytes。如果申请可以被满足,task会向TaskMemoryManager要求分配X bytes内存。后者更新自己的page table,并且要求ExecutorMemoryManager真正分配内存。

内存分配策略

每个task可以向ShuffleMemoryManager申请1/N的execution内存,N是当前存在的tasks数量(通过cores配置)。如果内存申请无法满足,可能会导致数据spill从而释放一些内存。依赖于不同的操作类型,该task随后可能继续尝试,或要求分配较小一些的内存。

为了避免过多spill,task仅当已分配到1/(2N)内存后才会spill。如果还没达到1/(2N)就内存不足了,那么该task会阻塞等待其他tasks释放内存。否则,如果先启动的task占用过多内存,会导致后启动的tasks持续spill。

例子:某executor先启动一个task A,并在task B启动前快速占用了所有可用内存。(B启动后)N变成2,task B会阻塞直到task A spill,自己可获得1/(2N)=1/4的execution内存。而一旦task B获取到了1/4的内存,A和B就都有可能spill了。

注意:直到A无法获得更多内存,它才会spill。与此同时,由于内存被占用,其他新tasks可能被饿死。可用通过一些强制spill机制予以规避,但不在本文讨论范围。

Storage内存管理

Storage内存通过BlockManager管理,虽然主要用于缓存RDD分片,它也被用于torrent broadcast和将较大的结果数据发送给driver。

Storage level

每个block都设置了一个storage level,用于指定是存储在内存、磁盘还是off-heap。Block也可以设置为存储在内存和磁盘,当内存不足的时候就可以转移至磁盘了。

Storage level也指定是否以序列化的方式进行存储。需要注意MEMORY_AND_DISK_SER,即内存数据也是以序列化方式存储的,故当需要转移至磁盘时无需额外序列化了,这种配置时,内存回收代价较低。

回收(evict)策略

老版本的回收策略基本就是LRU,且仅用于内存存储数据。有两个例外需要注意。首先,永远不会为了保存当前RDD的新blocks而回收其已存在的blocks。其次,如果unrolling失败,那所涉及的新block直接被回收。

Unrolling

如果BlockManager收到需存储在内存中的iterator,需要将其unroll为数组。但由于迭代数据可能较大、无法全部放入内存,BlockManager需要逐步unroll以避免OOM,并间歇性的检查内存是否足够。

Unroll所需内存消耗storage空间。如果当前没有缓存数据,unrolling可占用全部storage空间。否则,unrolling内存上限由spark.storage.unrollFraction(默认0.2)配置。注意,这些sub-region(unrolling、cache等)都不是静态保留,而是随着抛弃现存blocks而冬天变化的。

如果在抛弃现存blocks后unrolling还是失败了,BlockManager会直接把unrolling对应的block存储到磁盘里。

设计方案

下面给出设计方案,并讨论折衷点。这里仅涉及high level内存管理策略。

建议

Execution和storage的内存空间可交叉。如果execution内存不足,可以借用storage的空闲空间,反之亦然。被借用的内存可以随时被回收,但考虑到实现复杂度,第一版设计里,借给execution的内存用于不会被强制回收。将引入以下新配置:

  • spark.memory.fraction(默认0.75):用于execution和storage的堆内存比例。该值越低,越容易发生spill和缓存数据回收。该配置实际上也限定了OTHER内存大小,以及偶发超大record的内存消耗。
  • spark.memory.storageFraction(默认0.5):spark.memory.fraction中storage内存的比例。缓存数据仅在该内存超限时回收。
  • spark.memory.useLegacyMode(默认false):是否使用老版本的内存管理模型。老版本将堆内存分为固定大小的区块,在未调优时可能导致过度spill。以下deprecated内存配置仅该配置打开时生效:

    • spark.storage.memoryFraction
    • spark.storage.safetyFraction
    • spark.storage.unrollFraction
    • spark.shuffle.memoryFraction
    • spark.shuffle.safetyFraction

以下将讨论折衷。

如何处理内存压力

由于execution和storage共享同一块内存空间,在内存压力上升导致竞争时,我们必须明确内存回收规则。有以下三种可能的选择:

  • 优先回收execution内存
  • 优先回收storage内存
  • 不回收内存

内存回收代价

Storage内存的回收依赖于storage level。MEMORY_ONLY的数据回收代价最大,因为后续的访问将导致重算。MEMORY_AND_DISK_SER代价最小,因为内存回收过程不涉及序列化,仅增加了I/O。

Execution内存的回收场景比较单一,所有被回收的数据将被spill到磁盘,不涉及重算。而且随着最近新增的unsafe功能,execution数据大多以紧凑格式存储,序列化成本也较低。

但有一点需要注意的是,被spill到磁盘的数据必须被回读,所以虽然没有重算,但execution内存回收仍有一定成本。

实现复杂度

Storage回收比较简单,基于现有机制就可将数据落地磁盘。但execution回收较为复杂,有以下方式:

  • 为所有execution内存块注册一个spill callback
  • 修改回读(poll)和spill以实现联动

每种方式,我们都需要关注是否有一个页存储了cache数据(我的理解是,都需要跟踪存储位置)。而且,一些操作还假设至少有一个page内存可用,如果我们强制spill execution内存,就需要小心、以免回收掉保留内存,而把这些操作饿死。

另外,当回收execution内存时,还需要解决如何处理即将被cache的blocks的问题。最简单的方式是等待直到有足够的内存被释放了,但这可能会导致死锁,尤其是待缓存块还依赖内存中exection数据的时候(例如,persist shuffled数据的时候)。一种替代方案是,将待缓存块直接落地磁盘,随后一旦有空闲内存再读取出来。为了避免落地所有待缓存块,可预留少量(例如5%的堆)内存用于存储一部分待缓存块。

但考虑到复杂度,第一版中暂不实现。如果今后证明该功能确实重要,再考虑升级。

选型

最终选择回收缓存数据。由于设计目的是期望提供更多的execution内存,而强制回收execution内存于事无补。另外该选择也更容易实现。

但如果允许所有回收缓存数据,那对于依赖缓存的applications也会有显著影响。所以,我们也需要为缓存块保留一部分内存。

最小保留(Minimum reservations)

不论是回收缓存块还是spill execution内存,都需要允许使用者保留一部分内存以免任一方被饿死(注:因为回收和spill都会释放出空闲的、可被借用的内存)。有动态和静态两者设置方法。老版本为execution和storage都提供了静态保留方式,不足之处仍然是割裂。

这次新的设计里提供动态最小预留内存的方式。即storage可以借用execution内存,但一旦后者需要就会立即被回收并归还。反之亦然,但有一个例外,如果application尝试缓存数据块,但所有内存都用于execution了,那我们不会回收execution内存,而是直接将待缓存数据块落地磁盘。

另外,我们还需为OTHER用途设置内存,老版本默认是20%的堆空间,且在application执行期不变。本次设计不予以改变,但配置方式改为通过spark.memory.fraction(默认0.75),即剩余空间用于OTHER。

其他设计方式

以上分析了相关技术折衷点,这里再对比其他一些方面。

在所有新设计里(A-C),execution和storage内存之和都由spark.memory.fraction限制。另外,为了后向兼容性,使用者可开启spark.memory.useLegacyMode以使用老版本。

(X)Existing behavior。将execution和storage内存分别限制在独立空间,不允许交叉。各自内存空间大小是静态配置的。用于Spark1.5及之前版本。

(A)Evict cached blocks,full fluidity。Execution和storage共享统一的内存空间。当内存压力大时,回收缓存数据。仅当抢占storage后依然内存不足时,execution才会spill。

(B)Evict cached blocks,static storage reservation。与A类似,但为storage保留一定内存不可被借用,缓存块仅当超出该空间时才会被回收和借用。通过spark.memory.storageFraction(默认0.0)进行配置,执行期不可变。

(C)Evict cached blocks,dynamic storage reservation。与B类似,但storage的保留内存也是动态可变的。区别是当内存空闲时,execution可以最大限量的借用。这也是我们选择的方案。

未选择A的原因是,它没有解决多租赁(multi-tenancy)和严重依赖缓存的application问题。所以提出B,添加了保留内存。

B的问题是,保留内存在某些场景下仍然需要使用者配置。在共享环境里(注:一个application支撑多处理请求时)默认值0并不奏效;通过较大的shuffle,一个使用者可能回收另一个使用者的缓存数据。另外,不论设置什么非0的配置,都会导致内存块割裂。例如,为了避免共享环境下潜在的性能倒退,可能设置该值为0.6(旧版本默认storage比例),从而导致execution内存仅占0.4*0.75 = 0.3的堆空间,并不比之前好,尤其是根本无需缓存数据时。

C没有设立强制的内存隔离。当storage空间有空闲时,execution可以借用。唯一的问题是,当storage需要缓存数据,但内存都被execution占用时怎么办?初始版本直接将待缓存数据落地磁盘。

C也是唯一满足如下要求的:

  • Storage内存没有上限(X不满足)
  • Execution内存没有上限(X和B不满足)
  • 可保证最小storage空间(A不满足)

所以我们选择了方案C

(注,后面是实现层面,看代码更合适,故不翻译了)

========================

最后总结下,在新版本且legacy=false时,有如下内存借用规则:

  • execution可借用所有空闲的storage内存
  • execution也可以收回storage向自己借用的内存
  • 但storage已使用的、属于storage的内存,不可被借用
  • storage可借用execution空闲内存
  • 但storage无法收回已经被execution借用的内存

故可以看到规则整体倾向于execution,尽量保证shuffle内存充沛。

另外,该模型不涉及内存实际的分配与释放,仍是由JVM保证的。它做的是内存计数器。

Leave a Reply