Pyspark实际是基于spark scala core的一层语言外壳,其执行代码经由scala/java发送到worker nodes,并由python进程执行。这里,就涉及如何将用户的python代码分发出去呢?而分发必然涉及序列化过程,又是如何实现的?

本文解释python代码的序列化过程。一句话而言,Pyspark是依赖pickle实现的,但又基于其做了不少定制化工作。

Callback TypeGlobal vars pickledInstance vars pickledClass vars pickledKey methods in cloudpickle.py
__main__.functionY--save_function -> save_function_tuple -> extract_func_data
other_module.functionN--save_function -> save_global and sendRef = True
__main__.instancemethodYYYsave_instancemethod
save_function for im_func, default __reduce_ex__ for im_self, save_global for im_class and sendRef = False
other_module.instancemethodYYNsame with __main__.instancemethod, but sendRef = True
__main__.classmethodY-Ysame with __main__.instancemethod, but im_self is class object, and im_class is TypeType
__main__.staticmethodY-Ysave_function,
class is one elem of f_globals
other_module.classmethodY-Nsave_instancemethod,
sendRef = True
other_module.staticmethodY-Nsave_function,
sendRef = True

上面这张表是基于spark 1.2, python 2.7,针对callback function的不同type,得出的结论。以下将分3部分进行介绍:

  1. spark的serialize过程
  2. python function/method type解析
  3. pickle过程剖析

为了描述方便,以sc.textFile(…).reduce(callback)为例。

spark的serialize过程

序列化和与scala的交互是以pyspark.SparkContext开始的,其__init__方法完成了两件事情:

  1. 调用_ensure_initialized方法,launch_gateway及JVM,通过py4j基于本地环回套接口,建立了与java/scala的通信通道
  2. 调用_do_init方法,默认将self.serializer = AutoBatchedSerializer(self._unbatched_serializer)

Pyspark中需要序列化的包括code和data,这儿的serializer不是用来pickle代码的,别被迷惑!Pyspark的serializer继承关系如下,与code serialize相关的是CloudPickleSerializer。

pyspark-serilizer

有了这些背景知识,我们来看示例代码的执行过程。

1. textFile()新建并返回RDD:

 

 Python |  copy code |? 
1
RDD(
2
    self._jsc.textFile(name, minPartitions),  // jrdd
3
    self,                                     // python sparkcontext 
4
    UTF8Deserializer(use_unicode))            // jrdd_deserializer

其中self._jsc是JavaSparkContext的实例。

2. reduce(callback)迫使DAG生成一个stage,并向cluster发送callback方法。

需要注意的是,reduce等利用nested function层层对我们的callback进行了封装,封装后function的type是“function”,而我们传入的callback则有可能是function、instancemethod、lambda、staticmethod、classmethod等。

reduce间接调用mapPartitionsWithIndex,生成PipelinedRDD:

 Python |  copy code |? 
1
PipelinedRDD(
2
    self,  // prev rdd, 即textFile生成的RDD
3
    f,       // reduce封装的function
4
    preservesPartitioning) 

示例代码比较简单,这儿的prev rdd只是简单的RDD,而非PipelinedRDD,故直接设置新PipelinedRDD主要属性如下:

  • self.func = func ,即reduce封装的function
  • self._prev_jrdd = prev._jrdd
  • self._prev_jrdd_deserializer = prev._jrdd_deserializer
  • self._jrdd_val = None, 第一次调用_jrdd时会触发生成
  • self._jrdd_deserializer = self.ctx.serializer ,默认是PickleSerializer

reduce里此时触发collect方法,通过self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())发起task。由于此时是第一次调用self._jrdd,故会计算生成_jrdd_val对象,该过程里也有一些关键代码:

 Python |  copy code |? 
1
        command = (self.func, profileStats, self._prev_jrdd_deserializer,
2
                   self._jrdd_deserializer)
3
        # the serialized command will be compressed by broadcast
4
        ser = CloudPickleSerializer()
5
        pickled_command = ser.dumps(command)
6
        if len(pickled_command) > (1 << 20):  # 1M
7
            self._broadcast = self.ctx.broadcast(pickled_command)
8
            pickled_command = ser.dumps(self._broadcast) 

这儿的command就是实际发送给worker的指令了,其中的self.func层层嵌套调用了我们的callback。序列化是通过CloudPickleSerializer完成的,如果序列化后的大于1M,则会走broadcast方式。pickled_command最终通过py4j,生成JavaRDD对象,走scala正常的DAG过程。

以上大体了解我们自己的callback怎样变成最终的command。

python function/method type解析

在看serialize过程前,我们还得解释下python function的type,因为pickle会根据不同的object type区别处理。虽然python内部所有都是object,但object也是有type的。

 Python |  copy code |? 
01
def my_function():
02
    pass
03
 
04
class AClass(object):
05
    def my_method(self):
06
        pass
07
 
08
    @staticmethod
09
    def my_static_method():
10
        pass
11
 
12
    @classmethod
13
    def my_class_method(cls):
14
        pass
15
 
16
print "my_function: %s" % type(my_function)  # function
17
print "my_method: %s" % type(AClass().my_method) # instancemethod
18
print "my_static_method: %s" % type(AClass.my_static_method) # function
19
print "my_class_method: %s" % type(AClass.my_class_method) # instancemethod

pickle过程剖析

上面可以看到serialize是调用CloudPickleSerializer().dumps(command)完成的,command这时是一个tuple。该方法是对cloudpickle.dumps(obj, 2)的封装,后者才是pyspark serialize的关键,它overwrite了python pickle类的一些方法,尤其是save_[function|instancemethod|…]之类的序列化callbacks。

pickle.Pickler.dump的调用,会进入Python/Lib/pickle.py或Modules/_pickle.c 的self.save(obj)方法。该方法有几个重要分支来决定对obj序列化的callback方法:

  1. f = self.dispatch.get(t),其中t = type(obj),而dispatch table中存储的就是save_*方法。如果type在cloudpickle里有对应方法,那就调用它处理
  2. 否则,issc = issubclass(t, TypeType),那就调用self.save_global(obj) 处理,注意这个方法也被cloudpickle overwrite了
  3. 否则,reduce = dispatch_table.get(t),如果有就调用处理,copy_reg.dispatch_table
  4. 否则,reduce = getattr(obj, “__reduce_ex__”, None) ,object有默认的__reduce_ex__
  5. 否则,reduce = getattr(obj, “__reduce__”, None),object有默认的__reduce__

对于想细致了解pickle过程的同学,建议在Lib/pickle.py中加入下面的print语句,能够帮助我们了解进入了哪个分支:

 Python |  copy code |? 
1
        f = self.dispatch.get(t)                                                                                         
2
        print "###### issc: %s, obj: %s, type: %s #######" % (issubclass(t, TypeType), obj, t)    

那么对于function,我们都希望传递什么呢?功能考虑,可调用的代码是第一步,更重要的是数据,例如使用到的global module变量、传入参数、所在object的vars、所在class的vars等。性能考虑,当然希望尽量不要重复传递,也不要传递不需要的数据。把握这个原则之后,就可以参考下面的关键流程图,在脑子里跑pickle过程了:(补充说明下command拆解出callback的过程:save_tuple – save(func wrapper) – save_function – extract_func_data – save( …, closure, …), callback是func wrapper的closure方法。)

main_function main.classmethod main.instancemethod main.staticmethodother_module.classmethod other_module.function other_module.instancemethod other_module.staticmethod

最后,再啰嗦下,extract_func_data的f_globals里会包含所有用到的、module层级可见的“objects”(python 一切皆对象),例如global vars、import的modules、class等。

其实通过分析pickle过程,也可以看出python内部的instancemethod,实际是由类型信息,以及 一个翻译后的function、包裹该方法的object对象、以及object所属的class信息构成的。而翻译后的function与普通的function没有本质区别。

可能的改进措施

在多个stages间,wrapper、serializer等信息,也会被pickle、传递,从而消耗driver、workers的cpu、内存和IO资源,当然,在节点和代码量小时都可以忽略,但随着节点数的增长,也是一种浪费。

是否可以考虑降低重复呢?毕竟我们都已经把整个python app打包为egg分发到worker上了啊。

3 Comments

  1. 傻子奔 says:

    你好博主,我想问一下您有没有遇到python的代码无法序列化的情况导致无法进行下去的情况呀?

  2. flykobe says:

    有,所以才会梳理了pickle的过程,保证在executor上需要的数据等都可以被序列化

  3. 傻子奔 says:

    其实我是加载了一个分词的模型
    我尝试了将模型实现分词部分声明为 @staticmethod或@classmethod 都还是出错了
    可以询问一下这样的话,博主会怎么做呢?或者是要怎么找出原因。。

Leave a Reply