火花结构化流:编写不正确 [英] spark structured streaming: not writing correctly

查看:66
本文介绍了火花结构化流:编写不正确的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在流式抄表,将数据从kafka_2.11-0.10.0.1读取为JSON到Spark 2.1.我切换到结构化流媒体;尽管kafka使用者确认了传入的数据,但控制台和writeStream却没有移动.我正在使用

I am streaming meter reading records as JSON from kafka_2.11-0.10.0.1 into Spark 2.1. I switched to structured streaming; and although kafka consumer confirms incoming data, I the console and writeStream dont move. I am testing using

pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0

我的代码:

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession \
    .builder \
    .appName("interval") \
    .master("local[4]") \
    .getOrCreate()
schema = StructType().add("customer_id", StringType()) 
df = spark \
      .readStream \
      .format("kafka") \
      .option("kafka.bootstrap.servers", "xx.xxx.xx.xxx:9092") \
      .option("subscribe", "test") \
  .option("startingOffsets", "earliest") \
  .load() \
  .select(from_json(col("value").cast("string"), schema).alias("parsed_value"))

query = df.writeStream \
 .option("checkpointLocation", "/user/XX/checkpoint5") \
 .format("parquet") \
 .start("/user/XX/interval5") 

它创建检查点&带有388字节拼花文件的数据目录.但是,永远不会写入流数据.

It creates the checkpoint & data directories with a 388 byte parquet file. However no streamed data is ever written.

$ hdfs dfs -ls interval5
drwxr-xr-x   ... interval5/_spark_metadata
-rw-r--r--   ... interval5/part-00000-0b2eb00a-c361-4dfe-a24e-9589d150a911.snappy.parquet
-rw-r--r--   ... interval5/part-00000-e0cb12d1-9c29-4eb0-92a8-688f468a42ce.snappy.parquet

kafka-consumer确认正在发送数据:

kafka-consumer confirms data is being shipped:

{"customer_id":"customer_736"}
{"customer_id":"customer_995"}
{"customer_id":"customer_1899"}
{"customer_id":"customer_35"}

kafka-consumer显示流式数据.

kafka-consumer displays the streamed data.

我认为我缺少出队并保存流式行的基本步骤-拖网的日子stackoverflow并没有帮助. (已编辑,删除了对控制台的引用;因为它不相关).

I think I'm missing an essential step to dequeue and save the streamed rows - a day of trawling stackoverflow has not helped. (edited to remove the references to the console; as it is not relevant).

推荐答案

相同的结构化流.py代码可在spark-submit中工作,但绝不会使用pspark处理任何数据.没有错误消息,控制台输出或镶木地板数据(除了目录创建和元数据). 走吧.

The same structured streaming .py code works in spark-submit, but it never processes any data using pspark; with no error message, console output or parquet data (apart from directory creation and metadata). Go figure.

这篇关于火花结构化流:编写不正确的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆