从 Spark Streaming DataFrame 中删除(损坏)不符合架构的行(来自 Kafka 的传入 JSON 数据) [英] Remove (corrupt) rows from Spark Streaming DataFrame that don't fit schema (incoming JSON data from Kafka)

查看:24
本文介绍了从 Spark Streaming DataFrame 中删除(损坏)不符合架构的行(来自 Kafka 的传入 JSON 数据)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 Spark 结构的 Steam 应用程序,我正在从 Kafka 读取它.这是我的代码的基本结构.

I have a spark structured steaming application that I'm reading in from Kafka. Here is the basic structure of my code.

我创建了 Spark 会话.

I create the Spark session.

val spark = SparkSession
  .builder
  .appName("app_name")
  .getOrCreate()

然后我从流中读取

val data_stream = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "server_list")
  .option("subscribe", "topic")
  .load()

在 Kafka 记录中,我将值"转换为字符串.它从二进制转换为字符串.此时数据框中有1列

In Kafka record, I cast the "value" as a string. It converts from binary to string. At this point there is 1 column in the data frame

val df = data_stream
    .select($"value".cast("string") as "json")

基于预定义的架构,我尝试将 JSON 结构解析为列.但是,这里的问题是,如果数据坏"或不同的格式,则它与定义的架构不匹配.所以下一个数据帧 (df2) 将空值放入列中.

Based off of a pre-defined schema, I try to parse out the JSON structure into columns. However, the problem here is if the data is "bad", or a different format then it doesn't match the defined schema. So the next dataframe (df2) get's null values into the columns.

val df2 = df.select(from_json($"json", schema) as "data")
  .select("data.*")

我希望能够从 df2 中过滤掉某一列(我用作数据库中的主键)中具有null"的行,即忽略与架构不匹配的坏数据?

I'd like to be able to filter out from df2 the row's that have "null" in a certain column (one that I use as a primary key in a database) i.e. ignore bad data that doesn't match the schema?

我在某种程度上能够做到这一点,但不是我想要的方式.在我的流程中,我使用了一个使用 .foreach(writer) 流程的查询.它的作用是打开与数据库的连接,处理每一行,然后关闭连接.structured streaming 的文档提到此过程所需的必需品.在处理方法中,我从每一行获取值并检查我的主键是否为空,如果为空我不将其插入到数据库中.

I was somewhat able to accomplish this but not the way I intended to. In my process, I use a query that uses the .foreach(writer) process. What this does is it opens a connection to a database, processes each row, and then closes the connection. The documentation for structured streaming mentions the necessities you need for this process. In the process method, I get the values from each row and check if my primary key is null, if it is null I don't insert it into the database.

推荐答案

只需过滤掉您不想要的任何空值:

Just filter out any null values you don't want:

df2
  .filter(row => row("colName") != null)

这篇关于从 Spark Streaming DataFrame 中删除(损坏)不符合架构的行(来自 Kafka 的传入 JSON 数据)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆