从 Spark Scala 中的数据帧创建动态查询 [英] Create dynamic query from the Dataframe present in Spark Scala

查看:56
本文介绍了从 Spark Scala 中的数据帧创建动态查询的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个如下的数据帧 DF.基于问题列和数据类型列,我想创建一个动态查询.如果问题列是 YES 然后检查数据类型,如果它的 StringType 添加 Trim(DiffColumnName) 到查询或者如果数据类型是整数做一些其他操作,如 round(COUNT,2)对于问题类型为 NO 的列,什么都不做并选择列本身

I have a dataframe DF as below. Based on the Issue column and Datatype column I wants to create a dynamic query. If Issue column is YES then check for the Datatype, If its StringType add Trim(DiffColumnName) to the query or if Datatype is integer do some other operation like round(COUNT,2) And for the column for which Issue type is NO do nothing and select the Column itself

查询应该是这样的

Select DEST_COUNTRY_NAME, trim(ORIGIN_COUNTRY_NAME),round(COUNT,2)
+-------------------+-----------+-----+
|     DiffColumnName|   Datatype|Issue|
+-------------------+-----------+-----+
|  DEST_COUNTRY_NAME| StringType|   NO|
|ORIGIN_COUNTRY_NAME| StringType|  YES|
|              COUNT|IntegerType|  YES|
+-------------------+-----------+-----+

我不确定是否应该在此处使用 If else 条件或 case 语句或创建 UDF.我的数据框(即列)也是动态的,每次都会改变.

I am not sure if I should be using If else condition here or case statement or create a UDF. Also my dataframe (i.e. columns) are dynamic and will be changed every time.

需要一些建议如何在此处进行.谢谢

Need some suggestions how to proceed here. Thanks

推荐答案

这可以使用以下代码来完成.

This can be accomplished using the following piece of code.

  • 通过应用所需的操作来派生新列
  • 使用 collect_list 将值聚合到数组中
  • 使用 concat_ws 和 concat 格式化输出
val origDF=Seq(("DEST_COUNTRY_NAME","StringType","NO"),
("ORIGIN_COUNTRY_NAME","StringType","YES"),
("COUNT","IntegerType","YES"),
("TESTCOL","StringType","NO")
).toDF("DiffColumnName","Datatype","Issue")

val finalDF=origDF.withColumn("newCol",when(col("Issue")==="YES" && col("DataType")==="StringType",concat(lit("trim("),col("DiffColumnName"),lit(")")))
when(col("Issue")==="YES" && col("DataType")==="IntegerType",concat(lit("round("),col("DiffColumnName"),lit(",2)")))
when(col("Issue")==="NO",col("DiffColumnName"))
)

finalDF.agg(collect_list("newCol").alias("queryout")).select(concat(lit("select "),concat_ws(",",col("queryout")))).show(false)

我在数据中添加了一个额外的列进行测试,它给了我想要的输出.

I included an additional column to the data for testing and it is giving me the desired output.

+-------------------------------------------------------------------------+
|concat(select , concat_ws(,, queryout))                                  |
+-------------------------------------------------------------------------+
|select DEST_COUNTRY_NAME,trim(ORIGIN_COUNTRY_NAME),round(COUNT,2),TESTCOL|
+-------------------------------------------------------------------------+

这篇关于从 Spark Scala 中的数据帧创建动态查询的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆