本文共 1278 字,大约阅读时间需要 4 分钟。
本节书摘来自华章社区《深入理解Spark:核心思想与源码分析》一书中的第3章,第3.13节创建DAGSchedulerSource和BlockManagerSource,作者耿嘉安,更多章节内容可以访问云栖社区“华章社区”公众号查看
3.13 创建DAGSchedulerSource和BlockManagerSource
在创建DAGSchedulerSource、BlockManagerSource之前首先调用taskScheduler的post-StartHook方法,其目的是为了等待backend就绪,见代码清单3-53。postStartHook的实现见代码清单3-54。创建DAGSchedulerSource和BlockManagerSource的过程类似于ExecutorSource,只不过DAGSchedulerSource测量的信息是stage. failedStages、stage. runningStages、stage. waiting-Stages、stage. allJobs、stage. activeJobs,BlockManagerSource测量的信息是memory. maxMem_MB、memory. remainingMem_MB、memory. memUsed_MB、memory. diskSpace-Used_MB。代码清单3-53 创建DAGSchedulerSource和BlockManagerSourcetaskScheduler.postStartHook() private val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler) private val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager)private def initDriverMetrics() { SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource) SparkEnv.get.metricsSystem.registerSource(blockManagerSource)}initDriverMetrics()代码清单3-54 postStartHook的实现override def postStartHook() { waitBackendReady() }private def waitBackendReady(): Unit = { if (backend.isReady) { return } while (!backend.isReady) { synchronized { this.wait(100) } }}
转载地址:http://vueyl.baihongyu.com/