spark的job提交之后,是由DAGScheduler进行stage划分,task的切割,最终完成执行的。 这里主要就是了解一个job是怎么切割为Task并发执行的。
Stage划分
直接进入正题,也就是org.apache.spark.scheduler.DAGScheduler#handleJobSubmitted的实现。
先了解整体的逻辑,不必要的逻辑直接简化掉
private[scheduler] def handleJobSubmitted(
jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
artifacts: JobArtifactSet,
properties: Properties): Unit = {
// 前置检查,已经取消的job直接结束
...
var finalStage: ResultStage = null
try {
// 创建Stage
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
// 异常处理,忽略
...
}
...
// 提交stage
submitStage(finalStage)
}job提交之后的处理可以很简单地划分为两步,创建Stage与提交Stage。很明显,任务的切割主要看createResultStage的实现。
private def createResultStage(
rdd: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
jobId: Int,
callSite: CallSite): ResultStage = {
// 通过BFS遍历RDD,根据ShuffleDependency为界进行切割
val (shuffleDeps, resourceProfiles) = getShuffleDependenciesAndResourceProfiles(rdd)
val resourceProfile = mergeResourceProfilesForStage(resourceProfiles)
// 创建父stage
val parents = getOrCreateParentStages(shuffleDeps, jobId)
// 申请stage id并创建stage
val id = nextStageId.getAndIncrement()
val stage = new ResultStage(id, rdd, func, partitions, parents, jobId,
callSite, resourceProfile.id)
// 更新stage对应的信息
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stage
}首先是getShuffleDependenciesAndResourceProfiles,对RDD进行广度优先遍历,找到RDD直接依赖的shuffle操作
private[scheduler] def getShuffleDependenciesAndResourceProfiles(
rdd: RDD[_]): (HashSet[ShuffleDependency[_, _, _]], HashSet[ResourceProfile]) = {
val parents = new HashSet[ShuffleDependency[_, _, _]]
val resourceProfiles = new HashSet[ResourceProfile]
val visited = new HashSet[RDD[_]]
val waitingForVisit = new ListBuffer[RDD[_]]
waitingForVisit += rdd
// 广度优先遍历
while (waitingForVisit.nonEmpty) {
val toVisit = waitingForVisit.remove(0)
if (!visited(toVisit)) {
visited += toVisit
Option(toVisit.getResourceProfile()).foreach(resourceProfiles += _)
toVisit.dependencies.foreach {
case shuffleDep: ShuffleDependency[_, _, _] =>
// shuffle直接依赖项,加入到parent,同时这个分支剪枝掉了
parents += shuffleDep
case dependency =>
// 非shuffle依赖加入到队列中等待继续遍历
waitingForVisit.prepend(dependency.rdd)
}
}
}
(parents, resourceProfiles)
}其实这里就是切割stage了,以shuffle操作为界进行切割,也就是对于一个RDD树来说,每一层的的ShuffleDependency节点就是一个stage的切割点。
这里返回的resourceProfiles就是当前RDD的stage执行所需的资源信息,因此接着又将其合并,表示一个stage执行过程中的所有资源需求。
我们前面划分了RDD的最外层stage以及直接依赖的shuffle节点。shuffle节点本身的运算可能还会依赖别的shuffle,也就是还可以继续划分stage。
因此接下来就是遍历shuffle,然后递归划分出所有的stage,最终构建出所有的stage,以及stage之间的依赖关系。
// 为shuffleDeps创建stage
val parents = getOrCreateParentStages(shuffleDeps, jobId)
val id = nextStageId.getAndIncrement()
// 为当前rdd这个stage,以及依赖的stage,创建最终的stage
val stage = new ResultStage(id, rdd, func, partitions, parents, jobId,
callSite, resourceProfile.id)getOrCreateParentStage是一个递归实现
private def getOrCreateParentStages(shuffleDeps: HashSet[ShuffleDependency[_, _, _]],
firstJobId: Int): List[Stage] = {
// 遍历shuffleDeps,创建对应的stage
shuffleDeps.map { shuffleDep =>
getOrCreateShuffleMapStage(shuffleDep, firstJobId)
}.toList
}
private def getOrCreateShuffleMapStage(
shuffleDep: ShuffleDependency[_, _, _],
firstJobId: Int): ShuffleMapStage = {
// shuffleIdToMapStage记录了shuffle id和stage的关系,如果存在映射,那么可以直接返回,相反,不存在的话就需要创建了
shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
case Some(stage) =>
stage
case None =>
// 先查找这个shuffle依赖的还没有创建的shuffle节点,不存在的话则先为这些依赖项创建stage
getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
createShuffleMapStage(dep, firstJobId)
}
}
// 将shuffle节点自身创建为stage
createShuffleMapStage(shuffleDep, firstJobId)
}
}createShuffleMapStage的逻辑和createResultStage类似。只不过前者明确创建的是ShuffleStage,后者明确创建的是ResultStage
def createShuffleMapStage[K, V, C](
shuffleDep: ShuffleDependency[K, V, C], jobId: Int): ShuffleMapStage = {
// 前置逻辑和createResultStage类似
val rdd = shuffleDep.rdd
val (shuffleDeps, resourceProfiles) = getShuffleDependenciesAndResourceProfiles(rdd)
val resourceProfile = mergeResourceProfilesForStage(resourceProfiles)
val numTasks = rdd.partitions.length
// 递归调用
val parents = getOrCreateParentStages(shuffleDeps, jobId)
val id = nextStageId.getAndIncrement()
// 创建ShuffleMapStage
val stage = new ShuffleMapStage(
id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker,
resourceProfile.id)
// 更新stage信息
shuffleIdToMapStage(shuffleDep.shuffleId) = stage
stage
}到这里为止,Stage的划分已经依赖关系已经构建完毕了。总结下来就是通过BFS遍历树的依赖关系,以Shuffle节点作为分割点进行Stage的划分。
Stage提交
Stage划分结束之后就可以提交Stage了。经过前面的介绍,这部分逻辑也很容易理解,就是深度优先遍历stage,stage存在依赖的parent stage还没有执行则提交执行parent stage,都执行了则提交自己去执行
private def submitStage(stage: Stage): Unit = {
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
// 做一些检查工作,判断要不要中断stage
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
if (stage.getNextAttemptId >= maxStageAttempts) {
abortStage(stage, reason, None)
} else {
// 查找缺失的依赖的stage列表
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
if (missing.isEmpty) {
// 都存在,则提交本stage执行
submitMissingTasks(stage, jobId.get)
} else {
// 否则先提交执行依赖的stage
for (parent <- missing) {
submitStage(parent)
}
waitingStages += stage
}
}
}
} else {
abortStage(stage, "No active job for stage " + stage.id, None)
}
}Stage执行
最终所有父stage都完成的stage通过submitMissingTasks来提交执行,也算是stage提交的一部分,不过最终执行的逻辑也在这里,而且逻辑比较复杂,就单独用一节来说明。
同样地,我们只关注主要的逻辑代码,其余地方直接忽略
private def submitMissingTasks(stage: Stage, jobId: Int): Unit = {
// 确定要计算的分区索引
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
// 根据分区的id,获取TaskId,以及Task要运行的位置
val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
stage match {
case s: ShuffleMapStage =>
partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
case s: ResultStage =>
partitionsToCompute.map { id =>
val p = s.partitions(id)
(id, getPreferredLocs(stage.rdd, p))
}.toMap
}
var taskBinary: Broadcast[Array[Byte]] = null
var partitions: Array[Partition] = null
try {
// 将stage序列化
RDDCheckpointData.synchronized {
taskBinaryBytes = stage match {
case stage: ShuffleMapStage =>
JavaUtils.bufferToArray(
closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
case stage: ResultStage =>
JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
}
partitions = stage.rdd.partitions
}
// 将序列化后的stage广播
taskBinary = sc.broadcast(taskBinaryBytes)
}
// 依据分区创建Task列表
val tasks: Seq[Task[_]] = try {
val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
stage match {
case stage: ShuffleMapStage =>
stage.pendingPartitions.clear()
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = partitions(id)
stage.pendingPartitions += id
new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber(), taskBinary,
part, stage.numPartitions, locs, artifacts, properties, serializedTaskMetrics,
Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,
stage.rdd.isBarrier())
}
case stage: ResultStage =>
partitionsToCompute.map { id =>
val p: Int = stage.partitions(id)
val part = partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptNumber(),
taskBinary, part, stage.numPartitions, locs, id, artifacts, properties,
serializedTaskMetrics, Option(jobId), Option(sc.applicationId),
sc.applicationAttemptId, stage.rdd.isBarrier())
}
}
}
if (tasks.nonEmpty) {
// 有任务则提交任务
val shuffleId = stage match {
case s: ShuffleMapStage => Some(s.shuffleDep.shuffleId)
case _: ResultStage => None
}
// 任务通过taskScheduler提交执行
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptNumber(), jobId, properties,
stage.resourceProfileId, shuffleId))
} else {
// 没有任务执行则直接标记stage为完成态
stage match {
case stage: ShuffleMapStage =>
if (!stage.shuffleDep.isShuffleMergeFinalizedMarked &&
stage.shuffleDep.getMergerLocs.nonEmpty) {
checkAndScheduleShuffleMergeFinalize(stage)
} else {
processShuffleMapStageCompletion(stage)
}
case stage : ResultStage =>
markStageAsFinished(stage)
submitWaitingChildStages(stage)
}
}
}执行的过程总结一下就是:
- 根据数据分布情况以及分区情况为stage创建出Tasks
- 将Task提交到TaskScheduler执行
- stage的task执行结束则stage执行完成
到此为止,一个job的提交到Stage的调度执行就完成了。