如何在Flink的Scala案例类中使用空值进行处理? [英] How to handle with null values in Scala case classes in Flink?

查看:632
本文介绍了如何在Flink的Scala案例类中使用空值进行处理?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

您好,我正在使用Flink从S3读取JSON数据,然后将输入解析为case类.问题在于,当JSON字符串具有某些null值时,case类不接受它,因为case类的每个字段都具有类型(Int,String等).

Hello I'm reading JSON data from S3 with Flink, then I'm parsing the input to a case class. The problem is that when a JSON string has some null value the case class does not accept it cause each field of case class has a type (Int, String, etc).

在搜索解决方案时,我发现我可以使用Option将null值启用到case类中并且可以使用,但是问题是在从数据集转换为表后尝试执行查询时出现的.

Searching a solution I found that I can use Option to enable null values into a case class and it works, but the problem comes when I try to do a query after convert from Dataset to Table.

如果我尝试获取得分为null的行,那么我就没有任何原因,因为Option Type代表了一个对象.

If I try to get the rows where score is null, I got nothing cause with Option Type represents an object.

//Case class that I used to parse a JSON
case class TestTableSchema(
    id: String,
    score: Option[Double]
)

val data: Dataset[TestTableSchema] = ...

tableEnv.registerDataSet("test_table", data)

val result = tableEnv.sqlQuery("""
    |SELECT *
    |FROM test_table
    |WHERE score IS NULL
    """.stripMargin
)

我还检查了是否可以使用Row数据类型代替接受空值的case类,但是我没有找到实现它的示例.

Also I checked that I can use a Row data type instead of a case class which accepts null values, but I didn't find some example to implement it.

当我想要一个方案时,如何处理空值?

How can I handle null values when I want to have a scheme?

推荐答案

所以您可以执行以下操作之一:

So what you could do is either:

  1. 检查.map中的空字段,并将其转换为类似字符串"NULL""N/A"的其他内容(如果您仍然希望将其保留在结果中),或者
  2. 使用.filter
  3. 从流中过滤掉它们
  1. Check for empty fields in your .map and transform it to something else like the string "NULL" or "N/A" if you still want to keep those in your result, or
  2. Filter them out from your stream using .filter

这篇关于如何在Flink的Scala案例类中使用空值进行处理?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆