Archive for the ‘未分类’ Category

HDFS read slow解决方法

通过spark的UI界面,我们可以看到多个metrics的分布情况,发现有个stage的75%的duration 20s,但有少量(4个左右)特别慢,达到8min。查看worker的errlog,发现如下日志:

 Python |  copy code |? 
1
15/05/21 17:48:33 INFO FSInputChecker: read block too slow, expect:512000 cost:513232
2
15/05/21 17:48:33 WARN FMSClient: DFS Read: org.apache.hadoop.hdfs.protocol.ReadSlowException: read block too slow, expect:512000 cost:513232

可见是由于读取block数据太慢导致的,我们的block大小是256MB,期望的读取时间是512000ms,即8.5min,而最终使用了513s,与慢task耗时一致。这个值是内部HDFS版本控制的,但开源届可能也有类似功能。通过修改spark/conf/hadoop-site.xml的dfs.client.slow.read.limit配置,调整为5242880,代表期望的下载速度是5MB,如果比这个速度慢就retry其他DataNode节点。调整后,max task duration变为1.3min。在max对应的worker上也看到了slow log,只不过超时时间变短了。

当然,也可能也意味着我们的HDFS集群中有部分慢节点,后续也得解决掉。

平均Duration过大的解决方法

Duration metric里,又发现有一个task,其75%的耗时9min+,GC时间最多2s,每个task处理2.2GB的数据。在不优化内部处理流程的情况下,能否通过增大并发度来提高task处理速度呢?

由于该task由join操作生成,修改其numPartition参数从1k变到2k。但由于driver运行在一个虚拟机上,期间多次发生worker node与其通信失败,导致task反而变慢,并引发retry重算!将该值该为1500,由于每个task处理数据变成1.5GB左右,故处理时间也下降了!

另,从pstats数据也可以看到,最耗时的操作还是shuffle的read过程。

 Python |  copy code |? 
01
   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
02
 62603140 388783.337    0.006 388783.337    0.006 {method 'read' of 'file' objects}
03
 31301070 21929.022    0.001 21929.022    0.001 {cPickle.loads}
04
 16958730 8177.087    0.000 8177.087    0.000 {_basic_image_lib.flash_decode}
05
 55610843 1719.077    0.000 1810.741    0.000 feature_murmur_sign.py:33(run)
06
 55610843 1088.828    0.000 10036.427    0.000 feature_flash_decoder.py:37(run)
07
     1000  664.456    0.664 17152.500   17.152 shuffle.py:245(mergeValues)
08
 55611841  482.160    0.000  482.160    0.000 encoder.py:206(iterencode)
09
 55610843  385.117    0.000 14211.060    0.000 callbacks.py:65(add_media_info)
10
     1000  361.858    0.362 411390.715  411.391 shuffle.py:282(mergeCombiners)
11
 55611841  246.357    0.000 1453.533    0.000 data.py:374(desc)
12
 55610843  234.460    0.000  520.973    0.000 policy_beidou_black_image.py:20(run)
13
 55610843  232.927    0.000 12084.700    0.000 review_policy.py:250(run)
14
 55610843  219.946    0.000 1593.674    0.000 review_policy.py:114(run)
15
 55610843  201.820    0.000  284.889    0.000 review_policy.py:168(_lack_image)
16
 49136554  189.099    0.000  222.585    0.000 join.py:56(dispatch)
17
 31300074  184.696    0.000 410952.765    0.013 serializers.py:127(load_stream)
18
 55431498  182.844    0.000  254.908    0.000 policy_reject_url_and_bin.py:20(run)
19
 55611841  180.865    0.000  778.554    0.000 encoder.py:180(encode)
20
   272005  157.849    0.001  157.860    0.001 {cPickle.dumps}
21
 16958894  145.608    0.000  351.860    0.000 basic_image_lib.py:183(__init__)
22
 55610843  139.166    0.000  590.473    0.000 review_policy.py:149(run)
23
221930724  138.625    0.000  210.047    0.000 {hasattr}
24
476047721  125.609    0.000  125.609    0.000 data.py:49(image)
25
287337626  124.806    0.000  223.212    0.000 {len}
26
 55610843  111.992    0.000  157.770    0.000 review_policy.py:185(_lack_flash)
27
 31300074  107.931    0.000 410768.069    0.013 serializers.py:144(_read_with_length)

Python Memory Management

One of the major challenges in writing (somewhat) large-scale Python programs is to keep memory usage at a minimum. However, managing memory in Python is easy—if you just don’t care. Python allocates memory transparently, manages objects using a reference count system, and frees memory when an object’s reference count falls to zero. In theory, it’s swell. In practice, you need to know a few things about Python memory management to get a memory-efficient program running. One of the things you should know, or at least get a good feel about, is the sizes of basic Python objects. Another thing is how Python manages its memory internally.

So let us begin with the size of basic objects. In Python, there’s not a lot of primitive data types: there are ints, longs (an unlimited precision version of ints), floats (which are doubles), tuples, strings, lists, dictionaries, and classes.

Basic Objects

What is the size of int? A programmer with a C or C++ background will probably guess that the size of a machine-specificint is something like 32 bits, maybe 64; and that therefore it occupies at most 8 bytes. But is that so in Python?

Let us first write a function that shows the sizes of objects (recursively if necessary):

import sys

def show_sizeof(x, level=0):

    print "\t" * level, x.__class__, sys.getsizeof(x), x

    if hasattr(x, '__iter__'):
        if hasattr(x, 'items'):
            for xx in x.items():
                show_sizeof(xx, level + 1)
        else:
            for xx in x:
                show_sizeof(xx, level + 1)

We can now use the function to inspect the sizes of the different basic data types:

show_sizeof(None)
show_sizeof(3)
show_sizeof(2**63)
show_sizeof(102947298469128649161972364837164)
show_sizeof(918659326943756134897561304875610348756384756193485761304875613948576297485698417)

If you have a 32-bit 2.7x Python, you’ll see:

8 None
12 3
22 9223372036854775808
28 102947298469128649161972364837164
48 918659326943756134897561304875610348756384756193485761304875613948576297485698417

and if you have a 64-bit 2.7x Python, you’ll see:

16 None
24 3
36 9223372036854775808
40 102947298469128649161972364837164
60 918659326943756134897561304875610348756384756193485761304875613948576297485698417

Let us focus on the 64-bit version (mainly because that’s what we need the most often in our case). None takes 16 bytes.int takes 24 bytes, three times as much memory as a C int64_t, despite being some kind of “machine-friendly” integer. Long integers (unbounded precision), used to represent integers larger than 263-1, have a minimum size of 36 bytes. Then it grows linearly in the logarithm of the integer represented.

Python’s floats are implementation-specific but seem to be C doubles. However, they do not eat up only 8 bytes:

show_sizeof(3.14159265358979323846264338327950288)

Outputs

16 3.14159265359

on a 32-bit platform and

24 3.14159265359

on a 64-bit platform. That’s again, three times the size a C programmer would expect. Now, what about strings?

show_sizeof("")
show_sizeof("My hovercraft is full of eels")

outputs, on a 32 bit platform:

21
50 My hovercraft is full of eels

and

37
66 My hovercraft is full of eels

An empty string costs 37 bytes in a 64-bit environment! Memory used by string then linearly grows in the length of the (useful) string.

* * *

Other structures commonly used, tuples, lists, and dictionaries are worthwhile to examine. Lists (which are implemented as array lists, not as linked lists, with everything it entails) are arrays of references to Python objects, allowing them to be heterogeneous. Let us look at our sizes:

show_sizeof([])
show_sizeof([4, "toaster", 230.1])

outputs

32 []
44 [4, 'toaster', 230.1]

on a 32-bit platform and

72 []
96 [4, 'toaster', 230.1]

on a 64-bit platform. An empty list eats up 72 bytes. The size of an empty, 64-bit C++ std::list() is only 16 bytes, 4-5 times less. What about tuples? (and dictionaries?):

show_sizeof({})
show_sizeof({'a':213, 'b':2131})

outputs, on a 32-bit box

136 {}
 136 {'a': 213, 'b': 2131}
        32 ('a', 213)
                22 a
                12 213
        32 ('b', 2131)
                22 b
                12 2131

and

280 {}
 280 {'a': 213, 'b': 2131}
        72 ('a', 213)
                38 a
                24 213
        72 ('b', 2131)
                38 b
                24 2131

for a 64-bit box.

This last example is particularly interesting because it “doesn’t add up.” If we look at individual key/value pairs, they take 72 bytes (while their components take 38+24=62 bytes, leaving 10 bytes for the pair itself), but the dictionary takes 280 bytes (rather than a strict minimum of 144=72×2 bytes). The dictionary is supposed to be an efficient data structure for search and the two likely implementations will use more space that strictly necessary. If it’s some kind of tree, then we should pay the cost of internal nodes that contain a key and two pointers to children nodes; if it’s a hash table, then we must have some room with free entries to ensure good performance.

The (somewhat) equivalent std::map C++ structure takes 48 bytes when created (that is, empty). An empty C++ string takes 8 bytes (then allocated size grows linearly the size of the string). An integer takes 4 bytes (32 bits).

* * *

Why does all this matter? It seems that whether an empty string takes 8 bytes or 37 doesn’t change anything much. That’s true. That’s true until you need to scale. Then, you need to be really careful about how many objects you create to limit the quantity of memory your program uses. It is a problem in real-life applications. However, to devise a really good strategy about memory management, we must not only consider the sizes of objects, but how many and in which order they are created. It turns out to be very important for Python programs. One key element to understand is how Python allocates its memory internally, which we will discuss next.

Internal Memory Management

To speed-up memory allocation (and reuse) Python uses a number of lists for small objects. Each list will contain objects of similar size: there will be a list for objects 1 to 8 bytes in size, one for 9 to 16, etc. When a small object needs to be created, either we reuse a free block in the list, or we allocate a new one.

There are some internal details on how Python manages those lists into blocks, pools, and “arena”: a number of block forms a pool, pools are gathered into arena, etc., but they’re not very relevant to the point we want to make (if you really want to know, read Evan Jones’ ideas on how to improve Python’s memory allocation). The important point is that those lists never shrink.

Indeed: if an item (of size x) is deallocated (freed by lack of reference) its location is not returned to Python’s global memory pool (and even less to the system), but merely marked as free and added to the free list of items of size x. The dead object’s location will be reused if another object of compatible size is needed. If there are no dead objects available, new ones are created.

If small objects memory is never freed, then the inescapable conclusion is that, like goldfishes, these small object lists only keep growing, never shrinking, and that the memory footprint of your application is dominated by the largest number of small objects allocated at any given point.

* * *

Therefore, one should work hard to allocate only the number of small objects necessary for one task, favoring (otherwiseunpythonèsque) loops where only a small number of elements are created/processed rather than (more pythonèsque) patterns where lists are created using list generation syntax then processed.

While the second pattern is more à la Python, it is rather the worst case: you end up creating lots of small objects that will come populate the small object lists, and even once the list is dead, the dead objects (now all in the free lists) will still occupy a lot of memory.

* * *

The fact that the free lists grow does not seem like much of a problem because the memory it contains is still accessible to the Python program. But from the OS’s perspective, your program’s size is the total (maximum) memory allocated to Python. Since Python returns memory to the OS on the heap (that allocates other objects than small objects) only on Windows, if you run on Linux, you can only see the total memory used by your program increase.

* * *

Let us prove my point using memory_profiler, a Python add-on module (which depends on the python-psutil package) byFabian Pedregosa (the module’s github page). This add-on provides the decorator @profile that allows one to monitor one specific function memory usage. It is extremely simple to use. Let us consider the following program:

import copy
import memory_profiler

@profile
def function():
    x = list(range(1000000))  # allocate a big list
    y = copy.deepcopy(x)
    del x
    return y

if __name__ == "__main__":
    function()

invoking

python -m memory_profiler memory-profile-me.py

prints, on a 64-bit computer

Filename: memory-profile-me.py

Line #    Mem usage    Increment   Line Contents
================================================
     4                             @profile
     5      9.11 MB      0.00 MB   def function():
     6     40.05 MB     30.94 MB       x = list(range(1000000)) # allocate a big list
     7     89.73 MB     49.68 MB       y = copy.deepcopy(x)
     8     82.10 MB     -7.63 MB       del x
     9     82.10 MB      0.00 MB       return y

This program creates a list of n=1,000,000 ints (n x 24 bytes = ~23 MB) and an additional list of references (n x 8 bytes = ~7.6 MB), which amounts to a total memory usage of ~31 MB. copy.deepcopy copies both lists, which allocates again ~50 MB (I am not sure where the additional overhead of 50 MB – 31 MB = 19 MB comes from). The interesting part is del x: it deletes x, but the memory usage only decreases by 7.63 MB! This is because del only deletes the reference list, not the actual integer values, which remain on the heap and cause a memory overhead of ~23 MB.

This example allocates in total ~73 MB, which is more than twice the amount of memory needed to store a single list of ~31 MB. You can see that memory can increase surprisingly if you are not careful!

Note that you might get different results on a different platform or with a different python version.

Pickle

On a related note: is pickle wasteful?

Pickle is the standard way of (de)serializing Python objects to file. What is its memory footprint? Does it create extra copies of the data or is it rather smart about it? Consider this short example:

import memory_profiler
import pickle
import random

def random_string():
    return "".join([chr(64 + random.randint(0, 25)) for _ in xrange(20)])

@profile
def create_file():
    x = [(random.random(),
          random_string(),
          random.randint(0, 2 ** 64))
         for _ in xrange(1000000)]

    pickle.dump(x, open('machin.pkl', 'w'))

@profile
def load_file():
    y = pickle.load(open('machin.pkl', 'r'))
    return y

if __name__=="__main__":
    create_file()
    #load_file()

With one invocation to profile the creation of the pickled data, and one invocation to re-read it (you comment out the function not to be called). Using memory_profiler, the creation uses a lot of memory:

Filename: test-pickle.py

Line #    Mem usage    Increment   Line Contents
================================================
     8                             @profile
     9      9.18 MB      0.00 MB   def create_file():
    10      9.33 MB      0.15 MB       x=[ (random.random(),
    11                                      random_string(),
    12                                      random.randint(0,2**64))
    13    246.11 MB    236.77 MB           for _ in xrange(1000000) ]
    14
    15    481.64 MB    235.54 MB       pickle.dump(x,open('machin.pkl','w'))

and re-reading a bit less:

Filename: test-pickle.py

Line #    Mem usage    Increment   Line Contents
================================================
    18                             @profile
    19      9.18 MB      0.00 MB   def load_file():
    20    311.02 MB    301.83 MB       y=pickle.load(open('machin.pkl','r'))
    21    311.02 MB      0.00 MB       return y

So somehow, pickling is very bad for memory consumption. The initial list takes up more or less 230MB, but pickling it creates an extra 230-something MB worth of memory allocation.

Unpickling, on the other hand, seems fairly efficient. It does create more memory than the original list (300MB instead of 230-something) but it does not double the quantity of allocated memory.

Overall, then, (un)pickling should be avoided for memory-sensitive applications. What are the alternatives? Pickling preserves all the structure of a data structure, so you can recover it exactly from the pickled file at a later time. However, that might not always be needed. If the file is to contain a list as in the example above, then maybe a simple flat, text-based, file format is in order. Let us see what it gives.

A naïve implementation would give:

import memory_profiler
import random
import pickle

def random_string():
    return "".join([chr(64 + random.randint(0, 25)) for _ in xrange(20)])

@profile
def create_file():
    x = [(random.random(),
          random_string(),
          random.randint(0, 2 ** 64))
         for _ in xrange(1000000) ]

    f = open('machin.flat', 'w')
    for xx in x:
        print >>f, xx
    f.close()

@profile
def load_file():
    y = []
    f = open('machin.flat', 'r')
    for line in f:
        y.append(eval(line))
    f.close()
    return y

if __name__== "__main__":
    create_file()
    #load_file()

Creating the file:

Filename: test-flat.py

Line #    Mem usage    Increment   Line Contents
================================================
     8                             @profile
     9      9.19 MB      0.00 MB   def create_file():
    10      9.34 MB      0.15 MB       x=[ (random.random(),
    11                                      random_string(),
    12                                      random.randint(0, 2**64))
    13    246.09 MB    236.75 MB           for _ in xrange(1000000) ]
    14
    15    246.09 MB      0.00 MB       f=open('machin.flat', 'w')
    16    308.27 MB     62.18 MB       for xx in x:
    17                                     print >>f, xx

and reading the file back:

Filename: test-flat.py

Line #    Mem usage    Increment   Line Contents
================================================
    20                             @profile
    21      9.19 MB      0.00 MB   def load_file():
    22      9.34 MB      0.15 MB       y=[]
    23      9.34 MB      0.00 MB       f=open('machin.flat', 'r')
    24    300.99 MB    291.66 MB       for line in f:
    25    300.99 MB      0.00 MB           y.append(eval(line))
    26    301.00 MB      0.00 MB       return y

Memory consumption on writing is now much better. It still creates a lot of temporary small objects (for 60MB’s worth), but it’s not doubling memory usage. Reading is comparable (using only marginally less memory).

This particular example is trivial but it generalizes to strategies where you don’t load the whole thing first then process it but rather read a few items, process them, and reuse the allocated memory. Loading data to a Numpy array, for example, one could first create the Numpy array, then read the file line by line to fill the array: this allocates one copy of the whole data. Using pickle, you would allocate the whole data (at least) twice: once by pickle, and once through Numpy.

Or even better yet: use Numpy (or PyTables) arrays. But that’s a different topic. In the mean time, you can have a look atloading and saving another tutorial in the Theano/doc/tutorial directory.

* * *

Python design goals are radically different than, say, C design goals. While the latter is designed to give you good control on what you’re doing at the expense of more complex and explicit programming, the former is designed to let you code rapidly while hiding most (if not all) of the underlying implementation details. While this sounds nice, in a production environment ignoring the implementation inefficiencies of a language can bite you hard, and sometimes when it’s too late. I think that having a good feel of how inefficient Python is with memory management (by design!) will play an important role in whether or not your code meets production requirements, scales well, or, on the contrary, will be a burning hell of memory.

 

zz from: http://deeplearning.net/software/theano/tutorial/python-memory-management.html

使用方式

driver程序里,如此分发:b_data = sc.broadcast(value),其中value需要是已经读取到driver程序里的变量,而不能是RDD。

在worker程序里,会通过b_data.value访问该数据副本。

driver程序里,在broadcast的数据被用完后,应该尽快清除:b_data.unpersist()

原理

以下分析基于github里的spark 1.2版本,由于spark目前其实还没有稳定下来,所以版本与版本间可能有比较大的区别。以rdd.collect()为例,实现方式就很不相同。

为了理解python封装的broadcast原理,还需要了解一些spark internal知识,其architecture如上图所示。

Driver端进程级设计

driver所在服务器运行了python进程和java进程,其中java进程作为python的“subprocess”存在,两者间通过socket交互。

java子进程在python/pyspark/java_gateway.py里初始化,首先通过subprocess.Popen调用exec ./bin/spark-submit脚本(子进程),间接启动java“孙子”进程。并将Popen的stdout, stderr参数都设置为PIPE,从而自动在python父进程和shell子进程间建立pipe通道,从而采集shell运行期间的输出,并通过EchoOutputThread子线程将其打印到stderr(注意,这时的PIPE是无法用来在python和java间通信的)

java子进程启动后,会通过spark-submit将自己启动的socket监听端口 返回给python进程,可以通过以下命令模拟:

$IS_SUBPROCESS=1 ./bin/spark-submit pyspark-shell

51383

……

python进程接收该port后,会通过py4j的JavaGateway在python的VM和jvm间建立socket交互通道。其提供了一些便利的交互/调用方法,例如:The `jvm` field of `JavaGateway` enables user to access classes, static members (fields and methods) and call constructors.

Worker端进程级设计

从sbin/start-slave.sh可以看到,worker进程是由org.apache.spark.deploy.worker.Worker作为入口的,其基于akka包进行网络请求的管理,通过receiveWithLogging属性定义了case handler。其中LaunchExecutor是启动executor进程的,即执行driver发起的RDD请求。处理流程细节后面会详细说明。

task执行请求首先是发送到worker上的java/scala进程的,所有的python任务都会被映射为PythonRDD(core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala),所以该类是理解worker原理的关键。其构造参数中包含broadcastVars。

由于父子进程会COW复制内存,如果在一个已经分配了大内存的java进程上fork子python进程的话,会很容易造成OOM。所以spark的做法是在java进程还没有消耗很多内存的开始阶段,就通过pyspark/daemon.py脚本fork一个python daemon进程,后面真正的python worker都是由java进程与该python daemon发起请求,由后者再fork出python子worker进程的。当然这是对unix-like(包括linux)系统有效,windows无法享受该红利。

基于以上思路,在PythonRDD.compute()方法里,val worker: Socket = env.createPythonWorker(pythonExec, envVars.toMap)生成python worker。该方法先检查有没有之前fork出的idle worker,如果有就直接用了,否则就通过python daemon fork一个可用的worker出来。

Driver与worker的交互流程

那么,一个类似sc.textFile(…).map(func1).reduce(func2)的句子,会被如何处理呢?过程中,会产生多少个PythonRDD呢?

首先来细致看下map与reduce在driver形成tasks的过程:


从上图(白色是python环境,灰色是scala环境)可以看到,map时并未真正发起计算请求,而仅是生成了一个PipelinedRDD对象。在reduce时,会将map和reduce计算所需的环境(包括broadcast vars)和参数等组装成为JavaRDD,最终还是复用scala rdd的调度通道来发起集群计算请求的。而scala接收到返回值后,是通过本地socket将数据推送给python进程的(有的版本里是通过临时文件来交互的)。

另外,需要关注的是,在PipelinedRDD._jrdd property生成的时候,在调用python/pyspark/cloudpickle.py的dumps方法生成pickled_command时,会把一些python相关信息填充进去。一次运行中未序列化的pickled_command内容如下:

(<function pipeline_func at 0x10c0f2758>, None, BatchedSerializer(PickleSerializer(), 1), BatchedSerializer(PickleSerializer(), 0))
cloud dumped (<function pipeline_func at 0x10c0f2758>, None, BatchedSerializer(PickleSerializer(), 1), BatchedSerializer(PickleSerializer(), 0)) set([<module ‘__builtin__’ (built-in)>, <module ‘pyspark.serializers’ from ‘/Users/baidu/Code/spark/spark-1.2.1-bin-hadoop2.4/python/pyspark/serializers.pyc’>, <module ‘pyspark.cloudpickle’ from ‘/Users/baidu/Code/spark/spark-1.2.1-bin-hadoop2.4/python/pyspark/cloudpickle.py’>, <module ‘itertools’ from ‘/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/lib-dynload/itertools.so’>, <module ‘pyspark.rdd’ from ‘/Users/baidu/Code/spark/spark-1.2.1-bin-hadoop2.4/python/pyspark/rdd.py’>])

 

当task发送到worker进程,触发了LaunchExecutor分支,处理流程如下图所示:

 Broadcast操作driver端流程

通过pyspack.SparkContext.broadcast()发起broadcast调用时,会间接调用pyspark.Broadcast类的init方法:

    def __init__(self, sc=None, value=None, pickle_registry=None, path=None):

“””

Should not be called directly by users — use L{SparkContext.broadcast()}

instead.

“””

if sc is not None:

            f = NamedTemporaryFile(delete=False, dir=sc._temp_dir)  # 生成随机文件句柄

            self._path = self.dump(value, f)        # 将value序列化并写入文件,返回文件路径名称

            self._jbroadcast = sc._jvm.PythonRDD.readBroadcastFromFile(sc._jsc, self._path)

            self._pickle_registry = pickle_registry

else:

self._jbroadcast = None

self._path = path

其中,readBroadcastFromFile方法,实际通过core/src/main/scala/org/apache/spark/SparkContext.scala的broadcast方法,将广播的数据常量进行注册。

 

 如上图所示,在此过程中,python与java进程间,实际上是通过临时文件来进行broadcast数据交互的,而java进程在从文件读取该数据之后,还需要两次数据写入,如core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala的writeBlocks办法所示,首先将未分块的数据写入到driver的blockmanager里,随后再将数据分块(blockify)并再次写入blockmanager!两次写入的原因如下代码注释所示,而个人感觉这里对I/O和内存的开销有些过大,尤其是python交互的设计显得过于粗糙。但也部分符合spark用内存空间换时间的思路。这里也可以猜测,driver java进程在broadcast上的内存消耗可能是python进程的2倍。

  /**

* Divide the object into multiple blocks and put those blocks in the block manager.

* @param value the object to divide

* @return number of blocks this broadcast variable is divided into

*/

private def writeBlocks(value: T): Int = {

    // Store a copy of the broadcast variable in the driver so that tasks run on the driver                             

    // do not create a duplicate copy of the broadcast variable’s value.

SparkEnv.get.blockManager.putSingle(broadcastId, value, StorageLevel.MEMORY_AND_DISK,

tellMaster = false)

val blocks =

TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec)

blocks.zipWithIndex.foreach { case (block, i) =>

SparkEnv.get.blockManager.putBytes(

BroadcastBlockId(id, “piece” + i),

block,

StorageLevel.MEMORY_AND_DISK_SER,

tellMaster = true)

}

blocks.length

}

Broadcast操作worker端流程

 

性能关注

1. 对driver的

 

参考资料

https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals

https://docs.python.org/2/library/subprocess.html

http://py4j.sourceforge.net/

http://www.scala-lang.org/docu/files/collections-api/collections_10.html

https://docs.python.org/2.7/library/pickle.html

https://docs.python.org/2/library/itertools.html?highlight=imap#itertools.imap

http://akka.io/

以Python Imaging Library(PIL)为例,进行安装。限制条件是:

  • 没有写python lib所在路径的权限
  • 服务器jpeg so包不在默认路径,而是/usr/lib64下

如果按照默认方法安装,会报错:IOError: decoder jpeg not available。因为setup.py时没有关联到其依赖的图片库。

1. 下载源代码包,并解压

在http://www.pythonware.com/products/pil/ 选择相应版本,我选择的是Python Imaging Library 1.1.7 Source Kit (all platforms) (November 15, 2009)

wget “http://effbot.org/downloads/Imaging-1.1.7.tar.gz”; tar -xzvf  Imaging-1.1.7.tar.gz; cd Imaging-1.1.7

2. 修改setup.py,使其能够找到libjpeg.so包

# Use None to look for the libraries in well-known library locations.
# Use a string to specify a single directory, for both the library and
# the include files. Use a tuple to specify separate directories:
# (libpath, includepath).

JPEG_ROOT = (“/usr/lib64″, “/usr/include”)

3. 指定路径安装(注意,该路径随意,只要你有读写权限即可)

python setup.py install –home /path/to/your/pythonlib/

关注其最初的output:

*** TKINTER support not available
— JPEG support available
— ZLIB (PNG/ZIP) support available
— FREETYPE2 support available
*** LITTLECMS support not available

4. 将pythonlib路径加入PYTHONPATH,注意要深入到lib/python里

$ tail -1 ~/.bash_profile

export PYTHONPATH=/path/to/your/pythonlib/lib/python/:$PYTHONPATH

completed jobs 日志

公司集群允许查看已完成job的列表,但默认配置下,点击进去,显示:

No event logs were found for this application! To enable event logging, set spark.eventLog.enabled to true and spark.eventLog.dir to the directory to which your event logs are written.

当设置spark UI配置的spark.eventLog.enabled为True,并且设置spark.eventLog.dir为本地文件路径后,点击进去,变成了:

No event logs found for application chengyi02_test_load_1GB in file:/home/users/chengyi02/spark-resource/test_perf/log//chengyi02_test_load_1gb-1427419993180. Did you specify the correct logging directory?

原因是,我的本地路径当然是无法被history server读取的了。不过这时查看该路径下,已经有日志了,看起来像json格式。

再次修改,把spark.eventLog.dir改为hdfs路径,运行spark报错。经与管理员沟通,得知是由于该spark集群的history server没有开导致的

15/03/27 09:50:32 WARN ServletHandler: /stages/

java.lang.NullPointerException

        at org.apache.spark.SparkContext.getAllPools(SparkContext.scala:892)

        at org.apache.spark.ui.jobs.JobProgressPage.render(JobProgressPage.scala:50)

        at org.apache.spark.ui.WebUI$$anonfun$attachPage$1.apply(WebUI.scala:68)

        at org.apache.spark.ui.WebUI$$anonfun$attachPage$1.apply(WebUI.scala:68)

        at org.apache.spark.ui.JettyUtils$$anon$1.doGet(JettyUtils.scala:70)

        at javax.servlet.http.HttpServlet.service(HttpServlet.java:735)

        at javax.servlet.http.HttpServlet.service(HttpServlet.java:848)

        at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:684)

        at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:501)

        at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1086)

        at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:428)

        at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1020)

        at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:135)

        ……

partitionBy导致的too many value to unpack

http://stackoverflow.com/questions/7053551/python-valueerror-too-many-values-to-unpack

HDFS文件读写性能调优

第一次测试:60GB的数据,1108个文件,大小从20m-100m+不等。 max cores我设置为160,启动了1108个Tasks,貌似只用了5台服务器。 读入的耗时1.3min,写入的耗时我执行了两次,一次1.75min,一次8min(被hang在一个task上了)

第二次测试:max cores提升为1000,并设置spark.speculation为True,以便re-lauch slow tasks。 读入耗时提升为29s,写入耗时提升为41s。效果比较明显!由于文件数量没有变化,故仍然是1108个Tasks,但用了32台服务器。

第三次测试:max cores提升为1000,并设置spark.speculation为True。把文件大小统一为165M左右,共计359个文件,60GB数据。读入耗时40s,写入耗时55s。由于tasks数降低了,所以整体耗时反而下降。

第四次测试:max cores恢复为160,也不设置spark.speculation。把文件大小统一为165M左右,共计359个文件,60GB数据。读入耗时51s,写入耗时442s(被2个tasks hang住了)。所以设置spark.speculation还是非常重要的!

第五次测试:3.TB的数据,15846个文件,大小从27M-260MB不等,max cores设置为1w。启动时使用了32台服务器,每个上面并发32个tasks,所以整体并发度是1664。后面增加到了50+台。然后就把spark跑到FULL GC了。

java.io.IOException: org.apache.hadoop.hdfs.FMSClient$RetryableIOException: Could not obtain block: blk_31525197851421116_494327281

附图如下:

第六次测试,3.TB的数据,15846个文件,大小从27M-260MB不等,max cores设置为1w,但spark版本改为1.2。仍然报错:

错误类型1:

15/03/27 14:56:56 ERROR DAGScheduler: Failed to update accumulators for ResultTask(0, 4815)

java.net.SocketException: Broken pipe

错误类型2:

java.io.IOException: org.apache.hadoop.hdfs.FMSClient$RetryableIOException: Could not obtain block: blk_31525197851421116_494327281

at org.apache.hadoop.hdfs.FMSClient$DFSInputStream.read(FMSClient.java:2563)

……

以上第五第六次测试报错的原因,可能都是HDFS所在服务器有坏道导致的。

第七次测试,spark 1.2。1.3TB数据,共计114个文件,大小从几百字节到57G不等,且大文件居多。max cores设置1w,spark.speculation设置为True。启动了5353个Tasks,读入耗时5.1min!

第八次测试,同第七次,但使用spark 1.1。读入耗时3.6 min

读取没有权限的HDFS路径

TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory

修改application所在服务器的spark/conf/hadoop-site.xml里的UGI配置为正确的用户名密码即可。

broadcast an RDD in PySpark

>>>data = sc.textFile(‘/app/ecom/aries/sf_public/chengyi02/uclogin/20141120/cq02-dr-uclogin206.cq02/*’)

>>>sc.broadcast(data)

报错:py4j.Py4JException: Method __getnewargs__([]) does not exist

如果改成sc.broadcast(data.collectAsMap())就不报错了

解释:broadcast的必须是一个kv对 这个地方的colletAsMap不是做collect操作 只是把那个textFile的RDD转成kv对发出去

读取hdfs sequenceFile

Job aborted due to stage failure: Task 8 in stage 0.0 failed 4 times, most recent failure: Lost task 8.3 in stage 0.0 (TID 47, nmg01-spark-a0033.nmg01.baidu.com): com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0, required: 64
        com.esotericsoftware.kryo.io.Output.require(Output.java:138)
        com.esotericsoftware.kryo.io.Output.writeString_slow(Output.java:420)
        com.esotericsoftware.kryo.io.Output.writeString(Output.java:326)
        com.esotericsoftware.kryo.serializers.DefaultArraySerializers$StringArraySerializer.write(DefaultArraySerializers.java:274)
        com.esotericsoftware.kryo.serializers.DefaultArraySerializers$StringArraySerializer.write(DefaultArraySerializers.java:262)
        com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
        org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:156)
        org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
        java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        java.lang.Thread.run(Thread.java:662)
Driver stacktrace:

任务非正常关闭

SparkDeploySchedulerBackend: Asked to remove non-existent executor

删除 /tmp/spark-*   ,  /tmp/fetchFileTemp*

任务启动失败

使用spark-submit启动任务失败,最下面的报错如下:

363 15/03/30 09:51:17 ERROR SparkDeploySchedulerBackend: Application has been killed. Reason: Master removed our application: FAILED
364 15/03/30 09:51:17 ERROR TaskSchedulerImpl: Exiting due to error from cluster scheduler: Master removed our application: FAILED

原因是该job的tasks数量过大,设置为1w,而集群整体的Cores: 2272 Total, 472 Used。所以如果max cores > 2272,就会被杀掉!如果大于unused cores数量,就会先抢占可用的,然后剩余的tasks再排队。

再往上有如下日志:

353 15/03/30 09:51:17 INFO AppClient$ClientActor: Executor updated: app-20150330095112-25033/62 is now EXITED (Command exited with code 1)

根据task号,在日志里找到其对应的服务器:

345 15/03/30 09:51:16 INFO AppClient$ClientActor: Executor added: app-20150330095112-25033/62 on worker-20141125192713-nmg01-spark-a0062.nmg01.xxx.com-11543 (nmg01-spark-a0062.nmg01.xxx.com:11543) with 32 cores

在该spark集群监控页面,找到对应的worker,在Finished Executors里发现有大量failed tasks, 找到我的task,查看其对应的stdout日志:

Error occurred during initialization of VM
Could not reserve enough space for object heap

所以,这些可能是部分节点OOM,但在大部分情况下spark可用通过重跑或重试,避免问题。

Out of Memory

在调用takeSample对一个大list做抽样,并collect回来时,报错如下:其中飘红的两个节点,分别是一个worker,以及application所在服务器。

15/03/30 17:35:15 WARN DefaultChannelPipeline: An exception was thrown by a user handler while handling an exception event ([id: 0x22dfe421, /10.75.65.12:12237 => /10.48.23.31:30001] EXCEPTION: java.lang.OutOfMemoryError: Java heap space)

java.lang.OutOfMemoryError: Java heap space

at java.lang.Object.clone(Native Method)

at akka.util.CompactByteString$.apply(ByteString.scala:410)

at akka.util.ByteString$.apply(ByteString.scala:22)

at akka.remote.transport.netty.TcpHandlers$class.onMessage(TcpSupport.scala:45)

at akka.remote.transport.netty.TcpServerHandler.onMessage(TcpSupport.scala:57)

 

broadcast大数据

文件大小是1GB左右,通过textFile().collect()后,占用xxxx内存。

配置有:

 

问题1:

spark-submit提交的任务退出,但没有错误提示,history UI 列表里看到任务是finish了,但点击进去发现还有active的job。

进入到executor所在UI列表页,找到该app对应的stderr,查看如下:

15/04/07 13:07:47 ERROR CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkExecutor@nmg01-taihang-d10538.nmg01.baidu.com:54765] -> [akka.tcp://sparkDriver@cq01-rdqa-dev006.cq01.baidu.com:16313] disassociated! Shutting down.
15/04/07 13:07:47 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkDriver@cq01-rdqa-dev006.cq01.baidu.com:16313] has failed, address is now gated for [5000] ms. Reason is: [Disassociated].

根据http://apache-spark-user-list.1001560.n3.nabble.com/SPARK-3106-fixed-td16264.html#a16316 ,修改cf.set(‘spark.akka.heartbeat.interval’, 10000)无效。

本机driver的问题:

$dmesg

lowest memcgroup 19

[chengyi02@cq01-rdqa-dev006.cq01.baidu.com test_perf]$ strace -p 19970
Process 19970 attached – interrupt to quit
futex(0x40e4e9f0, FUTEX_WAIT, 19974, NULL

跑python unittest

参考spark/python/run-tests可以看到:

SPARK_TESTING=1  /path/to/spark-*-client/bin/pyspark tests/schedule_frame_test.py

 

安装SWIG

mac下非常简单:brew install swig。其他平台可以在http://www.swig.org/下载安装应该也不难。

Python调用C方法

1. 准备一个简单的C文件 palindrome.c:

#include <string.h>

 

/**

* return: 0 — not palindrome

*         1 — is  palindrome

*/

int is_palindrome(char* text)

{

int i, n = strlen(text);

 

for (i=0; i<=n/2; i++) {

if (text[i] != text[n-i-1]) {

return 0;

}

}

 

return 1;

}

2. 再按照SWIG要求,准备一个.i文件(类似.h) palindrome.i:

%module palindrome

 

%{

#include <string.h>

%}

extern int is_palindrome(char* text);

3. 调用SWIG生成包装器,会生成两个新文件palindrome.py     palindrome_wrap.c

$ swig -python palindrome.i

4. 编译为so包

gcc -shared -I/System/Library/Frameworks/Python.framework/Versions/2.7/include/python2.7 -L/System/Library/Frameworks/Python.framework/Versions/2.7/ *.c -lpython2.7 -o _palindrome.so

(如果不知道python安装路径,可以python -c “import sys;import pprint; pprint.pprint(sys.path);”查看一下)

5. 在python里调用palindrome_test.py:

import _palindrome

print dir(_palindrome)

mystr1 = “nopalindrome”

mystr2 = “ipreferpi”

print _palindrome.is_palindrome(mystr1)

print _palindrome.is_palindrome(mystr2)

 

执行

1. 交互式

/path/to/spark/bin/pyspack

2. 批处理:

/path/to/spark/bin/spark-submit ~/spark-resource/spark-training/spark/examples/src/main/python/pi.py

 

Basic RDDs

element-wise transformations

  • map
  • filter
  • flatMap

pseudo set operations transformations

要求所操作的RDDs含有相同类型的元素。

  • distinct,expensive!
  • union,不去重
  • intersection,会去重,expensive!
  • subtract,expensive!
  • cartesion,expensive!

actions

  • reduce
  • fold
  • aggregate
  • collect,会将所有结果返回到driver,所以结果集需要能保存在一台服务器的内存里。可以作用于list返回list,dict返回keys list。
  • take
  • top
  • takeSample
  • foreach,针对每个element应用func,没有返回值
  • count
  • countByValue
  • takeOrdered
需要注意:不像Scala和Java,除了Base RDD,还有DoubleRDD之类,Python只有base rdd。所有的rdd 方法都作用于base rdd,但如果其内含的数据类型不正确,就直接挂掉了。

 persistence

  • persist = cache。对应的level是 pyspark.StorageLevel.MEMORY_ONLY_SER and so on
  • unpersist
  • is_cached,属性,不是方法

 Key/Value pairs = pair RDDs

注意,python spark里的pair RDD是指 list(tuples),而非直接处理dict。例如 [ (‘k1′, ‘v1′), (‘k2′, ‘v2′) ]。

create pair RDDs

  • pairs = lines.map(lambda line: (line.split(” “)[0], line)) ,从一个文本里生成,以第一个word为key
  • data = sc.parallelize( [ (‘k1′, ‘v1′), (‘k2′, ‘v2′) ] ),从in-memory的dict里parallelize生成
  • 直接从文件里读取(TODO

transformations on pair RDDs

  • 上面列举的,可以针对base RDDs的transformations都可以用在pair RDDs上面。Pair RDD每个item是一个tuple(key, val),所以t[0]是key,t[1]是value。
transformations on one pair RDD
  • reduceByKey
  • groupByKey
  • combineByKey, createCombiner是在每个partition首次遇到一个key时被调用的,所以在整个数据集上会多次调用
  • mapValues
  • flatMapValues
  • keys
  • values
  • sortByKey
  • foldByKey

transformations on two pair RDDs

  • subtractByKey
  • join, inner join, only keys that are present in both pair RDDs are output
  • rightOuterJoin
  • leftOuterJoin 
  • cogroup

 actions on pair RDDs

  • 上面列举的可以针对base RDD的actions,也都可以应用于pair RDD
  • countByKey
  • collectAsMap,可以作用于 list( tuple, tuple ), 返回 dict。 不可作用于dict、非tuple的list
  • lookup

针对pari RDDs join操作的调优

  • partitionBy,将大数据集partition到“固定”的服务器上 ,之后再与小数据集join的时候,就不用分发大数据集了。spark会将小数据集按照相同的方式分发过去。是一个transformation,第二个参数partitionFunc可以用来控制如何进行分片。
  • 对于pair RDDs,当不改变key时,尽量使用mapValues和flatMapValues,以保持partition(虽然也可以用map模拟这些操作,但spark不会分析map的func,所以就无法维持partition了)
  • partition会极大影响性能,等在实战中积累经验之后,再来补充(TODO

 文件读取

  • python的saveAsSequenceFile,可以处理的是 tuple的list,例如:sc.parallelize([ (‘k1′, ‘v1′), (‘k2′, ‘v2′) ]).saveAsSequenceFile(‘…’)。如果处理dict、非tuple的list,则会报错:RDD element of type java.lang.String cannot be used。另外,如果传入的keyClass、valueClass与类型不匹配,则会默认被当做string处理。
Output a Python RDD of key-value pairs (of form RDD[(K, V)]) to any Hadoop file system
……
  1. A Java RDD is created from the SequenceFile or other InputFormat, and the key and value Writable classes
  2. Serialization is attempted via Pyrolite pickling
  3. If this fails, the fallback is to call ‘toString’ on each key and value
  4. PickleSerializer is used to deserialize pickled objects on the Python side
  •  saveAsNewAPIHadoopFile,也是处理list(tuples)。可以用于protobuf格式的数据。

shared variables

  • 可以broadcast list、dict等类型的数据,用b_var.value读取其值
  • action阶段的accumulator 可以保证fault tolerant时只计算一次,transformation阶段的不保证

spark transformations:

  • map(func) , and func is:  mixed func(x)

example: distData.map(lambda x: (x, 1))  是用python的lambda构造了一个匿名函数,接受map的每一行作为输入x,并返回(x, 1)。所以,这条语句的作用是,针对每一行x,转换为(x, 1)。

  • filter(func), and func is: bool func(x)

example:  f.filter(lambda x: x.find(“spark”) != -1) 是在f对应的RDD里,找包含spark这个单词的行。

  • flatMap(func), and func is: Seq func(x)

example:  f.flatMap(lambda x: x.split(” “)).collect() 是用空格符分隔输入的每一行,这样一个flatMap输入行会对应到多个输出行,所以返回的是Seq。注意,以上语句的output每一行是一个单词,如果改为f.map(lambda x: x.split(” “)).collect() 则output每一行是一个seq(word)。

  • sample(withReplacement, fraction, seed) 该method接受3个参数,分别是bool、float和int型。

example: f.flatMap(lambda x: x.split(” “)).sample(withReplacement = True, fraction = 0.01, seed = 181341).collect()

  • union(otherDataSet), 貌似otherDataSet也需要是一个RDD,而不能是一个普通的array

example:

new_edges = tc.join(edges).map(lambda (_, (a, b)): (b, a))
tc = tc.union(new_edges).distinct().cache()

  • distinct([numTasks]), 去重

example: f.flatMap(lambda x: x.split(” “)).distinct()

  • groupByKey([numTasks]),类似sql的group by,将(k, v) => (k, seq(v))
  • reduceByKey(func, [numTasks]),类似MR的reduce,针对相同key的所有val,循环调用func方法

example:

contribs.reduceByKey(add) // 直接用python add作为func使用了

f.reduceByKey(lambda x, _: x) // 生成了一个匿名函数,作为func使用

pointStats = closest.reduceByKey(lambda (x1, y1), (x2, y2): (x1 + x2, y1 + y2)) // 生成了一个匿名函数,作为func使用

  • sortByKey([ascending], [numTasks])
  • join(otherDataSet, [numTasks])
  • cogroup(otherDataSet, [numTasks])
  • cartesian(otherDataSet),类似php的array_combine

Spark Actions:

  • reduce(func), func is: mixed  func(mixed first, mixed second)
  • collect()
  • count()
  • first(), 等同于调用take(1)
  • take(n),注意:当前非并行处理,而是由driver program自行计算的
  • takeSample(withReplacement, fraction, seed)
  • saveAsTextFile(path),path是directory路径,针对每一个element会调用toString转化为字符串进行写入
  • saveAsSequenceFile(path),path是directory路径,Only available on RDDs of key-value pairs that either implement Hadoop’s Writable interface or are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc).
  • countByKey()
  • foreach(func)

pyspark package

  • parallelize(c, numSlices=None),numSlices控制启动几个tasks并行计算

example:count = sc.parallelize(xrange(1, n+1), slices).map(f).reduce(add)

Python help

>>> help (‘operator’)

>>> help(‘operator.add’)

 

类似PHP、Nginx之类,需要频繁申请与释放内存的软件,一般都提供了用户态的内存管理。原因是,直接调用malloc/free会导致用户态到内核态的切换,较为耗时。那么,两者的区别到底是什么呢?

内核态与用户态的定义

CPU只会运行在以下两种状态:

  1. Kernel ModeIn Kernel mode, the executing code has complete and unrestricted access to the underlying hardware. It can execute any CPU instruction and reference any memory address. Kernel mode is generally reserved for the lowest-level, most trusted functions of the operating system. Crashes in kernel mode are catastrophic; they will halt the entire PC.
  2. User ModeIn User mode, the executing code has no ability to directly access hardware or reference memory. Code running in user mode must delegate to system APIs to access hardware or memory. Due to the protection afforded by this sort of isolation, crashes in user mode are always recoverable. Most of the code running on your computer will execute in user mode.
不同硬件的实现方式可能也不同,x86是通过0-3的4层protection rings硬件来划分的。据称Linux仅使用ring 0作为内核态,ring 3作为用户态,未涉及ring 1-2;而windows中部分drivers会使用ring 1-2。

何时会发生切换

Typically, there are 2 points of switching:

  1. When calling a System Call: after calling a System Call, the task voluntary calls pieces of code living in Kernel Mode
  2. When an IRQ (or exception) comes: after the IRQ an IRQ handler (or exception handler) is called, then control returns back to the task that was interrupted like nothing was happened.

IRQ全称为Interrupt Request,即是“中断请求”的意思,IRQ的作用就是在我们所用的电脑中,执行硬件中断请求的动作。

一般而言,系统调用是用户主动发起的,例如调用fork函数,会间接调用系统函数sys_fork,从而陷入内核态。而IRQ的发生也有用户“主动”和“被动”两种形式:例如用户调用malloc申请内存,可能会导致缺页异常,引发IRQ陷入内核态;或者我们需要读取硬盘中的一段数据时,当数据读取完毕,硬盘就通过IRQ来通知系统,相应的数据已经写到指定的内存中了。

切换成本

从用户态到内核态,本质上都是响应中断。因为系统调用实际上最终是中断机制实现的,而异常和中断的处理机制基本上也是一致的。其切换过程如下:

  1. 从当前进程的描述符中提取其内核栈的ss0及esp0信息。
  2. 使用ss0和esp0指向的内核栈将当前进程的cs,eip,eflags,ss,esp信息保存起来,这个过程也完成了由用户栈到内核栈的切换过程,同时保存了被暂停执行的程序的下一条指令。
  3. 将先前由中断向量检索得到的中断处理程序的cs,eip信息装入相应的寄存器,开始执行中断处理程序,这时就转到了内核态的程序执行了。

内核态到用户态需要将保存的进程信息予以恢复。

从上面的步骤可以看到,mode switch涉及大量数据的复制,还需要硬件配合,故耗时较大。而在不发生mode switch时,cpu只需顺序执行指令即可。所以应该尽量减少mode switch!

参考资料

http://blog.codinghorror.com/understanding-user-and-kernel-mode/

http://www.linfo.org/kernel_mode.html

http://www.tldp.org/HOWTO/KernelAnalysis-HOWTO-3.html

http://jakielong.iteye.com/blog/771663

http://os.ibds.kit.edu/downloads/publ_1995_liedtke_ukernel-construction.pdf

http://os.inf.tu-dresden.de/pubs/sosp97/

mac 10.10,打印东西的时候总是报:保持以备鉴定  的错误。

在确认驱动正确、用户名密码正确的前提下,发现一种诡异的解决方案:

1. 在finder界面的 “前往” =》 输入 smb://172.22.2.xx  (即打印机的服务器地址),挂载一个盘

2. 正常配置打印机

3. 打印,并输入用户名和密码,成功

在《HBase:The definitvie Guide》里,Lars George首先从多个dimensions讨论了NoSQL与传统RDBMS的异同。那么HBase在这些纬度上,是如何选型和实现的呢?

Data Model

一般认为Hbase是column-oriented。但是同样在该书里,作者也强调了HBase的column-oriented与RDBMS里的column oriented又有所不同。HBase在存储时,确实是以column family为单位的,但在查询时是以key为基础的。

Note, though,that HBase is not a column-oriented database in the typical RDBMS sense, but utilizes an on-disk column storage format. This is also where the majority of similarities end, because although HBase store data on disk in a column-oriented format, it is distinctly different from traditional columnar databases: whereas columnar databases excel at providing real-time analytical access to data, HBase excels at providing key-based access to a specific cell of data, or a sequential range of cells.

Storage Model

HBase是persistent, 但它其实是一个memory与disk的结合体。增删改时数据先写WAL(可配),再写入memstore,在memtable满了之后,才会flush到持久化存储介质(一般是HDFS,3备份,都可配)的HFile里。而查询时,会merge memstore和多个HFiles的数据。而且,为了避免disk IO的延迟,对于hot data,Region server会把它以block为单位cache起来,也就是说,最好的情况下,有可能你的查询都在内存里完成。

Consistency model

这里的consistency,我理解是指CAP(consistency, availability,partition tolerance)里的一致性,包含了原子性和隔离性两个概念。HBase保证对于一个row-key里多个column families or columns的写入是原子的,但跨row-keys的写入就非原子了。这也意味着,如果有两个clients A和B在访问HBase,A批量写入一些跨row-keys的数据,B在此过程中读取会看到行与行的中间态,但不会有一行数据的中间态。

Physical model

HBase天生就是分布式的(当然为了学习目的,也可以把所有进程都配置在一台服务器上)。它依赖zookeeper协调各种进程角色,例如master与backup master之间的选主,region servers的存活性检查等,以及关键数据的保存,例如root,meta tables等。

在增加新region servers,对用户基本无感,master会在合适的时机(例如load balance 或者splitting),把流量分配到新服务器上。

Read/write performance

HBase支持random access, range scan, sequence access等,由于其数据是按key有序存储的,且存在block cache,所以比较高效的是range scan和sequence access。大量的Random access会使block cache无效,且造成频繁的disk IO,甚至Java heap问题,导致GC,性能会受到影响。

写入是先追加写WAL,再写memstore。后者是内存操作,耗时较小。WAL可以配置是实时flush,还是定期、阈值flush,由于其底层存储也是HDFS,故flush的代价较大,需要平衡持久性和写性能。

Secondary indexes

HBase不支持secondary indexes。但可以通过row-key,column family,version等的设计,模拟一些secondary indexes的场景。而且在HBase的设计理念里,也可以通过一些反范式的方式,例如冗余存储,提高查询性能,所以,是否一定要依赖secondary indexes也是一件值得商榷的事情。

Failure handling

首先,HBase架构里,master是一个单点,但由于client对数据的ACID不需要经过master,是直接与zookeeper和region servers通信的,所以master故障时,服务仍然可用。

但master的故障,在恢复或容错之前,会导致表结构、集群配置等无法修改。而且load balancing、splitting等操作也无法进行,如果再同时遇到region server的故障,那么也无法恢复等等。所以,master的故障还是非常需要重视的。

它的容错通过backup masters与zookeeper集群配合的选主完成,由于master上没有什么不可恢复的数据,例如region servers配置等是在zookeeper上,每个region server有哪些分片数据,是由region server自己维护并上报的,所以master的切换一般不会造成数据或元数据的丢失。

其次,如果region server故障,由于在逻辑层它是单点,所以短期内其上拥有的分片都是不可访问的(??),但数据不会丢失。

master监测zookeeper 临时节点的变化,感知到region server的故障(不论是硬件还是网络导致的),会选择新的region server来承担它之前的任务。在切换前,需要将故障region server的WAL日志splitting(由master和多个region servers配合完成)和replay(新region server完成,它之前需要从HDFS加载各个分片对应的已持久化的HFiles)。之后新的region server可用。

底层存储层,如果使用的是HDFS,则failure handling是由hdfs保证的。

Compression

如果不是大数据,用HBase就太复杂了。而既然是大数据,那么压缩就是一个重要的问题,动辄上T的数据啊!HBase支持多种压缩算法,例如gzip等,且支持扩展。这里指的HBase的压缩是kv层面的,底层的HDFS可能还会有压缩吧(??)。

Load balancing

HMaster定期会启动load balancing(默认5min),检查有没有负载不均衡的情况并予以处理。其理念比较简单,就是每个region servers平均分配regions。

元数据的修改,应该比较好办,耗时也较小。但假设A region server负载重,需要把一些regions移走,那么在此之前,它需要flush WAL,flush memstore,如果本身负载就重,再做这些事情,也比较麻烦吧?HBase有专门的config限制balancing的耗时,默认是2.5min。

由于没真实经历过,蛮好奇在实际运维中,是如何解决的呢?选择流量低峰期收工触发?

Atomic read-modify-write

在上面consistent里,已经介绍了HBase的row层级的事务。同时,HBase还提供Compare and set(CAS)操作。例如checkAndPut、checkAndDelete,针对相同的rowkey,先check是否等于指定值,然后再put or delete。

HBase的增删改操作,如前所述,都需要先写optional的WAL,再写memstore。猜测memstore是可以用row级别的锁来保证没有竞争,但WAL呢?如果涉及一个row下的多个column,且分成多个kv pair写入log,中途失败了怎么办呢?那么在replay的时候就破坏了原子性了!为了解决这个问题,HBase将一次写入的、同一rowkey的、多个column的数据,封装为一个WALEdit对象,一次性写入log,这样就保证了故障时的原子性。(异步flush这个waledit时如果失败会怎样?这时client端应该已经接收到true的返回值,认为成功。所以,就丢失数据了!这个topic应该在durability里讨论)

Locking, waits and deadlocks

HBase与数据相关的锁,应该大部分是行级锁。写入有row level lock;get/scan操作依赖multiversion concurrency control-style,所以不需要锁。

以put操作为例,默认情况下,锁是由server端自动管理的。也提供方法,在client端自己构造RowLock实例,并作为参数调用put方法。但这样会增加deadlock的几率,并降低HBase的吞吐!所以,除非万不得已,别这么玩。

get操作也同样提供了RowLock参赛,也同样由于deadlock和性能的原因,不推荐用。

关于锁,也有一些发散的疑问:

每个regionserver共享一个HLog实例,那锁加在哪里呢?HRegion是同步调用HLog写入的,所以会不会是互斥锁保证HLog实例在同一时间,只能被一个HRegion调用?

memstore是一个怎样的结构呢?有序数组?树形结构?数组插入数据的效率太差,应该不大可能。如果是树形结构,那么一旦发生节点关系的调整,会不会对整棵树、或path有影响呢?这依赖于其数据结构的选择。

当跨column families时,涉及多个store,即涉及其中的多个memstores,所以,这个锁是较为独立的?