Archive for the ‘spark’ Category

How-to: Tune Your Apache Spark Jobs (Part 1)

Learn techniques for tuning your Apache Spark jobs for optimal efficiency.

When you write Apache Spark code and page through the public APIs, you come across words like transformation,action, and RDD. Understanding Spark at this level is vital for writing Spark programs. Similarly, when things start to fail, or when you venture into the web UI to try to understand why your application is taking so long, you’re confronted with a new vocabulary of words like job, stage, and task. Understanding Spark at this level is vital for writing goodSpark programs, and of course by good, I mean fast. To write a Spark program that will execute efficiently, it is very, very helpful to understand Spark’s underlying execution model.

In this post, you’ll learn the basics of how Spark programs are actually executed on a cluster. Then, you’ll get some practical recommendations about what Spark’s execution model means for writing efficient programs.

How Spark Executes Your Program

A Spark application consists of a single driver process and a set of executor processes scattered across nodes on the cluster.

The driver is the process that is in charge of the high-level control flow of work that needs to be done. The executor processes are responsible for executing this work, in the form of tasks, as well as for storing any data that the user chooses to cache. Both the driver and the executors typically stick around for the entire time the application is running, although dynamic resource allocation changes that for the latter. A single executor has a number of slots for running tasks, and will run many concurrently throughout its lifetime. Deploying these processes on the cluster is up to the cluster manager in use (YARN, Mesos, or Spark Standalone), but the driver and executor themselves exist in every Spark application.

At the top of the execution hierarchy are jobs. Invoking an action inside a Spark application triggers the launch of a Spark job to fulfill it. To decide what this job looks like, Spark examines the graph of RDDs on which that action depends and formulates an execution plan. This plan starts with the farthest-back RDDs—that is, those that depend on no other RDDs or reference already-cached data–and culminates in the final RDD required to produce the action’s results.

The execution plan consists of assembling the job’s transformations into stages. A stage corresponds to a collection of tasks that all execute the same code, each on a different subset of the data. Each stage contains a sequence of transformations that can be completed without shuffling the full data.

What determines whether data needs to be shuffled? Recall that an RDD comprises a fixed number of partitions, each of which comprises a number of records. For the RDDs returned by so-called narrow transformations like map and filter, the records required to compute the records in a single partition reside in a single partition in the parent RDD. Each object is only dependent on a single object in the parent. Operations like coalesce can result in a task processing multiple input partitions, but the transformation is still considered narrow because the input records used to compute any single output record can still only reside in a limited subset of the partitions.

However, Spark also supports transformations with wide dependencies such as groupByKey and reduceByKey. In these dependencies, the data required to compute the records in a single partition may reside in many partitions of the parent RDD. All of the tuples with the same key must end up in the same partition, processed by the same task. To satisfy these operations, Spark must execute a shuffle, which transfers data around the cluster and results in a new stage with a new set of partitions.

For example, consider the following code:

It executes a single action, which depends on a sequence of transformations on an RDD derived from a text file. This code would execute in a single stage, because none of the outputs of these three operations depend on data that can come from different partitions than their inputs.

In contrast, this code finds how many times each character appears in all the words that appear more than 1,000 times in a text file.

This process would break down into three stages. The reduceByKey operations result in stage boundaries, because computing their outputs requires repartitioning the data by keys.

Here is a more complicated transformation graph including a join transformation with multiple dependencies.

The pink boxes show the resulting stage graph used to execute it.

At each stage boundary, data is written to disk by tasks in the parent stages and then fetched over the network by tasks in the child stage. Because they incur heavy disk and network I/O, stage boundaries can be expensive and should be avoided when possible. The number of data partitions in the parent stage may be different than the number of partitions in the child stage. Transformations that may trigger a stage boundary typically accept a numPartitionsargument that determines how many partitions to split the data into in the child stage.

Just as the number of reducers is an important parameter in tuning MapReduce jobs, tuning the number of partitions at stage boundaries can often make or break an application’s performance. We’ll delve deeper into how to tune this number in a later section.

Picking the Right Operators

When trying to accomplish something with Spark, a developer can usually choose from many arrangements of actions and transformations that will produce the same results. However, not all these arrangements will result in the same performance: avoiding common pitfalls and picking the right arrangement can make a world of difference in an application’s performance. A few rules and insights will help you orient yourself when these choices come up.

Recent work in SPARK-5097 began stabilizing SchemaRDD, which will open up Spark’s Catalyst optimizer to programmers using Spark’s core APIs, allowing Spark to make some higher-level choices about which operators to use. When SchemaRDD becomes a stable component, users will be shielded from needing to make some of these decisions.

The primary goal when choosing an arrangement of operators is to reduce the number of shuffles and the amount of data shuffled. This is because shuffles are fairly expensive operations; all shuffle data must be written to disk and then transferred over the network. repartition , join, cogroup, and any of the *By or *ByKey transformations can result in shuffles. Not all these operations are equal, however, and a few of the most common performance pitfalls for novice Spark developers arise from picking the wrong one:

  • Avoid groupByKey when performing an associative reductive operation. For example,rdd.groupByKey().mapValues(_.sum) will produce the same results as rdd.reduceByKey(_ + _). However, the former will transfer the entire dataset across the network, while the latter will compute local sums for each key in each partition and combine those local sums into larger sums after shuffling.
  • Avoid reduceByKey When the input and output value types are different. For example, consider writing a transformation that finds all the unique strings corresponding to each key. One way would be to use map to transform each element into a Set and then combine the Sets with reduceByKey:

    This code results in tons of unnecessary object creation because a new set must be allocated for each record. It’s better to use aggregateByKey, which performs the map-side aggregation more efficiently:

  • Avoid the flatMap-join-groupBy pattern. When two datasets are already grouped by key and you want to join them and keep them grouped, you can just use cogroup. That avoids all the overhead associated with unpacking and repacking the groups.

When Shuffles Don’t Happen

It’s also useful to be aware of the cases in which the above transformations will not result in shuffles. Spark knows to avoid a shuffle when a previous transformation has already partitioned the data according to the same partitioner. Consider the following flow:

Because no partitioner is passed to reduceByKey, the default partitioner will be used, resulting in rdd1 and rdd2 both hash-partitioned. These two reduceByKeys will result in two shuffles. If the RDDs have the same number of partitions, the join will require no additional shuffling. Because the RDDs are partitioned identically, the set of keys in any single partition of rdd1 can only show up in a single partition of rdd2. Therefore, the contents of any single output partition of rdd3 will depend only on the contents of a single partition in rdd1 and single partition in rdd2, and a third shuffle is not required.

For example, if someRdd has four partitions, someOtherRdd has two partitions, and both the reduceByKeys use three partitions, the set of tasks that execute would look like:

What if rdd1 and rdd2 use different partitioners or use the default (hash) partitioner with different numbers partitions?  In that case, only one of the rdds (the one with the fewer number of partitions) will need to be reshuffled for the join.

Same transformations, same inputs, different number of partitions:

One way to avoid shuffles when joining two datasets is to take advantage of broadcast variables. When one of the datasets is small enough to fit in memory in a single executor, it can be loaded into a hash table on the driver and then broadcast to every executor. A map transformation can then reference the hash table to do lookups.

When More Shuffles are Better

There is an occasional exception to the rule of minimizing the number of shuffles. An extra shuffle can be advantageous to performance when it increases parallelism. For example, if your data arrives in a few large unsplittable files, the partitioning dictated by the InputFormat might place large numbers of records in each partition, while not generating enough partitions to take advantage of all the available cores. In this case, invoking repartition with a high number of partitions (which will trigger a shuffle) after loading the data will allow the operations that come after it to leverage more of the cluster’s CPU.

Another instance of this exception can arise when using the reduce or aggregate action to aggregate data into the driver. When aggregating over a high number of partitions, the computation can quickly become bottlenecked on a single thread in the driver merging all the results together. To loosen the load on the driver, one can first usereduceByKey or aggregateByKey to carry out a round of distributed aggregation that divides the dataset into a smaller number of partitions. The values within each partition are merged with each other in parallel, before sending their results to the driver for a final round of aggregation. Take a look at treeReduce and treeAggregate for examples of how to do that. (Note that in 1.2, the most recent version at the time of this writing, these are marked as developer APIs, but SPARK-5430 seeks to add stable versions of them in core.)

This trick is especially useful when the aggregation is already grouped by a key. For example, consider an app that wants to count the occurrences of each word in a corpus and pull the results into the driver as a map.  One approach, which can be accomplished with the aggregate action, is to compute a local map at each partition and then merge the maps at the driver. The alternative approach, which can be accomplished with aggregateByKey, is to perform the count in a fully distributed way, and then simply collectAsMap the results to the driver.

Secondary Sort

Another important capability to be aware of is the repartitionAndSortWithinPartitions transformation. It’s a transformation that sounds arcane, but seems to come up in all sorts of strange situations. This transformation pushes sorting down into the shuffle machinery, where large amounts of data can be spilled efficiently and sorting can be combined with other operations.

For example, Apache Hive on Spark uses this transformation inside its join implementation. It also acts as a vital building block in the secondary sort pattern, in which you want to both group records by key and then, when iterating over the values that correspond to a key, have them show up in a particular order. This issue comes up in algorithms that need to group events by user and then analyze the events for each user based on the order they occurred in time. Taking advantage of repartitionAndSortWithinPartitions to do secondary sort currently requires a bit of legwork on the part of the user, but SPARK-3655 will simplify things vastly.

Conclusion

You should now have a good understanding of the basic factors in involved in creating a performance-efficient Spark program! In Part 2, we’ll cover tuning resource requests, parallelism, and data structures.

Sandy Ryza is a Data Scientist at Cloudera, an Apache Spark committer, and an Apache Hadoop PMC member. He is a co-author of the O’Reilly Media book, Advanced Analytics with Spark.

How-to: Tune Your Apache Spark Jobs (Part 2)

In the conclusion to this series, learn how resource tuning, parallelism, and data representation affect Spark job performance.

In this post, we’ll finish what we started in “How to Tune Your Apache Spark Jobs (Part 1)”. I’ll try to cover pretty much everything you could care to know about making a Spark program run fast. In particular, you’ll learn about resource tuning, or configuring Spark to take advantage of everything the cluster has to offer. Then we’ll move to tuning parallelism, the most difficult as well as most important parameter in job performance. Finally, you’ll learn about representing the data itself, in the on-disk form which Spark will read (spoiler alert: use Apache Avro or Apache Parquet) as well as the in-memory format it takes as it’s cached or moves through the system.

Tuning Resource Allocation

The Spark user list is a litany of questions to the effect of “I have a 500-node cluster, but when I run my application, I see only two tasks executing at a time. HALP.” Given the number of parameters that control Spark’s resource utilization, these questions aren’t unfair, but in this section you’ll learn how to squeeze every last bit of juice out of your cluster. The recommendations and configurations here differ a little bit between Spark’s cluster managers (YARN, Mesos, and Spark Standalone), but we’re going to focus only on YARN, which Cloudera recommends to all users.

For some background on what it looks like to run Spark on YARN, check out my post on this topic.

The two main resources that Spark (and YARN) think about are CPU and memory. Disk and network I/O, of course, play a part in Spark performance as well, but neither Spark nor YARN currently do anything to actively manage them.

Every Spark executor in an application has the same fixed number of cores and same fixed heap size. The number of cores can be specified with the --executor-cores flag when invoking spark-submit, spark-shell, and pyspark from the command line, or by setting the spark.executor.cores property in the spark-defaults.conf file or on aSparkConf object. Similarly, the heap size can be controlled with the --executor-cores flag or thespark.executor.memory property. The cores property controls the number of concurrent tasks an executor can run. --executor-cores 5 means that each executor can run a maximum of five tasks at the same time. The memory property impacts the amount of data Spark can cache, as well as the maximum sizes of the shuffle data structures used for grouping, aggregations, and joins.

The --num-executors command-line flag or spark.executor.instances configuration property control the number of executors requested. Starting in CDH 5.4/Spark 1.3, you will be able to avoid setting this property by turning on dynamic allocation with the spark.dynamicAllocation.enabled property. Dynamic allocation enables a Spark application to request executors when there is a backlog of pending tasks and free up executors when idle.

It’s also important to think about how the resources requested by Spark will fit into what YARN has available. The relevant YARN properties are:

  • yarn.nodemanager.resource.memory-mb controls the maximum sum of memory used by the containers on each node.
  • yarn.nodemanager.resource.cpu-vcores controls the maximum sum of cores used by the containers on each node.

Asking for five executor cores will result in a request to YARN for five virtual cores. The memory requested from YARN is a little more complex for a couple reasons:

  • --executor-memory/spark.executor.memory controls the executor heap size, but JVMs can also use some memory off heap, for example for interned Strings and direct byte buffers. The value of thespark.yarn.executor.memoryOverhead property is added to the executor memory to determine the full memory request to YARN for each executor. It defaults to max(384, .07 * spark.executor.memory).
  • YARN may round the requested memory up a little. YARN’s yarn.scheduler.minimum-allocation-mb andyarn.scheduler.increment-allocation-mb properties control the minimum and increment request values respectively.

The following (not to scale with defaults) shows the hierarchy of memory properties in Spark and YARN:

And if that weren’t enough to think about, a few final concerns when sizing Spark executors:

  • The application master, which is a non-executor container with the special capability of requesting containers from YARN, takes up resources of its own that must be budgeted in. In yarn-client mode, it defaults to a 1024MB and one vcore. In yarn-cluster mode, the application master runs the driver, so it’s often useful to bolster its resources with the --driver-memory and --driver-cores properties.
  • Running executors with too much memory often results in excessive garbage collection delays. 64GB is a rough guess at a good upper limit for a single executor.
  • I’ve noticed that the HDFS client has trouble with tons of concurrent threads. A rough guess is that at most five tasks per executor can achieve full write throughput, so it’s good to keep the number of cores per executor below that number.
  • Running tiny executors (with a single core and just enough memory needed to run a single task, for example) throws away the benefits that come from running multiple tasks in a single JVM. For example, broadcast variables need to be replicated once on each executor, so many small executors will result in many more copies of the data.

To hopefully make all of this a little more concrete, here’s a worked example of configuring a Spark app to use as much of the cluster as possible: Imagine a cluster with six nodes running NodeManagers, each equipped with 16 cores and 64GB of memory. The NodeManager capacities, yarn.nodemanager.resource.memory-mb andyarn.nodemanager.resource.cpu-vcores, should probably be set to 63 * 1024 = 64512 (megabytes) and 15 respectively. We avoid allocating 100% of the resources to YARN containers because the node needs some resources to run the OS and Hadoop daemons. In this case, we leave a gigabyte and a core for these system processes. Cloudera Manager helps by accounting for these and configuring these YARN properties automatically.

The likely first impulse would be to use --num-executors 6 --executor-cores 15 --executor-memory 63G. However, this is the wrong approach because:

  • 63GB + the executor memory overhead won’t fit within the 63GB capacity of the NodeManagers.
  • The application master will take up a core on one of the nodes, meaning that there won’t be room for a 15-core executor on that node.
  • 15 cores per executor can lead to bad HDFS I/O throughput.

A better option would be to use --num-executors 17 --executor-cores 5 --executor-memory 19G. Why?

  • This config results in three executors on all nodes except for the one with the AM, which will have two executors.
  • --executor-memory was derived as (63/3 executors per node) = 21.  21 * 0.07 = 1.47.  21 – 1.47 ~ 19.

Tuning Parallelism

Spark, as you have likely figured out by this point, is a parallel processing engine. What is maybe less obvious is that Spark is not a “magic” parallel processing engine, and is limited in its ability to figure out the optimal amount of parallelism. Every Spark stage has a number of tasks, each of which processes data sequentially. In tuning Spark jobs, this number is probably the single most important parameter in determining performance.

How is this number determined? The way Spark groups RDDs into stages is described in the previous post. (As a quick reminder, transformations like repartition and reduceByKey induce stage boundaries.) The number of tasks in a stage is the same as the number of partitions in the last RDD in the stage. The number of partitions in an RDD is the same as the number of partitions in the RDD on which it depends, with a couple exceptions: thecoalescetransformation allows creating an RDD with fewer partitions than its parent RDD, the union transformation creates an RDD with the sum of its parents’ number of partitions, and cartesian creates an RDD with their product.

What about RDDs with no parents? RDDs produced by textFile or hadoopFile have their partitions determined by the underlying MapReduce InputFormat that’s used. Typically there will be a partition for each HDFS block being read. Partitions for RDDs produced by parallelize come from the parameter given by the user, orspark.default.parallelism if none is given.

To determine the number of partitions in an RDD, you can always call rdd.partitions().size().

The primary concern is that the number of tasks will be too small. If there are fewer tasks than slots available to run them in, the stage won’t be taking advantage of all the CPU available.

A small number of tasks also mean that more memory pressure is placed on any aggregation operations that occur in each task. Any join, cogroup, or *ByKey operation involves holding objects in hashmaps or in-memory buffers to group or sort. join, cogroup, and groupByKey use these data structures in the tasks for the stages that are on the fetching side of the shuffles they trigger. reduceByKey and aggregateByKey use data structures in the tasks for the stages on both sides of the shuffles they trigger.

When the records destined for these aggregation operations do not easily fit in memory, some mayhem can ensue. First, holding many records in these data structures puts pressure on garbage collection, which can lead to pauses down the line. Second, when the records do not fit in memory, Spark will spill them to disk, which causes disk I/O and sorting. This overhead during large shuffles is probably the number one cause of job stalls I have seen at Cloudera customers.

So how do you increase the number of partitions? If the stage in question is reading from Hadoop, your options are:

  • Use the repartition transformation, which will trigger a shuffle.
  • Configure your InputFormat to create more splits.
  • Write the input data out to HDFS with a smaller block size.

If the stage is getting its input from another stage, the transformation that triggered the stage boundary will accept anumPartitions argument, such as

What should “X” be? The most straightforward way to tune the number of partitions is experimentation: Look at the number of partitions in the parent RDD and then keep multiplying that by 1.5 until performance stops improving.

There is also a more principled way of calculating X, but it’s difficult to apply a priori because some of the quantities are difficult to calculate. I’m including it here not because it’s recommended for daily use, but because it helps with understanding what’s going on. The main goal is to run enough tasks so that the data destined for each task fits in the memory available to that task.

The memory available to each task is (spark.executor.memory * spark.shuffle.memoryFraction *spark.shuffle.safetyFraction)/spark.executor.cores. Memory fraction and safety fraction default to 0.2 and 0.8 respectively.

The in-memory size of the total shuffle data is harder to determine. The closest heuristic is to find the ratio between Shuffle Spill (Memory) metric and the Shuffle Spill (Disk) for a stage that ran. Then multiply the total shuffle write by this number. However, this can be somewhat compounded if the stage is doing a reduction:

Then round up a bit because too many partitions is usually better than too few partitions.

In fact, when in doubt, it’s almost always better to err on the side of a larger number of tasks (and thus partitions). This advice is in contrast to recommendations for MapReduce, which requires you to be more conservative with the number of tasks. The difference stems from the fact that MapReduce has a high startup overhead for tasks, while Spark does not.

Slimming Down Your Data Structures

Data flows through Spark in the form of records. A record has two representations: a deserialized Java object representation and a serialized binary representation. In general, Spark uses the deserialized representation for records in memory and the serialized representation for records stored on disk or being transferred over the network. There is work planned to store some in-memory shuffle data in serialized form.

The spark.serializer property controls the serializer that’s used to convert between these two representations. The Kryo serializer, org.apache.spark.serializer.KryoSerializer, is the preferred option. It is unfortunately not the default, because of some instabilities in Kryo during earlier versions of Spark and a desire not to break compatibility, but the Kryo serializer should always be used

The footprint of your records in these two representations has a massive impact on Spark performance. It’s worthwhile to review the data types that get passed around and look for places to trim some fat.

Bloated deserialized objects will result in Spark spilling data to disk more often and reduce the number of deserialized records Spark can cache (e.g. at the MEMORY storage level). The Spark tuning guide has a great section on slimming these down.

Bloated serialized objects will result in greater disk and network I/O, as well as reduce the number of serialized records Spark can cache (e.g. at the MEMORY_SER storage level.)  The main action item here is to make sure to register any custom classes you define and pass around using the SparkConf#registerKryoClasses API.

Data Formats

Whenever you have the power to make the decision about how data is stored on disk, use an extensible binary format like Avro, Parquet, Thrift, or Protobuf. Pick one of these formats and stick to it. To be clear, when one talks about using Avro, Thrift, or Protobuf on Hadoop, they mean that each record is a Avro/Thrift/Protobuf struct stored in a sequence file. JSON is just not worth it.

Every time you consider storing lots of data in JSON, think about the conflicts that will be started in the Middle East, the beautiful rivers that will be dammed in Canada, or the radioactive fallout from the nuclear plants that will be built in the American heartland to power the CPU cycles spent parsing your files over and over and over again. Also, try to learn people skills so that you can convince your peers and superiors to do this, too.

Sandy Ryza is a Data Scientist at Cloudera, an Apache Spark committer, and an Apache Hadoop PMC member. He is a co-author of the O’Reilly Media book, Advanced Analytics with Spark.

 

zz from:

  • http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-1/
  • http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/

spark的调度分为资源的调度和任务的调度。前者目前支持standalone、yarn、mesos等,后者对标hadoop的MapReduce。本文介绍资源调度的流程,能力所限仅cover到最简单的standalone,无法对比各种调度框架的优劣。

Spark中与资源调度相关的角色包括driver、master、worker和executor,其中executor更多是与任务调度有关。其交互关系如下图所示,主要交互流如下:

  • driver->master:提交Application,并告知需要多少资源(cores)
  • master->driver:告知Application提交成功(我们只考虑最理想的情况)
  • driver->workers:发送appDescription,启动executor子进程
  • executor->driver:注册executor
  • driver->executor:提交Tasks,由后者执行任务

任务提交流程

上图的虚线代表Actor交互,spark基于Actor模式的Akka包进行进程、线程、网络间的交互,以避免同步、死锁等问题,也使代码相对简单。可以注意到,worker主进程是不跟driver交互的,只有executor子进程才跟driver交互。

Driver & Master

在SparkContext初始化的时候,在SparkContext class的一个大try catch中,就会完成Application注册,在standalone mode下,主要做了以下事情:

  • 启动心跳维持监听actor,因为executors需要同driver维持心跳
  • 启动Job进度监听actor,在UI和LOG里需要能够看到进度
  • 启动mapOutputTracker,因为作为reducer的executors会向driver询问map output的地址
  • 初始化task pool,确定FIFO or FAIR的调度方式
  • 根据masters uri的格式,确定schedulerBackend和taskScheduler的取值,其中schedulerBackend与executor端的executorBackend交互
  • 由AppClient/ClientActor代理,同masters交互,确定可用master,由后者代理获取并启动workers上的可用executors资源

SparkContext.createTaskScheduler方法会确定backend和scheduler的取值,并调用_taskScheduler.initialize(),初始化pool并确定schedulableBuilder:FIFO、FAIR。

masterschedulerBackendtaskScheduler
localLocalBackendTaskSchedulerImpl
local\[([0-9]+|\*)\]LocalBackendTaskSchedulerImpl
local\[([0-9]+|\*)\s*,\s*([0-9]+)\]LocalBackendTaskSchedulerImpl
spark://(.*)SparkDeploySchedulerBackendTaskSchedulerImpl
local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]SparkDeploySchedulerBackendTaskSchedulerImpl
"yarn-standalone" | "yarn-cluster"org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackendorg.apache.spark.scheduler.cluster.YarnClusterScheduler
"yarn-client"org.apache.spark.scheduler.cluster.YarnClientSchedulerBackendorg.apache.spark.scheduler.cluster.YarnScheduler
mesosUrl @ MESOS_REGEX(_)CoarseMesosSchedulerBackend or MesosSchedulerBackendTaskSchedulerImpl
simr://(.*)SimrSchedulerBackendTaskSchedulerImpl
确定两者取值之后,会立即触发与master的交互:

 XML |  copy code |? 
1
 // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
2
// constructor
3
_taskScheduler.start()   //  以standalone模式为例,间接调用 _schedulerBackend.start(),并阻塞等待初始化AppClient完成(即接收到master返回的RegisteredApplication消息)

AppClient就是一个wrapper,初始化了:

     

 Scala |  copy code |? 
1
actor = actorSystem.actorOf(Props(new ClientActor))

ClientActor代理与master的交互,在preStart()里向每一个master uri发送RegisterApplication消息,并注册了接收处理master发送消息的多个处理方法。

Worker & Driver

deploy/worker/Worker.scala 是worker主进程,以actor方式接收master发送的消息,例如LaunchExecutor。在该消息处理方法中,通过一个子线程启动真正的executor子进程。
deploy/worker/ExecutorRunner.scala -> fetchAndRunExecutor()是启动子进程的地方。
executor/CoarseGrainedExecutorBackend.scala -> onStart() 里会向driver发送RegisterExecutor消息,由driver的schedulerBackend接收并处理。在该消息里提交了executor的相关信息,driver会将executor信息存储在executorDataMap对象里,并触发makeOffers方法,分配pending的tasks。

Driver端的消息处理方法如下:

 Scala |  copy code |? 
01
    case RegisterExecutor(executorId, executorRef, hostPort, cores, logUrls) =>
02
        Utils.checkHostPort(hostPort, "Host port expected " + hostPort)
03
        if (executorDataMap.contains(executorId)) {
04
          context.reply(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
05
        } else {
06
          logInfo("Registered executor: " + executorRef + " with ID " + executorId)
07
          context.reply(RegisteredExecutor)
08
          addressToExecutorId(executorRef.address) = executorId
09
          totalCoreCount.addAndGet(cores)
10
          totalRegisteredExecutors.addAndGet(1)
11
          val (host, _) = Utils.parseHostPort(hostPort)
12
          val data = new ExecutorData(executorRef, executorRef.address, host, cores, cores, logUrls)
13
          // This must be synchronized because variables mutated
14
          // in this block are read when requesting executors
15
          CoarseGrainedSchedulerBackend.this.synchronized {
16
            executorDataMap.put(executorId, data)
17
            if (numPendingExecutors > 0) {
18
              numPendingExecutors −= 1
19
              logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
20
            }
21
          }
22
          listenerBus.post(  // 通过bus,向所有监听SparkListenerExecutorAdded的线程发送通知,这里好像没人关注。
23
            SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data)) 
24
          makeOffers()  // Make fake resource offers on all executors ,
25
        }

其中makerOffers方法会分配tasks,是把资源调度和Task调度融合的地方,调用该方法的地方还有:

  •      case StatusUpdate
  •      case ReviveOffers    submitTasks以及任务重试等时机时,会调用
  •      case RegisterExecutor   即一旦有executor注册,就看看有没有需要分配的任务
该方法调用scheduler.resourceOffers ,每次尽量调度更多的tasks,主要考虑的是:
  • 本地化 recomputeLocality,PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
  • 负载均衡 round robin
  • 任务的重跑
  • 任务 + executorId的黑名单
  • 各个状态的任务列表

其中调用的scheduler.resourceOffers关键代码如下,针对每个taskSet,尽量多的调度task,在分配时,尽量分配locality的task:

 Scala |  copy code |? 
01
    // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
02
    // of locality levels so that it gets a chance to launch local tasks on all of them.
03
    // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
04
    var launchedTask = false
05
    for (taskSet <− sortedTaskSets; maxLocality <− taskSet.myLocalityLevels) {
06
      do {
07
        launchedTask = resourceOfferSingleTaskSet(
08
            taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
09
      } while (launchedTask)
10
    }

在scheduler.resourceOffers子方法中,也会告知driver,有task状态的改变,可用通过listenerBus告知监听者:

 Scala |  copy code |? 
1
          sched.dagScheduler.taskStarted(task, info) 
2
               eventProcessLoop.post(BeginEvent(task, taskInfo))

在makeOffers的最后,调用launchTasks,这时才是真正发起任务。workder端的CoarseGrainedExecutorBackend.scala接收到消息后,会继续执行。

 Scala |  copy code |? 
1
          executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))

     总结:
  • Worker主进程启动executor子进程后(独立JVM),后者通过actor方式向driver注册自己
  • driver接收到RegisterExecutor消息,会检查有没有pending的tasks,并且task的localityLevel与executorId匹配
  • 如果有合适的tasks(可能多个),则会再通过actor方式发送LaunchTask消息给executor,由后者真正执行任务

问题:

  • driver、master、worker里有哪些相关的子线程、子进程?
    • driver里至少有工作主进程、通过ClientActor与master通信的子线程们、通过SchedulerBackend与executors通信的子线程们、关注各种进度的listenerBus相关子线程、通过MapOutputTrackerMasterEndpoint管理mapOutput位置的子线程们、心跳维持的子线程,并且应该还有block、http等
    • standalone的master采用actor子线程维护可用workers列表,包括资源使用情况;并且接收driver发起的app注册请求等。如果是使用zk进行master选主,还得维持与zk之间的连接。另外,还有UI界面。
    • worker主进程需要维持与master之间的心跳,汇报资源使用情况。executors子进程需要同driver间维持心跳。主进程通过子线程启动executor子进程。另外还有block、shuffle output(不确定是否直接复用了block线程or进程)等。
  • 模块、进程、线程间如何通信的?数据结构体如何?
    • scala程序,网络、进程、线程间主要通过Actor方式进行交互,资源调度主要走这条流
    • python通过py4j调用java代理的scala程序
    • shuffle output等通过netty交互数据
    • broadcast通过p2p交互数据
  • master怎么知道有哪些可用worker的?
    • worker启动时,向master发送RegisterWorker消息,由Master类通过Actor方式处理
  • 任务调度的算法是什么?有没有优缺点?trait SchedulerBuilder , FairSchedulingAlgorithm/FIFOSchedulingAlgorithm
    • FIFO:
    • Fair:
  • 资源调度的依据,对可用executor 随机化,locality的原则,每个worker有cores限制,当前每个task只能占用1个core
  • jar、files、library是这时发送给executor的吗?driver还是master发送的呢?
    • driver提交给master的是appDescription,其中包含的command只是指定了executorBackend类,未涉及app具体的代码
    • driver在executor注册后,向其发送的launchTask消息才包含真正的task信息(files、jars名称),由TaskRunner中反序列化并拉取文件
  • standalone、yarn、mesos切换对driver、master、worker都有什么影响呢?使用到哪些类?
    • 目前只能看出有不同的backend类,实现功能,没有比较优劣。
  • 同一worker上的资源隔离做到哪一步?JVM可以控制memory,CPU、IO、DISK呢?
    • 我们当前的配置,一台worker上只能有一个executor,后者是在driver->master->Worker时生成的
    • 每个executor在调度时,可能会被分配多个tasks,只要availableCpus(i) >= CPUS_PER_TASK
    • executorBackend在接收到launchTask请求时,会生成TaskRunner对象,并由threadpool调度执行,即如果有多个tasks到达,就会多线程并发执行了
    • 这些属于同一个executor的tasks共享一个JVM,所以共享executor.memory等限制
  • JVM、python进程复用 是如何做到的?
  • standalone cluster manager type时,如何与zk配合?
    • ZooKeeperPersistenceEngine和ZooKeeperLeaderElectionAgent 在Master类里被初始化。后者在new时直接start了,基于apache curator.apache.org实现,其isLeader、notLeader方法会被curator的方法调用。

最后,放一张简化后的时序图:

spark-resource-schedule

该项目基于Pyspark core,处理1.5TB input,期间涉及rightOuterJoin、reduceBy以及多媒体处理,执行耗时从最初的1hour+,到最终的10min+-。这里记录一些优化的方法。

调优一般3步走:

  • 找瓶颈,从各种log和统计数据,定位瓶颈在哪里,Job、stage、细节交互、function层层递推
  • 制定目标,通过对spark、业务、代码的理解,给出一个靠谱的目标
  • 优化
    • 针对stage,我们优化了Java GC、数据倾斜、并发度
    • 针对细节和function,我们调整shuffle等配置,合理设计代码逻辑,避免过多的网络IO和跨语言交互

数据本地化

Job的input来自HDFS,但由于公司的HDFS与Spark集群目前没有共享资源,所以无法做到local或rack local,只能尽量选择离Spark机房较近且专线的HDFS机房,避免网络延迟的过度干扰。

瓶颈定位方法

  1. driver端业务日志里,打印总耗时,所有的优化手段只有较大影响到该值,才认为是有效的
  2. 打开spark.python.profile,针对每个RDD分析python内部方法调用耗时
  3. 通过master UI界面(我们还是standalone mode),找到最耗时的stage、不均衡的task等
  4. executor stderr log,通过打开log4j的level = DEBUG,可以看到scala进程内部很多运行细节

提高并发度

一开始我们的输入是业务方提供的原始数据,HDFS block size=256MB,而整个文件才3.5GB,所以针对这个数据源只有14个并发任务,每个耗时数分钟。

通过将文件拆分为1000个小part,将并发提升至1000,每个耗时20s左右。(20s其实算短task,有可能稍微增大些性能反而更好)

同时,修改spark.cores.max=1000,使我们最多可以获得1000个可用cores,分布在64个executors上面(每个是16cores)。

由于中间有rightOuterJoin过程,我们也设置了numPartitions参数=1000(该值后续会优化)。

内存使用

pyspark场景,driver java进程、driver python进程、executor java进程、executor python进程都需要消耗内存。driver端还好,在我们当前的场景下,并没有使用太大内存。但executor端,python和scala/java都有shuffle需求,故需要分配足够内存。

我们的配置是:

 Python |  copy code |? 
1
spark.python.worker.memory 40g
2
spark.driver.memory 10G
3
spark.executor.memory 100G

其中,executor.memory代表java进程内存,fraction没有修改,使用默认的storage占60%、shuffle 20%、code 20%的比例。

但通过UI界面,发现某些tasks失败,通过speculation重试后又可以成功,出错日志里包含如下异常,重试几次失败后,task就退出了:

 Scala |  copy code |? 
1
OneForOneBlockFetcher: Failed while starting block fetches
2
     java.io.IOException: Connection from other−executor−ip closed

在other-executor-ip所在服务器看到如下异常,但稍后又可以正常提供服务:

 Scala |  copy code |? 
1
TransprotChannelHandler: Exception in connect from ip:port
2
    java.lang.OutOfMemory: requested array size exceeds VM limit

猜测other-executor-ip在提供shuffle output时内存不足,需GC释放足够后,才可以正常提供服务。故调整GC相关参数如下:

 Scala |  copy code |? 
1
          −−conf "spark.executor.extraJavaOptions=−XX:+UseParallelGC −XX:+UseParallelOldGC −XX:ParallelGCThreads=64 −XX:NewRatio=3 −XX:SurvivorRatio=3 −XX:+HeapDumpOnOutOfMemoryError −XX:HeapDumpPath=/home/spark/heapdump.bin −XX:+PrintHeapAtGC −XX:PermSize=256m −XX:MaxPermSize=256m  −XX:+PrintGCDetails −XX:+PrintGCTimeStamps −XX:+PrintGCDateStamps −XX:+PrintTenuringDistribution −XX:+PrintGCApplicationStoppedTime −XX:−OmitStackTraceInFastThrow  −verbose:gc −Xloggc:/tmp/spark.executor.gc.log −Dlog4j.configuration=ftp://.../tmp/chengyi02/log4j.properties"

调整后没有再看到相关错误。

数据倾斜

开始的时候,有一个数据输入源的文件分片较多(>1000),尝试过用默认的partition func对其repartition为1000个分片,结果导致了分片不均衡,有的分片256MB,有的只有几MB,这跟我们的数据key有关系。不repartition反而更好。

另外,在《spark 任务调优》也提到当出现少数慢tasks时的处理方法。

序列化方法与压缩

Kryo等优化方式,都是针对scala、java语言的。pyspark的数据本身就是在python进程里序列化后,才作为二进制流传递给scala进程的,所以开启kryo没有效果。

默认情况,spill、broadcast、shuffle等的compress都是开启的。另外,我们也将spark.rdd.compress设置为true了。

rightOuterJoin的numPartitions参数调整

rightjoin操作用来整合多个数据源的输入,产生最终被处理的数据(2TB+),并作为map-filter-reduceByKey stage的parent。将rightOuterJoin的numPartitions参数从1000下降为400,则reduceByKey stage的执行时间从7min下降到4min左右。

结论先放着:如果使用pyspark,产生shuffle操作时,可以尝试下numPartitions数量设置为cores.max的一半。(但由于我们的集群环境与其他任务混布,测试数据不稳定,无法给出确切结论)

查看executor的debug log发现,调整numPartitions前,单机并发16tasks;调整后,单机并发7tasks。每个scala task获取shuffle的耗时从2.3s左右,下降到500ms左右。但这个降幅,离分钟差太远,应该不是决定性的影响因素。

以优化后的index=0,TID=7802为例,分析其耗时:

 Javascript |  copy code |? 
1
15/06/01 13:52:50 INFO CoarseGrainedExecutorBackend: Got assigned task 7802
2
15/06/01 13:52:50 INFO Executor: Running task 0.0 in stage 6.0 (TID 7802)
3
15/06/01 13:52:54 DEBUG BlockStoreShuffleFetcher: Fetching outputs for shuffle 0, reduce 0
4
15/06/01 13:52:54 DEBUG BlockStoreShuffleFetcher: Fetching map output location for shuffle 0, reduce 0 took 514 ms
5
15/06/01 13:55:40 INFO PythonRDD: Times: total = 170763, boot = −43996, init = 48943, finish = 165816
6
15/06/01 13:55:40 INFO Executor: Finished task 0.0 in stage 6.0 (TID 7802)33362 bytes result sent to driver

可以看到scala的shuffle read耗时只有514ms,说明shuffle read本身在scala内部其实没有太大耗时。进一步分析python pstats输出,优化前,rdd_23.pstats耗时479039.008 seconds,平均tasks耗时=7.9min;优化后,66494.758 seconds,平均耗时=2.7min。

屏幕快照 2015-06-01 17.44.33

从上图可以看到,发生剧烈变化的就是{method ‘read’ of ‘file’ objects},相差约6倍!猜测的原因是,当并发度16时(实际有16个scala和16个python进程),由于单机cores=16,且物理内存也就100G+,故tasks间相互产生较大的竞争关系。而并发度7时,竞争降低,进程间通信的效率提高。

上图一个可能的疑问是,根据代码逻辑,每一条item都会调用feature、policy等方法,其调用次数是5kw,而cPickle.loads的调用次数只有1kw,不相符。如果调整numPartitions数目,会看到loads次数随之改变。这是由于pyspark.SparkContext初始化时,会生成一个批量序列化对象:

 Python |  copy code |? 
01
    def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer,
02
                 conf, jsc):
03
        self.environment = environment or {}
04
        self._conf = conf or SparkConf(_jvm=self._jvm)
05
        self._batchSize = batchSize  # −1 represents an unlimited batch size
06
        self._unbatched_serializer = serializer
07
        if batchSize == 0:
08
            self.serializer = AutoBatchedSerializer(self._unbatched_serializer)
09
        else:
10
            self.serializer = BatchedSerializer(self._unbatched_serializer,
11
                                                batchSize)

通过查看不同numPartitions cPickle.loads 在该stage的总耗时,波动不大,故没有调优余地,无需关注。(虽然percall变化大,但那是由于batch造成的)

以下列出executor log中一些与性能相关的关键词:

 Python |  copy code |? 
1
15/06/01 10:55:44 INFO MemoryStore: ensureFreeSpace(62854) called with curMem=9319, maxMem=55082955571
2
15/06/01 10:58:20 DEBUG ShuffleBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329
3
15/06/01 10:58:20 INFO ShuffleBlockFetcherIterator: Getting 7725 non−empty blocks out of 7747 blocks
4
15/06/01 11:04:59 INFO MemoryStore: Block rdd_27_89 stored as values in memory (estimated size 3.4 MB, free 51.3 GB)
5
15/06/01 10:56:00 INFO TorrentBroadcast: Reading broadcast variable 5 took 361 ms
6
15/06/01 10:57:56 INFO PythonRDD: Times: total = 9401, boot = −3692, init = 3791, finish = 9302
7
15/06/01 10:58:20 DEBUG BlockStoreShuffleFetcher: Fetching map output location for shuffle 0, reduce 627 took 2338 ms

在我们的一个项目中,使用pyspark,并通过swig调用了C++ so包,对一堆textFile和sequenceFile进行join、parser等处理。但在某些特定的输入下,会报Python worker exited unexpectedly的Exception,导致任务失败。虽然我们的python和C++代码都通过了unittest,并且本地模式运行正确,但线上cluster环境、输入数据源都与测试环境存在差异,以下记录线上环境定位与解决问题的方法。

问题描述

spark任务失败时,driver端会打印一些error log,但基本上没有太大作用。建议先去UI界面,找到失败的stage,大概心里明白问题可能出现在哪些阶段:

屏幕快照 2015-05-29 13.21.29

然后找到fail task对应executor的stderr,搜exception关键词:

 Python |  copy code |? 
01
15/05/29 11:35:03 ERROR Executor: Exception in task 237.3 in stage 5.0 (TID 2568)
02
org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
03
 at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:170)
04
 at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
05
 at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
06
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
07
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
08
 at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305)
09
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
10
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
11
 at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
12
 at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
13
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
14
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197)
15
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
16
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
17
 at java.lang.Thread.run(Thread.java:744)
18
Caused by: java.io.EOFException
19
 at java.io.DataInputStream.readInt(DataInputStream.java:392)
20
 at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:110)
21
 ... 14 more

问题定位

这个exception是什么意思呢?从spark代码可以看到:

 Scala |  copy code |? 
01
      private def read(): Array[Byte] = {
02
        if (writerThread.exception.isDefined) {
03
          throw writerThread.exception.get
04
        }
05
        try {
06
          stream.readInt() match {
07
            case length if length > 0 =>
08
              val obj = new Array[Byte](length)
09
              stream.readFully(obj)
10
              obj
11
            case 0 => Array.empty[Byte]
12
            case SpecialLengths.TIMING_DATA =>
13
              // Timing data from worker
14
              val bootTime = stream.readLong()
15
              ……
16
              read()
17
            case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
18
              // Signals that an exception has been thrown in python
19
              val exLength = stream.readInt()
20
              val obj = new Array[Byte](exLength)
21
              stream.readFully(obj)
22
              throw new PythonException(new String(obj, UTF_8),
23
                writerThread.exception.getOrElse(null))
24
            case SpecialLengths.END_OF_DATA_SECTION =>
25
              // We've finished the data section of the output, but we can still
26
              // read some accumulator updates:
27
              val numAccumulatorUpdates = stream.readInt()
28
             ……
29
              }
30
              null
31
          }
32
        } catch {
33
 
34
          case e: Exception if context.isInterrupted =>
35
            logDebug("Exception thrown after task interruption", e)
36
            throw new TaskKilledException
37
 
38
          case e: Exception if env.isStopped =>
39
            logDebug("Exception thrown after context is stopped", e)
40
            null  // exit silently
41
 
42
          case e: Exception if writerThread.exception.isDefined =>
43
            logError("Python worker exited unexpectedly (crashed)", e)
44
            logError("This may have been caused by a prior exception:", writerThread.exception.get)
45
            throw writerThread.exception.get
46
 
47
         <strong> case eof: EOFException =&gt;</strong>
48
<strong>            throw new SparkException("Python worker exited unexpectedly (crashed)", eof)</strong>
49
        }
50
      }

exception应该是由最后那个case抛出的。如果对pyspark模式下,executor工作原理不清楚的,可以参考pyspark与spark的集成方式。scala进程这时会等待python的len+data格式的输出,但在使用readInt等待len的时候,python关闭了管道,于是scala接收到EOF,抛出EOFException。

所以,问题出在python进程里是没有疑问了。那会不会是python内存爆了呢?我们通过调用python/pyspark/shuffle.py的get_used_memory()方法,在worker端定期打印内存使用,发现也就300M+,所以排除之。

这时,拜托spark集群的OP帮忙看了下问题executor所在服务器的dmesg,发现有:

 Python |  copy code |? 
1
python[39160]: segfault at 0 ip 00007ff99a9671d0 sp 00007fff9b806880 error 6 in _basic_image_lib.so[7ff99a8f1000+126000]

但线上服务器没有产生core文件,所以仍然无法具体定位。而cluster环境我们也没有权限登录、修改。

如果能用local模式跑,就很方便了。但由于数据量太大,所以我们采用了笨方法,由于通过对stage的分析,可以知道问题应该是由于一组input files里的某些异常输入格式导致的,所以我们用二分法逐步将bad case定位到一个input file上。

为了简化问题,我们把对_basic_image_lib.so的调用提取为一个test脚本,以bad case input file为输入,setMaster(‘local[*]’)在本地模式运行,成功的产生了core file。这时用gdb python core.xxxx可以将问题缩小到具体的C++函数:

屏幕快照 2015-05-29 13.39.12

这时问题就相对简单了,可以由该c++ so包的开发同学具体跟进了。

Pyspark实际是基于spark scala core的一层语言外壳,其执行代码经由scala/java发送到worker nodes,并由python进程执行。这里,就涉及如何将用户的python代码分发出去呢?而分发必然涉及序列化过程,又是如何实现的?

本文解释python代码的序列化过程。一句话而言,Pyspark是依赖pickle实现的,但又基于其做了不少定制化工作。

Callback TypeGlobal vars pickledInstance vars pickledClass vars pickledKey methods in cloudpickle.py
__main__.functionY--save_function -> save_function_tuple -> extract_func_data
other_module.functionN--save_function -> save_global and sendRef = True
__main__.instancemethodYYYsave_instancemethod
save_function for im_func, default __reduce_ex__ for im_self, save_global for im_class and sendRef = False
other_module.instancemethodYYNsame with __main__.instancemethod, but sendRef = True
__main__.classmethodY-Ysame with __main__.instancemethod, but im_self is class object, and im_class is TypeType
__main__.staticmethodY-Ysave_function,
class is one elem of f_globals
other_module.classmethodY-Nsave_instancemethod,
sendRef = True
other_module.staticmethodY-Nsave_function,
sendRef = True

上面这张表是基于spark 1.2, python 2.7,针对callback function的不同type,得出的结论。以下将分3部分进行介绍:

  1. spark的serialize过程
  2. python function/method type解析
  3. pickle过程剖析

为了描述方便,以sc.textFile(…).reduce(callback)为例。

spark的serialize过程

序列化和与scala的交互是以pyspark.SparkContext开始的,其__init__方法完成了两件事情:

  1. 调用_ensure_initialized方法,launch_gateway及JVM,通过py4j基于本地环回套接口,建立了与java/scala的通信通道
  2. 调用_do_init方法,默认将self.serializer = AutoBatchedSerializer(self._unbatched_serializer)

Pyspark中需要序列化的包括code和data,这儿的serializer不是用来pickle代码的,别被迷惑!Pyspark的serializer继承关系如下,与code serialize相关的是CloudPickleSerializer。

pyspark-serilizer

有了这些背景知识,我们来看示例代码的执行过程。

1. textFile()新建并返回RDD:

 

 Python |  copy code |? 
1
RDD(
2
    self._jsc.textFile(name, minPartitions),  // jrdd
3
    self,                                     // python sparkcontext 
4
    UTF8Deserializer(use_unicode))            // jrdd_deserializer

其中self._jsc是JavaSparkContext的实例。

2. reduce(callback)迫使DAG生成一个stage,并向cluster发送callback方法。

需要注意的是,reduce等利用nested function层层对我们的callback进行了封装,封装后function的type是“function”,而我们传入的callback则有可能是function、instancemethod、lambda、staticmethod、classmethod等。

reduce间接调用mapPartitionsWithIndex,生成PipelinedRDD:

 Python |  copy code |? 
1
PipelinedRDD(
2
    self,  // prev rdd, 即textFile生成的RDD
3
    f,       // reduce封装的function
4
    preservesPartitioning) 

示例代码比较简单,这儿的prev rdd只是简单的RDD,而非PipelinedRDD,故直接设置新PipelinedRDD主要属性如下:

  • self.func = func ,即reduce封装的function
  • self._prev_jrdd = prev._jrdd
  • self._prev_jrdd_deserializer = prev._jrdd_deserializer
  • self._jrdd_val = None, 第一次调用_jrdd时会触发生成
  • self._jrdd_deserializer = self.ctx.serializer ,默认是PickleSerializer

reduce里此时触发collect方法,通过self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())发起task。由于此时是第一次调用self._jrdd,故会计算生成_jrdd_val对象,该过程里也有一些关键代码:

 Python |  copy code |? 
1
        command = (self.func, profileStats, self._prev_jrdd_deserializer,
2
                   self._jrdd_deserializer)
3
        # the serialized command will be compressed by broadcast
4
        ser = CloudPickleSerializer()
5
        pickled_command = ser.dumps(command)
6
        if len(pickled_command) &gt; (1 &lt;&lt; 20):  # 1M
7
            self._broadcast = self.ctx.broadcast(pickled_command)
8
            pickled_command = ser.dumps(self._broadcast) 

这儿的command就是实际发送给worker的指令了,其中的self.func层层嵌套调用了我们的callback。序列化是通过CloudPickleSerializer完成的,如果序列化后的大于1M,则会走broadcast方式。pickled_command最终通过py4j,生成JavaRDD对象,走scala正常的DAG过程。

以上大体了解我们自己的callback怎样变成最终的command。

python function/method type解析

在看serialize过程前,我们还得解释下python function的type,因为pickle会根据不同的object type区别处理。虽然python内部所有都是object,但object也是有type的。

 Python |  copy code |? 
01
def my_function():
02
    pass
03
 
04
class AClass(object):
05
    def my_method(self):
06
        pass
07
 
08
    @staticmethod
09
    def my_static_method():
10
        pass
11
 
12
    @classmethod
13
    def my_class_method(cls):
14
        pass
15
 
16
print "my_function: %s" % type(my_function)  # function
17
print "my_method: %s" % type(AClass().my_method) # instancemethod
18
print "my_static_method: %s" % type(AClass.my_static_method) # function
19
print "my_class_method: %s" % type(AClass.my_class_method) # instancemethod

pickle过程剖析

上面可以看到serialize是调用CloudPickleSerializer().dumps(command)完成的,command这时是一个tuple。该方法是对cloudpickle.dumps(obj, 2)的封装,后者才是pyspark serialize的关键,它overwrite了python pickle类的一些方法,尤其是save_[function|instancemethod|…]之类的序列化callbacks。

pickle.Pickler.dump的调用,会进入Python/Lib/pickle.py或Modules/_pickle.c 的self.save(obj)方法。该方法有几个重要分支来决定对obj序列化的callback方法:

  1. f = self.dispatch.get(t),其中t = type(obj),而dispatch table中存储的就是save_*方法。如果type在cloudpickle里有对应方法,那就调用它处理
  2. 否则,issc = issubclass(t, TypeType),那就调用self.save_global(obj) 处理,注意这个方法也被cloudpickle overwrite了
  3. 否则,reduce = dispatch_table.get(t),如果有就调用处理,copy_reg.dispatch_table
  4. 否则,reduce = getattr(obj, “__reduce_ex__”, None) ,object有默认的__reduce_ex__
  5. 否则,reduce = getattr(obj, “__reduce__”, None),object有默认的__reduce__

对于想细致了解pickle过程的同学,建议在Lib/pickle.py中加入下面的print语句,能够帮助我们了解进入了哪个分支:

 Python |  copy code |? 
1
        f = self.dispatch.get(t)                                                                                         
2
        print "###### issc: %s, obj: %s, type: %s #######" % (issubclass(t, TypeType), obj, t)    

那么对于function,我们都希望传递什么呢?功能考虑,可调用的代码是第一步,更重要的是数据,例如使用到的global module变量、传入参数、所在object的vars、所在class的vars等。性能考虑,当然希望尽量不要重复传递,也不要传递不需要的数据。把握这个原则之后,就可以参考下面的关键流程图,在脑子里跑pickle过程了:(补充说明下command拆解出callback的过程:save_tuple – save(func wrapper) – save_function – extract_func_data – save( …, closure, …), callback是func wrapper的closure方法。)

main_function main.classmethod main.instancemethod main.staticmethodother_module.classmethod other_module.function other_module.instancemethod other_module.staticmethod

最后,再啰嗦下,extract_func_data的f_globals里会包含所有用到的、module层级可见的“objects”(python 一切皆对象),例如global vars、import的modules、class等。

其实通过分析pickle过程,也可以看出python内部的instancemethod,实际是由类型信息,以及 一个翻译后的function、包裹该方法的object对象、以及object所属的class信息构成的。而翻译后的function与普通的function没有本质区别。

可能的改进措施

在多个stages间,wrapper、serializer等信息,也会被pickle、传递,从而消耗driver、workers的cpu、内存和IO资源,当然,在节点和代码量小时都可以忽略,但随着节点数的增长,也是一种浪费。

是否可以考虑降低重复呢?毕竟我们都已经把整个python app打包为egg分发到worker上了啊。

Spark的很多transformation方法是基于combineBy的,会导致shuffle过程,所以一般认为成本较大。那么具体过程如何?涉及到哪些spark配置,又该如何调整呢?数据倾斜问题又会对此造成什么影响呢?

本文暂不涉及调优分析,仅从代码层面基于pyspark分析combineBy的实现原理和其中的shuffle过程,期望抛砖引玉。

接口定义如下:

 Python |  copy code |? 
1
    def combineByKey(self, createCombiner, mergeValue, mergeCombiners,
2
                     numPartitions=None)

约定Combiner和value含义为:

  • Value 是PariRDD中的value值
  • Combiner是combineByKey完成后PairRDD的value值,可以与Value类型不一样

前3个参数都是callback方法:

  • Combiner createCombiner(Value),通过一个Value元素生成Combiner元素,会被多次调用
  • Combiner mergeValue(Combiner, Value),将一个Value元素合并到Combiner里,会被多次调用
  • Combiner mergeCombiners(Combiner, Combiner),将两个Combiners合并,也会被多次调用

从python/pyspark/rdd.py的combineBy代码看到,其处理过程分为3步:

  1. locally_combined = self.mapPartitions(combineLocally),在python进程进行combine
  2. shuffled = locally_combined.partitionBy(numPartitions) ,进行python进程内部的shuffle和基于scala的worker nodes间shuffle
  3. return shuffled.mapPartitions(_mergeCombiners, True) ,类似MR的reducer,聚合shuffle的结果

下面来详细看每一个steps。

Step1,locally_combined = self.mapPartitions(combineLocally)。

 Python |  copy code |? 
1
        def combineLocally(iterator):
2
            merger = ExternalMerger(agg, memory * 0.9, serializer) \
3
                if spill else InMemoryMerger(agg)
4
            merger.mergeValues(iterator)
5
            return merger.iteritems()

该方法根据spark.shuffle.spill方法决定是使用ExternalMerger还是InMemoryMerger,其中ExternalMerger的内存是由spark.python.worker.memory限定的。以下主要关注ExternalMerger,磁盘+内存的外部排序。另外,merger.mergeValues也是有可能调用全部3个callbacks方法的,不仅仅是mergeValue callback,不要被它的名称迷惑了。

ExternalMerger的关键成员变量:

  • data: dict of {K: V},unpartitioned merged data,还没有spill前的数据都是放在该dict里的
  • pdata:list of dicts,list长度由self.partitions数目决定,该值与numPartitions不一样。partitioned merged data,如果已经发生过spill,则后续的数据都读入pdata并hash到不同的dict槽位里。

ExternalMerger的外存文件路径是/path/to/localdir// 。其中localdir路径可以有多个,当位于不同的磁盘时,可以提高并发写入速度。spill_num是spill的轮次,partition_num是self.partitions对应的分片数。文件中的每一行是序列化后的Key-Combiner list。

再来看mergeValues的方法主体:

 Python |  copy code |? 
01
def mergeValues(self, iterator):
02
 
03
    """ Combine the items by creator and combiner """
04
 
05
    iterator = iter(iterator)
06
 
07
    # speedup attribute lookup
08
 
09
    creator, comb = self.agg.createCombiner, self.agg.mergeValue
10
 
11
    d, c, batch = self.data, 0, self.batch
12
 
13
 
14
    for k, v in iterator:
15
 
16
         d[k] = comb(d[k], v) if k in d else creator(v)
17
 
18
    c += 1
19
 
20
 
21
    if c % batch == 0 and get_used_memory() > self.memory_limit:
22
 
23
        self._spill() // 按实时计算的内部partition idself.data中的数据 flush到disk,为了兼容pdata的flush,这里每行一个单元素的dict
24
 
25
        self._partitioned_mergeValues(iterator, self._next_limit())
26
 
27
        break // iterator指针已经交给_partitioned_mergeValues了,这里就直接跳出了

针对当前mapPartition里的每一个元素,调用createCombiner或mergeValue callback生成新的Combiner对象,并存入self.data[k]里。定期(根据c%batch,注意batch的值会动态调整)检查内存,如果不超限就继续读入。直到内存不足时,首先调用_spill方法把self.data flush,然后重新计算memory_limit并把处理权交给_partitioned_mergeValues。

 Python |  copy code |? 
01
    def _partitioned_mergeValues(self, iterator, limit=0):
02
        """ Partition the items by key, then combine them """
03
        # speedup attribute lookup
04
        creator, comb = self.agg.createCombiner, self.agg.mergeValue
05
        c, pdata, hfun, batch = 0, self.pdata, self._partition, self.batch
06
 
07
        for k, v in iterator:
08
            d = pdata[hfun(k)]
09
            d[k] = comb(d[k], v) if k in d else creator(v)
10
            if not limit:
11
                continue
12
 
13
            c += 1                                                                                                       
14
            if c % batch == 0 and get_used_memory() > limit:                                                             
15
                self._spill()                                                                                            
16
                limit = self._next_limit() 

该方法也是继续读入元素,但不同的是,将新生成的Combiner存入self.pdata[hfun(k)][k]里,即根据内部partitions方法进行分片。如果内存不足,则调用_spill写入磁盘,并调整limit值。

上面两次调用了_spill方法,由于只有第一次是针对self.data的,故应只有spill_num=1时的partition file里,才有多行;后续spill_num的partition file都是一个大行。

mergeValues把当前partition里具有相同key的values合并到一个Combiner里了,但这些数据可能在内存和磁盘上,这就需要iteritems()来把它们归并起来返回给上层调用者。如果没用spill过,证明数据都在self.data里,就直接返回其迭代器即可。否则就要调用_external_items()来归并了。

_external_items()按partition处理文件,即每次处理多轮spill生成的一个partition id对应的文件。为了最大程度利用内存并降低复杂度,首先会把pdata里的数据也spill掉。

 Python |  copy code |? 
01
        try:
02
            for i in range(self.partitions):
03
                self.data = {}
04
                for j in range(self.spills):
05
                    path = self._get_spill_dir(j)
06
                    p = os.path.join(path, str(i))
07
                    # do not check memory during merging
08
                    self.mergeCombiners(self.serializer.load_stream(open(p)),
09
                                        False)                                                                           
10
 
11
                    # limit the total partitions                                                                         
12
                    if (self.scale * self.partitions < self.MAX_TOTAL_PARTITIONS                                         
13
                            and j < self.spills - 1                                                                      
14
                            and get_used_memory() > hard_limit):                                                         
15
                        self.data.clear()  # will read from disk again                                                   
16
                        gc.collect()  # release the memory as much as possible                                           
17
                        """ chengyi02: Because the yield values before are also using memory(in the caller),             
18
                        so if now more than limit, the following partitions will almostly exceed too.                    
19
                        So recursive merged all the remaining partitions.                                                
20
                         """                                                                                             
21
                        for v in self._recursive_merged_items(i):                                                        
22
                            yield v                                                                                      
23
                        return                                                                                           
24
 
25
                for v in self.data.iteritems():                                                                          
26
                    yield v                                                                                              
27
                self.data.clear()                                                                                        
28
 
29
                # remove the merged partition                                                                            
30
                for j in range(self.spills):                                                                             
31
                    path = self._get_spill_dir(j)                                                                        
32
                    os.remove(os.path.join(path, str(i))) 

在调用self.mergeCombiner处理一个/ 文件时,不检查内存是否超限,否则可能会继续向当前/path/to/localdir/下flush数据,就破坏文件数据结构了。而是在一个文件处理完成之后,检查内存若超限且分片数可控,则调用_recursive_merged_items进行递归的合并。

这里需要注意的是,_recursive_merged_items会所有>=i(当前partition id)的数据,而非仅处理当前partition。这是因为,当前内存消耗主要包含两块:rdd.py里调用者保存的yield返回内存块,以及当前partition已经读入的数据。假设partition比较平均,则后者数据量相对稳定;而前者不断增长。所以后续内存超限的几率会更大。

_recursive_merged_items会new 一个ExternalMerger对象m,将文件里的数据依次读入、merge,并按需spill(这时会spill到其他localdir目录,spill_num也重新开始计数,可以视为在不同的数据空间),最后通过m._external_items返回合并后的数据。在m._external_items里还会不会再次发生递归调用呢?几率很小,因为这个方法里的内存消耗基本等同于m的一个partition分片数据大小。而m的所有数据 == self的一个partition,故m的一个partition数据量非常小。即使再次发生递归调用,m的子m 分片数据量会依次递减,故会再次降低spill几率。

上面的描述基本上符合外部排序算法,但工程的世界里还需要考虑GC问题。在_spill的最后和_extern_items里都调用了python的gc.collect()方法,同步释放引用计数为0的内存块。但由于collect不一定能那么完美的释放,一些reachabled还是无法释放的,如果这部分存量较大(例如很逼近本次limit),那极端情况下一个元素就又会触发spill了,这显然逼近耗时也没用意义。所以ExternalMerger会动态调整limit值,max(self.memory_limit, get_used_memory() * 1.05)。所以,这能看出来了,python进程实际消耗内存可能会大于python.worker.memory值。

以上step1结束,返回的locally_combined_rdd的结构为:[(k, Combiner), …]。k还是原来的k,但当前worker task下所有相同k的values都聚合到了Combiner中。

step2,shuffled = locally_combined.partitionBy(numPartitions)

这里有一个问题,为什么不直接把rdd给scala的partitionBy,而在python代码里实现了一些内部shuffle的逻辑呢?首先,partitionBy的回调方法是可定制的,python里默认是portable_hash,如果用scala实现,如何回调python的分片函数呢?其次,python和scala间的通信是需要序列化的,一条一条的成本有点大,所以python shuffle后也做了batch。具体如下。

(注:partitionBy完之后,数据量不会有变化,以上kv变化仅为了代表shuffle后一个task里只会有属于自己分片的key了。)

从代码可以看到python里计算了partition_id,所以scala仅根据确定的分片情况,进行shuffle。在深入之前,先看下python partitionBy里涉及到的几个rdds:

  • 自身,即self,PairRDD(k,v),通过mapPartitions向下转化
  • keyed,PairRDD(paritition_id, [(k,v),(k,v),…]),通过copy constructor向下转化
  • pairRDD,JavaPairRDD,java会调用适配到scala,并通过scala的partitionBy向下转化
  • jrdd,java代理的scala的ShuffledRDD,通过copy constructor向下转化
  • rdd,PairRDD(k,v),分片完成

keyed的生成过程比较简单,由内部方法add_shuffle_key实现,完成k到partition_id的计算,并将一批kv打包、序列化作为keyed rdd的v。Every 1k items, will check used_memroy once, and if equal to batch+1, will also flush。并且batch的值会动态调整,尽量使生成的批量包大小在1M和10M之间。

在jrdd的构造过程中,会生成shuffleDependency,当stage提交时,DAG调度发现该dependency,就会发起ShuffleMapTask,生成shuffle数据(相当于MR里map端的shuffle工作):

ExternalSorter是shuffle的关键,在该分支中生成ExternalSorter的参数:

 Scala |  copy code |? 
1
sorter = new ExternalSorter[K, V, V](None, Some(dep.partitioner), None, dep.serializer)

即将aggregate=None;dep.partitioner此处是PythonPartitioner,直接使用rdd的key作为paritition_id,在该场景就是python里计算好的partition_id;ordering=None。其中aggregate和ordering为None即代表无需再combine和sort,只单纯shuffle就好,以下仅关注这个分支,由insertAll和writePartitionedFile合作完成。

insertAll的一个可能分支是bypassMergeSort == True,即如果partitions数目较少,且无需aggregate和ordering,则直接写numPartitions个文件,随后再在writePartitionedFile里简单concate。缺点是每个worker node上都同时打开numPartitions个文件,有额外内存消耗,且可能too many open files。

另一个可能分支是大数据时的常见场景(无需combine且大量写),针对每条数据:

 Scala |  copy code |? 
1
buffer.insert((getPartition(kv._1), kv._1), kv._2.asInstanceOf[C])
2
maybeSpillCollection(usingMap = false)

由于python里做了批量打包,故这儿的一条数据的v对应python的一批数据的kv了。每插入一条数据,检查是否需要spill。这儿的实现思路与python的externalMerger也类似。简单介绍下,insertAll的maybeSpillCollection最终调用spillToMergeableFiles,根据partition_id和key对内容排序(这里partition_id==key),每次spill写入一个临时文件,并且把file信息记录在spills数组里。

随后writePartitionedFile得把之前分布在多个临时文件里的数据归并为最终输出的partition文件。与spilled bypassMergeSort对应的分支,因为一个partition的数据都在一个partition文件里,所以简单的concate即可。否则,数据分散在内存和多个文件里,整合的过程由PartitionedIterator迭代器触发完成,返回的iter依次写文件。

以上过程,将shuffled数据写入disk了,但怎样被下一个计算单元使用呢?

step3,return shuffled.mapPartitions(_mergeCombiners, True) 

虽然spark1.2之后已默认使用sort based shuffle,但sort shuffle还是使用HashShuffleReader读取数据:SortShuffleManager -> HashShuffleReader -> BlockStoreShuffleFetcher.fetch() 。其目的是收集上游多个worker node产生的shuffled数据,所以必然有network I/O。

BlockStoreShuffleFetcher.fetch里,先获取shuffleId和reduceId对应的blocks上游节点信息,信息包括((address, size), index),其中address是BlockManageId类型。之后调用ShuffleBlockFetcherIterator从local block manager和remote BlockTransferService处获取blocks数据。

ShuffleBlockFetcherIterator的initialize方法调用splitLocalRemoteBlocks方法根据address.executeId生成出remoteRequests 对象,针对同一address的多个block,若size < maxBytesInFlight/5 则合并为一个request。从而通过这种方式,确保最大并发度为5。也可以看出,如果有大的block,则request size可能大于maxBytesInFlight。随后会对remoteRequests列表随机化,以保证请求尽量均衡。

initialize方法随后立即调用sendRequest发送多个请求,并打印日志:logInfo(“Started ” + numFetches + ” remote fetches in” + Utils.getUsedTimeMs(startTime))。sendRequest调用shuffleClient.fetchBlocks()读取远端数据,并注册BlockFetchingListener,后者的onBlockFetchSuccess方法会接收buf数据并添加到results队列里。

ShuffleBlockFetcherIterator的next()方法先“释放”内存,并在内存充足的情况下再发出sendRequest请求。随后再读取results队列里的数据,并解压、反序列化,返回迭代器iterator。这里有两个小的优化点,“释放”内存并不是真的触发gc,而是从bytesInFlight里减去已接收到results队列的数据长度,因为后者随后将被读取,而bytesInFlight限制的是网络缓存。另一个是先sendRequest再处理results中的数据,原因是前者是异步调用,且耗时可能较长。

还需要注意的是,sendRequest调用的shuffleClient实际是BlockTransferService的一个实例,有netty和nio两种实现方式。

以上完成了shuffle数据的接收,next返回的iterator最终回到python代码进入step3的剩余阶段,这就很简单了,实际上此时才真正执行shuffled.mapPartitions(_mergeCombiners, True)。

通过以上3个步骤,python、java和scala多语言的交互,最终完成了pyspark的combineByKey。个人理解,由于scala的partitionBy和shuffle过程早已实现,所以pyspark主要解决的是 如何更优的进行多语言交互,以达到较优的性能、扩展性、复用度。

虽然spark的资料不少,但其内核与原理介绍不多,而pyspark原理说明的就更少了。所以项目里遇到的问题,只能自己分析解决了。最近比较关注的一个问题是在broadcast大词典时,内存消耗非常大,而driver端java进程的内存用量更是达到python的2倍+。为什么呢?为了回答这个问题,还是得先搞清楚pyspark与spark的集成方式,因为一个看起来简单的spark map、reduce、broadcast,其中包含了非常多的进程、网络、线程交互,以及序列化、反序列化、压缩等等。

首先,看一下整合了pyspark后的spark runtime architecure,但在此之前,得先回顾一下没有python时简单的情况:

一个stage对应的tasks,从driver端传送至master,尤其选择合适的worker node,将tasks下发给Executor执行,结果直接返回给Driver Program。(根据不同的master方式,实现细节可能有区别)

那么当添加了python以后呢?可能是为了保持spark核心代码的精简以及用统一的模式未来适配到更多语言,pyspark的实现者选择了尽量不改变交互协议下的外围封装,这也造成非常多的python与java进程间的交互。看一下修改后的架构图:

driver与worker端都有所修改,其中白色是新增的python进程,其通过py4j开源包、socket、local file system等多种方式,与java进程交互。之所以有多种交互方式,分别是为了应对不同的使用场景。例如py4j是为了python调用scala方法;直接的socket用于java主动发起与python的交互;file system用于交互大量数据,例如broadcast的值。

有了以上的基本知识后,再来看交互序列图:

 

  • spark-submit your-app.py 启动spark job,由于是python脚本,故由PythonRunner处理(注意,它是scala代码),它首先启动py4j的socket server,然后fork 子进程执行python your-app.py
  • python解析your-app.py,计算执行的rdd族谱关系等,并通过py4j socket调用scala,生成PythonRDD
  • driver上的scala进程,处理PythonRDD,并将任务提交给master
  • master根据不同的策略,选择worker node,发起launchExecutor命令给后者
  • worker node上的java slave程序,处理launchExecutor命令,最主要的是复用或fork出新的python进程(先忽略其中与daemon.py的交互,下面再说),并与python进程间建立socket连接
  • worker node上的python worker进程,从socket接收job信息,开始执行,并将结果通过socket返回给java worker进程
  • java worker进程通过网络将结果返回给driver上的java进程
  • driver的java 进程,再返回给driver的python进程

以上忽略了很多容错、监控、数据统计的细节,但可以看到已经比较复杂了。

再来看一下daemon.py的作用,由于父子进程会COW复制内存,如果在一个已经分配了大内存的java进程上fork子python进程的话,会很容易造成OOM。所以spark的做法是尽量在java进程还没有消耗很多内存的开始阶段,就fork出一个pyspark/daemon.py子进程,后续由它再按需fork python worker进程。当然这是对unix-like(包括linux)系统有效,windows无法享受该红利。

以上涉及的细节很多,得看代码才可以真正了解,列举关键性代码路径如下,由于master没有看,所以欠奉:

  • Driver端
    • core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
    • python/pyspark/rdd.py
    • core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
    • core/src/main/scala/org/apache/spark/rdd/RDD.scala
    • core/src/main/scala/org/apache/spark/SparkContext.scala
    • core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
  • Worker端
    • core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
    • core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala->compute()
    • core/src/main/scala/org/apache/spark/SparkEnv.scala->createPythonWorker()
    • python/pyspark/daemon.py
    • python/pyspark/worker.py

关于broadcast时为什么消耗内存过多的问题,您有答案了嘛?后续将单独成文予以介绍 haha!