`
distantlight1
  • 浏览: 43641 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

spark源码梳理(1)-Action提交1

阅读更多

1.概述

Action算子是触发Spark计算的入口,属于Spark核心逻辑。本文梳理Action触发、计算、返回的整个数据流

 

本文涉及代码主体位于org.apache.spark.scheduler/rdd/executor几个模块。核心类:RDD、SparkContext、DAGScheduler、TaskSchedulerImpl、CoarseGrainedSchedulerBackEnd、CoarseGrainedExecutorBackEnd、TaskSetManager、Executor、ResultTask、ShuffleMapTask

 

BTW:吐槽一下Iteye的编辑器实在太不给力了,插入图片好麻烦,从mac版word复制还带格式乱码。。

 

2.整体调用逻辑图

 实线框内为方法名,连接线上为传递的消息实体

红线:提交计算主体调用流 蓝线:返回数据主体调用流 浅蓝线:事件总线消息分发 绿线:Stage提交回调回路

黄底:重要逻辑实现调用栈 绿底:其他模块相关调用 空心箭头:异步调用/远程通信

 

3.源码详解

Step 1-RDD

// Utils.getIteratorSize _是传入的计算函数,这里是计算每个分区的size,runJob返回Array[Long]。不同Action就是传入不同的func,因此调度逻辑(DagScheduler/TaskScheduler不需要关心具体操作内容)。RDD.foreach()就是直接传入自定义Action

def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

 

Step 2-SparkContext

// 对于每个partition执行getIteratorSize。RDD的物理内容上就是一个Seq,Spark中通过迭代子的形式传递RDD内容的引用,逻辑上可以把Iterator[T]就看做RDD本身。func则传递Action操作信息。最终计算落地到RDD.compute(),后文会说到

def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {

  runJob(rdd, func, 0 until rdd.partitions.length)

}

 

// cleanedFunc是一个闭包的函数,用到asm来解析class。大致是去掉对闭包无影响的父类、子类、transient属性等,确认闭包可序列化。后续文章再深入分析这个方法。最终getIteratorSize函数传到runJob里

def runJob[T, U: ClassTag]( rdd: RDD[T], func: Iterator[T] => U, partitions: Seq[Int]): Array[U] = {

  val cleanedFunc = clean(func)

  runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)

}

 

def runJob[T, U: ClassTag]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], resultHandler: (Int, U) => Unit): Unit = {

  val callSite = getCallSite// callSite是通过StackTrace获取调用端的代码行号,用来记日志

  val cleanedFunc = clean(func)

  logInfo("Starting job: " + callSite.shortForm)

  if (conf.getBoolean("spark.logLineage", false)) {// 日志,建议打开

    logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)

  }

  // 交给DasScheduler执行,action信息通过cleanedFunc参数传递

  dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)

  progressBar.foreach(_.finishAll())// 更新进度,也就是控制台看到的进度日志

  rdd.doCheckpoint()// 记录checkpoint

}

 

Step 3-DAGScheduler

// 这步就是异步调用submitJob

def runJob[T, U]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], callSite: CallSite, resultHandler: (Int, U) => Unit, properties: Properties): Unit = {

val start = System.nanoTime

val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)

waiter.awaitResult() match {// 阻塞等待计算结果

  case JobSucceeded =>

    logInfo("Job %d finished: %s, took %f s".format (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))

  case JobFailed(exception: Exception) =>

    logInfo("Job %d failed: %s, took %f s".format (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))

    // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.

    val callerStackTrace = Thread.currentThread().getStackTrace.tail

    exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)

    throw exception

  }

}

 

def submitJob[T, U](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], callSite: CallSite, resultHandler: (Int, U) => Unit, properties: Properties): JobWaiter[U] = {

  // 校验所有partition都存在

  val maxPartitions = rdd.partitions.length

  partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>

    throw new IllegalArgumentException(

      "Attempting to access a non-existent partition: " + p + ". " +"Total number of partitions: " + maxPartitions)

  }

 

  val jobId = nextJobId.getAndIncrement()// 生成一个jobId

  if (partitions.size == 0) {// 没有partition,啥也不用做

    return new JobWaiter[U](this, jobId, 0, resultHandler)

  }

 

  assert(partitions.size > 0)

  val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]

  // JobWaiter是类似Future的异步任务封装

  val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)

      // 封装一个提交消息,JobSubmitted实现了DAGSchedulerEvent,EventLoop是一个消息队列,实现类是DAGSchedulerEventProcessLoop

  eventProcessLoop.post(JobSubmitted(jobId, rdd, func2, partitions.toArray, callSite, waiter, SerializationUtils.clone(properties)))

  waiter

}

 

DAGSchedulerEventLoop

private[spark] abstract class EventLoop[E](name: String) extends Logging {

  private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()

  private val eventThread = new Thread(name) {

    override def run(): Unit = {

        while (!stopped.get) {

          val event = eventQueue.take()

            onReceive(event)

……

}

 

private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {

  case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>

    dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)

    ……//其他消息处理

 

// 回到dagScheduler,这里消息队列只是做了一次平峰。listener是JobWaiter回调对象

private[scheduler] def handleJobSubmitted(jobId: Int, finalRDD: RDD[_],func: (TaskContext, Iterator[_]) => _,partitions: Array[Int], callSite: CallSite, listener: JobListener, properties: Properties) {

  var finalStage: ResultStage = null

  try {

// 根据dependency关系划分Stage。Stage是可以并发执行的Task集合,Stage边界为RDD宽依赖

// 实现是一个广度优先遍历,具体代码参见《内幕》4.2.3节,此处略去

    finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)

  } catch {

      logWarning("Creating new stage failed due to exception - job: " + jobId, e)

      listener.jobFailed(e)

      return

  }

 

  // 生成一个ActiveJob准备执行,这里func信息封装到了Stage对象里,整个DAG的func信息由Stage.parentStage()关联到RDD起来

  val job = new ActiveJob(jobId, finalStage, callSite, listener, properties) 

  clearCacheLocs()// 清本地缓存

  logInfo("Got job %s (%s) with %d output partitions".format(job.jobId, callSite.shortForm, partitions.length))

  logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")

  logInfo("Parents of final stage: " + finalStage.parents)

  logInfo("Missing parents: " + getMissingParentStages(finalStage))

 

  // 注册Job

  val jobSubmissionTime = clock.getTimeMillis()

  jobIdToActiveJob(jobId) = job

  activeJobs += job

  finalStage.setActiveJob(job)

  val stageIds = jobIdToStageIds(jobId).toArray

  val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))

 

  // 提交一个SparkListenerJobStart事件(SparkListenerEvent),listenerBus实现是AsynchronousListenerBus,也是一个异步消息队列

  listenerBus.post(SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))

  

  submitStage(finalStage)  // 提交Stage

  submitWaitingStages()  // Stage完成后尝试提交等待的Stage,submitWaitingStage其实是一个回调方法,会在很多地方调用,形成回调回路

}

 

// 递归提交stage

private def submitStage(stage: Stage) {

  val jobId = activeJobForStage(stage)

  if (jobId.isDefined) {

    logDebug("submitStage(" + stage + ")")

    if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) { // 运行中/等待中/已失败的Stage不用重复触发

      val missing = getMissingParentStages(stage).sortBy(_.id)// 如果存在未完成的Parent Stage,就先提交ParentStage,本Stage等待

      logDebug("missing: " + missing)

      if (missing.isEmpty) {

        logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")

        submitMissingTasks(stage, jobId.get)

      } else {

        for (parent <- missing) {

          submitStage(parent)

        }

        waitingStages += stage

      }

    }

  } else {

    abortStage(stage, "No active job for stage " + stage.id, None)// 如果Job不存在也就不用计算Stage了

  }

}

 

Step 4-DAGScheduler

/** 提交当前Stage未计算的Task,此时Parent Stage的结果已经available*/

private def submitMissingTasks(stage: Stage, jobId: Int) {

  logDebug("submitMissingTasks(" + stage + ")")

  stage.pendingPartitions.clear()  // 注册PendingTask

 

  /* 获取未计算的分区,分两种实现:1.ShuffleMapStage:通过查找outputLocs(id).isEmpty, outputLocs存放MapStatus对象(ShuffleMapTask计算结果的元数据)

  2.ResultStage:通过!job.finished(partitionId)判断 */

  val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()

 

  // 仅当当前stage没有部分提交的时候,重置累加器(如果部分提交了,重置会覆盖之前的结果

  if (stage.internalAccumulators.isEmpty || stage.numPartitions == partitionsToCompute.size) {

    stage.resetInternalAccumulators()

  }

 

  // 注册状态

  val properties = jobIdToActiveJob(jobId).properties

  runningStages += stage

 

  // 注册Stage各分区重试(Attempt)状态,OutputCommitCoordinator在Driver和Executor端都有,记录Stage运行状态及HDFS相关权限

  // SparkListenerStageSubmitted should be posted before testing whether tasks are serializable. If tasks are not serializable, a SparkListenerStageCompleted event will be posted, which should always come after a corresponding SparkListenerStageSubmitted event.

  stage match {

    case s: ShuffleMapStage =>

      outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)

    case s: ResultStage =>

      outputCommitCoordinator.stageStart(

        stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)

  }

 

// 为每个mission partition分配候选的Executor地址,这里包含本地优先策略(locality),TaskLocation包含Executor的host和executorId。这个Map的value null表示没有优选的executor(不是说不能执行)

 val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {

    stage match {

      case s: ShuffleMapStage => partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap

      case s: ResultStage =>

        val job = s.activeJob.get

        partitionsToCompute.map { id =>

          val p = s.partitions(id)

          (id, getPreferredLocs(stage.rdd, p))

        }.toMap

    }

  } catch {

    case NonFatal(e) =>

      // 记录一次(已失败的)计算Attempt。StageInfo是一个封装Stage状态的pojo

      stage.makeNewStageAttempt(partitionsToCompute.size)

      listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))

      abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}", Some(e))

      runningStages -= stage

      return

  }

 

  // 记录一次计算Attempt

  stage.makeNewStageAttempt(partitionsToCompute.size,  taskIdToLocations.values.toSeq)

  // 异步注册StageSubmit消息监听,落地到SparkListener这个trait的实现类

  // ListenerBus是异步消息总线,用于Driver端整体的消息分发。消息被所有SparkListener实现类订阅(此处略去一些不重要的实现):

  // SparkFirehoseListener:用户自定义事件监听,这个类只暴露了一个onEvent抽象方法,可统一处理各类型事件

  // JavaSparkListener:用户自定义监听

  // SQLListener:Spark-SQL使用

  // ExecutorAllocationListener:根据负载动态挂载/卸载Executor,注意Executor是Slave节点上的独立进程

  // JobProcessListener:Job进度监听,用于维护各Stage运行状态

  // StorgeListener:存储状态监听,包括在WebUI呈现

  listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))

 

  // TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.

  // 向所有Executor广播Task内容(RDD序列化后的二进制码)。每个Task都获取一份独立的(序列化的)RDD副本,这使Task之间有更好的隔离性。因此每个Task修改闭包中的引用都是独立的(所以也没有并发问题)This is necessary in Hadoop where the JobConf/Configuration object is not thread-safe.

  var taskBinary: Broadcast[Array[Byte]] = null

  try {

    // ShuffleMapTask, 广播rdd与shuffleDep依赖关系

    // ResultTask, 广播rdd与func(func就是Action操作,包括foreach)

    val taskBinaryBytes: Array[Byte] = stage match {

      case stage: ShuffleMapStage =>

      // closureSerializer默认为JavaSerializer,就是java.io.Serializer默认序列化方式。序列化异常的时候看到的一长串writeExternal堆栈就是这个类调用

        JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))

      case stage: ResultStage =>

        JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))

    }

    taskBinary = sc.broadcast(taskBinaryBytes)// 广播

  } catch {

    // 序列化失败报的错,如果任务无法序列化则直接放弃

    case e: NotSerializableException =>

      abortStage(stage, "Task not serializable: " + e.toString, Some(e))

      runningStages -= stage

      return

    case NonFatal(e) =>

      abortStage(stage, s"Task serialization failed: $e\n${e.getStackTraceString}", Some(e))

      runningStages -= stage

      return

  }

 

  // 创建Task实例。Task信息传递到Executor有多个渠道,DAG信息通过广播,依赖Jar通过文件下载,Action本身(字节码)通过Akka 

  val tasks: Seq[Task[_]] = try {

    stage match {

      case stage: ShuffleMapStage =>

        partitionsToCompute.map { id =>

          // 附带loc和partition信息

          val locs = taskIdToLocations(id)

          val part = stage.rdd.partitions(id)

          new ShuffleMapTask(stage.id, stage.latestInfo.attemptId, taskBinary, part, locs, stage.internalAccumulators)

        }

      case stage: ResultStage =>

        val job = stage.activeJob.get

        partitionsToCompute.map { id =>

          val p: Int = stage.partitions(id)

          val part = stage.rdd.partitions(p)

          val locs = taskIdToLocations(id)

          new ResultTask(stage.id, stage.latestInfo.attemptId, taskBinary, part, locs, id, stage.internalAccumulators)

        }

    }

  } catch {

    case NonFatal(e) =>

      abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}", Some(e))

      runningStages -= stage

      return

  }

 

  // 提交Task

  if (tasks.size > 0) {

    logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")

    stage.pendingPartitions ++= tasks.map(_.partitionId)

    logDebug("New pending partitions: " + stage.pendingPartitions)

    // 封装Task到TaskSet并提交,TaskSet基本就是Task的集合。控制移交到TaskScheduler

    taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))

    stage.latestInfo.submissionTime = Some(clock.getTimeMillis())

  } else {

    // 没有待执行Task,标志Stage结束

    markStageAsFinished(stage, None)

    val debugString = stage match {

      case stage: ShuffleMapStage =>

        s"Stage ${stage} is actually done; " + s"(available: ${stage.isAvailable}," + s"available outputs: ${stage.numAvailableOutputs}," + s"partitions: ${stage.numPartitions})"

      case stage : ResultStage =>

        s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"

    }

    logDebug(debugString)

  }

}

 

 

private def getPreferredLocsInternal(rdd: RDD[_],partition: Int, visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = {

  // If the partition has already been visited, no need to re-visit.

  // This avoids exponential path exploration.  SPARK-695

  if (!visited.add((rdd, partition))) {

    // Nil has already been returned for previously visited partitions.

    return Nil

  }

  // 先拿缓存

  val cached = getCacheLocs(rdd)(partition)

  if (cached.nonEmpty) {

    return cached

  }

  // 如果有预设的preference,则根据预设返回。这是递归的终点,所有preference都是从input-rdd沿窄依赖传递的

  val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList

  if (rddPrefs.nonEmpty) {

    return rddPrefs.map(TaskLocation(_))

  }

  // 递归寻找各个窄依赖

  rdd.dependencies.foreach {

    case n: NarrowDependency[_] =>

      for (inPart <- n.getParents(partition)) {

        val locs = getPreferredLocsInternal(n.rdd, inPart, visited)

        if (locs != Nil) {

          return locs

        }

      }

    case _ =>

  }

  Nil

}

 

以上完成DagScheduler提交逻辑,后续逻辑由TaskScheduler继续实现,未完待续

  • 大小: 251.6 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics