虽然spark的资料不少,但其内核与原理介绍不多,而pyspark原理说明的就更少了。所以项目里遇到的问题,只能自己分析解决了。最近比较关注的一个问题是在broadcast大词典时,内存消耗非常大,而driver端java进程的内存用量更是达到python的2倍+。为什么呢?为了回答这个问题,还是得先搞清楚pyspark与spark的集成方式,因为一个看起来简单的spark map、reduce、broadcast,其中包含了非常多的进程、网络、线程交互,以及序列化、反序列化、压缩等等。

首先,看一下整合了pyspark后的spark runtime architecure,但在此之前,得先回顾一下没有python时简单的情况:

一个stage对应的tasks,从driver端传送至master,尤其选择合适的worker node,将tasks下发给Executor执行,结果直接返回给Driver Program。(根据不同的master方式,实现细节可能有区别)

那么当添加了python以后呢?可能是为了保持spark核心代码的精简以及用统一的模式未来适配到更多语言,pyspark的实现者选择了尽量不改变交互协议下的外围封装,这也造成非常多的python与java进程间的交互。看一下修改后的架构图:

driver与worker端都有所修改,其中白色是新增的python进程,其通过py4j开源包、socket、local file system等多种方式,与java进程交互。之所以有多种交互方式,分别是为了应对不同的使用场景。例如py4j是为了python调用scala方法;直接的socket用于java主动发起与python的交互;file system用于交互大量数据,例如broadcast的值。

有了以上的基本知识后,再来看交互序列图:

 

  • spark-submit your-app.py 启动spark job,由于是python脚本,故由PythonRunner处理(注意,它是scala代码),它首先启动py4j的socket server,然后fork 子进程执行python your-app.py
  • python解析your-app.py,计算执行的rdd族谱关系等,并通过py4j socket调用scala,生成PythonRDD
  • driver上的scala进程,处理PythonRDD,并将任务提交给master
  • master根据不同的策略,选择worker node,发起launchExecutor命令给后者
  • worker node上的java slave程序,处理launchExecutor命令,最主要的是复用或fork出新的python进程(先忽略其中与daemon.py的交互,下面再说),并与python进程间建立socket连接
  • worker node上的python worker进程,从socket接收job信息,开始执行,并将结果通过socket返回给java worker进程
  • java worker进程通过网络将结果返回给driver上的java进程
  • driver的java 进程,再返回给driver的python进程

以上忽略了很多容错、监控、数据统计的细节,但可以看到已经比较复杂了。

再来看一下daemon.py的作用,由于父子进程会COW复制内存,如果在一个已经分配了大内存的java进程上fork子python进程的话,会很容易造成OOM。所以spark的做法是尽量在java进程还没有消耗很多内存的开始阶段,就fork出一个pyspark/daemon.py子进程,后续由它再按需fork python worker进程。当然这是对unix-like(包括linux)系统有效,windows无法享受该红利。

以上涉及的细节很多,得看代码才可以真正了解,列举关键性代码路径如下,由于master没有看,所以欠奉:

  • Driver端
    • core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
    • python/pyspark/rdd.py
    • core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
    • core/src/main/scala/org/apache/spark/rdd/RDD.scala
    • core/src/main/scala/org/apache/spark/SparkContext.scala
    • core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
  • Worker端
    • core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
    • core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala->compute()
    • core/src/main/scala/org/apache/spark/SparkEnv.scala->createPythonWorker()
    • python/pyspark/daemon.py
    • python/pyspark/worker.py

关于broadcast时为什么消耗内存过多的问题,您有答案了嘛?后续将单独成文予以介绍 haha!

2 Comments

  1. Hui says:

    最近试图理解pyspark在executor上是如何工作的,看了您的blog,感觉清楚了好多,十分感谢!其中还有一个问题不是很清楚,如果在pyspark里调用cache的方法,RDD会存在both jvm和python端吗?看您画的最后一幅图,我感觉python worker只管计算然后返回结果给JVM,并不会存储RDD。

    Thanks,
    Hui

  2. flykobe says:

    cache的由java进程里的blockManager来管理的,所以不会持久化在python进程里。

Leave a Reply