300字范文,内容丰富有趣,生活中的好帮手!
300字范文 > hadoop源码分析_Spark2.x精通:Job触发流程源码深度剖析(一)

hadoop源码分析_Spark2.x精通:Job触发流程源码深度剖析(一)

时间:2019-03-14 07:28:29

相关推荐

hadoop源码分析_Spark2.x精通:Job触发流程源码深度剖析(一)

一、概述

之前几篇文章对Spark集群的Master、Worker启动流程进行了源码剖析,后面直接从客户端角度出发,讲解了spark-submit任务提交过程及driver的启动;集群启动、任务提交、SparkContext初始化等前期准备工作完成之后,后面就是我们的主函数的代码Job如何触发的,本篇文章还是结合源码进行剖析。

软件版本:

spark2.2.0

二、Job触发流程源码剖析

1. 我们先上一段最简单的代码,读取本地文件进行WordCount,并打印统计结果,代码如下:

package com.hadoop.ljs.spark220.study;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.FlatMapFunction;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.PairFunction;import org.apache.spark.api.java.function.VoidFunction;import org.apache.spark.sql.SparkSession;import scala.Tuple2;import java.util.Arrays;import java.util.Iterator;/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: -03-12 08:26 * @version: v1.0 * @description: com.hadoop.ljs.spark220.study */public class Example1 {public static void main(String[] args) throws Exception{/*spark环境初始化*/ SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("Example1"); SparkSession sc = SparkSession.builder().config(sparkConf).getOrCreate(); JavaSparkContext jsc = new JavaSparkContext(sc.sparkContext()); /*读取本地文件*/ JavaRDD<String> sourceRDD = jsc.textFile("D:\\kafkaSSL\\kafka_client_jaas.conf"); /*转换多维为一维数组*/ JavaRDD<String> words = sourceRDD.flatMap(new FlatMapFunction<String, String>() {@Override public Iterator<String> call(String s) {return Arrays.asList(s.split(" ")).iterator(); } }); /*转换成(hello,1)格式*/ JavaPairRDD<String, Integer> wordOne = words.mapToPair(new PairFunction<String, String, Integer>() {@Override public Tuple2<String, Integer> call(String s) {return new Tuple2<String, Integer>(s, 1); }}); /*根据key进行聚合*/ JavaPairRDD<String, Integer> wordCount = wordOne.reduceByKey(new Function2() {@Override public Integer call(Integer v1, Integer v2) {return v1+v2; } }); /*打印结果*/ wordCount.foreach(new VoidFunctionString, Integer>>() {@Override public void call(Tuple2<String, Integer> result){System.out.println("word: "+result._1+" count: "+result._2); } }); }}

我们一行行的进行分析,首先看读取本地文件textFile()函数:

/*这里直接调用的SparkContext的textFile函数*/ def textFile(path: String): JavaRDD[String] = sc.textFile(path)

2. 直接看sc.textFile()函数:

def textFile(path: String,minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {assertNotStopped()/*这里调用了hadoopFile函数,传入三个,写过Mapreuce的时候都知道第二个参数就是Map的输入格式化类型,参数3是行号4是一行的内容*//*hadoopFile()函数,返回了一个HadoopRDD*/hadoopFile(path,classOf[TextInputFormat],classOf[LongWritable],classOf[Text],minPartitions).map(pair=>pair._2.toString).setName(path) }

看hadoopFile()函数

def hadoopFile[K, V](path: String,inputFormatClass: Class[_ <: inputformat v>keyClass: Class[K],valueClass: Class[V],minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {assertNotStopped() // This is a hack to enforce loading hdfs-site.xml. // See SPARK-11227 for details.FileSystem.getLocal(hadoopConfiguration)//这里把hadoopConfiguration配置做了一个广播变量 val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))/*传入一个jobConf对输入数据进行格式化*/ val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)/*返回一个HadoopRDD实例,这里Hadoop配置文件是以广播变量的方式传进去的*//*广播变量每个Worker保存一份,被多个Executor共享*//*HadoopRDD继承自RDD*/ new HadoopRDD(this,confBroadcast,Some(setInputPathsFunc),inputFormatClass,keyClass,valueClass,minPartitions).setName(path) }

上面直接对HadopRDD做了一个map转换,这里Hadoop继承自RDD,调用的是RDD里面的map()函数,我们直接看看map函数代码:

/*最后其实是返回了一个MapPartitionsRDD,里面是(key,value),key是行号,value是内容*/ def map[U: ClassTag](f: T => U): RDD[U] = withScope {val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF)) }

上面对返回的RDD是一个键值对,然后.map(pair=>pair._2.toString对其进行了转换,其实就是去掉了那个key行号,剩下的是一个vlaue数组,里面是每行的内容,至此textFile这一行剖析完毕。

3.主函数的第30-42行都是对RDD进行了一系列的转换,其实都是调用RDD.scala中的内容对MapPartitionsRDD进行的转换,有兴趣你可以跟进去看一下,比较简单:

def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF)) }

/*mapToPair函数里面其实是调用的rdd.map函数,刚才上面已经说过了*/ def mapToPair[K2, V2](f: PairFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {def cm: ClassTag[(K2, V2)] = implicitly[ClassTag[(K2, V2)]] new JavaPairRDD(rdd.map[(K2, V2)](f)(cm))(fakeClassTag[K2], fakeClassTag[V2]) }

4.最后调用reduceBykey进行了聚合,这里就比较重要了,我们之前讲过一个spark任务里面会有多个job,job的划分依据是action,有几个action就有几个job,而每个job的划分依据是shuffle,只要发生了shuffle就会有新的stage生成,reduceBykey是个action操作,RDD中没有这个函数,是通过里面的隐式转换调用了PairRDDFunctions.scala中的reduceBykey()函数,里面的转换先不用管,因为涉及到shuffle操作,会有新的stage的生成,这里先略过:

def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)}

5.最后主函数调用了wordCount.foreach()进行了结果打印,这是一个action操作,有几个action就会提交几个job,直接去看代码:

def foreach(f: T => Unit): Unit = withScope {val cleanF = sc.clean(f)/*这里是执行了runJob,跟其他操作不一样,这里会提交一个job*/ sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF)) }

跟进代码,里面调用了SparkContext.scala中的函数:

def runJob[T, U: ClassTag](rdd: RDD[T],func: Iterator[T] => U,partitions: Seq[Int]): Array[U] = {//这里clean函数其实直接输出valcleanedFunc=clean(func)runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions) }

跟进了好几层,最后看runJob干了啥:

def runJob[T, U: ClassTag](rdd: RDD[T],func: (TaskContext, Iterator[T]) => U,partitions: Seq[Int],resultHandler: (Int, U) => Unit): Unit = {if (stopped.get()) {throw new IllegalStateException("SparkContext has been shutdown") } val callSite = getCallSite val cleanedFunc = clean(func) logInfo("Starting job: " + callSite.shortForm) if (conf.getBoolean("spark.logLineage", false)) {logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString) }//SparkContext初始化的dagScheduler调用runJob函数比较任务,这样就跟之前SparkContext源码剖析内容联系在一起了 dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get) progressBar.foreach(_.finishAll()) rdd.doCheckpoint() }

6.上面调用了DAGScheduler中的runJob函数,这个DAGScheduler是我们在SparkContext初始化的时候执行的初始化,DAGSCheduler主要工作:创建Job,推断出每一个Job的stage划分(DAG),跟踪RDD,实体化stage的输出,调度job,将stage以taskSet的形式提交给TaskScheduler的实现类,在集群上运运行,其中,TaskSet是一组可以立即运行的独立task,基于集群上已存在的数据,直接看下代码:

def runJob[T, U](rdd: RDD[T],func: (TaskContext, Iterator[T]) => U,partitions: Seq[Int],callSite: CallSite,resultHandler: (Int, U) => Unit,properties: Properties): Unit = {val start = System.nanoTime/* 这里就一行比较重要,这里调用submitJob进行提交 */ val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties) ThreadUtils.awaitReady(pletionFuture, Duration.Inf)//下面这些就是任务结果的一些判断了pletionFuture.value.get match {case scala.util.Success(_) => logInfo("Job %d finished: %s, took %f s".format(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))case scala.util.Failure(exception) => logInfo("Job %d failed: %s, took %f s".format(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9)) // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler. val callerStackTrace = Thread.currentThread().getStackTrace.tail exception.setStackTrace(exception.getStackTrace ++ callerStackTrace) throw exception } }

下面就是调用了submitJob进行任务的提交,代码如下:

def submitJob[T, U](rdd: RDD[T],func: (TaskContext, Iterator[T]) => U,partitions: Seq[Int],callSite: CallSite,resultHandler: (Int, U) => Unit,properties: Properties): JobWaiter[U] = {//这里确认我们提交的Partition存在 val maxPartitions = rdd.partitions.length partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>throw new IllegalArgumentException( "Attempting to access a non-existent partition: " + p + ". " +"Total number of partitions: " + maxPartitions) } val jobId = nextJobId.getAndIncrement() if (partitions.size == 0) {// Return immediately if the job is running 0 tasksreturn new JobWaiter[U](this, jobId, 0, resultHandler) } assert(partitions.size > 0) val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)//这里会触发DAGSchedulerEventProcessLoop的JobSubmitted,他里面onReceive()函数//接收消息进行处理,这里调用的是JobSubmitted,触发dagScheduler.handleJobSubmitted//函数进行处理 eventProcessLoop.post(JobSubmitted(jobId, rdd, func2, partitions.toArray, callSite, waiter,SerializationUtils.clone(properties))) waiter }

下面就是调用handleJobSubmitted()函数进行处理,它是DAGSchduler的job调度核心入口,代码如下:

private[scheduler] def handleJobSubmitted(jobId: Int,finalRDD: RDD[_],func: (TaskContext, Iterator[_]) => _,partitions: Array[Int],callSite: CallSite,listener: JobListener,properties: Properties) {//var finalStage: ResultStage = nulltry{//使用触发job的最后一个rdd,创建stage//当hdfs上的文件被删除的时候stage可能创建失败finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) } catch {case e: Exception => logWarning("Creating new stage failed due to exception - job: " + jobId, e) listener.jobFailed(e) return }//通过finalStage创创建一个job, val job = new ActiveJob(jobId, finalStage, callSite, listener, properties) clearCacheLocs() logInfo("Got job %s (%s) with %d output partitions".format(job.jobId, callSite.shortForm, partitions.length)) logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")") logInfo("Parents of final stage: " + finalStage.parents) logInfo("Missing parents: " + getMissingParentStages(finalStage)) val jobSubmissionTime = clock.getTimeMillis()//将job加入到activeJob缓存中 jobIdToActiveJob(jobId) = job activeJobs += job finalStage.setActiveJob(job) val stageIds = jobIdToStageIds(jobId).toArray val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) listenerBus.post(SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties)) //提交finalStage,但是finalStage肯定不会首先执行,它要先执行它的依赖stage submitStage(finalStage) }

7.最后调用了submitStage进行了finalStage的提交,finalStage肯定不会首先执行,它要先执行它的依赖stage,这里面就涉及到了stage的换分了,代码如下:

/** Submits stage, but first recursively submits any missing parents. */ private def submitStage(stage: Stage) {val jobId = activeJobForStage(stage) if (jobId.isDefined) {logDebug("submitStage(" + stage + ")")if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {//获取stage对应的父stage,返回List[Stage]按id排序 val missing = getMissingParentStages(stage).sortBy(_.id) logDebug("missing: " + missing)//如果父stage为空,则调用submitMissingTasks提交stage, if (missing.isEmpty) {logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")submitMissingTasks(stage, jobId.get) } else {for (parent //如果父stage不为空,则调用submitStage提交父stage submitStage(parent)}//并将stage放入等待的队列中,先去执行父stagewaitingStages += stage }} } else {abortStage(stage, "No active job for stage " + stage.id, None) } }

我们看下getMissingParentStages()函数,如何进行stage划分的,代码如下:

//大体划分流程:遍历rdd的所有的依赖,如果是ShufDep,则通过getShuffleMapStage获取stage, // 并加入到missing队列中。如果是窄依赖的话,将放入waitingForVisit的栈中。 private def getMissingParentStages(stage: Stage): List[Stage] = {val missing = new HashSet[Stage] val visited = new HashSet[RDD[_]] // We are manually maintaining a stack here to prevent StackOverflowError // caused by recursively visiting val waitingForVisit = new Stack[RDD[_]] def visit(rdd: RDD[_]) {if (!visited(rdd)) {visited += rdd val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil) if (rddHasUncachedPartitions) {for (dep dep match {//如果shufDep也就是我们说的宽依赖 case shufDep: ShuffleDependency[_, _, _] =>//宽依赖,则创建一个shuffleStage,即finalStage之前的stage是shufflestageval mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)if (!mapStage.isAvailable) {//加入到missing队列,返回 missing += mapStage}//如果narrowDep也就是我们说的窄依赖 case narrowDep: NarrowDependency[_] =>//加入等待队列中waitingForVisit.push(narrowDep.rdd) }} }} } waitingForVisit.push(stage.rdd) while (waitingForVisit.nonEmpty) {// 如果是窄依赖,将rdd放入栈中visit(waitingForVisit.pop()) } missing.toList }

submitStage()函数中如果父stage为空则,调用submitMissingTasks()函数进行提交,这个函数主要做了一下几件事:

a.首先获取stage中没有计算的partition;

b.通过 taskIdToLocations(id) 方法进行tasks运行最佳位置的确定;

c.调用taskScheduler的submitTasks进行任务的提交。

至此,Spark任务Job触发流程源码深度剖析的第一部分讲解完毕,后面会写一遍文章专门讲解submitMissingTasks()函数中task最佳位置的定位、task的提交具体流程,请继续关注。

如果觉得我的文章能帮到您,请关注微信公众号“大数据开发运维架构”,并转发朋友圈,谢谢支持!

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。