Archive for the ‘未分类’ Category

HDFS read slow解决方法

通过spark的UI界面,我们可以看到多个metrics的分布情况,发现有个stage的75%的duration 20s,但有少量(4个左右)特别慢,达到8min。查看worker的errlog,发现如下日志:

 Python |  copy code |? 
15/05/21 17:48:33 INFO FSInputChecker: read block too slow, expect:512000 cost:513232
15/05/21 17:48:33 WARN FMSClient: DFS Read: org.apache.hadoop.hdfs.protocol.ReadSlowException: read block too slow, expect:512000 cost:513232

可见是由于读取block数据太慢导致的,我们的block大小是256MB,期望的读取时间是512000ms,即8.5min,而最终使用了513s,与慢task耗时一致。这个值是内部HDFS版本控制的,但开源届可能也有类似功能。通过修改spark/conf/hadoop-site.xml的配置,调整为5242880,代表期望的下载速度是5MB,如果比这个速度慢就retry其他DataNode节点。调整后,max task duration变为1.3min。在max对应的worker上也看到了slow log,只不过超时时间变短了。



Duration metric里,又发现有一个task,其75%的耗时9min+,GC时间最多2s,每个task处理2.2GB的数据。在不优化内部处理流程的情况下,能否通过增大并发度来提高task处理速度呢?

由于该task由join操作生成,修改其numPartition参数从1k变到2k。但由于driver运行在一个虚拟机上,期间多次发生worker node与其通信失败,导致task反而变慢,并引发retry重算!将该值该为1500,由于每个task处理数据变成1.5GB左右,故处理时间也下降了!


 Python |  copy code |? 
   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
 62603140 388783.337    0.006 388783.337    0.006 {method 'read' of 'file' objects}
 31301070 21929.022    0.001 21929.022    0.001 {cPickle.loads}
 16958730 8177.087    0.000 8177.087    0.000 {_basic_image_lib.flash_decode}
 55610843 1719.077    0.000 1810.741    0.000
 55610843 1088.828    0.000 10036.427    0.000
     1000  664.456    0.664 17152.500   17.152
 55611841  482.160    0.000  482.160    0.000
 55610843  385.117    0.000 14211.060    0.000
     1000  361.858    0.362 411390.715  411.391
 55611841  246.357    0.000 1453.533    0.000
 55610843  234.460    0.000  520.973    0.000
 55610843  232.927    0.000 12084.700    0.000
 55610843  219.946    0.000 1593.674    0.000
 55610843  201.820    0.000  284.889    0.000
 49136554  189.099    0.000  222.585    0.000
 31300074  184.696    0.000 410952.765    0.013
 55431498  182.844    0.000  254.908    0.000
 55611841  180.865    0.000  778.554    0.000
   272005  157.849    0.001  157.860    0.001 {cPickle.dumps}
 16958894  145.608    0.000  351.860    0.000
 55610843  139.166    0.000  590.473    0.000
221930724  138.625    0.000  210.047    0.000 {hasattr}
476047721  125.609    0.000  125.609    0.000
287337626  124.806    0.000  223.212    0.000 {len}
 55610843  111.992    0.000  157.770    0.000
 31300074  107.931    0.000 410768.069    0.013

Python Memory Management

One of the major challenges in writing (somewhat) large-scale Python programs is to keep memory usage at a minimum. However, managing memory in Python is easy—if you just don’t care. Python allocates memory transparently, manages objects using a reference count system, and frees memory when an object’s reference count falls to zero. In theory, it’s swell. In practice, you need to know a few things about Python memory management to get a memory-efficient program running. One of the things you should know, or at least get a good feel about, is the sizes of basic Python objects. Another thing is how Python manages its memory internally.

So let us begin with the size of basic objects. In Python, there’s not a lot of primitive data types: there are ints, longs (an unlimited precision version of ints), floats (which are doubles), tuples, strings, lists, dictionaries, and classes.

Basic Objects

What is the size of int? A programmer with a C or C++ background will probably guess that the size of a machine-specificint is something like 32 bits, maybe 64; and that therefore it occupies at most 8 bytes. But is that so in Python?

Let us first write a function that shows the sizes of objects (recursively if necessary):

import sys

def show_sizeof(x, level=0):

    print "\t" * level, x.__class__, sys.getsizeof(x), x

    if hasattr(x, '__iter__'):
        if hasattr(x, 'items'):
            for xx in x.items():
                show_sizeof(xx, level + 1)
            for xx in x:
                show_sizeof(xx, level + 1)

We can now use the function to inspect the sizes of the different basic data types:


If you have a 32-bit 2.7x Python, you’ll see:

8 None
12 3
22 9223372036854775808
28 102947298469128649161972364837164
48 918659326943756134897561304875610348756384756193485761304875613948576297485698417

and if you have a 64-bit 2.7x Python, you’ll see:

16 None
24 3
36 9223372036854775808
40 102947298469128649161972364837164
60 918659326943756134897561304875610348756384756193485761304875613948576297485698417

Let us focus on the 64-bit version (mainly because that’s what we need the most often in our case). None takes 16 takes 24 bytes, three times as much memory as a C int64_t, despite being some kind of “machine-friendly” integer. Long integers (unbounded precision), used to represent integers larger than 263-1, have a minimum size of 36 bytes. Then it grows linearly in the logarithm of the integer represented.

Python’s floats are implementation-specific but seem to be C doubles. However, they do not eat up only 8 bytes:



16 3.14159265359

on a 32-bit platform and

24 3.14159265359

on a 64-bit platform. That’s again, three times the size a C programmer would expect. Now, what about strings?

show_sizeof("My hovercraft is full of eels")

outputs, on a 32 bit platform:

50 My hovercraft is full of eels


66 My hovercraft is full of eels

An empty string costs 37 bytes in a 64-bit environment! Memory used by string then linearly grows in the length of the (useful) string.

* * *

Other structures commonly used, tuples, lists, and dictionaries are worthwhile to examine. Lists (which are implemented as array lists, not as linked lists, with everything it entails) are arrays of references to Python objects, allowing them to be heterogeneous. Let us look at our sizes:

show_sizeof([4, "toaster", 230.1])


32 []
44 [4, 'toaster', 230.1]

on a 32-bit platform and

72 []
96 [4, 'toaster', 230.1]

on a 64-bit platform. An empty list eats up 72 bytes. The size of an empty, 64-bit C++ std::list() is only 16 bytes, 4-5 times less. What about tuples? (and dictionaries?):

show_sizeof({'a':213, 'b':2131})

outputs, on a 32-bit box

136 {}
 136 {'a': 213, 'b': 2131}
        32 ('a', 213)
                22 a
                12 213
        32 ('b', 2131)
                22 b
                12 2131


280 {}
 280 {'a': 213, 'b': 2131}
        72 ('a', 213)
                38 a
                24 213
        72 ('b', 2131)
                38 b
                24 2131

for a 64-bit box.

This last example is particularly interesting because it “doesn’t add up.” If we look at individual key/value pairs, they take 72 bytes (while their components take 38+24=62 bytes, leaving 10 bytes for the pair itself), but the dictionary takes 280 bytes (rather than a strict minimum of 144=72×2 bytes). The dictionary is supposed to be an efficient data structure for search and the two likely implementations will use more space that strictly necessary. If it’s some kind of tree, then we should pay the cost of internal nodes that contain a key and two pointers to children nodes; if it’s a hash table, then we must have some room with free entries to ensure good performance.

The (somewhat) equivalent std::map C++ structure takes 48 bytes when created (that is, empty). An empty C++ string takes 8 bytes (then allocated size grows linearly the size of the string). An integer takes 4 bytes (32 bits).

* * *

Why does all this matter? It seems that whether an empty string takes 8 bytes or 37 doesn’t change anything much. That’s true. That’s true until you need to scale. Then, you need to be really careful about how many objects you create to limit the quantity of memory your program uses. It is a problem in real-life applications. However, to devise a really good strategy about memory management, we must not only consider the sizes of objects, but how many and in which order they are created. It turns out to be very important for Python programs. One key element to understand is how Python allocates its memory internally, which we will discuss next.

Internal Memory Management

To speed-up memory allocation (and reuse) Python uses a number of lists for small objects. Each list will contain objects of similar size: there will be a list for objects 1 to 8 bytes in size, one for 9 to 16, etc. When a small object needs to be created, either we reuse a free block in the list, or we allocate a new one.

There are some internal details on how Python manages those lists into blocks, pools, and “arena”: a number of block forms a pool, pools are gathered into arena, etc., but they’re not very relevant to the point we want to make (if you really want to know, read Evan Jones’ ideas on how to improve Python’s memory allocation). The important point is that those lists never shrink.

Indeed: if an item (of size x) is deallocated (freed by lack of reference) its location is not returned to Python’s global memory pool (and even less to the system), but merely marked as free and added to the free list of items of size x. The dead object’s location will be reused if another object of compatible size is needed. If there are no dead objects available, new ones are created.

If small objects memory is never freed, then the inescapable conclusion is that, like goldfishes, these small object lists only keep growing, never shrinking, and that the memory footprint of your application is dominated by the largest number of small objects allocated at any given point.

* * *

Therefore, one should work hard to allocate only the number of small objects necessary for one task, favoring (otherwiseunpythonèsque) loops where only a small number of elements are created/processed rather than (more pythonèsque) patterns where lists are created using list generation syntax then processed.

While the second pattern is more à la Python, it is rather the worst case: you end up creating lots of small objects that will come populate the small object lists, and even once the list is dead, the dead objects (now all in the free lists) will still occupy a lot of memory.

* * *

The fact that the free lists grow does not seem like much of a problem because the memory it contains is still accessible to the Python program. But from the OS’s perspective, your program’s size is the total (maximum) memory allocated to Python. Since Python returns memory to the OS on the heap (that allocates other objects than small objects) only on Windows, if you run on Linux, you can only see the total memory used by your program increase.

* * *

Let us prove my point using memory_profiler, a Python add-on module (which depends on the python-psutil package) byFabian Pedregosa (the module’s github page). This add-on provides the decorator @profile that allows one to monitor one specific function memory usage. It is extremely simple to use. Let us consider the following program:

import copy
import memory_profiler

def function():
    x = list(range(1000000))  # allocate a big list
    y = copy.deepcopy(x)
    del x
    return y

if __name__ == "__main__":


python -m memory_profiler

prints, on a 64-bit computer


Line #    Mem usage    Increment   Line Contents
     4                             @profile
     5      9.11 MB      0.00 MB   def function():
     6     40.05 MB     30.94 MB       x = list(range(1000000)) # allocate a big list
     7     89.73 MB     49.68 MB       y = copy.deepcopy(x)
     8     82.10 MB     -7.63 MB       del x
     9     82.10 MB      0.00 MB       return y

This program creates a list of n=1,000,000 ints (n x 24 bytes = ~23 MB) and an additional list of references (n x 8 bytes = ~7.6 MB), which amounts to a total memory usage of ~31 MB. copy.deepcopy copies both lists, which allocates again ~50 MB (I am not sure where the additional overhead of 50 MB – 31 MB = 19 MB comes from). The interesting part is del x: it deletes x, but the memory usage only decreases by 7.63 MB! This is because del only deletes the reference list, not the actual integer values, which remain on the heap and cause a memory overhead of ~23 MB.

This example allocates in total ~73 MB, which is more than twice the amount of memory needed to store a single list of ~31 MB. You can see that memory can increase surprisingly if you are not careful!

Note that you might get different results on a different platform or with a different python version.


On a related note: is pickle wasteful?

Pickle is the standard way of (de)serializing Python objects to file. What is its memory footprint? Does it create extra copies of the data or is it rather smart about it? Consider this short example:

import memory_profiler
import pickle
import random

def random_string():
    return "".join([chr(64 + random.randint(0, 25)) for _ in xrange(20)])

def create_file():
    x = [(random.random(),
          random.randint(0, 2 ** 64))
         for _ in xrange(1000000)]

    pickle.dump(x, open('machin.pkl', 'w'))

def load_file():
    y = pickle.load(open('machin.pkl', 'r'))
    return y

if __name__=="__main__":

With one invocation to profile the creation of the pickled data, and one invocation to re-read it (you comment out the function not to be called). Using memory_profiler, the creation uses a lot of memory:


Line #    Mem usage    Increment   Line Contents
     8                             @profile
     9      9.18 MB      0.00 MB   def create_file():
    10      9.33 MB      0.15 MB       x=[ (random.random(),
    11                                      random_string(),
    12                                      random.randint(0,2**64))
    13    246.11 MB    236.77 MB           for _ in xrange(1000000) ]
    15    481.64 MB    235.54 MB       pickle.dump(x,open('machin.pkl','w'))

and re-reading a bit less:


Line #    Mem usage    Increment   Line Contents
    18                             @profile
    19      9.18 MB      0.00 MB   def load_file():
    20    311.02 MB    301.83 MB       y=pickle.load(open('machin.pkl','r'))
    21    311.02 MB      0.00 MB       return y

So somehow, pickling is very bad for memory consumption. The initial list takes up more or less 230MB, but pickling it creates an extra 230-something MB worth of memory allocation.

Unpickling, on the other hand, seems fairly efficient. It does create more memory than the original list (300MB instead of 230-something) but it does not double the quantity of allocated memory.

Overall, then, (un)pickling should be avoided for memory-sensitive applications. What are the alternatives? Pickling preserves all the structure of a data structure, so you can recover it exactly from the pickled file at a later time. However, that might not always be needed. If the file is to contain a list as in the example above, then maybe a simple flat, text-based, file format is in order. Let us see what it gives.

A naïve implementation would give:

import memory_profiler
import random
import pickle

def random_string():
    return "".join([chr(64 + random.randint(0, 25)) for _ in xrange(20)])

def create_file():
    x = [(random.random(),
          random.randint(0, 2 ** 64))
         for _ in xrange(1000000) ]

    f = open('machin.flat', 'w')
    for xx in x:
        print >>f, xx

def load_file():
    y = []
    f = open('machin.flat', 'r')
    for line in f:
    return y

if __name__== "__main__":

Creating the file:


Line #    Mem usage    Increment   Line Contents
     8                             @profile
     9      9.19 MB      0.00 MB   def create_file():
    10      9.34 MB      0.15 MB       x=[ (random.random(),
    11                                      random_string(),
    12                                      random.randint(0, 2**64))
    13    246.09 MB    236.75 MB           for _ in xrange(1000000) ]
    15    246.09 MB      0.00 MB       f=open('machin.flat', 'w')
    16    308.27 MB     62.18 MB       for xx in x:
    17                                     print >>f, xx

and reading the file back:


Line #    Mem usage    Increment   Line Contents
    20                             @profile
    21      9.19 MB      0.00 MB   def load_file():
    22      9.34 MB      0.15 MB       y=[]
    23      9.34 MB      0.00 MB       f=open('machin.flat', 'r')
    24    300.99 MB    291.66 MB       for line in f:
    25    300.99 MB      0.00 MB           y.append(eval(line))
    26    301.00 MB      0.00 MB       return y

Memory consumption on writing is now much better. It still creates a lot of temporary small objects (for 60MB’s worth), but it’s not doubling memory usage. Reading is comparable (using only marginally less memory).

This particular example is trivial but it generalizes to strategies where you don’t load the whole thing first then process it but rather read a few items, process them, and reuse the allocated memory. Loading data to a Numpy array, for example, one could first create the Numpy array, then read the file line by line to fill the array: this allocates one copy of the whole data. Using pickle, you would allocate the whole data (at least) twice: once by pickle, and once through Numpy.

Or even better yet: use Numpy (or PyTables) arrays. But that’s a different topic. In the mean time, you can have a look atloading and saving another tutorial in the Theano/doc/tutorial directory.

* * *

Python design goals are radically different than, say, C design goals. While the latter is designed to give you good control on what you’re doing at the expense of more complex and explicit programming, the former is designed to let you code rapidly while hiding most (if not all) of the underlying implementation details. While this sounds nice, in a production environment ignoring the implementation inefficiencies of a language can bite you hard, and sometimes when it’s too late. I think that having a good feel of how inefficient Python is with memory management (by design!) will play an important role in whether or not your code meets production requirements, scales well, or, on the contrary, will be a burning hell of memory.


zz from:


driver程序里,如此分发:b_data = sc.broadcast(value),其中value需要是已经读取到driver程序里的变量,而不能是RDD。




以下分析基于github里的spark 1.2版本,由于spark目前其实还没有稳定下来,所以版本与版本间可能有比较大的区别。以rdd.collect()为例,实现方式就很不相同。

为了理解python封装的broadcast原理,还需要了解一些spark internal知识,其architecture如上图所示。



java子进程在python/pyspark/java_gateway.py里初始化,首先通过subprocess.Popen调用exec ./bin/spark-submit脚本(子进程),间接启动java“孙子”进程。并将Popen的stdout, stderr参数都设置为PIPE,从而自动在python父进程和shell子进程间建立pipe通道,从而采集shell运行期间的输出,并通过EchoOutputThread子线程将其打印到stderr(注意,这时的PIPE是无法用来在python和java间通信的)

java子进程启动后,会通过spark-submit将自己启动的socket监听端口 返回给python进程,可以通过以下命令模拟:

$IS_SUBPROCESS=1 ./bin/spark-submit pyspark-shell



python进程接收该port后,会通过py4j的JavaGateway在python的VM和jvm间建立socket交互通道。其提供了一些便利的交互/调用方法,例如:The `jvm` field of `JavaGateway` enables user to access classes, static members (fields and methods) and call constructors.


从sbin/start-slave.sh可以看到,worker进程是由org.apache.spark.deploy.worker.Worker作为入口的,其基于akka包进行网络请求的管理,通过receiveWithLogging属性定义了case handler。其中LaunchExecutor是启动executor进程的,即执行driver发起的RDD请求。处理流程细节后面会详细说明。


由于父子进程会COW复制内存,如果在一个已经分配了大内存的java进程上fork子python进程的话,会很容易造成OOM。所以spark的做法是在java进程还没有消耗很多内存的开始阶段,就通过pyspark/daemon.py脚本fork一个python daemon进程,后面真正的python worker都是由java进程与该python daemon发起请求,由后者再fork出python子worker进程的。当然这是对unix-like(包括linux)系统有效,windows无法享受该红利。

基于以上思路,在PythonRDD.compute()方法里,val worker: Socket = env.createPythonWorker(pythonExec, envVars.toMap)生成python worker。该方法先检查有没有之前fork出的idle worker,如果有就直接用了,否则就通过python daemon fork一个可用的worker出来。




从上图(白色是python环境,灰色是scala环境)可以看到,map时并未真正发起计算请求,而仅是生成了一个PipelinedRDD对象。在reduce时,会将map和reduce计算所需的环境(包括broadcast vars)和参数等组装成为JavaRDD,最终还是复用scala rdd的调度通道来发起集群计算请求的。而scala接收到返回值后,是通过本地socket将数据推送给python进程的(有的版本里是通过临时文件来交互的)。

另外,需要关注的是,在PipelinedRDD._jrdd property生成的时候,在调用python/pyspark/cloudpickle.py的dumps方法生成pickled_command时,会把一些python相关信息填充进去。一次运行中未序列化的pickled_command内容如下:

(<function pipeline_func at 0x10c0f2758>, None, BatchedSerializer(PickleSerializer(), 1), BatchedSerializer(PickleSerializer(), 0))
cloud dumped (<function pipeline_func at 0x10c0f2758>, None, BatchedSerializer(PickleSerializer(), 1), BatchedSerializer(PickleSerializer(), 0)) set([<module ‘__builtin__’ (built-in)>, <module ‘pyspark.serializers’ from ‘/Users/baidu/Code/spark/spark-1.2.1-bin-hadoop2.4/python/pyspark/serializers.pyc’>, <module ‘pyspark.cloudpickle’ from ‘/Users/baidu/Code/spark/spark-1.2.1-bin-hadoop2.4/python/pyspark/’>, <module ‘itertools’ from ‘/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/lib-dynload/’>, <module ‘pyspark.rdd’ from ‘/Users/baidu/Code/spark/spark-1.2.1-bin-hadoop2.4/python/pyspark/’>])





    def __init__(self, sc=None, value=None, pickle_registry=None, path=None):


Should not be called directly by users — use L{SparkContext.broadcast()}



if sc is not None:

            f = NamedTemporaryFile(delete=False, dir=sc._temp_dir)  # 生成随机文件句柄

            self._path = self.dump(value, f)        # 将value序列化并写入文件,返回文件路径名称

            self._jbroadcast = sc._jvm.PythonRDD.readBroadcastFromFile(sc._jsc, self._path)

            self._pickle_registry = pickle_registry


self._jbroadcast = None

self._path = path



 如上图所示,在此过程中,python与java进程间,实际上是通过临时文件来进行broadcast数据交互的,而java进程在从文件读取该数据之后,还需要两次数据写入,如core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala的writeBlocks办法所示,首先将未分块的数据写入到driver的blockmanager里,随后再将数据分块(blockify)并再次写入blockmanager!两次写入的原因如下代码注释所示,而个人感觉这里对I/O和内存的开销有些过大,尤其是python交互的设计显得过于粗糙。但也部分符合spark用内存空间换时间的思路。这里也可以猜测,driver java进程在broadcast上的内存消耗可能是python进程的2倍。


* Divide the object into multiple blocks and put those blocks in the block manager.

* @param value the object to divide

* @return number of blocks this broadcast variable is divided into


private def writeBlocks(value: T): Int = {

    // Store a copy of the broadcast variable in the driver so that tasks run on the driver                             

    // do not create a duplicate copy of the broadcast variable’s value.

SparkEnv.get.blockManager.putSingle(broadcastId, value, StorageLevel.MEMORY_AND_DISK,

tellMaster = false)

val blocks =

TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec)

blocks.zipWithIndex.foreach { case (block, i) =>


BroadcastBlockId(id, “piece” + i),



tellMaster = true)







1. 对driver的



以Python Imaging Library(PIL)为例,进行安装。限制条件是:

  • 没有写python lib所在路径的权限
  • 服务器jpeg so包不在默认路径,而是/usr/lib64下

如果按照默认方法安装,会报错:IOError: decoder jpeg not available。因为setup.py时没有关联到其依赖的图片库。

1. 下载源代码包,并解压

在 选择相应版本,我选择的是Python Imaging Library 1.1.7 Source Kit (all platforms) (November 15, 2009)

wget “”; tar -xzvf  Imaging-1.1.7.tar.gz; cd Imaging-1.1.7

2. 修改,使其能够找到libjpeg.so包

# Use None to look for the libraries in well-known library locations.
# Use a string to specify a single directory, for both the library and
# the include files. Use a tuple to specify separate directories:
# (libpath, includepath).

JPEG_ROOT = (“/usr/lib64″, “/usr/include”)

3. 指定路径安装(注意,该路径随意,只要你有读写权限即可)

python install –home /path/to/your/pythonlib/


*** TKINTER support not available
— JPEG support available
— ZLIB (PNG/ZIP) support available
— FREETYPE2 support available
*** LITTLECMS support not available

4. 将pythonlib路径加入PYTHONPATH,注意要深入到lib/python里

$ tail -1 ~/.bash_profile

export PYTHONPATH=/path/to/your/pythonlib/lib/python/:$PYTHONPATH

completed jobs 日志


No event logs were found for this application! To enable event logging, set spark.eventLog.enabled to true and spark.eventLog.dir to the directory to which your event logs are written.

当设置spark UI配置的spark.eventLog.enabled为True,并且设置spark.eventLog.dir为本地文件路径后,点击进去,变成了:

No event logs found for application chengyi02_test_load_1GB in file:/home/users/chengyi02/spark-resource/test_perf/log//chengyi02_test_load_1gb-1427419993180. Did you specify the correct logging directory?

原因是,我的本地路径当然是无法被history server读取的了。不过这时查看该路径下,已经有日志了,看起来像json格式。

再次修改,把spark.eventLog.dir改为hdfs路径,运行spark报错。经与管理员沟通,得知是由于该spark集群的history server没有开导致的

15/03/27 09:50:32 WARN ServletHandler: /stages/


        at org.apache.spark.SparkContext.getAllPools(SparkContext.scala:892)


        at org.apache.spark.ui.WebUI$$anonfun$attachPage$1.apply(WebUI.scala:68)

        at org.apache.spark.ui.WebUI$$anonfun$attachPage$1.apply(WebUI.scala:68)

        at org.apache.spark.ui.JettyUtils$$anon$1.doGet(JettyUtils.scala:70)

        at javax.servlet.http.HttpServlet.service(

        at javax.servlet.http.HttpServlet.service(

        at org.eclipse.jetty.servlet.ServletHolder.handle(

        at org.eclipse.jetty.servlet.ServletHandler.doHandle(

        at org.eclipse.jetty.server.handler.ContextHandler.doHandle(

        at org.eclipse.jetty.servlet.ServletHandler.doScope(

        at org.eclipse.jetty.server.handler.ContextHandler.doScope(

        at org.eclipse.jetty.server.handler.ScopedHandler.handle(


partitionBy导致的too many value to unpack


第一次测试:60GB的数据,1108个文件,大小从20m-100m+不等。 max cores我设置为160,启动了1108个Tasks,貌似只用了5台服务器。 读入的耗时1.3min,写入的耗时我执行了两次,一次1.75min,一次8min(被hang在一个task上了)

第二次测试:max cores提升为1000,并设置spark.speculation为True,以便re-lauch slow tasks。 读入耗时提升为29s,写入耗时提升为41s。效果比较明显!由于文件数量没有变化,故仍然是1108个Tasks,但用了32台服务器。

第三次测试:max cores提升为1000,并设置spark.speculation为True。把文件大小统一为165M左右,共计359个文件,60GB数据。读入耗时40s,写入耗时55s。由于tasks数降低了,所以整体耗时反而下降。

第四次测试:max cores恢复为160,也不设置spark.speculation。把文件大小统一为165M左右,共计359个文件,60GB数据。读入耗时51s,写入耗时442s(被2个tasks hang住了)。所以设置spark.speculation还是非常重要的!

第五次测试:3.TB的数据,15846个文件,大小从27M-260MB不等,max cores设置为1w。启动时使用了32台服务器,每个上面并发32个tasks,所以整体并发度是1664。后面增加到了50+台。然后就把spark跑到FULL GC了。 org.apache.hadoop.hdfs.FMSClient$RetryableIOException: Could not obtain block: blk_31525197851421116_494327281


第六次测试,3.TB的数据,15846个文件,大小从27M-260MB不等,max cores设置为1w,但spark版本改为1.2。仍然报错:


15/03/27 14:56:56 ERROR DAGScheduler: Failed to update accumulators for ResultTask(0, 4815) Broken pipe

错误类型2: org.apache.hadoop.hdfs.FMSClient$RetryableIOException: Could not obtain block: blk_31525197851421116_494327281

at org.apache.hadoop.hdfs.FMSClient$



第七次测试,spark 1.2。1.3TB数据,共计114个文件,大小从几百字节到57G不等,且大文件居多。max cores设置1w,spark.speculation设置为True。启动了5353个Tasks,读入耗时5.1min!

第八次测试,同第七次,但使用spark 1.1。读入耗时3.6 min


TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory


broadcast an RDD in PySpark

>>>data = sc.textFile(‘/app/ecom/aries/sf_public/chengyi02/uclogin/20141120/cq02-dr-uclogin206.cq02/*’)


报错:py4j.Py4JException: Method __getnewargs__([]) does not exist


解释:broadcast的必须是一个kv对 这个地方的colletAsMap不是做collect操作 只是把那个textFile的RDD转成kv对发出去

读取hdfs sequenceFile

Job aborted due to stage failure: Task 8 in stage 0.0 failed 4 times, most recent failure: Lost task 8.3 in stage 0.0 (TID 47, com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0, required: 64
Driver stacktrace:


SparkDeploySchedulerBackend: Asked to remove non-existent executor

删除 /tmp/spark-*   ,  /tmp/fetchFileTemp*



363 15/03/30 09:51:17 ERROR SparkDeploySchedulerBackend: Application has been killed. Reason: Master removed our application: FAILED
364 15/03/30 09:51:17 ERROR TaskSchedulerImpl: Exiting due to error from cluster scheduler: Master removed our application: FAILED

原因是该job的tasks数量过大,设置为1w,而集群整体的Cores: 2272 Total, 472 Used。所以如果max cores > 2272,就会被杀掉!如果大于unused cores数量,就会先抢占可用的,然后剩余的tasks再排队。


353 15/03/30 09:51:17 INFO AppClient$ClientActor: Executor updated: app-20150330095112-25033/62 is now EXITED (Command exited with code 1)


345 15/03/30 09:51:16 INFO AppClient$ClientActor: Executor added: app-20150330095112-25033/62 on ( with 32 cores

在该spark集群监控页面,找到对应的worker,在Finished Executors里发现有大量failed tasks, 找到我的task,查看其对应的stdout日志:

Error occurred during initialization of VM
Could not reserve enough space for object heap


Out of Memory


15/03/30 17:35:15 WARN DefaultChannelPipeline: An exception was thrown by a user handler while handling an exception event ([id: 0x22dfe421, / => /] EXCEPTION: java.lang.OutOfMemoryError: Java heap space)

java.lang.OutOfMemoryError: Java heap space

at java.lang.Object.clone(Native Method)

at akka.util.CompactByteString$.apply(ByteString.scala:410)

at akka.util.ByteString$.apply(ByteString.scala:22)

at akka.remote.transport.netty.TcpHandlers$class.onMessage(TcpSupport.scala:45)

at akka.remote.transport.netty.TcpServerHandler.onMessage(TcpSupport.scala:57)







spark-submit提交的任务退出,但没有错误提示,history UI 列表里看到任务是finish了,但点击进去发现还有active的job。


15/04/07 13:07:47 ERROR CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://] -> [akka.tcp://] disassociated! Shutting down.
15/04/07 13:07:47 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://] has failed, address is now gated for [5000] ms. Reason is: [Disassociated].

根据 ,修改cf.set(‘spark.akka.heartbeat.interval’, 10000)无效。



lowest memcgroup 19

[ test_perf]$ strace -p 19970
Process 19970 attached – interrupt to quit
futex(0x40e4e9f0, FUTEX_WAIT, 19974, NULL

跑python unittest


SPARK_TESTING=1  /path/to/spark-*-client/bin/pyspark tests/



mac下非常简单:brew install swig。其他平台可以在下载安装应该也不难。


1. 准备一个简单的C文件 palindrome.c:

#include <string.h>



* return: 0 — not palindrome

*         1 — is  palindrome


int is_palindrome(char* text)


int i, n = strlen(text);


for (i=0; i<=n/2; i++) {

if (text[i] != text[n-i-1]) {

return 0;




return 1;


2. 再按照SWIG要求,准备一个.i文件(类似.h) palindrome.i:

%module palindrome



#include <string.h>


extern int is_palindrome(char* text);

3. 调用SWIG生成包装器,会生成两个新文件     palindrome_wrap.c

$ swig -python palindrome.i

4. 编译为so包

gcc -shared -I/System/Library/Frameworks/Python.framework/Versions/2.7/include/python2.7 -L/System/Library/Frameworks/Python.framework/Versions/2.7/ *.c -lpython2.7 -o

(如果不知道python安装路径,可以python -c “import sys;import pprint; pprint.pprint(sys.path);”查看一下)

5. 在python里调用

import _palindrome

print dir(_palindrome)

mystr1 = “nopalindrome”

mystr2 = “ipreferpi”

print _palindrome.is_palindrome(mystr1)

print _palindrome.is_palindrome(mystr2)



1. 交互式


2. 批处理:

/path/to/spark/bin/spark-submit ~/spark-resource/spark-training/spark/examples/src/main/python/


Basic RDDs

element-wise transformations

  • map
  • filter
  • flatMap

pseudo set operations transformations


  • distinct,expensive!
  • union,不去重
  • intersection,会去重,expensive!
  • subtract,expensive!
  • cartesion,expensive!


  • reduce
  • fold
  • aggregate
  • collect,会将所有结果返回到driver,所以结果集需要能保存在一台服务器的内存里。可以作用于list返回list,dict返回keys list。
  • take
  • top
  • takeSample
  • foreach,针对每个element应用func,没有返回值
  • count
  • countByValue
  • takeOrdered
需要注意:不像Scala和Java,除了Base RDD,还有DoubleRDD之类,Python只有base rdd。所有的rdd 方法都作用于base rdd,但如果其内含的数据类型不正确,就直接挂掉了。


  • persist = cache。对应的level是 pyspark.StorageLevel.MEMORY_ONLY_SER and so on
  • unpersist
  • is_cached,属性,不是方法

 Key/Value pairs = pair RDDs

注意,python spark里的pair RDD是指 list(tuples),而非直接处理dict。例如 [ (‘k1′, ‘v1′), (‘k2′, ‘v2′) ]。

create pair RDDs

  • pairs = line: (line.split(” “)[0], line)) ,从一个文本里生成,以第一个word为key
  • data = sc.parallelize( [ (‘k1′, ‘v1′), (‘k2′, ‘v2′) ] ),从in-memory的dict里parallelize生成
  • 直接从文件里读取(TODO

transformations on pair RDDs

  • 上面列举的,可以针对base RDDs的transformations都可以用在pair RDDs上面。Pair RDD每个item是一个tuple(key, val),所以t[0]是key,t[1]是value。
transformations on one pair RDD
  • reduceByKey
  • groupByKey
  • combineByKey, createCombiner是在每个partition首次遇到一个key时被调用的,所以在整个数据集上会多次调用
  • mapValues
  • flatMapValues
  • keys
  • values
  • sortByKey
  • foldByKey

transformations on two pair RDDs

  • subtractByKey
  • join, inner join, only keys that are present in both pair RDDs are output
  • rightOuterJoin
  • leftOuterJoin 
  • cogroup

 actions on pair RDDs

  • 上面列举的可以针对base RDD的actions,也都可以应用于pair RDD
  • countByKey
  • collectAsMap,可以作用于 list( tuple, tuple ), 返回 dict。 不可作用于dict、非tuple的list
  • lookup

针对pari RDDs join操作的调优

  • partitionBy,将大数据集partition到“固定”的服务器上 ,之后再与小数据集join的时候,就不用分发大数据集了。spark会将小数据集按照相同的方式分发过去。是一个transformation,第二个参数partitionFunc可以用来控制如何进行分片。
  • 对于pair RDDs,当不改变key时,尽量使用mapValues和flatMapValues,以保持partition(虽然也可以用map模拟这些操作,但spark不会分析map的func,所以就无法维持partition了)
  • partition会极大影响性能,等在实战中积累经验之后,再来补充(TODO


  • python的saveAsSequenceFile,可以处理的是 tuple的list,例如:sc.parallelize([ (‘k1′, ‘v1′), (‘k2′, ‘v2′) ]).saveAsSequenceFile(‘…’)。如果处理dict、非tuple的list,则会报错:RDD element of type java.lang.String cannot be used。另外,如果传入的keyClass、valueClass与类型不匹配,则会默认被当做string处理。
Output a Python RDD of key-value pairs (of form RDD[(K, V)]) to any Hadoop file system
  1. A Java RDD is created from the SequenceFile or other InputFormat, and the key and value Writable classes
  2. Serialization is attempted via Pyrolite pickling
  3. If this fails, the fallback is to call ‘toString’ on each key and value
  4. PickleSerializer is used to deserialize pickled objects on the Python side
  •  saveAsNewAPIHadoopFile,也是处理list(tuples)。可以用于protobuf格式的数据。

shared variables

  • 可以broadcast list、dict等类型的数据,用b_var.value读取其值
  • action阶段的accumulator 可以保证fault tolerant时只计算一次,transformation阶段的不保证

spark transformations:

  • map(func) , and func is:  mixed func(x)

example: x: (x, 1))  是用python的lambda构造了一个匿名函数,接受map的每一行作为输入x,并返回(x, 1)。所以,这条语句的作用是,针对每一行x,转换为(x, 1)。

  • filter(func), and func is: bool func(x)

example:  f.filter(lambda x: x.find(“spark”) != -1) 是在f对应的RDD里,找包含spark这个单词的行。

  • flatMap(func), and func is: Seq func(x)

example:  f.flatMap(lambda x: x.split(” “)).collect() 是用空格符分隔输入的每一行,这样一个flatMap输入行会对应到多个输出行,所以返回的是Seq。注意,以上语句的output每一行是一个单词,如果改为 x: x.split(” “)).collect() 则output每一行是一个seq(word)。

  • sample(withReplacement, fraction, seed) 该method接受3个参数,分别是bool、float和int型。

example: f.flatMap(lambda x: x.split(” “)).sample(withReplacement = True, fraction = 0.01, seed = 181341).collect()

  • union(otherDataSet), 貌似otherDataSet也需要是一个RDD,而不能是一个普通的array


new_edges = tc.join(edges).map(lambda (_, (a, b)): (b, a))
tc = tc.union(new_edges).distinct().cache()

  • distinct([numTasks]), 去重

example: f.flatMap(lambda x: x.split(” “)).distinct()

  • groupByKey([numTasks]),类似sql的group by,将(k, v) => (k, seq(v))
  • reduceByKey(func, [numTasks]),类似MR的reduce,针对相同key的所有val,循环调用func方法


contribs.reduceByKey(add) // 直接用python add作为func使用了

f.reduceByKey(lambda x, _: x) // 生成了一个匿名函数,作为func使用

pointStats = closest.reduceByKey(lambda (x1, y1), (x2, y2): (x1 + x2, y1 + y2)) // 生成了一个匿名函数,作为func使用

  • sortByKey([ascending], [numTasks])
  • join(otherDataSet, [numTasks])
  • cogroup(otherDataSet, [numTasks])
  • cartesian(otherDataSet),类似php的array_combine

Spark Actions:

  • reduce(func), func is: mixed  func(mixed first, mixed second)
  • collect()
  • count()
  • first(), 等同于调用take(1)
  • take(n),注意:当前非并行处理,而是由driver program自行计算的
  • takeSample(withReplacement, fraction, seed)
  • saveAsTextFile(path),path是directory路径,针对每一个element会调用toString转化为字符串进行写入
  • saveAsSequenceFile(path),path是directory路径,Only available on RDDs of key-value pairs that either implement Hadoop’s Writable interface or are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc).
  • countByKey()
  • foreach(func)

pyspark package

  • parallelize(c, numSlices=None),numSlices控制启动几个tasks并行计算

example:count = sc.parallelize(xrange(1, n+1), slices).map(f).reduce(add)

Python help

>>> help (‘operator’)

>>> help(‘operator.add’)





  1. Kernel ModeIn Kernel mode, the executing code has complete and unrestricted access to the underlying hardware. It can execute any CPU instruction and reference any memory address. Kernel mode is generally reserved for the lowest-level, most trusted functions of the operating system. Crashes in kernel mode are catastrophic; they will halt the entire PC.
  2. User ModeIn User mode, the executing code has no ability to directly access hardware or reference memory. Code running in user mode must delegate to system APIs to access hardware or memory. Due to the protection afforded by this sort of isolation, crashes in user mode are always recoverable. Most of the code running on your computer will execute in user mode.
不同硬件的实现方式可能也不同,x86是通过0-3的4层protection rings硬件来划分的。据称Linux仅使用ring 0作为内核态,ring 3作为用户态,未涉及ring 1-2;而windows中部分drivers会使用ring 1-2。


Typically, there are 2 points of switching:

  1. When calling a System Call: after calling a System Call, the task voluntary calls pieces of code living in Kernel Mode
  2. When an IRQ (or exception) comes: after the IRQ an IRQ handler (or exception handler) is called, then control returns back to the task that was interrupted like nothing was happened.

IRQ全称为Interrupt Request,即是“中断请求”的意思,IRQ的作用就是在我们所用的电脑中,执行硬件中断请求的动作。




  1. 从当前进程的描述符中提取其内核栈的ss0及esp0信息。
  2. 使用ss0和esp0指向的内核栈将当前进程的cs,eip,eflags,ss,esp信息保存起来,这个过程也完成了由用户栈到内核栈的切换过程,同时保存了被暂停执行的程序的下一条指令。
  3. 将先前由中断向量检索得到的中断处理程序的cs,eip信息装入相应的寄存器,开始执行中断处理程序,这时就转到了内核态的程序执行了。


从上面的步骤可以看到,mode switch涉及大量数据的复制,还需要硬件配合,故耗时较大。而在不发生mode switch时,cpu只需顺序执行指令即可。所以应该尽量减少mode switch!


mac 10.10,打印东西的时候总是报:保持以备鉴定  的错误。


1. 在finder界面的 “前往” =》 输入 smb://172.22.2.xx  (即打印机的服务器地址),挂载一个盘

2. 正常配置打印机

3. 打印,并输入用户名和密码,成功

在《HBase:The definitvie Guide》里,Lars George首先从多个dimensions讨论了NoSQL与传统RDBMS的异同。那么HBase在这些纬度上,是如何选型和实现的呢?

Data Model

一般认为Hbase是column-oriented。但是同样在该书里,作者也强调了HBase的column-oriented与RDBMS里的column oriented又有所不同。HBase在存储时,确实是以column family为单位的,但在查询时是以key为基础的。

Note, though,that HBase is not a column-oriented database in the typical RDBMS sense, but utilizes an on-disk column storage format. This is also where the majority of similarities end, because although HBase store data on disk in a column-oriented format, it is distinctly different from traditional columnar databases: whereas columnar databases excel at providing real-time analytical access to data, HBase excels at providing key-based access to a specific cell of data, or a sequential range of cells.

Storage Model

HBase是persistent, 但它其实是一个memory与disk的结合体。增删改时数据先写WAL(可配),再写入memstore,在memtable满了之后,才会flush到持久化存储介质(一般是HDFS,3备份,都可配)的HFile里。而查询时,会merge memstore和多个HFiles的数据。而且,为了避免disk IO的延迟,对于hot data,Region server会把它以block为单位cache起来,也就是说,最好的情况下,有可能你的查询都在内存里完成。

Consistency model

这里的consistency,我理解是指CAP(consistency, availability,partition tolerance)里的一致性,包含了原子性和隔离性两个概念。HBase保证对于一个row-key里多个column families or columns的写入是原子的,但跨row-keys的写入就非原子了。这也意味着,如果有两个clients A和B在访问HBase,A批量写入一些跨row-keys的数据,B在此过程中读取会看到行与行的中间态,但不会有一行数据的中间态。

Physical model

HBase天生就是分布式的(当然为了学习目的,也可以把所有进程都配置在一台服务器上)。它依赖zookeeper协调各种进程角色,例如master与backup master之间的选主,region servers的存活性检查等,以及关键数据的保存,例如root,meta tables等。

在增加新region servers,对用户基本无感,master会在合适的时机(例如load balance 或者splitting),把流量分配到新服务器上。

Read/write performance

HBase支持random access, range scan, sequence access等,由于其数据是按key有序存储的,且存在block cache,所以比较高效的是range scan和sequence access。大量的Random access会使block cache无效,且造成频繁的disk IO,甚至Java heap问题,导致GC,性能会受到影响。


Secondary indexes

HBase不支持secondary indexes。但可以通过row-key,column family,version等的设计,模拟一些secondary indexes的场景。而且在HBase的设计理念里,也可以通过一些反范式的方式,例如冗余存储,提高查询性能,所以,是否一定要依赖secondary indexes也是一件值得商榷的事情。

Failure handling

首先,HBase架构里,master是一个单点,但由于client对数据的ACID不需要经过master,是直接与zookeeper和region servers通信的,所以master故障时,服务仍然可用。

但master的故障,在恢复或容错之前,会导致表结构、集群配置等无法修改。而且load balancing、splitting等操作也无法进行,如果再同时遇到region server的故障,那么也无法恢复等等。所以,master的故障还是非常需要重视的。

它的容错通过backup masters与zookeeper集群配合的选主完成,由于master上没有什么不可恢复的数据,例如region servers配置等是在zookeeper上,每个region server有哪些分片数据,是由region server自己维护并上报的,所以master的切换一般不会造成数据或元数据的丢失。

其次,如果region server故障,由于在逻辑层它是单点,所以短期内其上拥有的分片都是不可访问的(??),但数据不会丢失。

master监测zookeeper 临时节点的变化,感知到region server的故障(不论是硬件还是网络导致的),会选择新的region server来承担它之前的任务。在切换前,需要将故障region server的WAL日志splitting(由master和多个region servers配合完成)和replay(新region server完成,它之前需要从HDFS加载各个分片对应的已持久化的HFiles)。之后新的region server可用。

底层存储层,如果使用的是HDFS,则failure handling是由hdfs保证的。



Load balancing

HMaster定期会启动load balancing(默认5min),检查有没有负载不均衡的情况并予以处理。其理念比较简单,就是每个region servers平均分配regions。

元数据的修改,应该比较好办,耗时也较小。但假设A region server负载重,需要把一些regions移走,那么在此之前,它需要flush WAL,flush memstore,如果本身负载就重,再做这些事情,也比较麻烦吧?HBase有专门的config限制balancing的耗时,默认是2.5min。


Atomic read-modify-write

在上面consistent里,已经介绍了HBase的row层级的事务。同时,HBase还提供Compare and set(CAS)操作。例如checkAndPut、checkAndDelete,针对相同的rowkey,先check是否等于指定值,然后再put or delete。

HBase的增删改操作,如前所述,都需要先写optional的WAL,再写memstore。猜测memstore是可以用row级别的锁来保证没有竞争,但WAL呢?如果涉及一个row下的多个column,且分成多个kv pair写入log,中途失败了怎么办呢?那么在replay的时候就破坏了原子性了!为了解决这个问题,HBase将一次写入的、同一rowkey的、多个column的数据,封装为一个WALEdit对象,一次性写入log,这样就保证了故障时的原子性。(异步flush这个waledit时如果失败会怎样?这时client端应该已经接收到true的返回值,认为成功。所以,就丢失数据了!这个topic应该在durability里讨论)

Locking, waits and deadlocks

HBase与数据相关的锁,应该大部分是行级锁。写入有row level lock;get/scan操作依赖multiversion concurrency control-style,所以不需要锁。






当跨column families时,涉及多个store,即涉及其中的多个memstores,所以,这个锁是较为独立的?