从spark-shell(pyspark)查询Spark Streaming应用程序 [英] Querying a spark streaming application from spark-shell (pyspark)
问题描述
I am following this example in the pyspark
console and everything works perfectly.
之后,我将其编写为PySpark应用程序,如下所示:
After that I wrote it as an PySpark application as follows:
# -*- coding: utf-8 -*-
import sys
import click
import logging
from pyspark.sql import SparkSession
from pyspark.sql.types import *
@click.command()
@click.option('--master')
def most_idiotic_bi_query(master):
spark = SparkSession \
.builder \
.master(master)\
.appName("stream-test")\
.getOrCreate()
spark.sparkContext.setLogLevel('ERROR')
some_schema = .... # Schema removed
some_stream = spark\
.readStream\
.option("sep", ",")\
.schema(some_schema)\
.option("maxFilesPerTrigger", 1)\
.csv("/data/some_stream", header=True)
streaming_counts = (
linkage_stream.groupBy(some_stream.field_1).count()
)
query = streaming_counts.writeStream\
.format("memory")\
.queryName("counts")\
.outputMode("complete")\
.start()
query.awaitTermination()
if __name__ == "__main__":
logging.getLogger("py4j").setLevel(logging.ERROR)
most_idiotic_bi_query()
该应用的执行方式为:
spark-submit test_stream.py --master spark://master:7077
现在,如果我在另一个终端上打开新的Spark驱动程序:
Now, If I open a new spark driver in another terminal:
pyspark --master spark://master:7077
并尝试运行:
spark.sql("select * from counts")
它失败并显示:
During handling of the above exception, another exception occurred:
AnalysisExceptionTraceback (most recent call last)
<ipython-input-3-732b22f02ef6> in <module>()
----> 1 spark.sql("select * from id_counts").show()
/usr/spark-2.0.2/python/pyspark/sql/session.py in sql(self, sqlQuery)
541 [Row(f1=1, f2=u'row1'), Row(f1=2, f2=u'row2'), Row(f1=3, f2=u'row3')]
542 """
--> 543 return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
544
545 @since(2.0)
/usr/local/lib/python3.4/dist-packages/py4j-0.10.4-py3.4.egg/py4j/java_gateway.py in __call__(self, *args)
1131 answer = self.gateway_client.send_command(command)
1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client, self.target_id, self.name)
1134
1135 for temp_arg in temp_args:
/usr/spark-2.0.2/python/pyspark/sql/utils.py in deco(*a, **kw)
67 e.java_exception.getStackTrace()))
68 if s.startswith('org.apache.spark.sql.AnalysisException: '):
---> 69 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
70 if s.startswith('org.apache.spark.sql.catalyst.analysis'):
71 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
AnalysisException: 'Table or view not found: counts; line 1 pos 14'
我不知道发生了什么.
推荐答案
这是预期的行为.如果您查看文档内存槽:
This is an expected behavior. If you check the documentation for memory sink:
输出作为内存表存储在内存中.支持追加和完整输出模式.当整个输出被收集并存储在驱动程序的内存中时,应将其用于低数据量的调试目的.因此,请谨慎使用.
The output is stored in memory as an in-memory table. Both, Append and Complete output modes, are supported. This should be used for debugging purposes on low data volumes as the entire output is collected and stored in the driver’s memory. Hence, use it with caution.
如您所见,内存接收器不会创建持久表或全局临时视图,而是会限制驱动程序的局部结构.因此,无法从另一个Spark应用程序中查询它.
As you can see memory sink doesn't create a persistent table or global temporary view but a local structure limited to a driver. Hence it cannot be queried from another Spark application.
因此必须从驱动程序中查询内存输出,并将其写入其中.例如,您可以模仿 console
模式,如下所示.
So the memory output has to be queried from the driver, in which it is written. For example you could mimic console
mode as shown below.
假作家:
import pandas as pd
import numpy as np
import tempfile
import shutil
def producer(path):
temp_path = tempfile.mkdtemp()
def producer(i):
df = pd.DataFrame({
"group": np.random.randint(10, size=1000)
})
df["val"] = (
np.random.randn(1000) +
np.random.random(1000) * df["group"] +
np.random.random(1000) * i % 7
)
f = tempfile.mktemp(dir=temp_path)
df.to_csv(f, index=False)
shutil.move(f, path)
return producer
火花应用程序:
from pyspark.sql.types import IntegerType, DoubleType, StructType, StructField
schema = StructType([
StructField("group", IntegerType()),
StructField("val", DoubleType())
])
path = tempfile.mkdtemp()
query_name = "foo"
stream = (spark.readStream
.schema(schema)
.format("csv")
.option("header", "true")
.load(path))
query = (stream
.groupBy("group")
.avg("val")
.writeStream
.format("memory")
.queryName(query_name)
.outputMode("complete")
.start())
一些事件:
from rx import Observable
timer = Observable.timer(5000, 5000)
timer.subscribe(producer(path))
timer.skip(1).subscribe(lambda *_: spark.table(query_name).show())
query.awaitTermination()
这篇关于从spark-shell(pyspark)查询Spark Streaming应用程序的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!