该项目基于Pyspark core,处理1.5TB input,期间涉及rightOuterJoin、reduceBy以及多媒体处理,执行耗时从最初的1hour+,到最终的10min+-。这里记录一些优化的方法。

调优一般3步走:

  • 找瓶颈,从各种log和统计数据,定位瓶颈在哪里,Job、stage、细节交互、function层层递推
  • 制定目标,通过对spark、业务、代码的理解,给出一个靠谱的目标
  • 优化
    • 针对stage,我们优化了Java GC、数据倾斜、并发度
    • 针对细节和function,我们调整shuffle等配置,合理设计代码逻辑,避免过多的网络IO和跨语言交互

数据本地化

Job的input来自HDFS,但由于公司的HDFS与Spark集群目前没有共享资源,所以无法做到local或rack local,只能尽量选择离Spark机房较近且专线的HDFS机房,避免网络延迟的过度干扰。

瓶颈定位方法

  1. driver端业务日志里,打印总耗时,所有的优化手段只有较大影响到该值,才认为是有效的
  2. 打开spark.python.profile,针对每个RDD分析python内部方法调用耗时
  3. 通过master UI界面(我们还是standalone mode),找到最耗时的stage、不均衡的task等
  4. executor stderr log,通过打开log4j的level = DEBUG,可以看到scala进程内部很多运行细节

提高并发度

一开始我们的输入是业务方提供的原始数据,HDFS block size=256MB,而整个文件才3.5GB,所以针对这个数据源只有14个并发任务,每个耗时数分钟。

通过将文件拆分为1000个小part,将并发提升至1000,每个耗时20s左右。(20s其实算短task,有可能稍微增大些性能反而更好)

同时,修改spark.cores.max=1000,使我们最多可以获得1000个可用cores,分布在64个executors上面(每个是16cores)。

由于中间有rightOuterJoin过程,我们也设置了numPartitions参数=1000(该值后续会优化)。

内存使用

pyspark场景,driver java进程、driver python进程、executor java进程、executor python进程都需要消耗内存。driver端还好,在我们当前的场景下,并没有使用太大内存。但executor端,python和scala/java都有shuffle需求,故需要分配足够内存。

我们的配置是:

 Python |  copy code |? 
1
spark.python.worker.memory 40g
2
spark.driver.memory 10G
3
spark.executor.memory 100G

其中,executor.memory代表java进程内存,fraction没有修改,使用默认的storage占60%、shuffle 20%、code 20%的比例。

但通过UI界面,发现某些tasks失败,通过speculation重试后又可以成功,出错日志里包含如下异常,重试几次失败后,task就退出了:

 Scala |  copy code |? 
1
OneForOneBlockFetcher: Failed while starting block fetches
2
     java.io.IOException: Connection from other−executor−ip closed

在other-executor-ip所在服务器看到如下异常,但稍后又可以正常提供服务:

 Scala |  copy code |? 
1
TransprotChannelHandler: Exception in connect from ip:port
2
    java.lang.OutOfMemory: requested array size exceeds VM limit

猜测other-executor-ip在提供shuffle output时内存不足,需GC释放足够后,才可以正常提供服务。故调整GC相关参数如下:

 Scala |  copy code |? 
1
          −−conf "spark.executor.extraJavaOptions=−XX:+UseParallelGC −XX:+UseParallelOldGC −XX:ParallelGCThreads=64 −XX:NewRatio=3 −XX:SurvivorRatio=3 −XX:+HeapDumpOnOutOfMemoryError −XX:HeapDumpPath=/home/spark/heapdump.bin −XX:+PrintHeapAtGC −XX:PermSize=256m −XX:MaxPermSize=256m  −XX:+PrintGCDetails −XX:+PrintGCTimeStamps −XX:+PrintGCDateStamps −XX:+PrintTenuringDistribution −XX:+PrintGCApplicationStoppedTime −XX:−OmitStackTraceInFastThrow  −verbose:gc −Xloggc:/tmp/spark.executor.gc.log −Dlog4j.configuration=ftp://.../tmp/chengyi02/log4j.properties"

调整后没有再看到相关错误。

数据倾斜

开始的时候,有一个数据输入源的文件分片较多(>1000),尝试过用默认的partition func对其repartition为1000个分片,结果导致了分片不均衡,有的分片256MB,有的只有几MB,这跟我们的数据key有关系。不repartition反而更好。

另外,在《spark 任务调优》也提到当出现少数慢tasks时的处理方法。

序列化方法与压缩

Kryo等优化方式,都是针对scala、java语言的。pyspark的数据本身就是在python进程里序列化后,才作为二进制流传递给scala进程的,所以开启kryo没有效果。

默认情况,spill、broadcast、shuffle等的compress都是开启的。另外,我们也将spark.rdd.compress设置为true了。

rightOuterJoin的numPartitions参数调整

rightjoin操作用来整合多个数据源的输入,产生最终被处理的数据(2TB+),并作为map-filter-reduceByKey stage的parent。将rightOuterJoin的numPartitions参数从1000下降为400,则reduceByKey stage的执行时间从7min下降到4min左右。

结论先放着:如果使用pyspark,产生shuffle操作时,可以尝试下numPartitions数量设置为cores.max的一半。(但由于我们的集群环境与其他任务混布,测试数据不稳定,无法给出确切结论)

查看executor的debug log发现,调整numPartitions前,单机并发16tasks;调整后,单机并发7tasks。每个scala task获取shuffle的耗时从2.3s左右,下降到500ms左右。但这个降幅,离分钟差太远,应该不是决定性的影响因素。

以优化后的index=0,TID=7802为例,分析其耗时:

 Javascript |  copy code |? 
1
15/06/01 13:52:50 INFO CoarseGrainedExecutorBackend: Got assigned task 7802
2
15/06/01 13:52:50 INFO Executor: Running task 0.0 in stage 6.0 (TID 7802)
3
15/06/01 13:52:54 DEBUG BlockStoreShuffleFetcher: Fetching outputs for shuffle 0, reduce 0
4
15/06/01 13:52:54 DEBUG BlockStoreShuffleFetcher: Fetching map output location for shuffle 0, reduce 0 took 514 ms
5
15/06/01 13:55:40 INFO PythonRDD: Times: total = 170763, boot = −43996, init = 48943, finish = 165816
6
15/06/01 13:55:40 INFO Executor: Finished task 0.0 in stage 6.0 (TID 7802)33362 bytes result sent to driver

可以看到scala的shuffle read耗时只有514ms,说明shuffle read本身在scala内部其实没有太大耗时。进一步分析python pstats输出,优化前,rdd_23.pstats耗时479039.008 seconds,平均tasks耗时=7.9min;优化后,66494.758 seconds,平均耗时=2.7min。

屏幕快照 2015-06-01 17.44.33

从上图可以看到,发生剧烈变化的就是{method ‘read’ of ‘file’ objects},相差约6倍!猜测的原因是,当并发度16时(实际有16个scala和16个python进程),由于单机cores=16,且物理内存也就100G+,故tasks间相互产生较大的竞争关系。而并发度7时,竞争降低,进程间通信的效率提高。

上图一个可能的疑问是,根据代码逻辑,每一条item都会调用feature、policy等方法,其调用次数是5kw,而cPickle.loads的调用次数只有1kw,不相符。如果调整numPartitions数目,会看到loads次数随之改变。这是由于pyspark.SparkContext初始化时,会生成一个批量序列化对象:

 Python |  copy code |? 
01
    def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer,
02
                 conf, jsc):
03
        self.environment = environment or {}
04
        self._conf = conf or SparkConf(_jvm=self._jvm)
05
        self._batchSize = batchSize  # −1 represents an unlimited batch size
06
        self._unbatched_serializer = serializer
07
        if batchSize == 0:
08
            self.serializer = AutoBatchedSerializer(self._unbatched_serializer)
09
        else:
10
            self.serializer = BatchedSerializer(self._unbatched_serializer,
11
                                                batchSize)

通过查看不同numPartitions cPickle.loads 在该stage的总耗时,波动不大,故没有调优余地,无需关注。(虽然percall变化大,但那是由于batch造成的)

以下列出executor log中一些与性能相关的关键词:

 Python |  copy code |? 
1
15/06/01 10:55:44 INFO MemoryStore: ensureFreeSpace(62854) called with curMem=9319, maxMem=55082955571
2
15/06/01 10:58:20 DEBUG ShuffleBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329
3
15/06/01 10:58:20 INFO ShuffleBlockFetcherIterator: Getting 7725 non−empty blocks out of 7747 blocks
4
15/06/01 11:04:59 INFO MemoryStore: Block rdd_27_89 stored as values in memory (estimated size 3.4 MB, free 51.3 GB)
5
15/06/01 10:56:00 INFO TorrentBroadcast: Reading broadcast variable 5 took 361 ms
6
15/06/01 10:57:56 INFO PythonRDD: Times: total = 9401, boot = −3692, init = 3791, finish = 9302
7
15/06/01 10:58:20 DEBUG BlockStoreShuffleFetcher: Fetching map output location for shuffle 0, reduce 627 took 2338 ms

Leave a Reply