在我们的一个项目中,使用pyspark,并通过swig调用了C++ so包,对一堆textFile和sequenceFile进行join、parser等处理。但在某些特定的输入下,会报Python worker exited unexpectedly的Exception,导致任务失败。虽然我们的python和C++代码都通过了unittest,并且本地模式运行正确,但线上cluster环境、输入数据源都与测试环境存在差异,以下记录线上环境定位与解决问题的方法。

问题描述

spark任务失败时,driver端会打印一些error log,但基本上没有太大作用。建议先去UI界面,找到失败的stage,大概心里明白问题可能出现在哪些阶段:

屏幕快照 2015-05-29 13.21.29

然后找到fail task对应executor的stderr,搜exception关键词:

 Python |  copy code |? 
01
15/05/29 11:35:03 ERROR Executor: Exception in task 237.3 in stage 5.0 (TID 2568)
02
org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
03
 at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:170)
04
 at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
05
 at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
06
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
07
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
08
 at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305)
09
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
10
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
11
 at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
12
 at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
13
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
14
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197)
15
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
16
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
17
 at java.lang.Thread.run(Thread.java:744)
18
Caused by: java.io.EOFException
19
 at java.io.DataInputStream.readInt(DataInputStream.java:392)
20
 at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:110)
21
 ... 14 more

问题定位

这个exception是什么意思呢?从spark代码可以看到:

 Scala |  copy code |? 
01
      private def read(): Array[Byte] = {
02
        if (writerThread.exception.isDefined) {
03
          throw writerThread.exception.get
04
        }
05
        try {
06
          stream.readInt() match {
07
            case length if length > 0 =>
08
              val obj = new Array[Byte](length)
09
              stream.readFully(obj)
10
              obj
11
            case 0 => Array.empty[Byte]
12
            case SpecialLengths.TIMING_DATA =>
13
              // Timing data from worker
14
              val bootTime = stream.readLong()
15
              ……
16
              read()
17
            case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
18
              // Signals that an exception has been thrown in python
19
              val exLength = stream.readInt()
20
              val obj = new Array[Byte](exLength)
21
              stream.readFully(obj)
22
              throw new PythonException(new String(obj, UTF_8),
23
                writerThread.exception.getOrElse(null))
24
            case SpecialLengths.END_OF_DATA_SECTION =>
25
              // We've finished the data section of the output, but we can still
26
              // read some accumulator updates:
27
              val numAccumulatorUpdates = stream.readInt()
28
             ……
29
              }
30
              null
31
          }
32
        } catch {
33
 
34
          case e: Exception if context.isInterrupted =>
35
            logDebug("Exception thrown after task interruption", e)
36
            throw new TaskKilledException
37
 
38
          case e: Exception if env.isStopped =>
39
            logDebug("Exception thrown after context is stopped", e)
40
            null  // exit silently
41
 
42
          case e: Exception if writerThread.exception.isDefined =>
43
            logError("Python worker exited unexpectedly (crashed)", e)
44
            logError("This may have been caused by a prior exception:", writerThread.exception.get)
45
            throw writerThread.exception.get
46
 
47
         <strong> case eof: EOFException =&gt;</strong>
48
<strong>            throw new SparkException("Python worker exited unexpectedly (crashed)", eof)</strong>
49
        }
50
      }

exception应该是由最后那个case抛出的。如果对pyspark模式下,executor工作原理不清楚的,可以参考pyspark与spark的集成方式。scala进程这时会等待python的len+data格式的输出,但在使用readInt等待len的时候,python关闭了管道,于是scala接收到EOF,抛出EOFException。

所以,问题出在python进程里是没有疑问了。那会不会是python内存爆了呢?我们通过调用python/pyspark/shuffle.py的get_used_memory()方法,在worker端定期打印内存使用,发现也就300M+,所以排除之。

这时,拜托spark集群的OP帮忙看了下问题executor所在服务器的dmesg,发现有:

 Python |  copy code |? 
1
python[39160]: segfault at 0 ip 00007ff99a9671d0 sp 00007fff9b806880 error 6 in _basic_image_lib.so[7ff99a8f1000+126000]

但线上服务器没有产生core文件,所以仍然无法具体定位。而cluster环境我们也没有权限登录、修改。

如果能用local模式跑,就很方便了。但由于数据量太大,所以我们采用了笨方法,由于通过对stage的分析,可以知道问题应该是由于一组input files里的某些异常输入格式导致的,所以我们用二分法逐步将bad case定位到一个input file上。

为了简化问题,我们把对_basic_image_lib.so的调用提取为一个test脚本,以bad case input file为输入,setMaster(‘local[*]’)在本地模式运行,成功的产生了core file。这时用gdb python core.xxxx可以将问题缩小到具体的C++函数:

屏幕快照 2015-05-29 13.39.12

这时问题就相对简单了,可以由该c++ so包的开发同学具体跟进了。

Leave a Reply