Spark 消息队列机制源码学习

2022-06-13 11:16:19

源码学习

spark源码注释中有下面一句话:

Asynchronously passes SparkListenerEvents to registered SparkListeners

即所有spark消息SparkListenerEvents 被异步的发送给已经注册过的SparkListeners.
在SparkContext中, 首先会创建LiveListenerBus实例,这个类主要功能如下:

  • 保存有消息队列,负责消息的缓存
  • 保存有注册过的listener,负责消息的分发

该类的继承层次如下所示:
这里写图片描述

listener链表保存在ListenerBus类中,为了保证并发访问的安全性,此处采用Java的CopyOnWriteArrayList类来存储listener. 当需要对listener链表进行更改时,CopyOnWriteArrayList的特性使得会先复制整个链表,然后在复制的链表上面进行修改.当一旦获得链表的迭代器,在迭代器的生命周期中,可以保证数据的一致性.

private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {// Marked `private[spark]` for access in tests.private[spark] val listeners =new CopyOnWriteArrayList[L]/**
   * Add a listener to listen events. This method is thread-safe and can be called in any thread.
   */final def addListener(listener: L) {
    listeners.add(listener)
  }
  ...

消息队列实际上是保存在类AsynchronousListenerBus中的:

  private val EVENT_QUEUE_CAPACITY = 10000
  private val eventQueue = new LinkedBlockingQueue[E](EVENT_QUEUE_CAPACITY)

事件队列的长度为10000,当缓存事件数量达到上限后,新来的事件会被丢弃,具体的丢弃处理函数位于LiveListenerBus类中:

private[spark]classLiveListenerBusextendsAsynchronousListenerBus[SparkListener,SparkListenerEvent]("SparkListenerBus")withSparkListenerBus {private val logDroppedEvent =new AtomicBoolean(false)

  override def onDropEvent(event: SparkListenerEvent): Unit = {if (logDroppedEvent.compareAndSet(false,true)) {// Only log the following message once to avoid duplicated annoying logs.
      logError("Dropping SparkListenerEvent because no remaining room in event queue. " +"This likely means one of the SparkListeners is too slow and cannot keep up with " +"the rate at which tasks are being started by the scheduler.")
    }
  }

}

通过上面代码可以看到, 处理方式输出错误日志,且通过使用变量logDroppedEvent来保证仅输出一次.
继续把目光放在类AsynchronousListenerBus上,该类是消息机制的核心.既然是消息队列,就涉及到消息的生产和消费.首先来看消息的消费方式,AsynchronousListenerBus类会创建一个消费者线程listenerThread,来从消息队列中取得消息并进行分发,下面是实现代码:

private val listenerThread =new Thread(name) {
    setDaemon(true)override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) {while (true) {
        eventLock.acquire()
        self.synchronized {
          processingEvent =true
        }try {
          valevent = eventQueue.pollif (event ==null) {// Get out of the while loop and shutdown the daemon threadif (!stopped.get) {thrownew IllegalStateException("Polling `null` from eventQueue means" +" the listener bus has been stopped. So `stopped` must be true")
            }return
          }
          postToAll(event)
        }finally {
          self.synchronized {
            processingEvent =false
          }
        }
      }
    }
  }

整个思想就是典型的生产者消费者思想.为了保证生产者和消费者对消息队列的并发访问,在每次需要获取消息的时候,调用eventLock.acquire()来获取信号量, 信号量的值就是当前队列中所含有的事件数量.如果正常获取到事件,就调用postToAll将事件分发给所有listener, 继续下一次循环. 如果获取到null值, 则有下面两种情况:

  1. 整个application正常结束, 此时stopped值已经被设置为true
  2. 系统发生了错误, 立即终止运行

下面来看看生产者,代码如下:

  def post(event: E) {if (stopped.get) {// Drop further events to make `listenerThread` exit ASAP
      logError(s"$name has already stopped! Dropping event $event")return
    }
    val eventAdded = eventQueue.offer(event)if (eventAdded) {
      eventLock.release()
    }else {
      onDropEvent(event)
    }
  }

该函数用来将事件放入到消息队列中,每成功放入一个事件,就调用eventLock.release()来增加信号量额值.以供消费者线程来进行消费. 如果队列满了,就调用onDropEvent来处理, 该函数已经在上面列出,此处不再赘述.

真正的消息路由是由SparkListenerBus的onPostEvent函数完成的:

override def onPostEvent(listener: SparkListener, event: SparkListenerEvent): Unit = {
    event match {case stageSubmitted: SparkListenerStageSubmitted =>
        listener.onStageSubmitted(stageSubmitted)case stageCompleted: SparkListenerStageCompleted =>
        listener.onStageCompleted(stageCompleted)case jobStart: SparkListenerJobStart =>
        listener.onJobStart(jobStart)case jobEnd: SparkListenerJobEnd =>
        listener.onJobEnd(jobEnd)case taskStart: SparkListenerTaskStart =>
        listener.onTaskStart(taskStart)case taskGettingResult: SparkListenerTaskGettingResult =>
        listener.onTaskGettingResult(taskGettingResult)case taskEnd: SparkListenerTaskEnd =>
        listener.onTaskEnd(taskEnd)case environmentUpdate: SparkListenerEnvironmentUpdate =>
        listener.onEnvironmentUpdate(environmentUpdate)case blockManagerAdded: SparkListenerBlockManagerAdded =>
        listener.onBlockManagerAdded(blockManagerAdded)case blockManagerRemoved: SparkListenerBlockManagerRemoved =>
        listener.onBlockManagerRemoved(blockManagerRemoved)case unpersistRDD: SparkListenerUnpersistRDD =>
        listener.onUnpersistRDD(unpersistRDD)case applicationStart: SparkListenerApplicationStart =>
        listener.onApplicationStart(applicationStart)case applicationEnd: SparkListenerApplicationEnd =>
        listener.onApplicationEnd(applicationEnd)case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
        listener.onExecutorMetricsUpdate(metricsUpdate)case executorAdded: SparkListenerExecutorAdded =>
        listener.onExecutorAdded(executorAdded)case executorRemoved: SparkListenerExecutorRemoved =>
        listener.onExecutorRemoved(executorRemoved)case blockUpdated: SparkListenerBlockUpdated =>
        listener.onBlockUpdated(blockUpdated)case logStart: SparkListenerLogStart =>// ignore event log metadata
    }
  }

上面的代码很直观,根据不同的消息类型,调用listener对应的方法来进行处理.
下面来看看listener的定义, 所有的listener都混入了SparkListener特质.该特质定义了针对所有消息的处理函数, 定义全部为空:

@DeveloperApitraitSparkListener {/**
   * Called when a stage completes successfully or fails, with information on the completed stage.
   */def onStageCompleted(stageCompleted: SparkListenerStageCompleted) { }/**
   * Called when a stage is submitted
   */def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) { }

  ......

对于特定的listener,在混入SparkListener特质之后,只需要重写相应的处理函数即可. 为了方便在进行消息路由时进行模式匹配,所有的具体的listener类均被定义为样本类.

对于spark中的事件来说,思想与listener类似,只是混入的特质不同而已, 事件混入的特质为SparkListenerEvent.


消息队列建立及发送流程介绍:

具体的消息发送流程如下所示:
消息发送流程
在SparkContext中,会

  1. 创建LiveListenerBus类类型的成员变量listenerBus
  2. 创建各种listener,并加入到listenerBus中
  3. post一些事件到listenerBus中
  4. 调用listenerBus.start() 来启动事件处理程序

这里有一点需要注意的是, 在listenerBus.start() 调用之前, 可以向其中post消息, 这些消息会被缓存起来,等start函数调用之后, 消费者线程会分发这些缓存的消息. listenerBus.start()是在SparkContext中的setupAndStartListenerBus函数中被调用的, 下面来看看该函数的实现:

privatedef setupAndStartListenerBus(): Unit = {// Use reflection to instantiate listeners specified via `spark.extraListeners`try {val listenerClassNames: Seq[String] =
        conf.get("spark.extraListeners","").split(',').map(_.trim).filter(_ !="")for (className <- listenerClassNames) {// Use reflection to find the right constructorval constructors = {val listenerClass = Utils.classForName(className)
          listenerClass.getConstructors.asInstanceOf[Array[Constructor[_ <: SparkListener]]]
        }val constructorTakingSparkConf = constructors.find { c =>
          c.getParameterTypes.sameElements(Array(classOf[SparkConf]))
        }lazyval zeroArgumentConstructor = constructors.find { c =>
          c.getParameterTypes.isEmpty
        }val listener: SparkListener = {if (constructorTakingSparkConf.isDefined) {
            constructorTakingSparkConf.get.newInstance(conf)
          }elseif (zeroArgumentConstructor.isDefined) {
            zeroArgumentConstructor.get.newInstance()
          }else {thrownew SparkException(
              s"$className did not have a zero-argument constructor or a" +" single-argument constructor that accepts SparkConf. Note: if the class is" +" defined inside of another Scala class, then its constructors may accept an" +" implicit parameter that references the enclosing class; in this case, you must" +" define the listener as a top-level class in order to prevent this extra" +" parameter from breaking Spark's ability to find a valid constructor.")
          }
        }
        listenerBus.addListener(listener)
        logInfo(s"Registered listener $className")
      }
    }catch {case e: Exception =>try {
          stop()
        }finally {thrownew SparkException(s"Exception when registering SparkListener", e)
        }
    }

    listenerBus.start(this)
    _listenerBusStarted =true
  }

这段代码首先运用反射机制来处理spark.extraListeners设置, 在spark doc中有关于该设置的解释:

A comma-separated list of classes that implement SparkListener; when initializing SparkContext, instances of these classes will be created and registered with Spark’s listener bus. If a class has a single-argument constructor that accepts a SparkConf, that constructor will be called; otherwise, a zero-argument constructor will be called. If no valid constructor can be found, the SparkContext creation will fail with an exception.

大意为:该设置制定的listener会在初始化SparkContext时被创建注册,然而对于listener的构造函数时有要求的:

  • 有一个单参数构造函数且参数为SparkConf类型, 则该构造函数被调用
  • 否则,如果有无参构造函数, 则将被调用
  • 如果没有定义构造函数,则程序异常结束

当extraListeners被构造并注册之后, listenerBus.start被调用:

def start(sc: SparkContext) {if (started.compareAndSet(false,true)) {
      sparkContext = sc
      listenerThread.start()
    }else {thrownew IllegalStateException(s"$name already started!")
    }
  }

与此同时,启动消费者线程listenerThread, 开始进行消息路由.

当程序运行结束后,会调用stop函数:

  def stop() {if (!started.get()) {thrownew IllegalStateException(s"Attempted to stop $name that has not yet started!")
    }if (stopped.compareAndSet(false,true)) {// Call eventLock.release() so that listenerThread will poll `null` from `eventQueue`and know// `stop`is called.
      eventLock.release()
      listenerThread.join()
    }else {// Keep quiet
    }
  }

这里可以看到,在stop函数中调用了eventLock.release()来增加信号量的值. 然而并未向消息队列中加入新的消息,这就导致在消费者线程listenerThread读取队列时会返回null值,进而达到结束listenerThread线程的目的.

  • 作者:sivolin
  • 原文链接:https://blog.csdn.net/sivolin/article/details/47316099
    更新时间:2022-06-13 11:16:19