从 Spark Streaming DataFrame 中删除(损坏)不符合架构的行(来自 Kafka 的传入 JSON 数据) [英] Remove (corrupt) rows from Spark Streaming DataFrame that don't fit schema (incoming JSON data from Kafka)
问题描述
我有一个 Spark 结构的 Steam 应用程序,我正在从 Kafka 读取它.这是我的代码的基本结构.
I have a spark structured steaming application that I'm reading in from Kafka. Here is the basic structure of my code.
我创建了 Spark 会话.
I create the Spark session.
val spark = SparkSession
.builder
.appName("app_name")
.getOrCreate()
然后我从流中读取
val data_stream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "server_list")
.option("subscribe", "topic")
.load()
在 Kafka 记录中,我将值"转换为字符串.它从二进制转换为字符串.此时数据框中有1列
In Kafka record, I cast the "value" as a string. It converts from binary to string. At this point there is 1 column in the data frame
val df = data_stream
.select($"value".cast("string") as "json")
基于预定义的架构,我尝试将 JSON 结构解析为列.但是,这里的问题是,如果数据坏"或不同的格式,则它与定义的架构不匹配.所以下一个数据帧 (df2) 将空值放入列中.
Based off of a pre-defined schema, I try to parse out the JSON structure into columns. However, the problem here is if the data is "bad", or a different format then it doesn't match the defined schema. So the next dataframe (df2) get's null values into the columns.
val df2 = df.select(from_json($"json", schema) as "data")
.select("data.*")
我希望能够从 df2 中过滤掉某一列(我用作数据库中的主键)中具有null"的行,即忽略与架构不匹配的坏数据?
I'd like to be able to filter out from df2 the row's that have "null" in a certain column (one that I use as a primary key in a database) i.e. ignore bad data that doesn't match the schema?
我在某种程度上能够做到这一点,但不是我想要的方式.在我的流程中,我使用了一个使用 .foreach(writer)
流程的查询.它的作用是打开与数据库的连接,处理每一行,然后关闭连接.structured streaming 的文档提到此过程所需的必需品.在处理方法中,我从每一行获取值并检查我的主键是否为空,如果为空我不将其插入到数据库中.
I was somewhat able to accomplish this but not the way I intended to.
In my process, I use a query that uses the .foreach(writer)
process. What this does is it opens a connection to a database, processes each row, and then closes the connection. The documentation for structured streaming mentions the necessities you need for this process. In the process method, I get the values from each row and check if my primary key is null, if it is null I don't insert it into the database.
推荐答案
只需过滤掉您不想要的任何空值:
Just filter out any null values you don't want:
df2
.filter(row => row("colName") != null)
这篇关于从 Spark Streaming DataFrame 中删除(损坏)不符合架构的行(来自 Kafka 的传入 JSON 数据)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!