如何在 Python 中从 pySpark 添加 SparkListener? [英] How to add a SparkListener from pySpark in Python?

查看:25
本文介绍了如何在 Python 中从 pySpark 添加 SparkListener?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想创建一个 Jupyter/IPython 扩展来监控 Apache Spark 作业.

I want to create a Jupyter/IPython extension to monitor Apache Spark Jobs.

Spark 提供了一个 REST API.

Spark provides a REST API.

但是,我希望通过回调发送事件更新,而不是轮询服务器.

However instead of polling the server, I want the event updates to be sent through callbacks.

我正在尝试注册一个 SparkListenerSparkContext.addSparkListener().此功能在 Python 中的 PySpark SparkContext 对象中不可用.那么如何从 Python 向上下文的 Scala/Java 版本注册一个 Python 侦听器.是否可以通过 py4j 做到这一点?我希望在侦听器中触发事件时调用 python 函数.

I am trying to register a SparkListener with the SparkContext.addSparkListener(). This feature is not available in the PySpark SparkContext object in Python. So how can I register a python listener to Scala/Java version of the context from Python. Is it possible to do this through py4j? I want python functions to be called when the events fire in the listener.

推荐答案

虽然有点复杂,但还是可以的.我们可以使用 Py4j 回调机制来传递消息来自 SparkListener.首先让我们创建一个包含所有必需类的 Scala 包.目录结构:

It is possible although it is a bit involved. We can use Py4j callback mechanism to pass message from a SparkListener. First lets create a Scala package with all required classes. Directory structure:

.
├── build.sbt
└── src
    └── main
        └── scala
            └── net
                └── zero323
                    └── spark
                        └── examples
                            └── listener
                                ├── Listener.scala
                                ├── Manager.scala
                                └── TaskListener.scala

build.sbt:

name := "listener"

organization := "net.zero323"

scalaVersion := "2.11.7"

val sparkVersion = "2.1.0"

libraryDependencies ++= List(
  "org.apache.spark" %% "spark-core" % sparkVersion,
  "net.sf.py4j" % "py4j" % "0.10.4"  // Just for the record
)

Listener.scala 定义了一个我们稍后要实现的 Python 接口

Listener.scala defines a Python interface we are going to implement later

package net.zero323.spark.examples.listener

/* You can add arbitrary methods here, 
 * as long as these match corresponding Python interface 
 */
trait Listener {
  /* This will be implemented by a Python class.
   * You can of course use more specific types, 
   * for example here String => Unit */
  def notify(x: Any): Any
}

Manager.scala 将用于将消息转发到 Python 侦听器:

Manager.scala will be used to forward messages to Python listener:

package net.zero323.spark.examples.listener

object Manager {
  var listeners: Map[String, Listener] = Map()

  def register(listener: Listener): String = {
    this.synchronized {
      val uuid = java.util.UUID.randomUUID().toString
      listeners = listeners + (uuid -> listener)
      uuid
    }
  }

  def unregister(uuid: String) = {
    this.synchronized {
      listeners = listeners - uuid
    }
  }

  def notifyAll(message: String): Unit = {
    for { (_, listener) <- listeners } listener.notify(message)
  }

}

最后一个简单的SparkListener:

package net.zero323.spark.examples.listener

import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._

/* A simple listener which captures SparkListenerTaskEnd,
 * extracts numbers of records written by the task
 * and converts to JSON. You can of course add handlers 
 * for other events as well.
 */
class PythonNotifyListener extends SparkListener { 
  override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
    val recordsWritten = taskEnd.taskMetrics.outputMetrics.recordsWritten
    val message = compact(render(
      ("recordsWritten" ->  recordsWritten)
    ))
    Manager.notifyAll(message)
  }
}

让我们打包我们的扩展:

Lets' package our extension:

sbt package

并启动 PySpark 会话,将生成的 jar 添加到类路径并注册侦听器:

and start PySpark session adding a generated jar to the class path and registering listener:

 $SPARK_HOME/bin/pyspark \
   --driver-class-path target/scala-2.11/listener_2.11-0.1-SNAPSHOT.jar \
   --conf spark.extraListeners=net.zero323.spark.examples.listener.PythonNotifyListener

接下来我们必须定义一个实现Listener接口的Python对象:

Next we have to define a Python object which implements Listener interface:

class PythonListener(object):
    package = "net.zero323.spark.examples.listener"

    @staticmethod
    def get_manager():
        jvm = SparkContext.getOrCreate()._jvm
        manager = getattr(jvm, "{}.{}".format(PythonListener.package, "Manager"))
        return manager

    def __init__(self):
        self.uuid = None

    def notify(self, obj):
        """This method is required by Scala Listener interface
        we defined above.
        """
        print(obj)

    def register(self):
        manager = PythonListener.get_manager()
        self.uuid = manager.register(self)
        return self.uuid

    def unregister(self):
        manager =  PythonListener.get_manager()
        manager.unregister(self.uuid)
        self.uuid = None

    class Java:
        implements = ["net.zero323.spark.examples.listener.Listener"]

启动回调服务器:

sc._gateway.start_callback_server()

创建和注册监听器:

listener = PythonListener()

注册:

listener.register()

并测试:

>>> sc.parallelize(range(100), 3).saveAsTextFile("/tmp/listener_test")
{"recordsWritten":33}
{"recordsWritten":34}
{"recordsWritten":33}

退出时你应该关闭回调服务器:

On exit you should shutdown the callback server:

sc._gateway.shutdown_callback_server()

注意:

在使用内部使用回调服务器的 Spark 流时应谨慎使用.

This should be used with caution when working with Spark streaming, which internally uses callback server.

编辑:

如果这太麻烦你可以定义org.apache.spark.scheduler.SparkListenerInterface:

If this is to much hassle you could just define org.apache.spark.scheduler.SparkListenerInterface:

class SparkListener(object):
    def onApplicationEnd(self, applicationEnd):
        pass
    def onApplicationStart(self, applicationStart):
        pass
    def onBlockManagerRemoved(self, blockManagerRemoved):
        pass
    def onBlockUpdated(self, blockUpdated):
        pass
    def onEnvironmentUpdate(self, environmentUpdate):
        pass
    def onExecutorAdded(self, executorAdded):
        pass
    def onExecutorMetricsUpdate(self, executorMetricsUpdate):
        pass
    def onExecutorRemoved(self, executorRemoved):
        pass
    def onJobEnd(self, jobEnd):
        pass
    def onJobStart(self, jobStart):
        pass
    def onOtherEvent(self, event):
        pass
    def onStageCompleted(self, stageCompleted):
        pass
    def onStageSubmitted(self, stageSubmitted):
        pass
    def onTaskEnd(self, taskEnd):
        pass
    def onTaskGettingResult(self, taskGettingResult):
        pass
    def onTaskStart(self, taskStart):
        pass
    def onUnpersistRDD(self, unpersistRDD):
        pass
    class Java:
        implements = ["org.apache.spark.scheduler.SparkListenerInterface"]

扩展它:

class TaskEndListener(SparkListener):
    def onTaskEnd(self, taskEnd):
        print(taskEnd.toString())

并直接使用:

>>> sc._gateway.start_callback_server()
True
>>> listener = TaskEndListener()
>>> sc._jsc.sc().addSparkListener(listener)
>>> sc.parallelize(range(100), 3).saveAsTextFile("/tmp/listener_test_simple")
SparkListenerTaskEnd(0,0,ResultTask,Success,org.apache.spark.scheduler.TaskInfo@9e7514a,org.apache.spark.executor.TaskMetrics@51b8ba92)
SparkListenerTaskEnd(0,0,ResultTask,Success,org.apache.spark.scheduler.TaskInfo@71278a44,org.apache.spark.executor.TaskMetrics@bdc06d)
SparkListenerTaskEnd(0,0,ResultTask,Success,org.apache.spark.scheduler.TaskInfo@336)

虽然更简单,但这种方法不是选择性的(JVM 和 Python 之间的流量更多)需要在 Python 会话中处理 Java 对象.

While simpler, this method is not selective (more traffic between JVM and Python) requires handling Java objects inside Python session.

这篇关于如何在 Python 中从 pySpark 添加 SparkListener?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆