从Spark Streaming DataFrame中删除(损坏)不适合架构的行(从Kafka接收JSON数据) [英] Remove (corrupt) rows from Spark Streaming DataFrame that don't fit schema (incoming JSON data from Kafka)

查看:144
本文介绍了从Spark Streaming DataFrame中删除(损坏)不适合架构的行(从Kafka接收JSON数据)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个正在从Kafka中阅读的Spark结构化的汽蒸应用程序.这是我的代码的基本结构.

I have a spark structured steaming application that I'm reading in from Kafka. Here is the basic structure of my code.

我创建了Spark会话.

I create the Spark session.

val spark = SparkSession
  .builder
  .appName("app_name")
  .getOrCreate()

然后我从信息流中阅读

val data_stream = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "server_list")
  .option("subscribe", "topic")
  .load()

在Kafka记录中,我将值"转换为字符串.它从二进制转换为字符串.此时,数据框中有1列

In Kafka record, I cast the "value" as a string. It converts from binary to string. At this point there is 1 column in the data frame

val df = data_stream
    .select($"value".cast("string") as "json")

基于预定义的架构,我尝试将JSON结构解析为列.但是,这里的问题是,如果数据错误"或其他格式,则与定义的架构不匹配.因此,下一个数据帧(df2)将空值插入列中.

Based off of a pre-defined schema, I try to parse out the JSON structure into columns. However, the problem here is if the data is "bad", or a different format then it doesn't match the defined schema. So the next dataframe (df2) get's null values into the columns.

val df2 = df.select(from_json($"json", schema) as "data")
  .select("data.*")

我希望能够从df2过滤出某一列中具有"null"的行(我将其用作数据库中的主键),即忽略与模式不匹配的不良数据?

I'd like to be able to filter out from df2 the row's that have "null" in a certain column (one that I use as a primary key in a database) i.e. ignore bad data that doesn't match the schema?

我在某种程度上能够做到这一点,但没有达到我想要的方式.在我的过程中,我使用一个使用 .foreach(writer)过程的查询.它的作用是打开与数据库的连接,处理每一行,然后关闭连接.结构化流的文档提到此过程所需的必需品.在process方法中,我从每一行获取值,并检查主键是否为null,如果为null,则不将其插入数据库中.

I was somewhat able to accomplish this but not the way I intended to. In my process, I use a query that uses the .foreach(writer) process. What this does is it opens a connection to a database, processes each row, and then closes the connection. The documentation for structured streaming mentions the necessities you need for this process. In the process method, I get the values from each row and check if my primary key is null, if it is null I don't insert it into the database.

推荐答案

只需过滤掉不需要的任何空值:

Just filter out any null values you don't want:

df2
  .filter(row => row("colName") != null)

这篇关于从Spark Streaming DataFrame中删除(损坏)不适合架构的行(从Kafka接收JSON数据)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆