Archive for the ‘android’ Category

Spark的很多transformation方法是基于combineBy的,会导致shuffle过程,所以一般认为成本较大。那么具体过程如何?涉及到哪些spark配置,又该如何调整呢?数据倾斜问题又会对此造成什么影响呢?

本文暂不涉及调优分析,仅从代码层面基于pyspark分析combineBy的实现原理和其中的shuffle过程,期望抛砖引玉。

接口定义如下:

 Python |  copy code |? 
1
    def combineByKey(self, createCombiner, mergeValue, mergeCombiners,
2
                     numPartitions=None)

约定Combiner和value含义为:

  • Value 是PariRDD中的value值
  • Combiner是combineByKey完成后PairRDD的value值,可以与Value类型不一样

前3个参数都是callback方法:

  • Combiner createCombiner(Value),通过一个Value元素生成Combiner元素,会被多次调用
  • Combiner mergeValue(Combiner, Value),将一个Value元素合并到Combiner里,会被多次调用
  • Combiner mergeCombiners(Combiner, Combiner),将两个Combiners合并,也会被多次调用

从python/pyspark/rdd.py的combineBy代码看到,其处理过程分为3步:

  1. locally_combined = self.mapPartitions(combineLocally),在python进程进行combine
  2. shuffled = locally_combined.partitionBy(numPartitions) ,进行python进程内部的shuffle和基于scala的worker nodes间shuffle
  3. return shuffled.mapPartitions(_mergeCombiners, True) ,类似MR的reducer,聚合shuffle的结果

下面来详细看每一个steps。

Step1,locally_combined = self.mapPartitions(combineLocally)。

 Python |  copy code |? 
1
        def combineLocally(iterator):
2
            merger = ExternalMerger(agg, memory * 0.9, serializer) \
3
                if spill else InMemoryMerger(agg)
4
            merger.mergeValues(iterator)
5
            return merger.iteritems()

该方法根据spark.shuffle.spill方法决定是使用ExternalMerger还是InMemoryMerger,其中ExternalMerger的内存是由spark.python.worker.memory限定的。以下主要关注ExternalMerger,磁盘+内存的外部排序。另外,merger.mergeValues也是有可能调用全部3个callbacks方法的,不仅仅是mergeValue callback,不要被它的名称迷惑了。

ExternalMerger的关键成员变量:

  • data: dict of {K: V},unpartitioned merged data,还没有spill前的数据都是放在该dict里的
  • pdata:list of dicts,list长度由self.partitions数目决定,该值与numPartitions不一样。partitioned merged data,如果已经发生过spill,则后续的数据都读入pdata并hash到不同的dict槽位里。

ExternalMerger的外存文件路径是/path/to/localdir// 。其中localdir路径可以有多个,当位于不同的磁盘时,可以提高并发写入速度。spill_num是spill的轮次,partition_num是self.partitions对应的分片数。文件中的每一行是序列化后的Key-Combiner list。

再来看mergeValues的方法主体:

 Python |  copy code |? 
01
def mergeValues(self, iterator):
02
 
03
    """ Combine the items by creator and combiner """
04
 
05
    iterator = iter(iterator)
06
 
07
    # speedup attribute lookup
08
 
09
    creator, comb = self.agg.createCombiner, self.agg.mergeValue
10
 
11
    d, c, batch = self.data, 0, self.batch
12
 
13
 
14
    for k, v in iterator:
15
 
16
         d[k] = comb(d[k], v) if k in d else creator(v)
17
 
18
    c += 1
19
 
20
 
21
    if c % batch == 0 and get_used_memory() > self.memory_limit:
22
 
23
        self._spill() // 按实时计算的内部partition idself.data中的数据 flush到disk,为了兼容pdata的flush,这里每行一个单元素的dict
24
 
25
        self._partitioned_mergeValues(iterator, self._next_limit())
26
 
27
        break // iterator指针已经交给_partitioned_mergeValues了,这里就直接跳出了

针对当前mapPartition里的每一个元素,调用createCombiner或mergeValue callback生成新的Combiner对象,并存入self.data[k]里。定期(根据c%batch,注意batch的值会动态调整)检查内存,如果不超限就继续读入。直到内存不足时,首先调用_spill方法把self.data flush,然后重新计算memory_limit并把处理权交给_partitioned_mergeValues。

 Python |  copy code |? 
01
    def _partitioned_mergeValues(self, iterator, limit=0):
02
        """ Partition the items by key, then combine them """
03
        # speedup attribute lookup
04
        creator, comb = self.agg.createCombiner, self.agg.mergeValue
05
        c, pdata, hfun, batch = 0, self.pdata, self._partition, self.batch
06
 
07
        for k, v in iterator:
08
            d = pdata[hfun(k)]
09
            d[k] = comb(d[k], v) if k in d else creator(v)
10
            if not limit:
11
                continue
12
 
13
            c += 1                                                                                                       
14
            if c % batch == 0 and get_used_memory() > limit:                                                             
15
                self._spill()                                                                                            
16
                limit = self._next_limit() 

该方法也是继续读入元素,但不同的是,将新生成的Combiner存入self.pdata[hfun(k)][k]里,即根据内部partitions方法进行分片。如果内存不足,则调用_spill写入磁盘,并调整limit值。

上面两次调用了_spill方法,由于只有第一次是针对self.data的,故应只有spill_num=1时的partition file里,才有多行;后续spill_num的partition file都是一个大行。

mergeValues把当前partition里具有相同key的values合并到一个Combiner里了,但这些数据可能在内存和磁盘上,这就需要iteritems()来把它们归并起来返回给上层调用者。如果没用spill过,证明数据都在self.data里,就直接返回其迭代器即可。否则就要调用_external_items()来归并了。

_external_items()按partition处理文件,即每次处理多轮spill生成的一个partition id对应的文件。为了最大程度利用内存并降低复杂度,首先会把pdata里的数据也spill掉。

 Python |  copy code |? 
01
        try:
02
            for i in range(self.partitions):
03
                self.data = {}
04
                for j in range(self.spills):
05
                    path = self._get_spill_dir(j)
06
                    p = os.path.join(path, str(i))
07
                    # do not check memory during merging
08
                    self.mergeCombiners(self.serializer.load_stream(open(p)),
09
                                        False)                                                                           
10
 
11
                    # limit the total partitions                                                                         
12
                    if (self.scale * self.partitions < self.MAX_TOTAL_PARTITIONS                                         
13
                            and j < self.spills - 1                                                                      
14
                            and get_used_memory() > hard_limit):                                                         
15
                        self.data.clear()  # will read from disk again                                                   
16
                        gc.collect()  # release the memory as much as possible                                           
17
                        """ chengyi02: Because the yield values before are also using memory(in the caller),             
18
                        so if now more than limit, the following partitions will almostly exceed too.                    
19
                        So recursive merged all the remaining partitions.                                                
20
                         """                                                                                             
21
                        for v in self._recursive_merged_items(i):                                                        
22
                            yield v                                                                                      
23
                        return                                                                                           
24
 
25
                for v in self.data.iteritems():                                                                          
26
                    yield v                                                                                              
27
                self.data.clear()                                                                                        
28
 
29
                # remove the merged partition                                                                            
30
                for j in range(self.spills):                                                                             
31
                    path = self._get_spill_dir(j)                                                                        
32
                    os.remove(os.path.join(path, str(i))) 

在调用self.mergeCombiner处理一个/ 文件时,不检查内存是否超限,否则可能会继续向当前/path/to/localdir/下flush数据,就破坏文件数据结构了。而是在一个文件处理完成之后,检查内存若超限且分片数可控,则调用_recursive_merged_items进行递归的合并。

这里需要注意的是,_recursive_merged_items会所有>=i(当前partition id)的数据,而非仅处理当前partition。这是因为,当前内存消耗主要包含两块:rdd.py里调用者保存的yield返回内存块,以及当前partition已经读入的数据。假设partition比较平均,则后者数据量相对稳定;而前者不断增长。所以后续内存超限的几率会更大。

_recursive_merged_items会new 一个ExternalMerger对象m,将文件里的数据依次读入、merge,并按需spill(这时会spill到其他localdir目录,spill_num也重新开始计数,可以视为在不同的数据空间),最后通过m._external_items返回合并后的数据。在m._external_items里还会不会再次发生递归调用呢?几率很小,因为这个方法里的内存消耗基本等同于m的一个partition分片数据大小。而m的所有数据 == self的一个partition,故m的一个partition数据量非常小。即使再次发生递归调用,m的子m 分片数据量会依次递减,故会再次降低spill几率。

上面的描述基本上符合外部排序算法,但工程的世界里还需要考虑GC问题。在_spill的最后和_extern_items里都调用了python的gc.collect()方法,同步释放引用计数为0的内存块。但由于collect不一定能那么完美的释放,一些reachabled还是无法释放的,如果这部分存量较大(例如很逼近本次limit),那极端情况下一个元素就又会触发spill了,这显然逼近耗时也没用意义。所以ExternalMerger会动态调整limit值,max(self.memory_limit, get_used_memory() * 1.05)。所以,这能看出来了,python进程实际消耗内存可能会大于python.worker.memory值。

以上step1结束,返回的locally_combined_rdd的结构为:[(k, Combiner), …]。k还是原来的k,但当前worker task下所有相同k的values都聚合到了Combiner中。

step2,shuffled = locally_combined.partitionBy(numPartitions)

这里有一个问题,为什么不直接把rdd给scala的partitionBy,而在python代码里实现了一些内部shuffle的逻辑呢?首先,partitionBy的回调方法是可定制的,python里默认是portable_hash,如果用scala实现,如何回调python的分片函数呢?其次,python和scala间的通信是需要序列化的,一条一条的成本有点大,所以python shuffle后也做了batch。具体如下。

(注:partitionBy完之后,数据量不会有变化,以上kv变化仅为了代表shuffle后一个task里只会有属于自己分片的key了。)

从代码可以看到python里计算了partition_id,所以scala仅根据确定的分片情况,进行shuffle。在深入之前,先看下python partitionBy里涉及到的几个rdds:

  • 自身,即self,PairRDD(k,v),通过mapPartitions向下转化
  • keyed,PairRDD(paritition_id, [(k,v),(k,v),…]),通过copy constructor向下转化
  • pairRDD,JavaPairRDD,java会调用适配到scala,并通过scala的partitionBy向下转化
  • jrdd,java代理的scala的ShuffledRDD,通过copy constructor向下转化
  • rdd,PairRDD(k,v),分片完成

keyed的生成过程比较简单,由内部方法add_shuffle_key实现,完成k到partition_id的计算,并将一批kv打包、序列化作为keyed rdd的v。Every 1k items, will check used_memroy once, and if equal to batch+1, will also flush。并且batch的值会动态调整,尽量使生成的批量包大小在1M和10M之间。

在jrdd的构造过程中,会生成shuffleDependency,当stage提交时,DAG调度发现该dependency,就会发起ShuffleMapTask,生成shuffle数据(相当于MR里map端的shuffle工作):

ExternalSorter是shuffle的关键,在该分支中生成ExternalSorter的参数:

 Scala |  copy code |? 
1
sorter = new ExternalSorter[K, V, V](None, Some(dep.partitioner), None, dep.serializer)

即将aggregate=None;dep.partitioner此处是PythonPartitioner,直接使用rdd的key作为paritition_id,在该场景就是python里计算好的partition_id;ordering=None。其中aggregate和ordering为None即代表无需再combine和sort,只单纯shuffle就好,以下仅关注这个分支,由insertAll和writePartitionedFile合作完成。

insertAll的一个可能分支是bypassMergeSort == True,即如果partitions数目较少,且无需aggregate和ordering,则直接写numPartitions个文件,随后再在writePartitionedFile里简单concate。缺点是每个worker node上都同时打开numPartitions个文件,有额外内存消耗,且可能too many open files。

另一个可能分支是大数据时的常见场景(无需combine且大量写),针对每条数据:

 Scala |  copy code |? 
1
buffer.insert((getPartition(kv._1), kv._1), kv._2.asInstanceOf[C])
2
maybeSpillCollection(usingMap = false)

由于python里做了批量打包,故这儿的一条数据的v对应python的一批数据的kv了。每插入一条数据,检查是否需要spill。这儿的实现思路与python的externalMerger也类似。简单介绍下,insertAll的maybeSpillCollection最终调用spillToMergeableFiles,根据partition_id和key对内容排序(这里partition_id==key),每次spill写入一个临时文件,并且把file信息记录在spills数组里。

随后writePartitionedFile得把之前分布在多个临时文件里的数据归并为最终输出的partition文件。与spilled bypassMergeSort对应的分支,因为一个partition的数据都在一个partition文件里,所以简单的concate即可。否则,数据分散在内存和多个文件里,整合的过程由PartitionedIterator迭代器触发完成,返回的iter依次写文件。

以上过程,将shuffled数据写入disk了,但怎样被下一个计算单元使用呢?

step3,return shuffled.mapPartitions(_mergeCombiners, True) 

虽然spark1.2之后已默认使用sort based shuffle,但sort shuffle还是使用HashShuffleReader读取数据:SortShuffleManager -> HashShuffleReader -> BlockStoreShuffleFetcher.fetch() 。其目的是收集上游多个worker node产生的shuffled数据,所以必然有network I/O。

BlockStoreShuffleFetcher.fetch里,先获取shuffleId和reduceId对应的blocks上游节点信息,信息包括((address, size), index),其中address是BlockManageId类型。之后调用ShuffleBlockFetcherIterator从local block manager和remote BlockTransferService处获取blocks数据。

ShuffleBlockFetcherIterator的initialize方法调用splitLocalRemoteBlocks方法根据address.executeId生成出remoteRequests 对象,针对同一address的多个block,若size < maxBytesInFlight/5 则合并为一个request。从而通过这种方式,确保最大并发度为5。也可以看出,如果有大的block,则request size可能大于maxBytesInFlight。随后会对remoteRequests列表随机化,以保证请求尽量均衡。

initialize方法随后立即调用sendRequest发送多个请求,并打印日志:logInfo(“Started ” + numFetches + ” remote fetches in” + Utils.getUsedTimeMs(startTime))。sendRequest调用shuffleClient.fetchBlocks()读取远端数据,并注册BlockFetchingListener,后者的onBlockFetchSuccess方法会接收buf数据并添加到results队列里。

ShuffleBlockFetcherIterator的next()方法先“释放”内存,并在内存充足的情况下再发出sendRequest请求。随后再读取results队列里的数据,并解压、反序列化,返回迭代器iterator。这里有两个小的优化点,“释放”内存并不是真的触发gc,而是从bytesInFlight里减去已接收到results队列的数据长度,因为后者随后将被读取,而bytesInFlight限制的是网络缓存。另一个是先sendRequest再处理results中的数据,原因是前者是异步调用,且耗时可能较长。

还需要注意的是,sendRequest调用的shuffleClient实际是BlockTransferService的一个实例,有netty和nio两种实现方式。

以上完成了shuffle数据的接收,next返回的iterator最终回到python代码进入step3的剩余阶段,这就很简单了,实际上此时才真正执行shuffled.mapPartitions(_mergeCombiners, True)。

通过以上3个步骤,python、java和scala多语言的交互,最终完成了pyspark的combineByKey。个人理解,由于scala的partitionBy和shuffle过程早已实现,所以pyspark主要解决的是 如何更优的进行多语言交互,以达到较优的性能、扩展性、复用度。

1.   Can’t connect to https://dl-ssl.google.com/android/repository/repository.xml

WorkArounds:

Add proxy in “Settings”

Select “Force https://… sources to be fetched using http://…” in “Misc” tab.

android sdk config

2.   Can’t install Update 7. Error prompt:

Folder failed to be renamed or moved on SDK install

Solution available on http://code.google.com/p/android/issues/detail?id=4410, which is a useful link to query the android open issues. Also abstract the useful one on my PC.

WorkArounds:

I had the same problem: the latest update failed to install because it couldn’t rename the tools folder in android-sdk-windows.  I’m using Avira antivirus and disabling it didn’t help, but I don’t think it had anything to do with the AV program anyway.

Fact is, running the Android SDK setup apparently uses items in the “android-sdk-windows\tools” directory.  I’m on Win 7 x64 so maybe that causes some unique situation – I’m not sure.

Solution:

– I made a copy of the tools folder itself (keeping it at the same directory tree level, thus “tools” and “tools-copy” were both in the “android-sdk-windows” folder).

– I ran Android.bat from that copy

– I ran the update without problems (it updated the original, not-being-used-at-the-moment tools folder, among whatever other items it needed to).

– I closed the SDK, deleted the folder (I had to kill the adb.exe process first – not sure why that always persists but you can’t delete the folder without doing that).

– I restarted the SDK from the normal (now-updated) tools folder.  Worked like a charm!

Note that simply killing adb.exe was NOT sufficient to get around the original issue… only by copying the tools folder and using the copy to run Android for the duration of the update process was enough to rectify the problem.

Tips for SDK configuration:

  1. Add the following environment path and restart:

ANDROID_SDK_HOME  E:\

This very helpful. First, save our C:\ space. SDK will save the created AVD and configurations to C:\Documents and Settings\<user>\.Android by default.  After add the environment path, these will be saved under E:\.Android . Second, reinstall system will not impact the SDK configuration.

TO BE ADD MORE.

Just collection and add some notes.

Abstract from http://androidappdocs.appspot.com/sdk/index.html

Eclipse IDE

  • Recommended versions: Eclipse 3.4 (Ganymede) or 3.5 (Galileo)

Caution: There are known issues with the ADT plugin running with Eclipse 3.6. Please stay on 3.5 until further notice.

  • Eclipse JDT plugin (included in most Eclipse IDE packages)
  • Several types of Eclipse packages are available for each platform. For developing Android applications, we recommend that you install one of these packages:
    • Eclipse IDE for Java EE Developers
    • Eclipse IDE for Java Developers
    • Eclipse for RCP/Plug-in Developers
    • Eclipse Classic (versions 3.5.1 and higher)
  • JDK 5 or JDK 6 (JRE alone is not sufficient)
  • Android Development Tools plugin (optional)
  • Not compatible with Gnu Compiler for Java (gcj)

My workspace: Eclipse 3.5.1 Galileo (include JDT plugin)+ JDK 1.5.0_15 + SDK (update to version 7)

Android的底层是基于C/C++的,但是做应用开发,需要使用java语言。而google官方推荐使用的开发工具是Eclipse。所以我们需要安装java开发必备的jre/jdk、android的库(SDK)、Eclipse以及Eclipse的插件(ADT)。

1、 一般,linux下已经安装了java,使用java -version看下版本,如果低于1.6的话,就需要重新安装了。

2、 下载SDK的管理包:sudo wget “http://dl.google.com/android/android-sdk_r06-linux_86.tgz”。注意,这里并不是直接把SDK安装了,解压之后,看到它“SDK\ Readme.txt”,需要执行“tools/android update sdk”,才会真正安装SDK。这里,需要有桌面系统,所以远程登录的同志们注意了。最好也使用root权限安装。

3、 下载Eclipse,从官网即可,需要对应的版本(linux、32/64位机)。解压即可。如果启动的时候出现问题,请查看/etc/sysconfig/selinux,关闭selinux的支持:

#SELINUX=enforcing
SELINUX=disabled

4、 安装Eclipse的插件ADT。可以通过Eclipse->help->install new software,使用ADT链接安装,也可以下载了ADT之后,解压,把plugins、features里的内容copy到Eclipse路径下的plugins、features下,并且删除eclipse/configuration/org.eclipse.update,重启Eclipse即可。