如何从 Zeppelin 中的控制台流接收器获取输出? [英] How to get the output from console streaming sink in Zeppelin?

查看:35
本文介绍了如何从 Zeppelin 中的控制台流接收器获取输出?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在努力让 console 接收器与 从 Zeppelin 运行时的 PySpark 结构化流.基本上,我没有看到任何结果打印到屏幕上,也没有看到我找到的任何日志文件.

I'm struggling to get the console sink working with PySpark Structured Streaming when run from Zeppelin. Basically, I'm not seeing any results printed to the screen, or to any logfiles I've found.

我的问题: 有没有人有将 PySpark 结构化流与接收器一起使用的工作示例,该接收器产生在 Apache Zeppelin 中可见的输出?理想情况下,它还可以使用套接字源,因为这很容易测试.

My question: Does anyone have a working example of using PySpark Structured Streaming with a sink that produces output visible in Apache Zeppelin? Ideally it would also use the socket source, as that's easy to test with.

我正在使用:

  • Ubuntu 16.04
  • spark-2.2.0-bin-hadoop2.7
  • zeppelin-0.7.3-bin-all
  • Python3

我的代码基于 structured_network_wordcount.py 示例.它在从 PySpark shell (./bin/pyspark --master local[2]) 运行时工作;我看到每批次的表.

I've based my code on the structured_network_wordcount.py example. It works when run from the PySpark shell (./bin/pyspark --master local[2]); I see tables per batch.

%pyspark
# structured streaming
from pyspark.sql.functions import *
lines = spark\
    .readStream\
    .format('socket')\
    .option('host', 'localhost')\
    .option('port', 9999)\
    .option('includeTimestamp', 'true')\
    .load()

# Split the lines into words, retaining timestamps
# split() splits each line into an array, and explode() turns the array into multiple rows
words = lines.select(
    explode(split(lines.value, ' ')).alias('word'),
    lines.timestamp
)

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, '10 seconds', '1 seconds'),
    words.word
).count().orderBy('window')

# Start running the query that prints the windowed word counts to the console
query = windowedCounts\
    .writeStream\
    .outputMode('complete')\
    .format('console')\
    .option('truncate', 'false')\
    .start()

print("Starting...")
query.awaitTermination(20)

我希望看到每个批次的结果打印输出,但我只看到 Starting...,然后是 Falsequery.awaitTermination(20).

I'd expect to see printouts of results for each batch, but instead I just see Starting..., and then False, the return value of query.awaitTermination(20).

在一个单独的终端中,我在上面运行时将一些数据输入到 nc -lk 9999 netcat 会话中.

In a separate terminal I'm entering some data into a nc -lk 9999 netcat session while the above is running.

推荐答案

控制台接收器不是基于笔记本的交互式工作流的好选择.即使在可以捕获输出的 Scala 中,它也需要在同一段落中进行 awaitTermination 调用(或等效调用),从而有效地阻止了注释.

Console sink is not a good choice for interactive notebook-based workflow. Even in Scala, where the output can be captured, it requires awaitTermination call (or equivalent) in the same paragraph, effectively blocking the note.

%spark

spark
  .readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", "9999")
  .option("includeTimestamp", "true")
  .load()
  .writeStream
  .outputMode("append")
  .format("console")
  .option("truncate", "false")
  .start()
  .awaitTermination() // Block execution, to force Zeppelin to capture the output

链式 awaitTermination 可以替换为独立调用在同一段落中也可以:

Chained awaitTermination could be replaced with standalone call in the same paragraph would work as well:

%spark

val query = df
  .writeStream
  ...
  .start()

query.awaitTermination()

没有它,Zeppelin 没有理由等待任何输出.PySpark 只是在此基础上增加了另一个问题 - 间接执行.因此,即使阻止查询在这里也无济于事.

Without it, Zeppelin has no reason to wait for any output. PySpark just adds another problem on top of that - indirect execution. Because of that, even blocking the query won't help you here.

此外,流的连续输出会导致浏览笔记时出现渲染问题和内存问题(可能可以通过 InterpreterContext 或 REST API 使用 Zeppelin 显示系统,以实现更明智的行为,其中输出被覆盖或定期清除).

Moreover continuous output from the stream can cause rendering issues and memory problems when browsing the note (it might be possible to use Zeppelin display system via InterpreterContext or REST API, to achieve a bit more sensible behavior, where the output is overwritten or periodically cleared).

使用 Zeppelin 进行测试的更好选择是 内存槽.这样你就可以在不阻塞的情况下开始查询:

A much better choice for testing with Zeppelin is memory sink. This way you can start a query without blocking:

%pyspark

query = (windowedCounts
  .writeStream
  .outputMode("complete")
  .format("memory")
  .queryName("some_name")
  .start())

并在另一段中按需查询结果:

and query the result on demand in another paragraph:

%pyspark

spark.table("some_name").show()

它可以与反应式流或类似解决方案结合使用,以提供基于间隔的更新.

It can be coupled with reactive streams or similar solution to provide interval based updates.

也可以使用带有 Py4j 回调的 StreamingQueryListenerrxonQueryProgress 事件耦合,尽管 PySpark 不支持查询侦听器,并需要一些代码,将东西粘合在一起.Scala 接口:

It is also possible to use StreamingQueryListener with Py4j callbacks to couple rx with onQueryProgress events, although query listeners are not supported in PySpark, and require a bit of code, to glue things together. Scala interface:

package com.example.spark.observer

import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._

trait PythonObserver {
  def on_next(o: Object): Unit
}

class PythonStreamingQueryListener(observer: PythonObserver) 
    extends StreamingQueryListener {
  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    observer.on_next(event)
  }
  override def onQueryStarted(event: QueryStartedEvent): Unit = {}
  override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}

构建一个 jar,调整构建定义以反映所需的 Scala 和 Spark 版本:

build a jar, adjusting build definition to reflect desired Scala and Spark version:

scalaVersion := "2.11.8"  

val sparkVersion = "2.2.0"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-sql" % sparkVersion,
  "org.apache.spark" %% "spark-streaming" % sparkVersion
)

放到Spark classpath,patch StreamingQueryManager:

put it on the Spark classpath, patch StreamingQueryManager:

%pyspark

from pyspark.sql.streaming import StreamingQueryManager
from pyspark import SparkContext

def addListener(self, listener):
    jvm = SparkContext._active_spark_context._jvm
    jlistener = jvm.com.example.spark.observer.PythonStreamingQueryListener(
        listener
    )
    self._jsqm.addListener(jlistener)
    return jlistener


StreamingQueryManager.addListener = addListener

启动回调服务器:

%pyspark

sc._gateway.start_callback_server()

并添加监听器:

%pyspark

from rx.subjects import Subject

class StreamingObserver(Subject):
    class Java:
        implements = ["com.example.spark.observer.PythonObserver"]

observer = StreamingObserver()
spark.streams.addListener(observer)

最后你可以使用 subscribe 并阻止执行:

Finally you can use subscribe and block execution:

%pyspark

(observer
    .map(lambda p: p.progress().name())
    # .filter() can be used to print only for a specific query
    .subscribe(lambda n: spark.table(n).show() if n else None))
input()  # Block execution to capture the output 

最后一步应该在你开始流式查询后执行.

The last step should be executed after you started streaming query.

也可以跳过 rx 并像这样使用最少的观察者:

It is also possible to skip rx and use minimal observer like this:

class StreamingObserver(object):
    class Java:
        implements = ["com.example.spark.observer.PythonObserver"]

    def on_next(self, value):
        try:
            name = value.progress().name()
            if name:
                spark.table(name).show()
        except: pass

Subject 相比,它提供的控制更少(一个警告是,这会干扰其他代码打印到标准输出,并且只能通过 删除侦听器.使用 Subject,您可以轻松地dispose subscribed 观察者,一旦完成),否则应该工作或多或少相同.

It gives a bit less control than the Subject (one caveat is that this can interfere with other code printing to stdout and can be stopped only by removing listener. With Subject you can easily dispose subscribed observer, once you're done), but otherwise should work more or less the same.

请注意,任何阻塞操作都足以捕获来自侦听器的输出,并且不必在同一单元格中执行.例如

Note that any blocking action will be sufficient to capture the output from the listener and it doesn't have to be executed in the same cell. For example

%pyspark

observer = StreamingObserver()
spark.streams.addListener(observer)

%pyspark

import time

time.sleep(42)

以类似的方式工作,在定义的时间间隔内打印表格.

would work in a similar way, printing table for a defined time interval.

为了完整起见,您可以实现 StreamingQueryManager.removeListener.

For completeness you can implement StreamingQueryManager.removeListener.

这篇关于如何从 Zeppelin 中的控制台流接收器获取输出?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆