spark的job提交之后,是由DAGScheduler进行stage划分,task的切割,最终完成执行的。 这里主要就是了解一个job是怎么切割为Task并发执行的。

Stage划分

直接进入正题,也就是org.apache.spark.scheduler.DAGScheduler#handleJobSubmitted的实现。

先了解整体的逻辑,不必要的逻辑直接简化掉

private[scheduler] def handleJobSubmitted(  
    jobId: Int,  
    finalRDD: RDD[_],  
    func: (TaskContext, Iterator[_]) => _,  
    partitions: Array[Int],  
    callSite: CallSite,  
    listener: JobListener,  
    artifacts: JobArtifactSet,  
    properties: Properties): Unit = {  
  // 前置检查,已经取消的job直接结束
  ...
  
  var finalStage: ResultStage = null  
  try {  
    // 创建Stage 
    finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)  
  } catch {
	// 异常处理,忽略  
    ...
  }  
  ...
 
  // 提交stage
  submitStage(finalStage)  
}

job提交之后的处理可以很简单地划分为两步,创建Stage与提交Stage。很明显,任务的切割主要看createResultStage的实现。

private def createResultStage(  
    rdd: RDD[_],  
    func: (TaskContext, Iterator[_]) => _,  
    partitions: Array[Int],  
    jobId: Int,  
    callSite: CallSite): ResultStage = { 
    // 通过BFS遍历RDD,根据ShuffleDependency为界进行切割
  val (shuffleDeps, resourceProfiles) = getShuffleDependenciesAndResourceProfiles(rdd)  
  val resourceProfile = mergeResourceProfilesForStage(resourceProfiles)  
 
  // 创建父stage 
  val parents = getOrCreateParentStages(shuffleDeps, jobId)  
  // 申请stage id并创建stage
  val id = nextStageId.getAndIncrement()  
  val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, 
    callSite, resourceProfile.id)  
	// 更新stage对应的信息
  stageIdToStage(id) = stage  
  updateJobIdStageIdMaps(jobId, stage)  
  stage  
}

首先是getShuffleDependenciesAndResourceProfiles,对RDD进行广度优先遍历,找到RDD直接依赖的shuffle操作

private[scheduler] def getShuffleDependenciesAndResourceProfiles(  
    rdd: RDD[_]): (HashSet[ShuffleDependency[_, _, _]], HashSet[ResourceProfile]) = {  
  val parents = new HashSet[ShuffleDependency[_, _, _]]  
  val resourceProfiles = new HashSet[ResourceProfile]  
  val visited = new HashSet[RDD[_]]  
  val waitingForVisit = new ListBuffer[RDD[_]]  
  waitingForVisit += rdd  
  // 广度优先遍历
  while (waitingForVisit.nonEmpty) {  
    val toVisit = waitingForVisit.remove(0)  
    if (!visited(toVisit)) {  
      visited += toVisit  
      Option(toVisit.getResourceProfile()).foreach(resourceProfiles += _)  
      toVisit.dependencies.foreach {  
        case shuffleDep: ShuffleDependency[_, _, _] =>  
          // shuffle直接依赖项,加入到parent,同时这个分支剪枝掉了
          parents += shuffleDep  
        case dependency =>  
	      // 非shuffle依赖加入到队列中等待继续遍历
          waitingForVisit.prepend(dependency.rdd)  
      }  
    }  
  }  
  (parents, resourceProfiles)  
}

其实这里就是切割stage了,以shuffle操作为界进行切割,也就是对于一个RDD树来说,每一层的的ShuffleDependency节点就是一个stage的切割点。

这里返回的resourceProfiles就是当前RDD的stage执行所需的资源信息,因此接着又将其合并,表示一个stage执行过程中的所有资源需求。

我们前面划分了RDD的最外层stage以及直接依赖的shuffle节点。shuffle节点本身的运算可能还会依赖别的shuffle,也就是还可以继续划分stage。

因此接下来就是遍历shuffle,然后递归划分出所有的stage,最终构建出所有的stage,以及stage之间的依赖关系。

// 为shuffleDeps创建stage
val parents = getOrCreateParentStages(shuffleDeps, jobId) 
val id = nextStageId.getAndIncrement()  
// 为当前rdd这个stage,以及依赖的stage,创建最终的stage
val stage = new ResultStage(id, rdd, func, partitions, parents, jobId,  
  callSite, resourceProfile.id)

getOrCreateParentStage是一个递归实现

private def getOrCreateParentStages(shuffleDeps: HashSet[ShuffleDependency[_, _, _]],  
    firstJobId: Int): List[Stage] = {  
    // 遍历shuffleDeps,创建对应的stage
  shuffleDeps.map { shuffleDep =>  
    getOrCreateShuffleMapStage(shuffleDep, firstJobId)  
  }.toList  
}
 
private def getOrCreateShuffleMapStage(  
    shuffleDep: ShuffleDependency[_, _, _],  
    firstJobId: Int): ShuffleMapStage = {  
 // shuffleIdToMapStage记录了shuffle id和stage的关系,如果存在映射,那么可以直接返回,相反,不存在的话就需要创建了
  shuffleIdToMapStage.get(shuffleDep.shuffleId) match {  
    case Some(stage) =>  
      stage  
  
    case None =>  
      // 先查找这个shuffle依赖的还没有创建的shuffle节点,不存在的话则先为这些依赖项创建stage 
      getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>  
        if (!shuffleIdToMapStage.contains(dep.shuffleId)) {  
          createShuffleMapStage(dep, firstJobId)  
        }  
      }  
      // 将shuffle节点自身创建为stage 
      createShuffleMapStage(shuffleDep, firstJobId)  
  }  
}

createShuffleMapStage的逻辑和createResultStage类似。只不过前者明确创建的是ShuffleStage,后者明确创建的是ResultStage

def createShuffleMapStage[K, V, C](  
    shuffleDep: ShuffleDependency[K, V, C], jobId: Int): ShuffleMapStage = {  
	// 前置逻辑和createResultStage类似
  val rdd = shuffleDep.rdd  
  val (shuffleDeps, resourceProfiles) = getShuffleDependenciesAndResourceProfiles(rdd)  
  val resourceProfile = mergeResourceProfilesForStage(resourceProfiles)  
 
  val numTasks = rdd.partitions.length  
  // 递归调用
  val parents = getOrCreateParentStages(shuffleDeps, jobId)  
  val id = nextStageId.getAndIncrement()  
  // 创建ShuffleMapStage
  val stage = new ShuffleMapStage(  
    id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker,  
    resourceProfile.id)  
  // 更新stage信息
  shuffleIdToMapStage(shuffleDep.shuffleId) = stage  
 
  stage  
}

到这里为止,Stage的划分已经依赖关系已经构建完毕了。总结下来就是通过BFS遍历树的依赖关系,以Shuffle节点作为分割点进行Stage的划分。

Stage提交

Stage划分结束之后就可以提交Stage了。经过前面的介绍,这部分逻辑也很容易理解,就是深度优先遍历stage,stage存在依赖的parent stage还没有执行则提交执行parent stage,都执行了则提交自己去执行

private def submitStage(stage: Stage): Unit = {  
  val jobId = activeJobForStage(stage)  
  if (jobId.isDefined) {  
	  // 做一些检查工作,判断要不要中断stage
    if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {  
      if (stage.getNextAttemptId >= maxStageAttempts) {  
        abortStage(stage, reason, None)  
      } else {  
        // 查找缺失的依赖的stage列表
        val missing = getMissingParentStages(stage).sortBy(_.id)  
        logDebug("missing: " + missing)  
        if (missing.isEmpty) {  
          // 都存在,则提交本stage执行
          submitMissingTasks(stage, jobId.get)  
        } else {  
	        // 否则先提交执行依赖的stage
          for (parent <- missing) {  
            submitStage(parent)  
          }  
          waitingStages += stage  
        }  
      }  
    }  
  } else {  
    abortStage(stage, "No active job for stage " + stage.id, None)  
  }  
}

Stage执行

最终所有父stage都完成的stage通过submitMissingTasks来提交执行,也算是stage提交的一部分,不过最终执行的逻辑也在这里,而且逻辑比较复杂,就单独用一节来说明。

同样地,我们只关注主要的逻辑代码,其余地方直接忽略

private def submitMissingTasks(stage: Stage, jobId: Int): Unit = {
	// 确定要计算的分区索引
	val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
 
	// 根据分区的id,获取TaskId,以及Task要运行的位置
	val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {  
	stage match {  
	case s: ShuffleMapStage =>  
	  partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap  
	case s: ResultStage =>  
	  partitionsToCompute.map { id =>  
		val p = s.partitions(id)  
		(id, getPreferredLocs(stage.rdd, p))  
	  }.toMap  
	}  
 
	var taskBinary: Broadcast[Array[Byte]] = null  
	var partitions: Array[Partition] = null  
	try {  
	  // 将stage序列化
	  RDDCheckpointData.synchronized {  
	    taskBinaryBytes = stage match {  
	      case stage: ShuffleMapStage =>  
	        JavaUtils.bufferToArray(  
	          closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))  
	      case stage: ResultStage =>  
	        JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))  
	    }  
	  
	    partitions = stage.rdd.partitions  
	  }  
	  // 将序列化后的stage广播
	  taskBinary = sc.broadcast(taskBinaryBytes)  
	}
 
	// 依据分区创建Task列表
	val tasks: Seq[Task[_]] = try {  
	  val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()  
	  stage match {  
	    case stage: ShuffleMapStage =>  
	      stage.pendingPartitions.clear()  
	      partitionsToCompute.map { id =>  
	        val locs = taskIdToLocations(id)  
	        val part = partitions(id)  
	        stage.pendingPartitions += id  
	        new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber(), taskBinary,  
	          part, stage.numPartitions, locs, artifacts, properties, serializedTaskMetrics,  
	          Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,  
	          stage.rdd.isBarrier())  
	      }  
	  
	    case stage: ResultStage =>  
	      partitionsToCompute.map { id =>  
	        val p: Int = stage.partitions(id)  
	        val part = partitions(p)  
	        val locs = taskIdToLocations(id)  
	        new ResultTask(stage.id, stage.latestInfo.attemptNumber(),  
	          taskBinary, part, stage.numPartitions, locs, id, artifacts, properties,  
	          serializedTaskMetrics, Option(jobId), Option(sc.applicationId),  
	          sc.applicationAttemptId, stage.rdd.isBarrier())  
	      }  
	  }  
	}
 
	if (tasks.nonEmpty) {  
	// 有任务则提交任务
	  val shuffleId = stage match {  
	    case s: ShuffleMapStage => Some(s.shuffleDep.shuffleId)  
	    case _: ResultStage => None  
	  }  
	  // 任务通过taskScheduler提交执行
	  taskScheduler.submitTasks(new TaskSet(  
	    tasks.toArray, stage.id, stage.latestInfo.attemptNumber(), jobId, properties,  
	    stage.resourceProfileId, shuffleId))  
	} else {  
	  // 没有任务执行则直接标记stage为完成态
	  stage match {  
	    case stage: ShuffleMapStage =>    
	      if (!stage.shuffleDep.isShuffleMergeFinalizedMarked &&  
	        stage.shuffleDep.getMergerLocs.nonEmpty) {  
	        checkAndScheduleShuffleMergeFinalize(stage)  
	      } else {  
	        processShuffleMapStageCompletion(stage)  
	      }  
	    case stage : ResultStage =>  
	      markStageAsFinished(stage)  
	      submitWaitingChildStages(stage)  
	  }  
	}
}

执行的过程总结一下就是:

  1. 根据数据分布情况以及分区情况为stage创建出Tasks
  2. 将Task提交到TaskScheduler执行
  3. stage的task执行结束则stage执行完成

到此为止,一个job的提交到Stage的调度执行就完成了。