结构化流错误 py4j.protocol.Py4JNetworkError: Answer from Java side is empty [英] Structured Streaming error py4j.protocol.Py4JNetworkError: Answer from Java side is empty

查看:419
本文介绍了结构化流错误 py4j.protocol.Py4JNetworkError: Answer from Java side is empty的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 PySpark 和 Structured Streaming (Spark 2.3) 在两个 Kafka Stream 之间进行左外连接.

import os导入时间从 pyspark.sql.types 导入 *从 pyspark.sql.functions 导入 from_json、col、struct、explode、get_json_object从 ast 导入literal_eval从 pyspark.sql 导入 SparkSession从 pyspark.sql.functions 导入 expros.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 pyspark-shell'火花 = SparkSession \.builder \.appName("Spark Kafka 结构化流媒体") \.getOrCreate()schema_impressions = StructType() \.add("id_req", StringType()) \.add("ts_imp_request", TimestampType()) \.add("国家", StringType()) \.add("TS_IMPRESSION", TimestampType())schema_requests = StructType() \.add("id_req", StringType()) \.add("page", StringType()) \.add("conntype", StringType()) \.add("TS_REQUEST", TimestampType())印象 = spark.readStream \.format("kafka") \.option("kafka.bootstrap.servers", "ip-ec2.internal:9092") \.option("订阅", "ssp.datascience_impressions") \.加载()请求 = 火花 \.readStream \.format("kafka") \.option("kafka.bootstrap.servers", "ip-ec2.internal:9092") \.option("subscribe", "ssp.datascience_requests") \.option("startingOffsets", "最新") \.加载()query_requests = 请求 \.select(col("timestamp"), col("key").cast("string"), from_json(col("value").cast("string"), schema_requests).alias("parsed"))\.select(col("timestamp").alias("timestamp_req"), "parsed.id_req", "parsed.page", "parsed.conntype", "parsed.TS_REQUEST") \.withWatermark("timestamp_req", "120 秒")query_impressions = 展示次数 \.select(col("timestamp"), col("key").cast("string"), from_json(col("value").cast("string"), schema_impressions).alias("parsed"))\.select(col("timestamp").alias("timestamp_imp"), col("parsed.id_req").alias("id_imp"), "parsed.ts_imp_request", "parsed.country", "parsed.TS_IMPRESSION") \.withWatermark("timestamp_imp", "120 秒")query_requests.printSchema()query_impressions.printSchema()>根|-- timestamp_req: 时间戳 (nullable = true)|-- id_req: 字符串(可为空 = 真)|-- 页面:字符串(可为空 = 真)|-- conntype: string (nullable = true)|-- TS_REQUEST: 时间戳(可为空 = 真)>>root |-- timestamp_imp: 时间戳 (nullable = true)|-- id_imp: 字符串(可为空 = 真)|-- ts_imp_request: 时间戳(可为空 = 真)|-- 国家:字符串(可为空 = 真)|-- TS_IMPRESSION: 时间戳(可为空 = 真)

在简历中,我将从两个 Kafka Streams 中获取数据,在接下来的几行中,我将尝试使用 ID 进行连接.

rawQuery = query_requests.join(query_impressions, expr("""(id_req = id_imp ANDtimestamp_imp >= timestamp_req ANDtimestamp_imp <= timestamp_req + 间隔 5 分钟)"""),左外")原始查询 = 原始查询 \.writeStream \.format("parquet") \.option("checkpointLocation", "/home/jovyan/streaming/applicationHistory") \.option("path", "/home/jovyan/streaming").start()打印(rawQuery.status)

<块引用>

{'message': 'Processing new data', 'isDataAvailable': True,'isTriggerActive':True} 错误:root:发送命令时出现异常.回溯(最近一次通话):文件/opt/conda/lib/python3.6/site-packages/py4j/java_gateway.py",行第1062话raise Py4JNetworkError("Answer from Java side is empty") py4j.protocol.Py4JNetworkError: Answer from Java side is empty

在处理上述异常的过程中,又发生了一个异常:

回溯(最近一次调用):文件/opt/conda/lib/python3.6/site-packages/py4j/java_gateway.py",行第908话response = connection.send_command(command) 文件/opt/conda/lib/python3.6/site-packages/py4j/java_gateway.py",行第1067章接收时出错",e,proto.ERROR_ON_RECEIVE)py4j.protocol.Py4JNetworkError:接收时出错错误:py4j.java_gateway:尝试连接时发生错误Java 服务器 (127.0.0.1:33968) 回溯(最近一次调用):
文件"/opt/conda/lib/python3.6/site-packages/IPython/core/interactiveshell.py",第 2910 行,在 run_code 中exec(code_obj, self.user_global_ns, self.user_ns) 文件",第 3 行,在打印(rawQuery.status)文件/opt/conda/lib/python3.6/site-packages/pyspark/sql/streaming.py",第 114 行,状态返回 json.loads(self._jsq.status().json()) 文件/opt/conda/lib/python3.6/site-packages/py4j/java_gateway.py",行1160,在通话答案,self.gateway_client,self.target_id,self.name)文件/opt/conda/lib/python3.6/site-packages/pyspark/sql/utils.py",行63, 在装饰返回 f(*a, **kw) 文件/opt/conda/lib/python3.6/site-packages/py4j/protocol.py",第 328 行,在 get_return_value 中format(target_id, ".", name)) py4j.protocol.Py4JError: 调用 o92.status 时出错

在处理上述异常的过程中,又发生了一个异常:

回溯(最近一次调用):文件"/opt/conda/lib/python3.6/site-packages/IPython/core/interactiveshell.py",第 1828 行,在 showtraceback 中stb = value._render_traceback_() AttributeError: 'Py4JError' 对象没有属性 '_render_traceback_'

在处理上述异常的过程中,又发生了一个异常:

回溯(最近一次调用):文件/opt/conda/lib/python3.6/site-packages/py4j/java_gateway.py",行第852话connection = self.deque.pop() IndexError: 从空双端队列中弹出

我使用 Jupyter Notebook 在本地运行 Spark.在 spark/conf/spark-defaults.conf 我有:

# 示例:# spark.master spark://master:7077# spark.eventLog.enabled true# spark.eventLog.dir hdfs://namenode:8021/directory# spark.serializer org.apache.spark.serializer.KryoSerializerspark.driver.memory 15g# spark.executor.extraJavaOptions -XX:+PrintGCDetails -Dkey=value -Dnumbers="一二三"

如果我在上一个错误后尝试使用 Spark,我会收到该错误:

<块引用>

ERROR:root:发送命令时出现异常.追溯(最近最后调用):文件/opt/conda/lib/python3.6/site-packages/py4j/java_gateway.py",行第1062话raise Py4JNetworkError("Answer from Java side is empty") py4j.protocol.Py4JNetworkError: Answer from Java side is empty

在处理上述异常的过程中,又发生了一个异常:

回溯(最近一次调用):文件/opt/conda/lib/python3.6/site-packages/py4j/java_gateway.py",行第908话response = connection.send_command(command) 文件/opt/conda/lib/python3.6/site-packages/py4j/java_gateway.py",行第1067章接收时出错", e, proto.ERROR_ON_RECEIVE) py4j.protocol.Py4JNetworkError: 接收时出错

解决方案

我解决了问题!基本上,该问题出于某种原因与 Jupyter Notebook 有关.我删除了前面代码的下一行:

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 pyspark-shell'

然后我使用控制台运行代码:

<代码>>spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 spark_structured.py

那样,我可以毫无问题地运行所有代码.

如果你有同样的问题,你也可以改变spark-default.conf 并增加 spark.driver.memoryspark.executor.memory

I'm trying to make a left outer join between two Kafka Stream using PySpark and Structured Streaming (Spark 2.3).

import os
import time

from pyspark.sql.types import *
from pyspark.sql.functions import from_json, col, struct, explode, get_json_object
from ast import literal_eval
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 pyspark-shell'

spark = SparkSession \
    .builder \
    .appName("Spark Kafka Structured Streaming") \
    .getOrCreate()

schema_impressions = StructType() \
    .add("id_req", StringType()) \
    .add("ts_imp_request", TimestampType()) \
    .add("country", StringType()) \
    .add("TS_IMPRESSION", TimestampType()) 

schema_requests = StructType() \
    .add("id_req", StringType()) \
    .add("page", StringType()) \
    .add("conntype", StringType()) \
    .add("TS_REQUEST", TimestampType()) 

impressions = spark.readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "ip-ec2.internal:9092") \
  .option("subscribe", "ssp.datascience_impressions") \
  .load()

requests = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "ip-ec2.internal:9092") \
  .option("subscribe", "ssp.datascience_requests") \
  .option("startingOffsets", "latest") \
  .load()

query_requests = requests \
        .select(col("timestamp"), col("key").cast("string"), from_json(col("value").cast("string"), schema_requests).alias("parsed")) \
        .select(col("timestamp").alias("timestamp_req"), "parsed.id_req", "parsed.page", "parsed.conntype", "parsed.TS_REQUEST") \
        .withWatermark("timestamp_req", "120 seconds") 

query_impressions = impressions \
        .select(col("timestamp"), col("key").cast("string"), from_json(col("value").cast("string"), schema_impressions).alias("parsed")) \
        .select(col("timestamp").alias("timestamp_imp"), col("parsed.id_req").alias("id_imp"), "parsed.ts_imp_request", "parsed.country", "parsed.TS_IMPRESSION") \
        .withWatermark("timestamp_imp", "120 seconds") 

query_requests.printSchema()        
query_impressions.printSchema()

> root  
|-- timestamp_req: timestamp (nullable = true)  
|-- id_req: string (nullable = true)  
|-- page: string (nullable = true)  
|-- conntype: string (nullable = true)  
|-- TS_REQUEST: timestamp (nullable = true)
> 
> root  |-- timestamp_imp: timestamp (nullable = true)  
|-- id_imp: string (nullable = true)  
|-- ts_imp_request: timestamp (nullable = true)  
|-- country: string (nullable = true)  
|-- TS_IMPRESSION: timestamp (nullable = true)

In resume, I will obtain data from two Kafka Streams, and in the next lines, I will try to make join using the IDs.

rawQuery = query_requests.join(query_impressions,  expr(""" 
    (id_req = id_imp AND 
    timestamp_imp >= timestamp_req AND 
    timestamp_imp <= timestamp_req + interval 5 minutes) 
    """), 
  "leftOuter")

rawQuery = rawQuery \
        .writeStream \
        .format("parquet") \
        .option("checkpointLocation", "/home/jovyan/streaming/applicationHistory") \
        .option("path", "/home/jovyan/streaming").start()
print(rawQuery.status)

{'message': 'Processing new data', 'isDataAvailable': True, 'isTriggerActive': True} ERROR:root:Exception while sending command. Traceback (most recent call last): File "/opt/conda/lib/python3.6/site-packages/py4j/java_gateway.py", line 1062, in send_command raise Py4JNetworkError("Answer from Java side is empty") py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last): File "/opt/conda/lib/python3.6/site-packages/py4j/java_gateway.py", line 908, in send_command response = connection.send_command(command) File "/opt/conda/lib/python3.6/site-packages/py4j/java_gateway.py", line 1067, in send_command "Error while receiving", e, proto.ERROR_ON_RECEIVE) py4j.protocol.Py4JNetworkError: Error while receiving ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:33968) Traceback (most recent call last):
File "/opt/conda/lib/python3.6/site-packages/IPython/core/interactiveshell.py", line 2910, in run_code exec(code_obj, self.user_global_ns, self.user_ns) File "", line 3, in print(rawQuery.status) File "/opt/conda/lib/python3.6/site-packages/pyspark/sql/streaming.py", line 114, in status return json.loads(self._jsq.status().json()) File "/opt/conda/lib/python3.6/site-packages/py4j/java_gateway.py", line 1160, in call answer, self.gateway_client, self.target_id, self.name) File "/opt/conda/lib/python3.6/site-packages/pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/opt/conda/lib/python3.6/site-packages/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name)) py4j.protocol.Py4JError: An error occurred while calling o92.status

During handling of the above exception, another exception occurred:

Traceback (most recent call last): File "/opt/conda/lib/python3.6/site-packages/IPython/core/interactiveshell.py", line 1828, in showtraceback stb = value._render_traceback_() AttributeError: 'Py4JError' object has no attribute '_render_traceback_'

During handling of the above exception, another exception occurred:

Traceback (most recent call last): File "/opt/conda/lib/python3.6/site-packages/py4j/java_gateway.py", line 852, in _get_connection connection = self.deque.pop() IndexError: pop from an empty deque

I'm running Spark in local using Jupyter Notebook. In the spark/conf/spark-defaults.conf I have:

# Example:
# spark.master                     spark://master:7077
# spark.eventLog.enabled           true
# spark.eventLog.dir               hdfs://namenode:8021/directory
# spark.serializer                 org.apache.spark.serializer.KryoSerializer
spark.driver.memory             15g
# spark.executor.extraJavaOptions  -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three"

If I try to use Spark after the previous error, I received that error:

ERROR:root:Exception while sending command. Traceback (most recent call last): File "/opt/conda/lib/python3.6/site-packages/py4j/java_gateway.py", line 1062, in send_command raise Py4JNetworkError("Answer from Java side is empty") py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last): File "/opt/conda/lib/python3.6/site-packages/py4j/java_gateway.py", line 908, in send_command response = connection.send_command(command) File "/opt/conda/lib/python3.6/site-packages/py4j/java_gateway.py", line 1067, in send_command "Error while receiving", e, proto.ERROR_ON_RECEIVE) py4j.protocol.Py4JNetworkError: Error while receiving

解决方案

I resolved the problem! Basically, the problem was related to Jupyter Notebook for some reason. I removed the next line of the previous code:

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 pyspark-shell'

And I ran the code using the console:

> spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 spark_structured.py

In that way, I could run all the code without problems.

In case of you have the same problem, you can also change the spark-default.conf and increase the spark.driver.memory and spark.executor.memory

这篇关于结构化流错误 py4j.protocol.Py4JNetworkError: Answer from Java side is empty的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆