本文尝试用spark自带example streaming.NetworkWordCount为例,解释spark streaming 1.6.0的执行流程。

例子代码如下:

 Scala |  copy code |? 
01
    // Create the context with a 1 second batch size
02
    val sparkConf = new SparkConf().setAppName("NetworkWordCount")
03
    val ssc = new StreamingContext(sparkConf, Minutes(1))
04
 
05
    // Create a socket stream on target ip:port and count the
06
    // words in input stream of \n delimited text (eg. generated by 'nc')
07
    // Note that no duplication in storage level only for running locally.
08
    // Replication necessary in distributed scenario for fault tolerance.
09
    val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
10
    val words = lines.flatMap(_.split(" "))
11
    val wordCounts = words.map(=> (x, 1)).reduceByKey(_ + _)
12
    wordCounts.print()
13
    ssc.start()
14
    ssc.awaitTermination()

Streaming的核心是DStream类,即discretized streams。以上的socketTextStream、flatMap、map、reduceByKey、print方法分别生成了5个子类实例:SocketInputDStream<-FlatMappedDStream<-MappedDStream<-ShuffledDStream<-ForEachDStream,前面是后面的parent,可以类比RDD的依赖关系。另外,首尾分别是input和output stream。

但截至到wordCounts.print(),spark集群并没有任何实质性的操作呢,直到ssc.start(),它会触发一系列的初始化,其中包括input stream的数据输入,并使用类似RDD的方式,以outputStream为入口依次触发parent的执行。将分如下几个部分详细说明:

  1. start触发的初始化过程,即input stream任务如何提交至executors?
  2. input stream blocks的产生过程,即数据是如何被读入和缓存的?
  3. input stream如何触发后续的transform、output操作的?

Start触发的初始化过程

SparkStreaming

 

当运行在集群模式下时,driver仅仅初始化job,input的真正读取者是worker节点。如上所示,StreamingContext.start()启动streaming-start子线程,触发JobScheduler.start(),进而触发两个关键实例的start方法:ReceiverTracker.start()和JobGenerator.start()。前者会生成并异步提交input Receiver job,后者以定时器监听duration超时、生成的后续处理任务。先仅关注前者。

核心方法是ReceiverTrackerEndpoint.startReceiver(receiver, scheduledLocations)方法,它调用ssc.sparkContext.submitJob提交Job,传入的RDD是仅包含receiver信息的单元素RDD,processPartition回调方法如下,在worker端取出receiver信息,并初始化ReceiverSupervisorImpl对象,开始读取数据。

 Scala |  copy code |? 
01
      // Function to start the receiver on the worker node
02
      val startReceiverFunc: Iterator[Receiver[_]] =&gt; Unit =
03
        (iterator: Iterator[Receiver[_]]) =&gt; {
04
          if (!iterator.hasNext) {
05
            throw new SparkException(
06
              "Could not start receiver as object not found.")
07
          }
08
          // flykobe: worker拿到RDD,里面仅包含一个元素,就是receiver
09
          // 包装并且启动数据接收
10
          if (TaskContext.get().attemptNumber() == 0) {
11
            val receiver = iterator.next()
12
            assert(iterator.hasNext == false)
13
            val supervisor = new ReceiverSupervisorImpl(
14
              receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
15
            supervisor.start()
16
            supervisor.awaitTermination()
17
          } else {
18
            // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.
19
          }
20
        }

注意,一个app可以有多个input streams,所以可以在该阶段触发生成多个Jobs(如何调度这些Jobs,就是后面JobScheduler等的事情了)。

Input stream blocks的产生过程

Spark Streaming的处理模型是小批量、伪实时,所以需要对读入的数据进行缓存,每隔一段时间生成一个blocks,再每隔duration时间,由这些blocks组成RDDs。

SparkStreaming-1

 

有两种方式blocks,一种是Receiver单条写入,框架处理底层细节,例如SocketInputDStream、KafkaInputDStream;另一种是Receiver直接形成block,自行控制缓存等。

以SocketInputDStream为例,它的receiver阻塞读取socket数据,每接收到一行,就调用Receiver.store()方法进行存储,进而触发上图左边的逻辑。

首先将数据暂存在BlockGenerator.currentBuffer里,BlockGenerator定时器子线程每隔spark.streaming.blockInterval时间,就调用updateCurrentBuffer()将buffer里的数据形成block,并put进blocksForPushing队列。以上可视为queue的生产过程。

BlockGenerator还有一个blockPushingThread子线程,使用keepPushingBlocks()方法阻塞监听该queue,一旦有block产生,就调用ReceiverSupervisorImpl.pushAndReportBlock()方法,按照storage level交由BlockManager.doPut存储在内存 and/or 磁盘上,在此期间可能会写WAL日志。随后通过AKKA向master的ReceiverTracker发送AddBlock消息,由ReceivedBlockTracker.addBlock()方法建立block与input streaming的关联关系。

这里需要注意的是,block的产生间隔是由spark.streaming.blockInterval控制的,默认是200ms,而非duration参数。

完成以上步骤后,worker节点里保存了blocks的数据,而master节点存储了blocks列表。

Input stream如何触发后续操作

示例里没有window操作,处理间隔仅有duration控制,相对简单。

SparkStreaming-2

 

前面提到JobGenerator的定时器线程,它每隔ssc.graph.batchDuration.milliseconds执行一次,简单生成GenerateJobs消息,由JobGenerator EventLoop子线程处理,触发JobGenerator.generateJobs()。

它首先调用ReceivedBlockTracker.allocateBlocksToBatch()方法,按需写入master端的WAL日志,并组装起(time => (streamID=>blocks, …) ),保存到实例属性timeToAllocatedBlocks HashMap里。

随后进入关键步骤,由于一个app可以有多个output streams,故调用DStreamGraph.generateJobs()依次触发。以这儿的ForEachDStream为例,它的generateJob方法触发生成parent RDD,继续触发parent.compute()方法,最终导致SocketInputDStream按需将duration里的blocks形成RDDs:

 Scala |  copy code |? 
01
 /**
02
   * Generates RDDs with blocks received by the receiver of this stream. */
03
  override def compute(validTime: Time): Option[RDD[T]] = {
04
    val blockRDD = {
05
 
06
      if (validTime &lt; graph.startTime) {
07
        // If this is called for any time before the start time of the context,
08
        // then this returns an empty RDD. This may happen when recovering from a
09
        // driver failure without any write ahead log to recover pre−failure data.
10
        new BlockRDD[T](ssc.sc, Array.empty)
11
      } else {
12
        // Otherwise, ask the tracker for all the blocks that have been allocated to this stream
13
        // for this batch
14
        val receiverTracker = ssc.scheduler.receiverTracker
15
        val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)
16
 
17
        // Register the input blocks information into InputInfoTracker
18
        val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)
19
        ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
20
 
21
        // Create the BlockRDD
22
        createBlockRDD(validTime, blockInfos)
23
      }
24
    }
25
    Some(blockRDD)
26
  }

截止到这儿,已经把blocks关联到RDD了。随后调用JobScheduler.submitJobSet()异步提交Job即可。

以上大体描述了spark streaming的job触发过程,但如何对input replication、怎么触发checkpoint、WAL日志如何使用等尚未涉及。

One Comment

  1. Aegeaner says:

    我和楼主的分析差不多,不过我也没注意到文末提到的关于input replication、WAL日志如何使用的细节,分析的还是不够细致啊。http://blog.csdn.net/aegeaner/article/details/53406471

Leave a Reply