spark的调度分为资源的调度和任务的调度。前者目前支持standalone、yarn、mesos等,后者对标hadoop的MapReduce。本文介绍资源调度的流程,能力所限仅cover到最简单的standalone,无法对比各种调度框架的优劣。

Spark中与资源调度相关的角色包括driver、master、worker和executor,其中executor更多是与任务调度有关。其交互关系如下图所示,主要交互流如下:

  • driver->master:提交Application,并告知需要多少资源(cores)
  • master->driver:告知Application提交成功(我们只考虑最理想的情况)
  • driver->workers:发送appDescription,启动executor子进程
  • executor->driver:注册executor
  • driver->executor:提交Tasks,由后者执行任务

任务提交流程

上图的虚线代表Actor交互,spark基于Actor模式的Akka包进行进程、线程、网络间的交互,以避免同步、死锁等问题,也使代码相对简单。可以注意到,worker主进程是不跟driver交互的,只有executor子进程才跟driver交互。

Driver & Master

在SparkContext初始化的时候,在SparkContext class的一个大try catch中,就会完成Application注册,在standalone mode下,主要做了以下事情:

  • 启动心跳维持监听actor,因为executors需要同driver维持心跳
  • 启动Job进度监听actor,在UI和LOG里需要能够看到进度
  • 启动mapOutputTracker,因为作为reducer的executors会向driver询问map output的地址
  • 初始化task pool,确定FIFO or FAIR的调度方式
  • 根据masters uri的格式,确定schedulerBackend和taskScheduler的取值,其中schedulerBackend与executor端的executorBackend交互
  • 由AppClient/ClientActor代理,同masters交互,确定可用master,由后者代理获取并启动workers上的可用executors资源

SparkContext.createTaskScheduler方法会确定backend和scheduler的取值,并调用_taskScheduler.initialize(),初始化pool并确定schedulableBuilder:FIFO、FAIR。

masterschedulerBackendtaskScheduler
localLocalBackendTaskSchedulerImpl
local\[([0-9]+|\*)\]LocalBackendTaskSchedulerImpl
local\[([0-9]+|\*)\s*,\s*([0-9]+)\]LocalBackendTaskSchedulerImpl
spark://(.*)SparkDeploySchedulerBackendTaskSchedulerImpl
local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]SparkDeploySchedulerBackendTaskSchedulerImpl
"yarn-standalone" | "yarn-cluster"org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackendorg.apache.spark.scheduler.cluster.YarnClusterScheduler
"yarn-client"org.apache.spark.scheduler.cluster.YarnClientSchedulerBackendorg.apache.spark.scheduler.cluster.YarnScheduler
mesosUrl @ MESOS_REGEX(_)CoarseMesosSchedulerBackend or MesosSchedulerBackendTaskSchedulerImpl
simr://(.*)SimrSchedulerBackendTaskSchedulerImpl
确定两者取值之后,会立即触发与master的交互:

 XML |  copy code |? 
1
 // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
2
// constructor
3
_taskScheduler.start()   //  以standalone模式为例,间接调用 _schedulerBackend.start(),并阻塞等待初始化AppClient完成(即接收到master返回的RegisteredApplication消息)

AppClient就是一个wrapper,初始化了:

     

 Scala |  copy code |? 
1
actor = actorSystem.actorOf(Props(new ClientActor))

ClientActor代理与master的交互,在preStart()里向每一个master uri发送RegisterApplication消息,并注册了接收处理master发送消息的多个处理方法。

Worker & Driver

deploy/worker/Worker.scala 是worker主进程,以actor方式接收master发送的消息,例如LaunchExecutor。在该消息处理方法中,通过一个子线程启动真正的executor子进程。
deploy/worker/ExecutorRunner.scala -> fetchAndRunExecutor()是启动子进程的地方。
executor/CoarseGrainedExecutorBackend.scala -> onStart() 里会向driver发送RegisterExecutor消息,由driver的schedulerBackend接收并处理。在该消息里提交了executor的相关信息,driver会将executor信息存储在executorDataMap对象里,并触发makeOffers方法,分配pending的tasks。

Driver端的消息处理方法如下:

 Scala |  copy code |? 
01
    case RegisterExecutor(executorId, executorRef, hostPort, cores, logUrls) =>
02
        Utils.checkHostPort(hostPort, "Host port expected " + hostPort)
03
        if (executorDataMap.contains(executorId)) {
04
          context.reply(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
05
        } else {
06
          logInfo("Registered executor: " + executorRef + " with ID " + executorId)
07
          context.reply(RegisteredExecutor)
08
          addressToExecutorId(executorRef.address) = executorId
09
          totalCoreCount.addAndGet(cores)
10
          totalRegisteredExecutors.addAndGet(1)
11
          val (host, _) = Utils.parseHostPort(hostPort)
12
          val data = new ExecutorData(executorRef, executorRef.address, host, cores, cores, logUrls)
13
          // This must be synchronized because variables mutated
14
          // in this block are read when requesting executors
15
          CoarseGrainedSchedulerBackend.this.synchronized {
16
            executorDataMap.put(executorId, data)
17
            if (numPendingExecutors > 0) {
18
              numPendingExecutors −= 1
19
              logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
20
            }
21
          }
22
          listenerBus.post(  // 通过bus,向所有监听SparkListenerExecutorAdded的线程发送通知,这里好像没人关注。
23
            SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data)) 
24
          makeOffers()  // Make fake resource offers on all executors ,
25
        }

其中makerOffers方法会分配tasks,是把资源调度和Task调度融合的地方,调用该方法的地方还有:

  •      case StatusUpdate
  •      case ReviveOffers    submitTasks以及任务重试等时机时,会调用
  •      case RegisterExecutor   即一旦有executor注册,就看看有没有需要分配的任务
该方法调用scheduler.resourceOffers ,每次尽量调度更多的tasks,主要考虑的是:
  • 本地化 recomputeLocality,PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
  • 负载均衡 round robin
  • 任务的重跑
  • 任务 + executorId的黑名单
  • 各个状态的任务列表

其中调用的scheduler.resourceOffers关键代码如下,针对每个taskSet,尽量多的调度task,在分配时,尽量分配locality的task:

 Scala |  copy code |? 
01
    // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
02
    // of locality levels so that it gets a chance to launch local tasks on all of them.
03
    // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
04
    var launchedTask = false
05
    for (taskSet <− sortedTaskSets; maxLocality <− taskSet.myLocalityLevels) {
06
      do {
07
        launchedTask = resourceOfferSingleTaskSet(
08
            taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
09
      } while (launchedTask)
10
    }

在scheduler.resourceOffers子方法中,也会告知driver,有task状态的改变,可用通过listenerBus告知监听者:

 Scala |  copy code |? 
1
          sched.dagScheduler.taskStarted(task, info) 
2
               eventProcessLoop.post(BeginEvent(task, taskInfo))

在makeOffers的最后,调用launchTasks,这时才是真正发起任务。workder端的CoarseGrainedExecutorBackend.scala接收到消息后,会继续执行。

 Scala |  copy code |? 
1
          executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))

     总结:
  • Worker主进程启动executor子进程后(独立JVM),后者通过actor方式向driver注册自己
  • driver接收到RegisterExecutor消息,会检查有没有pending的tasks,并且task的localityLevel与executorId匹配
  • 如果有合适的tasks(可能多个),则会再通过actor方式发送LaunchTask消息给executor,由后者真正执行任务

问题:

  • driver、master、worker里有哪些相关的子线程、子进程?
    • driver里至少有工作主进程、通过ClientActor与master通信的子线程们、通过SchedulerBackend与executors通信的子线程们、关注各种进度的listenerBus相关子线程、通过MapOutputTrackerMasterEndpoint管理mapOutput位置的子线程们、心跳维持的子线程,并且应该还有block、http等
    • standalone的master采用actor子线程维护可用workers列表,包括资源使用情况;并且接收driver发起的app注册请求等。如果是使用zk进行master选主,还得维持与zk之间的连接。另外,还有UI界面。
    • worker主进程需要维持与master之间的心跳,汇报资源使用情况。executors子进程需要同driver间维持心跳。主进程通过子线程启动executor子进程。另外还有block、shuffle output(不确定是否直接复用了block线程or进程)等。
  • 模块、进程、线程间如何通信的?数据结构体如何?
    • scala程序,网络、进程、线程间主要通过Actor方式进行交互,资源调度主要走这条流
    • python通过py4j调用java代理的scala程序
    • shuffle output等通过netty交互数据
    • broadcast通过p2p交互数据
  • master怎么知道有哪些可用worker的?
    • worker启动时,向master发送RegisterWorker消息,由Master类通过Actor方式处理
  • 任务调度的算法是什么?有没有优缺点?trait SchedulerBuilder , FairSchedulingAlgorithm/FIFOSchedulingAlgorithm
    • FIFO:
    • Fair:
  • 资源调度的依据,对可用executor 随机化,locality的原则,每个worker有cores限制,当前每个task只能占用1个core
  • jar、files、library是这时发送给executor的吗?driver还是master发送的呢?
    • driver提交给master的是appDescription,其中包含的command只是指定了executorBackend类,未涉及app具体的代码
    • driver在executor注册后,向其发送的launchTask消息才包含真正的task信息(files、jars名称),由TaskRunner中反序列化并拉取文件
  • standalone、yarn、mesos切换对driver、master、worker都有什么影响呢?使用到哪些类?
    • 目前只能看出有不同的backend类,实现功能,没有比较优劣。
  • 同一worker上的资源隔离做到哪一步?JVM可以控制memory,CPU、IO、DISK呢?
    • 我们当前的配置,一台worker上只能有一个executor,后者是在driver->master->Worker时生成的
    • 每个executor在调度时,可能会被分配多个tasks,只要availableCpus(i) >= CPUS_PER_TASK
    • executorBackend在接收到launchTask请求时,会生成TaskRunner对象,并由threadpool调度执行,即如果有多个tasks到达,就会多线程并发执行了
    • 这些属于同一个executor的tasks共享一个JVM,所以共享executor.memory等限制
  • JVM、python进程复用 是如何做到的?
  • standalone cluster manager type时,如何与zk配合?
    • ZooKeeperPersistenceEngine和ZooKeeperLeaderElectionAgent 在Master类里被初始化。后者在new时直接start了,基于apache curator.apache.org实现,其isLeader、notLeader方法会被curator的方法调用。

最后,放一张简化后的时序图:

spark-resource-schedule

Leave a Reply