使用 Python 进行 Spark 结构化流式传输 [英] Spark structured streaming with python
问题描述
我正在尝试使用 Kafka 和 Python 进行结构化流式传输.需求:我需要在 Spark 中处理来自 Kafka(以 JSON 格式)的流数据(执行转换),然后将其存储在数据库中.
I am trying to Spark structured streaming with Kafka and Python. Requirement: I need to process streaming data from Kafka (in JSON format) in Spark (perform transformations) and then store it in a database.
我有 JSON 格式的数据,例如,{"a": 120.56, "b": 143.6865998138807, "name": "niks", "time": "2012-12-01 00:00:09"}
I have data in JSON format like,
{"a": 120.56, "b": 143.6865998138807, "name": "niks", "time": "2012-12-01 00:00:09"}
我打算使用 spark.readStream
来读取 Kafka 之类的内容,
I am planning to use spark.readStream
for reading from Kafka like,
data = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe","test").load()
我参考了此链接 供参考,但没有得到如何解析 JSON 数据.我试过了,
I referred this link for reference but didn't get how to parse JSON data. I tried this,
data = data.selectExpr("CAST(a AS FLOAT)","CAST(b as FLOAT)", "CAST(name as STRING)", "CAST(time as STRING)").as[(Float, Float, String, String)]
但看起来它不起作用.
任何使用 Python 从事过 Spark 结构化流处理的人都可以指导我继续处理示例或链接吗?
Can anyone who has worked on Spark structured streaming with Python guide me to proceed with sample examples or links?
使用,
schema = StructType([
StructField("a", DoubleType()),
StructField("b", DoubleType()),
StructField("name", StringType()),
StructField("time", TimestampType())])
inData = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe","test").load()
data = inData.select(from_json(col("value").cast("string"), schema))
query = data.writeStream.outputMode("Append").format("console").start()
程序运行,但我在控制台上获取值,
Program runs but I am getting values on console as,
+-----------------------------------+
|jsontostruct(CAST(value AS STRING))|
+-----------------------------------+
| [null,null,null,2...|
| [null,null,null,2...|
+-----------------------------------+
17/04/07 19:23:15 INFO StreamExecution: Streaming query made progress: {
"id" : "8e2355cb-0fd3-4233-89d8-34a855256b1e",
"runId" : "9fc462e0-385a-4b05-97ed-8093dc6ef37b",
"name" : null,
"timestamp" : "2017-04-07T19:23:15.013Z",
"numInputRows" : 2,
"inputRowsPerSecond" : 125.0,
"processedRowsPerSecond" : 12.269938650306749,
"durationMs" : {
"addBatch" : 112,
"getBatch" : 8,
"getOffset" : 2,
"queryPlanning" : 4,
"triggerExecution" : 163,
"walCommit" : 26
},
"eventTime" : {
"watermark" : "1970-01-01T00:00:00.000Z"
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaSource[Subscribe[test]]",
"startOffset" : {
"test" : {
"0" : 366
}
},
"endOffset" : {
"test" : {
"0" : 368
}
},
"numInputRows" : 2,
"inputRowsPerSecond" : 125.0,
"processedRowsPerSecond" : 12.269938650306749
} ],
"sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@6aa91aa2"
}
}
我是不是错过了什么.
推荐答案
您可以将 from_json
与 schema 一起使用:
You can either use from_json
with schema:
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import *
schema = StructType([
StructField("a", DoubleType()),
StructField("b", DoubleType()),
StructField("name", StringType()),
StructField("time", TimestampType())])
data.select(from_json(col("value").cast("string"), schema))
或使用 get_json_object
将单个字段作为字符串获取:
or get individual fields as strings with get_json_object
:
from pyspark.sql.functions import get_json_object
data.select([
get_json_object(col("value").cast("string"), "$.{}".format(c)).alias(c)
for c in ["a", "b", "name", "time"]])
然后根据您的需要cast
它们.
and cast
them later according to your needs.
这篇关于使用 Python 进行 Spark 结构化流式传输的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!