PySpark - 逐行转换为 JSON [英] PySpark - Convert to JSON row by row
问题描述
我有一个非常大的 pyspark 数据框.我需要将数据帧转换为每一行的 JSON 格式的字符串,然后将该字符串发布到 Kafka 主题.我最初使用了以下代码.
I have a very large pyspark data frame. I need to convert the dataframe into a JSON formatted string for each row then publish the string to a Kafka topic. I originally used the following code.
for message in df.toJSON().collect():
kafkaClient.send(message)
然而,数据帧非常大,因此在尝试 collect()
时失败.
However the dataframe is very large so it fails when trying to collect()
.
我正在考虑使用 UDF
因为它会逐行处理它.
I was thinking of using a UDF
since it processes it row by row.
from pyspark.sql.functions import udf, struct
def get_row(row):
json = row.toJSON()
kafkaClient.send(message)
return "Sent"
send_row_udf = F.udf(get_row, StringType())
df_json = df.withColumn("Sent", get_row(struct([df[x] for x in df.columns])))
df_json.select("Sent").show()
但是我收到一个错误,因为输入到函数的是列而不是行.
But I am getting an error because the column is inputed to the function and not the row.
出于说明目的,我们可以使用下面的 df,我们可以假设必须发送 Col1 和 Col2.
For illustrative purposes, we can use the df below where we can assume Col1 and Col2 must be send over.
df= spark.createDataFrame([("A", 1), ("B", 2), ("D", 3)],["Col1", "Col2"])
每一行的 JSON 字符串:
The JSON string for each row:
'{"Col1":"A","Col2":1}'
'{"Col1":"B","Col2":2}'
'{"Col1":"D","Col2":3}'
推荐答案
您不能像这样使用 select
.使用 foreach
/foreachPartition
:
You cannot use select
like this. Use foreach
/ foreachPartition
:
import json
def send(part):
kafkaClient = ...
for r in part:
kafkaClient.send(json.dumps(r.asDict()))
如果您需要诊断信息,只需使用 Accumulator
.
If you need diagnostic information just use Accumulator
.
在当前版本中,我会直接使用 Kafka 源(2.0 及更高版本):
In current releases I would use Kafka source directly (2.0 and later):
from pyspark.sql.functions import to_json, struct
(df.select(to_json(struct([df[x] for x in df.columns])).alias("value"))
.write
.format("kafka")
.option("kafka.bootstrap.servers", bootstrap_servers)
.option("topic", topic)
.save())
例如,您将需要 Kafka SQL 包:
You'll need Kafka SQL package for example:
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.1
这篇关于PySpark - 逐行转换为 JSON的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!