从spark-shell(pyspark)查询Spark Streaming应用程序 [英] Querying a spark streaming application from spark-shell (pyspark)

查看:84
本文介绍了从spark-shell(pyspark)查询Spark Streaming应用程序的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在关注

I am following this example in the pyspark console and everything works perfectly.

之后,我将其编写为PySpark应用程序,如下所示:

After that I wrote it as an PySpark application as follows:

# -*- coding: utf-8 -*-

import sys

import click

import logging

from pyspark.sql import SparkSession

from pyspark.sql.types import *


@click.command()
@click.option('--master')
def most_idiotic_bi_query(master):
    spark = SparkSession \
            .builder \
            .master(master)\
            .appName("stream-test")\
            .getOrCreate()

    spark.sparkContext.setLogLevel('ERROR')

    some_schema = ....  # Schema removed 

    some_stream    = spark\
                     .readStream\
                     .option("sep", ",")\
                     .schema(some_schema)\
                     .option("maxFilesPerTrigger", 1)\
                     .csv("/data/some_stream", header=True)

    streaming_counts = (
        linkage_stream.groupBy(some_stream.field_1).count()
    )

    query = streaming_counts.writeStream\
                            .format("memory")\
                            .queryName("counts")\
                            .outputMode("complete")\
                            .start()



    query.awaitTermination()

if __name__ == "__main__":
    logging.getLogger("py4j").setLevel(logging.ERROR)
    most_idiotic_bi_query()

该应用的执行方式为:

spark-submit test_stream.py --master spark://master:7077

现在,如果我在另一个终端上打开新的Spark驱动程序:

Now, If I open a new spark driver in another terminal:

pyspark --master spark://master:7077

并尝试运行:

spark.sql("select * from counts")

它失败并显示:

During handling of the above exception, another exception occurred:

AnalysisExceptionTraceback (most recent call last)
<ipython-input-3-732b22f02ef6> in <module>()
----> 1 spark.sql("select * from id_counts").show()

/usr/spark-2.0.2/python/pyspark/sql/session.py in sql(self, sqlQuery)
    541         [Row(f1=1, f2=u'row1'), Row(f1=2, f2=u'row2'), Row(f1=3, f2=u'row3')]
    542         """
--> 543         return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
    544 
    545     @since(2.0)

/usr/local/lib/python3.4/dist-packages/py4j-0.10.4-py3.4.egg/py4j/java_gateway.py in __call__(self, *args)
   1131         answer = self.gateway_client.send_command(command)
   1132         return_value = get_return_value(
-> 1133             answer, self.gateway_client, self.target_id, self.name)
   1134 
   1135         for temp_arg in temp_args:

/usr/spark-2.0.2/python/pyspark/sql/utils.py in deco(*a, **kw)
     67                                              e.java_exception.getStackTrace()))
     68             if s.startswith('org.apache.spark.sql.AnalysisException: '):
---> 69                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
     70             if s.startswith('org.apache.spark.sql.catalyst.analysis'):
     71                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)

AnalysisException: 'Table or view not found: counts; line 1 pos 14'

我不知道发生了什么.

推荐答案

这是预期的行为.如果您查看文档内存槽:

This is an expected behavior. If you check the documentation for memory sink:

输出作为内存表存储在内存中.支持追加和完整输出模式.当整个输出被收集并存储在驱动程序的内存中时,应将其用于低数据量的调试目的.因此,请谨慎使用.

The output is stored in memory as an in-memory table. Both, Append and Complete output modes, are supported. This should be used for debugging purposes on low data volumes as the entire output is collected and stored in the driver’s memory. Hence, use it with caution.

如您所见,内存接收器不会创建持久表或全局临时视图,而是会限制驱动程序的局部结构.因此,无法从另一个Spark应用程序中查询它.

As you can see memory sink doesn't create a persistent table or global temporary view but a local structure limited to a driver. Hence it cannot be queried from another Spark application.

因此必须从驱动程序中查询内存输出,并将其写入其中.例如,您可以模仿 console 模式,如下所示.

So the memory output has to be queried from the driver, in which it is written. For example you could mimic console mode as shown below.

假作家:

import pandas as pd
import numpy as np
import tempfile
import shutil

def producer(path):
    temp_path = tempfile.mkdtemp()

    def producer(i):
        df = pd.DataFrame({
          "group": np.random.randint(10, size=1000)
        }) 
        df["val"] = (
            np.random.randn(1000) + 
            np.random.random(1000) * df["group"] + 
            np.random.random(1000) * i % 7
        )
        f = tempfile.mktemp(dir=temp_path)
        df.to_csv(f, index=False)
        shutil.move(f, path)
    return producer

火花应用程序:

from pyspark.sql.types import IntegerType, DoubleType, StructType, StructField

schema = StructType([
   StructField("group", IntegerType()),
   StructField("val", DoubleType())
])

path = tempfile.mkdtemp()
query_name = "foo"

stream = (spark.readStream
    .schema(schema)
    .format("csv")
    .option("header", "true")
    .load(path))

query = (stream
    .groupBy("group")
    .avg("val")
    .writeStream
    .format("memory")
    .queryName(query_name)
    .outputMode("complete")
    .start())

一些事件:

from rx import Observable

timer = Observable.timer(5000, 5000)
timer.subscribe(producer(path))
timer.skip(1).subscribe(lambda *_: spark.table(query_name).show())

query.awaitTermination()

这篇关于从spark-shell(pyspark)查询Spark Streaming应用程序的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆