Archive for 五月, 2015

在我们的一个项目中,使用pyspark,并通过swig调用了C++ so包,对一堆textFile和sequenceFile进行join、parser等处理。但在某些特定的输入下,会报Python worker exited unexpectedly的Exception,导致任务失败。虽然我们的python和C++代码都通过了unittest,并且本地模式运行正确,但线上cluster环境、输入数据源都与测试环境存在差异,以下记录线上环境定位与解决问题的方法。

问题描述

spark任务失败时,driver端会打印一些error log,但基本上没有太大作用。建议先去UI界面,找到失败的stage,大概心里明白问题可能出现在哪些阶段:

屏幕快照 2015-05-29 13.21.29

然后找到fail task对应executor的stderr,搜exception关键词:

 Python |  copy code |? 
01
15/05/29 11:35:03 ERROR Executor: Exception in task 237.3 in stage 5.0 (TID 2568)
02
org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
03
 at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:170)
04
 at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
05
 at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
06
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
07
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
08
 at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305)
09
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
10
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
11
 at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
12
 at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
13
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
14
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197)
15
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
16
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
17
 at java.lang.Thread.run(Thread.java:744)
18
Caused by: java.io.EOFException
19
 at java.io.DataInputStream.readInt(DataInputStream.java:392)
20
 at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:110)
21
 ... 14 more

问题定位

这个exception是什么意思呢?从spark代码可以看到:

 Scala |  copy code |? 
01
      private def read(): Array[Byte] = {
02
        if (writerThread.exception.isDefined) {
03
          throw writerThread.exception.get
04
        }
05
        try {
06
          stream.readInt() match {
07
            case length if length > 0 =>
08
              val obj = new Array[Byte](length)
09
              stream.readFully(obj)
10
              obj
11
            case 0 => Array.empty[Byte]
12
            case SpecialLengths.TIMING_DATA =>
13
              // Timing data from worker
14
              val bootTime = stream.readLong()
15
              ……
16
              read()
17
            case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
18
              // Signals that an exception has been thrown in python
19
              val exLength = stream.readInt()
20
              val obj = new Array[Byte](exLength)
21
              stream.readFully(obj)
22
              throw new PythonException(new String(obj, UTF_8),
23
                writerThread.exception.getOrElse(null))
24
            case SpecialLengths.END_OF_DATA_SECTION =>
25
              // We've finished the data section of the output, but we can still
26
              // read some accumulator updates:
27
              val numAccumulatorUpdates = stream.readInt()
28
             ……
29
              }
30
              null
31
          }
32
        } catch {
33
 
34
          case e: Exception if context.isInterrupted =>
35
            logDebug("Exception thrown after task interruption", e)
36
            throw new TaskKilledException
37
 
38
          case e: Exception if env.isStopped =>
39
            logDebug("Exception thrown after context is stopped", e)
40
            null  // exit silently
41
 
42
          case e: Exception if writerThread.exception.isDefined =>
43
            logError("Python worker exited unexpectedly (crashed)", e)
44
            logError("This may have been caused by a prior exception:", writerThread.exception.get)
45
            throw writerThread.exception.get
46
 
47
         <strong> case eof: EOFException =&gt;</strong>
48
<strong>            throw new SparkException("Python worker exited unexpectedly (crashed)", eof)</strong>
49
        }
50
      }

exception应该是由最后那个case抛出的。如果对pyspark模式下,executor工作原理不清楚的,可以参考pyspark与spark的集成方式。scala进程这时会等待python的len+data格式的输出,但在使用readInt等待len的时候,python关闭了管道,于是scala接收到EOF,抛出EOFException。

所以,问题出在python进程里是没有疑问了。那会不会是python内存爆了呢?我们通过调用python/pyspark/shuffle.py的get_used_memory()方法,在worker端定期打印内存使用,发现也就300M+,所以排除之。

这时,拜托spark集群的OP帮忙看了下问题executor所在服务器的dmesg,发现有:

 Python |  copy code |? 
1
python[39160]: segfault at 0 ip 00007ff99a9671d0 sp 00007fff9b806880 error 6 in _basic_image_lib.so[7ff99a8f1000+126000]

但线上服务器没有产生core文件,所以仍然无法具体定位。而cluster环境我们也没有权限登录、修改。

如果能用local模式跑,就很方便了。但由于数据量太大,所以我们采用了笨方法,由于通过对stage的分析,可以知道问题应该是由于一组input files里的某些异常输入格式导致的,所以我们用二分法逐步将bad case定位到一个input file上。

为了简化问题,我们把对_basic_image_lib.so的调用提取为一个test脚本,以bad case input file为输入,setMaster(‘local[*]’)在本地模式运行,成功的产生了core file。这时用gdb python core.xxxx可以将问题缩小到具体的C++函数:

屏幕快照 2015-05-29 13.39.12

这时问题就相对简单了,可以由该c++ so包的开发同学具体跟进了。

昨天升级了wordpress到新版本,结果Developer Formatter在插入代码时失败,第一个报错信息是:

 Javascript |  copy code |? 
1
Uncaught TypeError: undefined is not a function

firebug排查定位其错误应该是在wp-content/plugins/devformatter/devinterface.php生成的js文件,调用execInstanceCommand方法出错。

解决方法如下:

#vim wp-content/plugins/devformatter/devinterface.php,找到execInstanceCommand哪一行,修改为:

 Javascript |  copy code |? 
1
      if(HtmlEditor){
2
        edInsertContent(edCanvas, DevFmt_ContentStart + DevFmt_TheContent + DevFmt_ContentEnd);
3
      }else{
4
        alert(DevFmt_ContentStart + DevFmt_TheContent + DevFmt_ContentEnd);
5
        tinyMCE.execCommand('mceReplaceContent', false,
6
          switchEditors.wpautop(DevFmt_ContentStart + DevFmt_TheContent + DevFmt_ContentEnd));
7
      }

第二个问题是,插入带空格的代码后,页面上出现大量DVFMTSC字样,修改wp-content/plugins/devformatter/devfmt_editor.js文件如下:

 Javascript |  copy code |? 
1
2
block = block.replace(/{{DVFMTSC}}/gi, '<!--DVFMTSC-->&').replace(/\n/gi, "<br />");
3
 
4
修改为:
5
block = block.replace(/{{DVFMTSC}}/gi, '&').replace(/\n/gi, "<br />");

参考:

  • http://stackoverflow.com/questions/22813970/typeerror-window-tinymce-execinstancecommand-is-not-a-function

Pyspark实际是基于spark scala core的一层语言外壳,其执行代码经由scala/java发送到worker nodes,并由python进程执行。这里,就涉及如何将用户的python代码分发出去呢?而分发必然涉及序列化过程,又是如何实现的?

本文解释python代码的序列化过程。一句话而言,Pyspark是依赖pickle实现的,但又基于其做了不少定制化工作。

Callback TypeGlobal vars pickledInstance vars pickledClass vars pickledKey methods in cloudpickle.py
__main__.functionY--save_function -> save_function_tuple -> extract_func_data
other_module.functionN--save_function -> save_global and sendRef = True
__main__.instancemethodYYYsave_instancemethod
save_function for im_func, default __reduce_ex__ for im_self, save_global for im_class and sendRef = False
other_module.instancemethodYYNsame with __main__.instancemethod, but sendRef = True
__main__.classmethodY-Ysame with __main__.instancemethod, but im_self is class object, and im_class is TypeType
__main__.staticmethodY-Ysave_function,
class is one elem of f_globals
other_module.classmethodY-Nsave_instancemethod,
sendRef = True
other_module.staticmethodY-Nsave_function,
sendRef = True

上面这张表是基于spark 1.2, python 2.7,针对callback function的不同type,得出的结论。以下将分3部分进行介绍:

  1. spark的serialize过程
  2. python function/method type解析
  3. pickle过程剖析

为了描述方便,以sc.textFile(…).reduce(callback)为例。

spark的serialize过程

序列化和与scala的交互是以pyspark.SparkContext开始的,其__init__方法完成了两件事情:

  1. 调用_ensure_initialized方法,launch_gateway及JVM,通过py4j基于本地环回套接口,建立了与java/scala的通信通道
  2. 调用_do_init方法,默认将self.serializer = AutoBatchedSerializer(self._unbatched_serializer)

Pyspark中需要序列化的包括code和data,这儿的serializer不是用来pickle代码的,别被迷惑!Pyspark的serializer继承关系如下,与code serialize相关的是CloudPickleSerializer。

pyspark-serilizer

有了这些背景知识,我们来看示例代码的执行过程。

1. textFile()新建并返回RDD:

 

 Python |  copy code |? 
1
RDD(
2
    self._jsc.textFile(name, minPartitions),  // jrdd
3
    self,                                     // python sparkcontext 
4
    UTF8Deserializer(use_unicode))            // jrdd_deserializer

其中self._jsc是JavaSparkContext的实例。

2. reduce(callback)迫使DAG生成一个stage,并向cluster发送callback方法。

需要注意的是,reduce等利用nested function层层对我们的callback进行了封装,封装后function的type是“function”,而我们传入的callback则有可能是function、instancemethod、lambda、staticmethod、classmethod等。

reduce间接调用mapPartitionsWithIndex,生成PipelinedRDD:

 Python |  copy code |? 
1
PipelinedRDD(
2
    self,  // prev rdd, 即textFile生成的RDD
3
    f,       // reduce封装的function
4
    preservesPartitioning) 

示例代码比较简单,这儿的prev rdd只是简单的RDD,而非PipelinedRDD,故直接设置新PipelinedRDD主要属性如下:

  • self.func = func ,即reduce封装的function
  • self._prev_jrdd = prev._jrdd
  • self._prev_jrdd_deserializer = prev._jrdd_deserializer
  • self._jrdd_val = None, 第一次调用_jrdd时会触发生成
  • self._jrdd_deserializer = self.ctx.serializer ,默认是PickleSerializer

reduce里此时触发collect方法,通过self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())发起task。由于此时是第一次调用self._jrdd,故会计算生成_jrdd_val对象,该过程里也有一些关键代码:

 Python |  copy code |? 
1
        command = (self.func, profileStats, self._prev_jrdd_deserializer,
2
                   self._jrdd_deserializer)
3
        # the serialized command will be compressed by broadcast
4
        ser = CloudPickleSerializer()
5
        pickled_command = ser.dumps(command)
6
        if len(pickled_command) &gt; (1 &lt;&lt; 20):  # 1M
7
            self._broadcast = self.ctx.broadcast(pickled_command)
8
            pickled_command = ser.dumps(self._broadcast) 

这儿的command就是实际发送给worker的指令了,其中的self.func层层嵌套调用了我们的callback。序列化是通过CloudPickleSerializer完成的,如果序列化后的大于1M,则会走broadcast方式。pickled_command最终通过py4j,生成JavaRDD对象,走scala正常的DAG过程。

以上大体了解我们自己的callback怎样变成最终的command。

python function/method type解析

在看serialize过程前,我们还得解释下python function的type,因为pickle会根据不同的object type区别处理。虽然python内部所有都是object,但object也是有type的。

 Python |  copy code |? 
01
def my_function():
02
    pass
03
 
04
class AClass(object):
05
    def my_method(self):
06
        pass
07
 
08
    @staticmethod
09
    def my_static_method():
10
        pass
11
 
12
    @classmethod
13
    def my_class_method(cls):
14
        pass
15
 
16
print "my_function: %s" % type(my_function)  # function
17
print "my_method: %s" % type(AClass().my_method) # instancemethod
18
print "my_static_method: %s" % type(AClass.my_static_method) # function
19
print "my_class_method: %s" % type(AClass.my_class_method) # instancemethod

pickle过程剖析

上面可以看到serialize是调用CloudPickleSerializer().dumps(command)完成的,command这时是一个tuple。该方法是对cloudpickle.dumps(obj, 2)的封装,后者才是pyspark serialize的关键,它overwrite了python pickle类的一些方法,尤其是save_[function|instancemethod|…]之类的序列化callbacks。

pickle.Pickler.dump的调用,会进入Python/Lib/pickle.py或Modules/_pickle.c 的self.save(obj)方法。该方法有几个重要分支来决定对obj序列化的callback方法:

  1. f = self.dispatch.get(t),其中t = type(obj),而dispatch table中存储的就是save_*方法。如果type在cloudpickle里有对应方法,那就调用它处理
  2. 否则,issc = issubclass(t, TypeType),那就调用self.save_global(obj) 处理,注意这个方法也被cloudpickle overwrite了
  3. 否则,reduce = dispatch_table.get(t),如果有就调用处理,copy_reg.dispatch_table
  4. 否则,reduce = getattr(obj, “__reduce_ex__”, None) ,object有默认的__reduce_ex__
  5. 否则,reduce = getattr(obj, “__reduce__”, None),object有默认的__reduce__

对于想细致了解pickle过程的同学,建议在Lib/pickle.py中加入下面的print语句,能够帮助我们了解进入了哪个分支:

 Python |  copy code |? 
1
        f = self.dispatch.get(t)                                                                                         
2
        print "###### issc: %s, obj: %s, type: %s #######" % (issubclass(t, TypeType), obj, t)    

那么对于function,我们都希望传递什么呢?功能考虑,可调用的代码是第一步,更重要的是数据,例如使用到的global module变量、传入参数、所在object的vars、所在class的vars等。性能考虑,当然希望尽量不要重复传递,也不要传递不需要的数据。把握这个原则之后,就可以参考下面的关键流程图,在脑子里跑pickle过程了:(补充说明下command拆解出callback的过程:save_tuple – save(func wrapper) – save_function – extract_func_data – save( …, closure, …), callback是func wrapper的closure方法。)

main_function main.classmethod main.instancemethod main.staticmethodother_module.classmethod other_module.function other_module.instancemethod other_module.staticmethod

最后,再啰嗦下,extract_func_data的f_globals里会包含所有用到的、module层级可见的“objects”(python 一切皆对象),例如global vars、import的modules、class等。

其实通过分析pickle过程,也可以看出python内部的instancemethod,实际是由类型信息,以及 一个翻译后的function、包裹该方法的object对象、以及object所属的class信息构成的。而翻译后的function与普通的function没有本质区别。

可能的改进措施

在多个stages间,wrapper、serializer等信息,也会被pickle、传递,从而消耗driver、workers的cpu、内存和IO资源,当然,在节点和代码量小时都可以忽略,但随着节点数的增长,也是一种浪费。

是否可以考虑降低重复呢?毕竟我们都已经把整个python app打包为egg分发到worker上了啊。

HDFS read slow解决方法

通过spark的UI界面,我们可以看到多个metrics的分布情况,发现有个stage的75%的duration 20s,但有少量(4个左右)特别慢,达到8min。查看worker的errlog,发现如下日志:

 Python |  copy code |? 
1
15/05/21 17:48:33 INFO FSInputChecker: read block too slow, expect:512000 cost:513232
2
15/05/21 17:48:33 WARN FMSClient: DFS Read: org.apache.hadoop.hdfs.protocol.ReadSlowException: read block too slow, expect:512000 cost:513232

可见是由于读取block数据太慢导致的,我们的block大小是256MB,期望的读取时间是512000ms,即8.5min,而最终使用了513s,与慢task耗时一致。这个值是内部HDFS版本控制的,但开源届可能也有类似功能。通过修改spark/conf/hadoop-site.xml的dfs.client.slow.read.limit配置,调整为5242880,代表期望的下载速度是5MB,如果比这个速度慢就retry其他DataNode节点。调整后,max task duration变为1.3min。在max对应的worker上也看到了slow log,只不过超时时间变短了。

当然,也可能也意味着我们的HDFS集群中有部分慢节点,后续也得解决掉。

平均Duration过大的解决方法

Duration metric里,又发现有一个task,其75%的耗时9min+,GC时间最多2s,每个task处理2.2GB的数据。在不优化内部处理流程的情况下,能否通过增大并发度来提高task处理速度呢?

由于该task由join操作生成,修改其numPartition参数从1k变到2k。但由于driver运行在一个虚拟机上,期间多次发生worker node与其通信失败,导致task反而变慢,并引发retry重算!将该值该为1500,由于每个task处理数据变成1.5GB左右,故处理时间也下降了!

另,从pstats数据也可以看到,最耗时的操作还是shuffle的read过程。

 Python |  copy code |? 
01
   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
02
 62603140 388783.337    0.006 388783.337    0.006 {method 'read' of 'file' objects}
03
 31301070 21929.022    0.001 21929.022    0.001 {cPickle.loads}
04
 16958730 8177.087    0.000 8177.087    0.000 {_basic_image_lib.flash_decode}
05
 55610843 1719.077    0.000 1810.741    0.000 feature_murmur_sign.py:33(run)
06
 55610843 1088.828    0.000 10036.427    0.000 feature_flash_decoder.py:37(run)
07
     1000  664.456    0.664 17152.500   17.152 shuffle.py:245(mergeValues)
08
 55611841  482.160    0.000  482.160    0.000 encoder.py:206(iterencode)
09
 55610843  385.117    0.000 14211.060    0.000 callbacks.py:65(add_media_info)
10
     1000  361.858    0.362 411390.715  411.391 shuffle.py:282(mergeCombiners)
11
 55611841  246.357    0.000 1453.533    0.000 data.py:374(desc)
12
 55610843  234.460    0.000  520.973    0.000 policy_beidou_black_image.py:20(run)
13
 55610843  232.927    0.000 12084.700    0.000 review_policy.py:250(run)
14
 55610843  219.946    0.000 1593.674    0.000 review_policy.py:114(run)
15
 55610843  201.820    0.000  284.889    0.000 review_policy.py:168(_lack_image)
16
 49136554  189.099    0.000  222.585    0.000 join.py:56(dispatch)
17
 31300074  184.696    0.000 410952.765    0.013 serializers.py:127(load_stream)
18
 55431498  182.844    0.000  254.908    0.000 policy_reject_url_and_bin.py:20(run)
19
 55611841  180.865    0.000  778.554    0.000 encoder.py:180(encode)
20
   272005  157.849    0.001  157.860    0.001 {cPickle.dumps}
21
 16958894  145.608    0.000  351.860    0.000 basic_image_lib.py:183(__init__)
22
 55610843  139.166    0.000  590.473    0.000 review_policy.py:149(run)
23
221930724  138.625    0.000  210.047    0.000 {hasattr}
24
476047721  125.609    0.000  125.609    0.000 data.py:49(image)
25
287337626  124.806    0.000  223.212    0.000 {len}
26
 55610843  111.992    0.000  157.770    0.000 review_policy.py:185(_lack_flash)
27
 31300074  107.931    0.000 410768.069    0.013 serializers.py:144(_read_with_length)

Spark的很多transformation方法是基于combineBy的,会导致shuffle过程,所以一般认为成本较大。那么具体过程如何?涉及到哪些spark配置,又该如何调整呢?数据倾斜问题又会对此造成什么影响呢?

本文暂不涉及调优分析,仅从代码层面基于pyspark分析combineBy的实现原理和其中的shuffle过程,期望抛砖引玉。

接口定义如下:

 Python |  copy code |? 
1
    def combineByKey(self, createCombiner, mergeValue, mergeCombiners,
2
                     numPartitions=None)

约定Combiner和value含义为:

  • Value 是PariRDD中的value值
  • Combiner是combineByKey完成后PairRDD的value值,可以与Value类型不一样

前3个参数都是callback方法:

  • Combiner createCombiner(Value),通过一个Value元素生成Combiner元素,会被多次调用
  • Combiner mergeValue(Combiner, Value),将一个Value元素合并到Combiner里,会被多次调用
  • Combiner mergeCombiners(Combiner, Combiner),将两个Combiners合并,也会被多次调用

从python/pyspark/rdd.py的combineBy代码看到,其处理过程分为3步:

  1. locally_combined = self.mapPartitions(combineLocally),在python进程进行combine
  2. shuffled = locally_combined.partitionBy(numPartitions) ,进行python进程内部的shuffle和基于scala的worker nodes间shuffle
  3. return shuffled.mapPartitions(_mergeCombiners, True) ,类似MR的reducer,聚合shuffle的结果

下面来详细看每一个steps。

Step1,locally_combined = self.mapPartitions(combineLocally)。

 Python |  copy code |? 
1
        def combineLocally(iterator):
2
            merger = ExternalMerger(agg, memory * 0.9, serializer) \
3
                if spill else InMemoryMerger(agg)
4
            merger.mergeValues(iterator)
5
            return merger.iteritems()

该方法根据spark.shuffle.spill方法决定是使用ExternalMerger还是InMemoryMerger,其中ExternalMerger的内存是由spark.python.worker.memory限定的。以下主要关注ExternalMerger,磁盘+内存的外部排序。另外,merger.mergeValues也是有可能调用全部3个callbacks方法的,不仅仅是mergeValue callback,不要被它的名称迷惑了。

ExternalMerger的关键成员变量:

  • data: dict of {K: V},unpartitioned merged data,还没有spill前的数据都是放在该dict里的
  • pdata:list of dicts,list长度由self.partitions数目决定,该值与numPartitions不一样。partitioned merged data,如果已经发生过spill,则后续的数据都读入pdata并hash到不同的dict槽位里。

ExternalMerger的外存文件路径是/path/to/localdir// 。其中localdir路径可以有多个,当位于不同的磁盘时,可以提高并发写入速度。spill_num是spill的轮次,partition_num是self.partitions对应的分片数。文件中的每一行是序列化后的Key-Combiner list。

再来看mergeValues的方法主体:

 Python |  copy code |? 
01
def mergeValues(self, iterator):
02
 
03
    """ Combine the items by creator and combiner """
04
 
05
    iterator = iter(iterator)
06
 
07
    # speedup attribute lookup
08
 
09
    creator, comb = self.agg.createCombiner, self.agg.mergeValue
10
 
11
    d, c, batch = self.data, 0, self.batch
12
 
13
 
14
    for k, v in iterator:
15
 
16
         d[k] = comb(d[k], v) if k in d else creator(v)
17
 
18
    c += 1
19
 
20
 
21
    if c % batch == 0 and get_used_memory() > self.memory_limit:
22
 
23
        self._spill() // 按实时计算的内部partition idself.data中的数据 flush到disk,为了兼容pdata的flush,这里每行一个单元素的dict
24
 
25
        self._partitioned_mergeValues(iterator, self._next_limit())
26
 
27
        break // iterator指针已经交给_partitioned_mergeValues了,这里就直接跳出了

针对当前mapPartition里的每一个元素,调用createCombiner或mergeValue callback生成新的Combiner对象,并存入self.data[k]里。定期(根据c%batch,注意batch的值会动态调整)检查内存,如果不超限就继续读入。直到内存不足时,首先调用_spill方法把self.data flush,然后重新计算memory_limit并把处理权交给_partitioned_mergeValues。

 Python |  copy code |? 
01
    def _partitioned_mergeValues(self, iterator, limit=0):
02
        """ Partition the items by key, then combine them """
03
        # speedup attribute lookup
04
        creator, comb = self.agg.createCombiner, self.agg.mergeValue
05
        c, pdata, hfun, batch = 0, self.pdata, self._partition, self.batch
06
 
07
        for k, v in iterator:
08
            d = pdata[hfun(k)]
09
            d[k] = comb(d[k], v) if k in d else creator(v)
10
            if not limit:
11
                continue
12
 
13
            c += 1                                                                                                       
14
            if c % batch == 0 and get_used_memory() > limit:                                                             
15
                self._spill()                                                                                            
16
                limit = self._next_limit() 

该方法也是继续读入元素,但不同的是,将新生成的Combiner存入self.pdata[hfun(k)][k]里,即根据内部partitions方法进行分片。如果内存不足,则调用_spill写入磁盘,并调整limit值。

上面两次调用了_spill方法,由于只有第一次是针对self.data的,故应只有spill_num=1时的partition file里,才有多行;后续spill_num的partition file都是一个大行。

mergeValues把当前partition里具有相同key的values合并到一个Combiner里了,但这些数据可能在内存和磁盘上,这就需要iteritems()来把它们归并起来返回给上层调用者。如果没用spill过,证明数据都在self.data里,就直接返回其迭代器即可。否则就要调用_external_items()来归并了。

_external_items()按partition处理文件,即每次处理多轮spill生成的一个partition id对应的文件。为了最大程度利用内存并降低复杂度,首先会把pdata里的数据也spill掉。

 Python |  copy code |? 
01
        try:
02
            for i in range(self.partitions):
03
                self.data = {}
04
                for j in range(self.spills):
05
                    path = self._get_spill_dir(j)
06
                    p = os.path.join(path, str(i))
07
                    # do not check memory during merging
08
                    self.mergeCombiners(self.serializer.load_stream(open(p)),
09
                                        False)                                                                           
10
 
11
                    # limit the total partitions                                                                         
12
                    if (self.scale * self.partitions < self.MAX_TOTAL_PARTITIONS                                         
13
                            and j < self.spills - 1                                                                      
14
                            and get_used_memory() > hard_limit):                                                         
15
                        self.data.clear()  # will read from disk again                                                   
16
                        gc.collect()  # release the memory as much as possible                                           
17
                        """ chengyi02: Because the yield values before are also using memory(in the caller),             
18
                        so if now more than limit, the following partitions will almostly exceed too.                    
19
                        So recursive merged all the remaining partitions.                                                
20
                         """                                                                                             
21
                        for v in self._recursive_merged_items(i):                                                        
22
                            yield v                                                                                      
23
                        return                                                                                           
24
 
25
                for v in self.data.iteritems():                                                                          
26
                    yield v                                                                                              
27
                self.data.clear()                                                                                        
28
 
29
                # remove the merged partition                                                                            
30
                for j in range(self.spills):                                                                             
31
                    path = self._get_spill_dir(j)                                                                        
32
                    os.remove(os.path.join(path, str(i))) 

在调用self.mergeCombiner处理一个/ 文件时,不检查内存是否超限,否则可能会继续向当前/path/to/localdir/下flush数据,就破坏文件数据结构了。而是在一个文件处理完成之后,检查内存若超限且分片数可控,则调用_recursive_merged_items进行递归的合并。

这里需要注意的是,_recursive_merged_items会所有>=i(当前partition id)的数据,而非仅处理当前partition。这是因为,当前内存消耗主要包含两块:rdd.py里调用者保存的yield返回内存块,以及当前partition已经读入的数据。假设partition比较平均,则后者数据量相对稳定;而前者不断增长。所以后续内存超限的几率会更大。

_recursive_merged_items会new 一个ExternalMerger对象m,将文件里的数据依次读入、merge,并按需spill(这时会spill到其他localdir目录,spill_num也重新开始计数,可以视为在不同的数据空间),最后通过m._external_items返回合并后的数据。在m._external_items里还会不会再次发生递归调用呢?几率很小,因为这个方法里的内存消耗基本等同于m的一个partition分片数据大小。而m的所有数据 == self的一个partition,故m的一个partition数据量非常小。即使再次发生递归调用,m的子m 分片数据量会依次递减,故会再次降低spill几率。

上面的描述基本上符合外部排序算法,但工程的世界里还需要考虑GC问题。在_spill的最后和_extern_items里都调用了python的gc.collect()方法,同步释放引用计数为0的内存块。但由于collect不一定能那么完美的释放,一些reachabled还是无法释放的,如果这部分存量较大(例如很逼近本次limit),那极端情况下一个元素就又会触发spill了,这显然逼近耗时也没用意义。所以ExternalMerger会动态调整limit值,max(self.memory_limit, get_used_memory() * 1.05)。所以,这能看出来了,python进程实际消耗内存可能会大于python.worker.memory值。

以上step1结束,返回的locally_combined_rdd的结构为:[(k, Combiner), …]。k还是原来的k,但当前worker task下所有相同k的values都聚合到了Combiner中。

step2,shuffled = locally_combined.partitionBy(numPartitions)

这里有一个问题,为什么不直接把rdd给scala的partitionBy,而在python代码里实现了一些内部shuffle的逻辑呢?首先,partitionBy的回调方法是可定制的,python里默认是portable_hash,如果用scala实现,如何回调python的分片函数呢?其次,python和scala间的通信是需要序列化的,一条一条的成本有点大,所以python shuffle后也做了batch。具体如下。

(注:partitionBy完之后,数据量不会有变化,以上kv变化仅为了代表shuffle后一个task里只会有属于自己分片的key了。)

从代码可以看到python里计算了partition_id,所以scala仅根据确定的分片情况,进行shuffle。在深入之前,先看下python partitionBy里涉及到的几个rdds:

  • 自身,即self,PairRDD(k,v),通过mapPartitions向下转化
  • keyed,PairRDD(paritition_id, [(k,v),(k,v),…]),通过copy constructor向下转化
  • pairRDD,JavaPairRDD,java会调用适配到scala,并通过scala的partitionBy向下转化
  • jrdd,java代理的scala的ShuffledRDD,通过copy constructor向下转化
  • rdd,PairRDD(k,v),分片完成

keyed的生成过程比较简单,由内部方法add_shuffle_key实现,完成k到partition_id的计算,并将一批kv打包、序列化作为keyed rdd的v。Every 1k items, will check used_memroy once, and if equal to batch+1, will also flush。并且batch的值会动态调整,尽量使生成的批量包大小在1M和10M之间。

在jrdd的构造过程中,会生成shuffleDependency,当stage提交时,DAG调度发现该dependency,就会发起ShuffleMapTask,生成shuffle数据(相当于MR里map端的shuffle工作):

ExternalSorter是shuffle的关键,在该分支中生成ExternalSorter的参数:

 Scala |  copy code |? 
1
sorter = new ExternalSorter[K, V, V](None, Some(dep.partitioner), None, dep.serializer)

即将aggregate=None;dep.partitioner此处是PythonPartitioner,直接使用rdd的key作为paritition_id,在该场景就是python里计算好的partition_id;ordering=None。其中aggregate和ordering为None即代表无需再combine和sort,只单纯shuffle就好,以下仅关注这个分支,由insertAll和writePartitionedFile合作完成。

insertAll的一个可能分支是bypassMergeSort == True,即如果partitions数目较少,且无需aggregate和ordering,则直接写numPartitions个文件,随后再在writePartitionedFile里简单concate。缺点是每个worker node上都同时打开numPartitions个文件,有额外内存消耗,且可能too many open files。

另一个可能分支是大数据时的常见场景(无需combine且大量写),针对每条数据:

 Scala |  copy code |? 
1
buffer.insert((getPartition(kv._1), kv._1), kv._2.asInstanceOf[C])
2
maybeSpillCollection(usingMap = false)

由于python里做了批量打包,故这儿的一条数据的v对应python的一批数据的kv了。每插入一条数据,检查是否需要spill。这儿的实现思路与python的externalMerger也类似。简单介绍下,insertAll的maybeSpillCollection最终调用spillToMergeableFiles,根据partition_id和key对内容排序(这里partition_id==key),每次spill写入一个临时文件,并且把file信息记录在spills数组里。

随后writePartitionedFile得把之前分布在多个临时文件里的数据归并为最终输出的partition文件。与spilled bypassMergeSort对应的分支,因为一个partition的数据都在一个partition文件里,所以简单的concate即可。否则,数据分散在内存和多个文件里,整合的过程由PartitionedIterator迭代器触发完成,返回的iter依次写文件。

以上过程,将shuffled数据写入disk了,但怎样被下一个计算单元使用呢?

step3,return shuffled.mapPartitions(_mergeCombiners, True) 

虽然spark1.2之后已默认使用sort based shuffle,但sort shuffle还是使用HashShuffleReader读取数据:SortShuffleManager -> HashShuffleReader -> BlockStoreShuffleFetcher.fetch() 。其目的是收集上游多个worker node产生的shuffled数据,所以必然有network I/O。

BlockStoreShuffleFetcher.fetch里,先获取shuffleId和reduceId对应的blocks上游节点信息,信息包括((address, size), index),其中address是BlockManageId类型。之后调用ShuffleBlockFetcherIterator从local block manager和remote BlockTransferService处获取blocks数据。

ShuffleBlockFetcherIterator的initialize方法调用splitLocalRemoteBlocks方法根据address.executeId生成出remoteRequests 对象,针对同一address的多个block,若size < maxBytesInFlight/5 则合并为一个request。从而通过这种方式,确保最大并发度为5。也可以看出,如果有大的block,则request size可能大于maxBytesInFlight。随后会对remoteRequests列表随机化,以保证请求尽量均衡。

initialize方法随后立即调用sendRequest发送多个请求,并打印日志:logInfo(“Started ” + numFetches + ” remote fetches in” + Utils.getUsedTimeMs(startTime))。sendRequest调用shuffleClient.fetchBlocks()读取远端数据,并注册BlockFetchingListener,后者的onBlockFetchSuccess方法会接收buf数据并添加到results队列里。

ShuffleBlockFetcherIterator的next()方法先“释放”内存,并在内存充足的情况下再发出sendRequest请求。随后再读取results队列里的数据,并解压、反序列化,返回迭代器iterator。这里有两个小的优化点,“释放”内存并不是真的触发gc,而是从bytesInFlight里减去已接收到results队列的数据长度,因为后者随后将被读取,而bytesInFlight限制的是网络缓存。另一个是先sendRequest再处理results中的数据,原因是前者是异步调用,且耗时可能较长。

还需要注意的是,sendRequest调用的shuffleClient实际是BlockTransferService的一个实例,有netty和nio两种实现方式。

以上完成了shuffle数据的接收,next返回的iterator最终回到python代码进入step3的剩余阶段,这就很简单了,实际上此时才真正执行shuffled.mapPartitions(_mergeCombiners, True)。

通过以上3个步骤,python、java和scala多语言的交互,最终完成了pyspark的combineByKey。个人理解,由于scala的partitionBy和shuffle过程早已实现,所以pyspark主要解决的是 如何更优的进行多语言交互,以达到较优的性能、扩展性、复用度。