LiveListenerBus¶
LiveListenerBus
is an event bus to dispatch Spark events to registered SparkListeners.
LiveListenerBus
is a single-JVM SparkListenerBus that uses listenerThread to poll events.
Note
The event queue is java.util.concurrent.LinkedBlockingQueue with capacity of 10000 SparkListenerEvent
events.
Creating Instance¶
LiveListenerBus
takes the following to be created:
LiveListenerBus
is created (and started) when SparkContext
is requested to initialize.
Event Queues¶
queues: CopyOnWriteArrayList[AsyncEventQueue]
LiveListenerBus
manages AsyncEventQueues.
queues
is initialized empty when LiveListenerBus
is created.
queues
is used when:
- Registering Listener with Queue
- Posting Event to All Queues
- Deregistering Listener
- Starting LiveListenerBus
LiveListenerBusMetrics¶
metrics: LiveListenerBusMetrics
LiveListenerBus
creates a LiveListenerBusMetrics
when created.
metrics
is registered (with a MetricsSystem) when LiveListenerBus
is started.
metrics
is used to:
- Increment events posted every event posting
- Create a AsyncEventQueue when adding a listener to a queue
Starting LiveListenerBus¶
start(
sc: SparkContext,
metricsSystem: MetricsSystem): Unit
start
starts AsyncEventQueues (from the queues internal registry).
In the end, start
requests the given MetricsSystem to register the LiveListenerBusMetrics.
start
is used when:
SparkContext
is created
Posting Event to All Queues¶
post(
event: SparkListenerEvent): Unit
post
puts the input event
onto the internal eventQueue
queue and releases the internal eventLock
semaphore. If the event placement was not successful (and it could happen since it is tapped at 10000 events) onDropEvent method is called.
The event publishing is only possible when stopped
flag has been enabled.
post
is used when...FIXME
postToQueues¶
postToQueues(
event: SparkListenerEvent): Unit
postToQueues
...FIXME
Event Dropped Callback¶
onDropEvent(
event: SparkListenerEvent): Unit
onDropEvent
is called when no further events can be added to the internal eventQueue
queue (while posting a SparkListenerEvent event).
It simply prints out the following ERROR message to the logs and ensures that it happens only once.
Dropping SparkListenerEvent because no remaining room in event queue. This likely means one of the SparkListeners is too slow and cannot keep up with the rate at which tasks are being started by the scheduler.
Stopping LiveListenerBus¶
stop(): Unit
stop
releases the internal eventLock
semaphore and waits until listenerThread dies. It can only happen after all events were posted (and polling eventQueue
gives nothing).
stopped
flag is enabled.
listenerThread for Event Polling¶
LiveListenerBus
uses a SparkListenerBus single-daemon thread that ensures that the polling events from the event queue is only after the listener was started and only one event at a time.
Registering Listener with Status Queue¶
addToStatusQueue(
listener: SparkListenerInterface): Unit
addToStatusQueue
adds the given SparkListenerInterface to appStatus queue.
addToStatusQueue
is used when:
BarrierCoordinator
is requested toonStart
SparkContext
is createdHiveThriftServer2
utility is used tocreateListenerAndUI
SharedState
(Spark SQL) is requested to create a SQLAppStatusStore
addToSharedQueue(
listener: SparkListenerInterface): Unit
addToSharedQueue
adds the given SparkListenerInterface to shared queue.
addToSharedQueue
is used when:
SparkContext
is requested to register a SparkListener and register extra SparkListenersExecutionListenerBus
(Spark Structured Streaming) is created
Registering Listener with executorManagement Queue¶
addToManagementQueue(
listener: SparkListenerInterface): Unit
addToManagementQueue
adds the given SparkListenerInterface to executorManagement queue.
addToManagementQueue
is used when:
Registering Listener with eventLog Queue¶
addToEventLogQueue(
listener: SparkListenerInterface): Unit
addToEventLogQueue
adds the given SparkListenerInterface to eventLog queue.
addToEventLogQueue
is used when:
SparkContext
is created (with event logging enabled)
Registering Listener with Queue¶
addToQueue(
listener: SparkListenerInterface,
queue: String): Unit
addToQueue
finds the queue in the queues internal registry.
If found, addToQueue
requests it to add the given listener
If not found, addToQueue
creates a AsyncEventQueue (with the given name, the LiveListenerBusMetrics, and this LiveListenerBus
) and requests it to add the given listener. The AsyncEventQueue
is started and added to the queues internal registry.
addToQueue
is used when:
LiveListenerBus
is requested to addToSharedQueue, addToManagementQueue, addToStatusQueue, addToEventLogQueueStreamingQueryListenerBus
(Spark Structured Streaming) is created
Deregistering Listener¶
removeListener(
listener: SparkListenerInterface): Unit
removeListener
...FIXME
removeListener
is used when:
BarrierCoordinator
is requested toonStop
SparkContext
is requested to deregister a SparkListenerAsyncEventQueue
is requested to deregister a listener on error