Archive for 四月, 2015

Python Memory Management

One of the major challenges in writing (somewhat) large-scale Python programs is to keep memory usage at a minimum. However, managing memory in Python is easy—if you just don’t care. Python allocates memory transparently, manages objects using a reference count system, and frees memory when an object’s reference count falls to zero. In theory, it’s swell. In practice, you need to know a few things about Python memory management to get a memory-efficient program running. One of the things you should know, or at least get a good feel about, is the sizes of basic Python objects. Another thing is how Python manages its memory internally.

So let us begin with the size of basic objects. In Python, there’s not a lot of primitive data types: there are ints, longs (an unlimited precision version of ints), floats (which are doubles), tuples, strings, lists, dictionaries, and classes.

Basic Objects

What is the size of int? A programmer with a C or C++ background will probably guess that the size of a machine-specificint is something like 32 bits, maybe 64; and that therefore it occupies at most 8 bytes. But is that so in Python?

Let us first write a function that shows the sizes of objects (recursively if necessary):

import sys

def show_sizeof(x, level=0):

    print "\t" * level, x.__class__, sys.getsizeof(x), x

    if hasattr(x, '__iter__'):
        if hasattr(x, 'items'):
            for xx in x.items():
                show_sizeof(xx, level + 1)
        else:
            for xx in x:
                show_sizeof(xx, level + 1)

We can now use the function to inspect the sizes of the different basic data types:

show_sizeof(None)
show_sizeof(3)
show_sizeof(2**63)
show_sizeof(102947298469128649161972364837164)
show_sizeof(918659326943756134897561304875610348756384756193485761304875613948576297485698417)

If you have a 32-bit 2.7x Python, you’ll see:

8 None
12 3
22 9223372036854775808
28 102947298469128649161972364837164
48 918659326943756134897561304875610348756384756193485761304875613948576297485698417

and if you have a 64-bit 2.7x Python, you’ll see:

16 None
24 3
36 9223372036854775808
40 102947298469128649161972364837164
60 918659326943756134897561304875610348756384756193485761304875613948576297485698417

Let us focus on the 64-bit version (mainly because that’s what we need the most often in our case). None takes 16 bytes.int takes 24 bytes, three times as much memory as a C int64_t, despite being some kind of “machine-friendly” integer. Long integers (unbounded precision), used to represent integers larger than 263-1, have a minimum size of 36 bytes. Then it grows linearly in the logarithm of the integer represented.

Python’s floats are implementation-specific but seem to be C doubles. However, they do not eat up only 8 bytes:

show_sizeof(3.14159265358979323846264338327950288)

Outputs

16 3.14159265359

on a 32-bit platform and

24 3.14159265359

on a 64-bit platform. That’s again, three times the size a C programmer would expect. Now, what about strings?

show_sizeof("")
show_sizeof("My hovercraft is full of eels")

outputs, on a 32 bit platform:

21
50 My hovercraft is full of eels

and

37
66 My hovercraft is full of eels

An empty string costs 37 bytes in a 64-bit environment! Memory used by string then linearly grows in the length of the (useful) string.

* * *

Other structures commonly used, tuples, lists, and dictionaries are worthwhile to examine. Lists (which are implemented as array lists, not as linked lists, with everything it entails) are arrays of references to Python objects, allowing them to be heterogeneous. Let us look at our sizes:

show_sizeof([])
show_sizeof([4, "toaster", 230.1])

outputs

32 []
44 [4, 'toaster', 230.1]

on a 32-bit platform and

72 []
96 [4, 'toaster', 230.1]

on a 64-bit platform. An empty list eats up 72 bytes. The size of an empty, 64-bit C++ std::list() is only 16 bytes, 4-5 times less. What about tuples? (and dictionaries?):

show_sizeof({})
show_sizeof({'a':213, 'b':2131})

outputs, on a 32-bit box

136 {}
 136 {'a': 213, 'b': 2131}
        32 ('a', 213)
                22 a
                12 213
        32 ('b', 2131)
                22 b
                12 2131

and

280 {}
 280 {'a': 213, 'b': 2131}
        72 ('a', 213)
                38 a
                24 213
        72 ('b', 2131)
                38 b
                24 2131

for a 64-bit box.

This last example is particularly interesting because it “doesn’t add up.” If we look at individual key/value pairs, they take 72 bytes (while their components take 38+24=62 bytes, leaving 10 bytes for the pair itself), but the dictionary takes 280 bytes (rather than a strict minimum of 144=72×2 bytes). The dictionary is supposed to be an efficient data structure for search and the two likely implementations will use more space that strictly necessary. If it’s some kind of tree, then we should pay the cost of internal nodes that contain a key and two pointers to children nodes; if it’s a hash table, then we must have some room with free entries to ensure good performance.

The (somewhat) equivalent std::map C++ structure takes 48 bytes when created (that is, empty). An empty C++ string takes 8 bytes (then allocated size grows linearly the size of the string). An integer takes 4 bytes (32 bits).

* * *

Why does all this matter? It seems that whether an empty string takes 8 bytes or 37 doesn’t change anything much. That’s true. That’s true until you need to scale. Then, you need to be really careful about how many objects you create to limit the quantity of memory your program uses. It is a problem in real-life applications. However, to devise a really good strategy about memory management, we must not only consider the sizes of objects, but how many and in which order they are created. It turns out to be very important for Python programs. One key element to understand is how Python allocates its memory internally, which we will discuss next.

Internal Memory Management

To speed-up memory allocation (and reuse) Python uses a number of lists for small objects. Each list will contain objects of similar size: there will be a list for objects 1 to 8 bytes in size, one for 9 to 16, etc. When a small object needs to be created, either we reuse a free block in the list, or we allocate a new one.

There are some internal details on how Python manages those lists into blocks, pools, and “arena”: a number of block forms a pool, pools are gathered into arena, etc., but they’re not very relevant to the point we want to make (if you really want to know, read Evan Jones’ ideas on how to improve Python’s memory allocation). The important point is that those lists never shrink.

Indeed: if an item (of size x) is deallocated (freed by lack of reference) its location is not returned to Python’s global memory pool (and even less to the system), but merely marked as free and added to the free list of items of size x. The dead object’s location will be reused if another object of compatible size is needed. If there are no dead objects available, new ones are created.

If small objects memory is never freed, then the inescapable conclusion is that, like goldfishes, these small object lists only keep growing, never shrinking, and that the memory footprint of your application is dominated by the largest number of small objects allocated at any given point.

* * *

Therefore, one should work hard to allocate only the number of small objects necessary for one task, favoring (otherwiseunpythonèsque) loops where only a small number of elements are created/processed rather than (more pythonèsque) patterns where lists are created using list generation syntax then processed.

While the second pattern is more à la Python, it is rather the worst case: you end up creating lots of small objects that will come populate the small object lists, and even once the list is dead, the dead objects (now all in the free lists) will still occupy a lot of memory.

* * *

The fact that the free lists grow does not seem like much of a problem because the memory it contains is still accessible to the Python program. But from the OS’s perspective, your program’s size is the total (maximum) memory allocated to Python. Since Python returns memory to the OS on the heap (that allocates other objects than small objects) only on Windows, if you run on Linux, you can only see the total memory used by your program increase.

* * *

Let us prove my point using memory_profiler, a Python add-on module (which depends on the python-psutil package) byFabian Pedregosa (the module’s github page). This add-on provides the decorator @profile that allows one to monitor one specific function memory usage. It is extremely simple to use. Let us consider the following program:

import copy
import memory_profiler

@profile
def function():
    x = list(range(1000000))  # allocate a big list
    y = copy.deepcopy(x)
    del x
    return y

if __name__ == "__main__":
    function()

invoking

python -m memory_profiler memory-profile-me.py

prints, on a 64-bit computer

Filename: memory-profile-me.py

Line #    Mem usage    Increment   Line Contents
================================================
     4                             @profile
     5      9.11 MB      0.00 MB   def function():
     6     40.05 MB     30.94 MB       x = list(range(1000000)) # allocate a big list
     7     89.73 MB     49.68 MB       y = copy.deepcopy(x)
     8     82.10 MB     -7.63 MB       del x
     9     82.10 MB      0.00 MB       return y

This program creates a list of n=1,000,000 ints (n x 24 bytes = ~23 MB) and an additional list of references (n x 8 bytes = ~7.6 MB), which amounts to a total memory usage of ~31 MB. copy.deepcopy copies both lists, which allocates again ~50 MB (I am not sure where the additional overhead of 50 MB – 31 MB = 19 MB comes from). The interesting part is del x: it deletes x, but the memory usage only decreases by 7.63 MB! This is because del only deletes the reference list, not the actual integer values, which remain on the heap and cause a memory overhead of ~23 MB.

This example allocates in total ~73 MB, which is more than twice the amount of memory needed to store a single list of ~31 MB. You can see that memory can increase surprisingly if you are not careful!

Note that you might get different results on a different platform or with a different python version.

Pickle

On a related note: is pickle wasteful?

Pickle is the standard way of (de)serializing Python objects to file. What is its memory footprint? Does it create extra copies of the data or is it rather smart about it? Consider this short example:

import memory_profiler
import pickle
import random

def random_string():
    return "".join([chr(64 + random.randint(0, 25)) for _ in xrange(20)])

@profile
def create_file():
    x = [(random.random(),
          random_string(),
          random.randint(0, 2 ** 64))
         for _ in xrange(1000000)]

    pickle.dump(x, open('machin.pkl', 'w'))

@profile
def load_file():
    y = pickle.load(open('machin.pkl', 'r'))
    return y

if __name__=="__main__":
    create_file()
    #load_file()

With one invocation to profile the creation of the pickled data, and one invocation to re-read it (you comment out the function not to be called). Using memory_profiler, the creation uses a lot of memory:

Filename: test-pickle.py

Line #    Mem usage    Increment   Line Contents
================================================
     8                             @profile
     9      9.18 MB      0.00 MB   def create_file():
    10      9.33 MB      0.15 MB       x=[ (random.random(),
    11                                      random_string(),
    12                                      random.randint(0,2**64))
    13    246.11 MB    236.77 MB           for _ in xrange(1000000) ]
    14
    15    481.64 MB    235.54 MB       pickle.dump(x,open('machin.pkl','w'))

and re-reading a bit less:

Filename: test-pickle.py

Line #    Mem usage    Increment   Line Contents
================================================
    18                             @profile
    19      9.18 MB      0.00 MB   def load_file():
    20    311.02 MB    301.83 MB       y=pickle.load(open('machin.pkl','r'))
    21    311.02 MB      0.00 MB       return y

So somehow, pickling is very bad for memory consumption. The initial list takes up more or less 230MB, but pickling it creates an extra 230-something MB worth of memory allocation.

Unpickling, on the other hand, seems fairly efficient. It does create more memory than the original list (300MB instead of 230-something) but it does not double the quantity of allocated memory.

Overall, then, (un)pickling should be avoided for memory-sensitive applications. What are the alternatives? Pickling preserves all the structure of a data structure, so you can recover it exactly from the pickled file at a later time. However, that might not always be needed. If the file is to contain a list as in the example above, then maybe a simple flat, text-based, file format is in order. Let us see what it gives.

A naïve implementation would give:

import memory_profiler
import random
import pickle

def random_string():
    return "".join([chr(64 + random.randint(0, 25)) for _ in xrange(20)])

@profile
def create_file():
    x = [(random.random(),
          random_string(),
          random.randint(0, 2 ** 64))
         for _ in xrange(1000000) ]

    f = open('machin.flat', 'w')
    for xx in x:
        print >>f, xx
    f.close()

@profile
def load_file():
    y = []
    f = open('machin.flat', 'r')
    for line in f:
        y.append(eval(line))
    f.close()
    return y

if __name__== "__main__":
    create_file()
    #load_file()

Creating the file:

Filename: test-flat.py

Line #    Mem usage    Increment   Line Contents
================================================
     8                             @profile
     9      9.19 MB      0.00 MB   def create_file():
    10      9.34 MB      0.15 MB       x=[ (random.random(),
    11                                      random_string(),
    12                                      random.randint(0, 2**64))
    13    246.09 MB    236.75 MB           for _ in xrange(1000000) ]
    14
    15    246.09 MB      0.00 MB       f=open('machin.flat', 'w')
    16    308.27 MB     62.18 MB       for xx in x:
    17                                     print >>f, xx

and reading the file back:

Filename: test-flat.py

Line #    Mem usage    Increment   Line Contents
================================================
    20                             @profile
    21      9.19 MB      0.00 MB   def load_file():
    22      9.34 MB      0.15 MB       y=[]
    23      9.34 MB      0.00 MB       f=open('machin.flat', 'r')
    24    300.99 MB    291.66 MB       for line in f:
    25    300.99 MB      0.00 MB           y.append(eval(line))
    26    301.00 MB      0.00 MB       return y

Memory consumption on writing is now much better. It still creates a lot of temporary small objects (for 60MB’s worth), but it’s not doubling memory usage. Reading is comparable (using only marginally less memory).

This particular example is trivial but it generalizes to strategies where you don’t load the whole thing first then process it but rather read a few items, process them, and reuse the allocated memory. Loading data to a Numpy array, for example, one could first create the Numpy array, then read the file line by line to fill the array: this allocates one copy of the whole data. Using pickle, you would allocate the whole data (at least) twice: once by pickle, and once through Numpy.

Or even better yet: use Numpy (or PyTables) arrays. But that’s a different topic. In the mean time, you can have a look atloading and saving another tutorial in the Theano/doc/tutorial directory.

* * *

Python design goals are radically different than, say, C design goals. While the latter is designed to give you good control on what you’re doing at the expense of more complex and explicit programming, the former is designed to let you code rapidly while hiding most (if not all) of the underlying implementation details. While this sounds nice, in a production environment ignoring the implementation inefficiencies of a language can bite you hard, and sometimes when it’s too late. I think that having a good feel of how inefficient Python is with memory management (by design!) will play an important role in whether or not your code meets production requirements, scales well, or, on the contrary, will be a burning hell of memory.

 

zz from: http://deeplearning.net/software/theano/tutorial/python-memory-management.html

虽然spark的资料不少,但其内核与原理介绍不多,而pyspark原理说明的就更少了。所以项目里遇到的问题,只能自己分析解决了。最近比较关注的一个问题是在broadcast大词典时,内存消耗非常大,而driver端java进程的内存用量更是达到python的2倍+。为什么呢?为了回答这个问题,还是得先搞清楚pyspark与spark的集成方式,因为一个看起来简单的spark map、reduce、broadcast,其中包含了非常多的进程、网络、线程交互,以及序列化、反序列化、压缩等等。

首先,看一下整合了pyspark后的spark runtime architecure,但在此之前,得先回顾一下没有python时简单的情况:

一个stage对应的tasks,从driver端传送至master,尤其选择合适的worker node,将tasks下发给Executor执行,结果直接返回给Driver Program。(根据不同的master方式,实现细节可能有区别)

那么当添加了python以后呢?可能是为了保持spark核心代码的精简以及用统一的模式未来适配到更多语言,pyspark的实现者选择了尽量不改变交互协议下的外围封装,这也造成非常多的python与java进程间的交互。看一下修改后的架构图:

driver与worker端都有所修改,其中白色是新增的python进程,其通过py4j开源包、socket、local file system等多种方式,与java进程交互。之所以有多种交互方式,分别是为了应对不同的使用场景。例如py4j是为了python调用scala方法;直接的socket用于java主动发起与python的交互;file system用于交互大量数据,例如broadcast的值。

有了以上的基本知识后,再来看交互序列图:

 

  • spark-submit your-app.py 启动spark job,由于是python脚本,故由PythonRunner处理(注意,它是scala代码),它首先启动py4j的socket server,然后fork 子进程执行python your-app.py
  • python解析your-app.py,计算执行的rdd族谱关系等,并通过py4j socket调用scala,生成PythonRDD
  • driver上的scala进程,处理PythonRDD,并将任务提交给master
  • master根据不同的策略,选择worker node,发起launchExecutor命令给后者
  • worker node上的java slave程序,处理launchExecutor命令,最主要的是复用或fork出新的python进程(先忽略其中与daemon.py的交互,下面再说),并与python进程间建立socket连接
  • worker node上的python worker进程,从socket接收job信息,开始执行,并将结果通过socket返回给java worker进程
  • java worker进程通过网络将结果返回给driver上的java进程
  • driver的java 进程,再返回给driver的python进程

以上忽略了很多容错、监控、数据统计的细节,但可以看到已经比较复杂了。

再来看一下daemon.py的作用,由于父子进程会COW复制内存,如果在一个已经分配了大内存的java进程上fork子python进程的话,会很容易造成OOM。所以spark的做法是尽量在java进程还没有消耗很多内存的开始阶段,就fork出一个pyspark/daemon.py子进程,后续由它再按需fork python worker进程。当然这是对unix-like(包括linux)系统有效,windows无法享受该红利。

以上涉及的细节很多,得看代码才可以真正了解,列举关键性代码路径如下,由于master没有看,所以欠奉:

  • Driver端
    • core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
    • python/pyspark/rdd.py
    • core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
    • core/src/main/scala/org/apache/spark/rdd/RDD.scala
    • core/src/main/scala/org/apache/spark/SparkContext.scala
    • core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
  • Worker端
    • core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
    • core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala->compute()
    • core/src/main/scala/org/apache/spark/SparkEnv.scala->createPythonWorker()
    • python/pyspark/daemon.py
    • python/pyspark/worker.py

关于broadcast时为什么消耗内存过多的问题,您有答案了嘛?后续将单独成文予以介绍 haha!

使用方式

driver程序里,如此分发:b_data = sc.broadcast(value),其中value需要是已经读取到driver程序里的变量,而不能是RDD。

在worker程序里,会通过b_data.value访问该数据副本。

driver程序里,在broadcast的数据被用完后,应该尽快清除:b_data.unpersist()

原理

以下分析基于github里的spark 1.2版本,由于spark目前其实还没有稳定下来,所以版本与版本间可能有比较大的区别。以rdd.collect()为例,实现方式就很不相同。

为了理解python封装的broadcast原理,还需要了解一些spark internal知识,其architecture如上图所示。

Driver端进程级设计

driver所在服务器运行了python进程和java进程,其中java进程作为python的“subprocess”存在,两者间通过socket交互。

java子进程在python/pyspark/java_gateway.py里初始化,首先通过subprocess.Popen调用exec ./bin/spark-submit脚本(子进程),间接启动java“孙子”进程。并将Popen的stdout, stderr参数都设置为PIPE,从而自动在python父进程和shell子进程间建立pipe通道,从而采集shell运行期间的输出,并通过EchoOutputThread子线程将其打印到stderr(注意,这时的PIPE是无法用来在python和java间通信的)

java子进程启动后,会通过spark-submit将自己启动的socket监听端口 返回给python进程,可以通过以下命令模拟:

$IS_SUBPROCESS=1 ./bin/spark-submit pyspark-shell

51383

……

python进程接收该port后,会通过py4j的JavaGateway在python的VM和jvm间建立socket交互通道。其提供了一些便利的交互/调用方法,例如:The `jvm` field of `JavaGateway` enables user to access classes, static members (fields and methods) and call constructors.

Worker端进程级设计

从sbin/start-slave.sh可以看到,worker进程是由org.apache.spark.deploy.worker.Worker作为入口的,其基于akka包进行网络请求的管理,通过receiveWithLogging属性定义了case handler。其中LaunchExecutor是启动executor进程的,即执行driver发起的RDD请求。处理流程细节后面会详细说明。

task执行请求首先是发送到worker上的java/scala进程的,所有的python任务都会被映射为PythonRDD(core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala),所以该类是理解worker原理的关键。其构造参数中包含broadcastVars。

由于父子进程会COW复制内存,如果在一个已经分配了大内存的java进程上fork子python进程的话,会很容易造成OOM。所以spark的做法是在java进程还没有消耗很多内存的开始阶段,就通过pyspark/daemon.py脚本fork一个python daemon进程,后面真正的python worker都是由java进程与该python daemon发起请求,由后者再fork出python子worker进程的。当然这是对unix-like(包括linux)系统有效,windows无法享受该红利。

基于以上思路,在PythonRDD.compute()方法里,val worker: Socket = env.createPythonWorker(pythonExec, envVars.toMap)生成python worker。该方法先检查有没有之前fork出的idle worker,如果有就直接用了,否则就通过python daemon fork一个可用的worker出来。

Driver与worker的交互流程

那么,一个类似sc.textFile(…).map(func1).reduce(func2)的句子,会被如何处理呢?过程中,会产生多少个PythonRDD呢?

首先来细致看下map与reduce在driver形成tasks的过程:


从上图(白色是python环境,灰色是scala环境)可以看到,map时并未真正发起计算请求,而仅是生成了一个PipelinedRDD对象。在reduce时,会将map和reduce计算所需的环境(包括broadcast vars)和参数等组装成为JavaRDD,最终还是复用scala rdd的调度通道来发起集群计算请求的。而scala接收到返回值后,是通过本地socket将数据推送给python进程的(有的版本里是通过临时文件来交互的)。

另外,需要关注的是,在PipelinedRDD._jrdd property生成的时候,在调用python/pyspark/cloudpickle.py的dumps方法生成pickled_command时,会把一些python相关信息填充进去。一次运行中未序列化的pickled_command内容如下:

(<function pipeline_func at 0x10c0f2758>, None, BatchedSerializer(PickleSerializer(), 1), BatchedSerializer(PickleSerializer(), 0))
cloud dumped (<function pipeline_func at 0x10c0f2758>, None, BatchedSerializer(PickleSerializer(), 1), BatchedSerializer(PickleSerializer(), 0)) set([<module ‘__builtin__’ (built-in)>, <module ‘pyspark.serializers’ from ‘/Users/baidu/Code/spark/spark-1.2.1-bin-hadoop2.4/python/pyspark/serializers.pyc’>, <module ‘pyspark.cloudpickle’ from ‘/Users/baidu/Code/spark/spark-1.2.1-bin-hadoop2.4/python/pyspark/cloudpickle.py’>, <module ‘itertools’ from ‘/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/lib-dynload/itertools.so’>, <module ‘pyspark.rdd’ from ‘/Users/baidu/Code/spark/spark-1.2.1-bin-hadoop2.4/python/pyspark/rdd.py’>])

 

当task发送到worker进程,触发了LaunchExecutor分支,处理流程如下图所示:

 Broadcast操作driver端流程

通过pyspack.SparkContext.broadcast()发起broadcast调用时,会间接调用pyspark.Broadcast类的init方法:

    def __init__(self, sc=None, value=None, pickle_registry=None, path=None):

“””

Should not be called directly by users — use L{SparkContext.broadcast()}

instead.

“””

if sc is not None:

            f = NamedTemporaryFile(delete=False, dir=sc._temp_dir)  # 生成随机文件句柄

            self._path = self.dump(value, f)        # 将value序列化并写入文件,返回文件路径名称

            self._jbroadcast = sc._jvm.PythonRDD.readBroadcastFromFile(sc._jsc, self._path)

            self._pickle_registry = pickle_registry

else:

self._jbroadcast = None

self._path = path

其中,readBroadcastFromFile方法,实际通过core/src/main/scala/org/apache/spark/SparkContext.scala的broadcast方法,将广播的数据常量进行注册。

 

 如上图所示,在此过程中,python与java进程间,实际上是通过临时文件来进行broadcast数据交互的,而java进程在从文件读取该数据之后,还需要两次数据写入,如core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala的writeBlocks办法所示,首先将未分块的数据写入到driver的blockmanager里,随后再将数据分块(blockify)并再次写入blockmanager!两次写入的原因如下代码注释所示,而个人感觉这里对I/O和内存的开销有些过大,尤其是python交互的设计显得过于粗糙。但也部分符合spark用内存空间换时间的思路。这里也可以猜测,driver java进程在broadcast上的内存消耗可能是python进程的2倍。

  /**

* Divide the object into multiple blocks and put those blocks in the block manager.

* @param value the object to divide

* @return number of blocks this broadcast variable is divided into

*/

private def writeBlocks(value: T): Int = {

    // Store a copy of the broadcast variable in the driver so that tasks run on the driver                             

    // do not create a duplicate copy of the broadcast variable’s value.

SparkEnv.get.blockManager.putSingle(broadcastId, value, StorageLevel.MEMORY_AND_DISK,

tellMaster = false)

val blocks =

TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec)

blocks.zipWithIndex.foreach { case (block, i) =>

SparkEnv.get.blockManager.putBytes(

BroadcastBlockId(id, “piece” + i),

block,

StorageLevel.MEMORY_AND_DISK_SER,

tellMaster = true)

}

blocks.length

}

Broadcast操作worker端流程

 

性能关注

1. 对driver的

 

参考资料

https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals

https://docs.python.org/2/library/subprocess.html

http://py4j.sourceforge.net/

http://www.scala-lang.org/docu/files/collections-api/collections_10.html

https://docs.python.org/2.7/library/pickle.html

https://docs.python.org/2/library/itertools.html?highlight=imap#itertools.imap

http://akka.io/