PySpark-逐行转换为JSON [英] PySpark - Convert to JSON row by row

查看:1802
本文介绍了PySpark-逐行转换为JSON的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个非常大的pyspark数据框.我需要将数据帧转换为每行JSON格式的字符串,然后将字符串发布到Kafka主题.我最初使用以下代码.

I have a very large pyspark data frame. I need to convert the dataframe into a JSON formatted string for each row then publish the string to a Kafka topic. I originally used the following code.

for message in df.toJSON().collect():
        kafkaClient.send(message) 

但是,数据帧非常大,因此在尝试collect()时会失败.

However the dataframe is very large so it fails when trying to collect().

我一直在考虑使用UDF,因为它逐行处理它.

I was thinking of using a UDF since it processes it row by row.

from pyspark.sql.functions import udf, struct

def get_row(row):
    json = row.toJSON()
    kafkaClient.send(message) 
    return "Sent"

send_row_udf = F.udf(get_row, StringType())
df_json = df.withColumn("Sent", get_row(struct([df[x] for x in df.columns])))
df_json.select("Sent").show()

但是我收到一个错误,因为列是输入到函数中而不是行中.

But I am getting an error because the column is inputed to the function and not the row.

出于说明目的,我们可以使用下面的df来假设必须发送Col1和Col2.

For illustrative purposes, we can use the df below where we can assume Col1 and Col2 must be send over.

df= spark.createDataFrame([("A", 1), ("B", 2), ("D", 3)],["Col1", "Col2"])

每行的JSON字符串:

The JSON string for each row:

'{"Col1":"A","Col2":1}'
'{"Col1":"B","Col2":2}'
'{"Col1":"D","Col2":3}'

推荐答案

您不能像这样使用select.使用foreach/foreachPartition:

You cannot use select like this. Use foreach / foreachPartition:

import json

def send(part):
    kafkaClient = ...
    for r in part:
        kafkaClient.send(json.dumps(r.asDict()))

如果您需要诊断信息,请使用Accumulator.

If you need diagnostic information just use Accumulator.

在当前版本中,我将直接使用Kafka源码(2.0及更高版本):

In current releases I would use Kafka source directly (2.0 and later):

from pyspark.sql.functions import to_json, struct

(df.select(to_json(struct([df[x] for x in df.columns])).alias("value"))
    .write
    .format("kafka")
    .option("kafka.bootstrap.servers", bootstrap_servers)
    .option("topic", topic)
    .save())

例如,您将需要Kafka SQL软件包:

You'll need Kafka SQL package for example:

--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.1

这篇关于PySpark-逐行转换为JSON的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆