如何在 Python 中从 pySpark 添加 SparkListener? [英] How to add a SparkListener from pySpark in Python?
问题描述
我想创建一个 Jupyter/IPython 扩展来监控 Apache Spark 作业.
I want to create a Jupyter/IPython extension to monitor Apache Spark Jobs.
Spark 提供了一个 REST API.
Spark provides a REST API.
但是,我希望通过回调发送事件更新,而不是轮询服务器.
However instead of polling the server, I want the event updates to be sent through callbacks.
我正在尝试注册一个 SparkListener
与 SparkContext.addSparkListener()
.此功能在 Python 中的 PySpark SparkContext
对象中不可用.那么如何从 Python 向上下文的 Scala/Java 版本注册一个 Python 侦听器.是否可以通过 py4j
做到这一点?我希望在侦听器中触发事件时调用 python 函数.
I am trying to register a SparkListener
with the SparkContext.addSparkListener()
. This feature is not available in the PySpark SparkContext
object in Python. So how can I register a python listener to Scala/Java version of the context from Python. Is it possible to do this through py4j
? I want python functions to be called when the events fire in the listener.
推荐答案
虽然有点复杂,但还是可以的.我们可以使用 Py4j 回调机制来传递消息来自 SparkListener
.首先让我们创建一个包含所有必需类的 Scala 包.目录结构:
It is possible although it is a bit involved. We can use Py4j callback mechanism to pass message from a SparkListener
. First lets create a Scala package with all required classes. Directory structure:
.
├── build.sbt
└── src
└── main
└── scala
└── net
└── zero323
└── spark
└── examples
└── listener
├── Listener.scala
├── Manager.scala
└── TaskListener.scala
build.sbt
:
name := "listener"
organization := "net.zero323"
scalaVersion := "2.11.7"
val sparkVersion = "2.1.0"
libraryDependencies ++= List(
"org.apache.spark" %% "spark-core" % sparkVersion,
"net.sf.py4j" % "py4j" % "0.10.4" // Just for the record
)
Listener.scala
定义了一个我们稍后要实现的 Python 接口
Listener.scala
defines a Python interface we are going to implement later
package net.zero323.spark.examples.listener
/* You can add arbitrary methods here,
* as long as these match corresponding Python interface
*/
trait Listener {
/* This will be implemented by a Python class.
* You can of course use more specific types,
* for example here String => Unit */
def notify(x: Any): Any
}
Manager.scala
将用于将消息转发到 Python 侦听器:
Manager.scala
will be used to forward messages to Python listener:
package net.zero323.spark.examples.listener
object Manager {
var listeners: Map[String, Listener] = Map()
def register(listener: Listener): String = {
this.synchronized {
val uuid = java.util.UUID.randomUUID().toString
listeners = listeners + (uuid -> listener)
uuid
}
}
def unregister(uuid: String) = {
this.synchronized {
listeners = listeners - uuid
}
}
def notifyAll(message: String): Unit = {
for { (_, listener) <- listeners } listener.notify(message)
}
}
最后一个简单的SparkListener
:
package net.zero323.spark.examples.listener
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
/* A simple listener which captures SparkListenerTaskEnd,
* extracts numbers of records written by the task
* and converts to JSON. You can of course add handlers
* for other events as well.
*/
class PythonNotifyListener extends SparkListener {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val recordsWritten = taskEnd.taskMetrics.outputMetrics.recordsWritten
val message = compact(render(
("recordsWritten" -> recordsWritten)
))
Manager.notifyAll(message)
}
}
让我们打包我们的扩展:
Lets' package our extension:
sbt package
并启动 PySpark 会话,将生成的 jar
添加到类路径并注册侦听器:
and start PySpark session adding a generated jar
to the class path and registering listener:
$SPARK_HOME/bin/pyspark \
--driver-class-path target/scala-2.11/listener_2.11-0.1-SNAPSHOT.jar \
--conf spark.extraListeners=net.zero323.spark.examples.listener.PythonNotifyListener
接下来我们必须定义一个实现Listener
接口的Python对象:
Next we have to define a Python object which implements Listener
interface:
class PythonListener(object):
package = "net.zero323.spark.examples.listener"
@staticmethod
def get_manager():
jvm = SparkContext.getOrCreate()._jvm
manager = getattr(jvm, "{}.{}".format(PythonListener.package, "Manager"))
return manager
def __init__(self):
self.uuid = None
def notify(self, obj):
"""This method is required by Scala Listener interface
we defined above.
"""
print(obj)
def register(self):
manager = PythonListener.get_manager()
self.uuid = manager.register(self)
return self.uuid
def unregister(self):
manager = PythonListener.get_manager()
manager.unregister(self.uuid)
self.uuid = None
class Java:
implements = ["net.zero323.spark.examples.listener.Listener"]
启动回调服务器:
sc._gateway.start_callback_server()
创建和注册监听器:
listener = PythonListener()
注册:
listener.register()
并测试:
>>> sc.parallelize(range(100), 3).saveAsTextFile("/tmp/listener_test")
{"recordsWritten":33}
{"recordsWritten":34}
{"recordsWritten":33}
退出时你应该关闭回调服务器:
On exit you should shutdown the callback server:
sc._gateway.shutdown_callback_server()
注意:
在使用内部使用回调服务器的 Spark 流时应谨慎使用.
This should be used with caution when working with Spark streaming, which internally uses callback server.
编辑:
如果这太麻烦你可以定义org.apache.spark.scheduler.SparkListenerInterface
:
If this is to much hassle you could just define org.apache.spark.scheduler.SparkListenerInterface
:
class SparkListener(object):
def onApplicationEnd(self, applicationEnd):
pass
def onApplicationStart(self, applicationStart):
pass
def onBlockManagerRemoved(self, blockManagerRemoved):
pass
def onBlockUpdated(self, blockUpdated):
pass
def onEnvironmentUpdate(self, environmentUpdate):
pass
def onExecutorAdded(self, executorAdded):
pass
def onExecutorMetricsUpdate(self, executorMetricsUpdate):
pass
def onExecutorRemoved(self, executorRemoved):
pass
def onJobEnd(self, jobEnd):
pass
def onJobStart(self, jobStart):
pass
def onOtherEvent(self, event):
pass
def onStageCompleted(self, stageCompleted):
pass
def onStageSubmitted(self, stageSubmitted):
pass
def onTaskEnd(self, taskEnd):
pass
def onTaskGettingResult(self, taskGettingResult):
pass
def onTaskStart(self, taskStart):
pass
def onUnpersistRDD(self, unpersistRDD):
pass
class Java:
implements = ["org.apache.spark.scheduler.SparkListenerInterface"]
扩展它:
class TaskEndListener(SparkListener):
def onTaskEnd(self, taskEnd):
print(taskEnd.toString())
并直接使用:
>>> sc._gateway.start_callback_server()
True
>>> listener = TaskEndListener()
>>> sc._jsc.sc().addSparkListener(listener)
>>> sc.parallelize(range(100), 3).saveAsTextFile("/tmp/listener_test_simple")
SparkListenerTaskEnd(0,0,ResultTask,Success,org.apache.spark.scheduler.TaskInfo@9e7514a,org.apache.spark.executor.TaskMetrics@51b8ba92)
SparkListenerTaskEnd(0,0,ResultTask,Success,org.apache.spark.scheduler.TaskInfo@71278a44,org.apache.spark.executor.TaskMetrics@bdc06d)
SparkListenerTaskEnd(0,0,ResultTask,Success,org.apache.spark.scheduler.TaskInfo@336)
虽然更简单,但这种方法不是选择性的(JVM 和 Python 之间的流量更多)需要在 Python 会话中处理 Java 对象.
While simpler, this method is not selective (more traffic between JVM and Python) requires handling Java objects inside Python session.
这篇关于如何在 Python 中从 pySpark 添加 SparkListener?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!