遍历 Dataset 中具有键值对数组的列,并找出具有最大值的对 [英] Iterate through a column in Dataset which have array of key value pairs and find out a pair with max value

查看:34
本文介绍了遍历 Dataset 中具有键值对数组的列,并找出具有最大值的对的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在一个 dataframe 中有数据,它是从 azure eventhub 获得的.然后我将这些数据转换为 json 对象,并将所需的数据存储到一个数据集中,如下所示.

用于从 eventhub 获取数据并将其存储到数据帧中的代码.

val connectionString = ConnectionStringBuilder().setEventHubName().buildval currTime = Instant.nowval ehConf = EventHubsConf(connectionString).setConsumerGroup("").setStartingPosition(EventPosition.fromEnqueuedTime(currTime.minus(Duration.ofMinutes(30)))).setEndingPosition(EventPosition.fromEnqueuedTime(currTime))val reader = spark.read.format("eventhubs").options(ehConf.toMap).load()var SIGNALS = 阅读器.select(get_json_object(($"body").cast("string"),"$.NUM").alias("NUM"),get_json_object(($"body").cast("string"),"$.SIG1").alias("SIG1"),get_json_object(($"body").cast("string"),"$.SIG2").alias("SIG2"),get_json_object(($"body").cast("string"),"$.SIG3").alias("SIG3"),get_json_object(($"body").cast("string"),"$.SIG4").alias("SIG4"))val SIGNALSFiltered = SIGNALS.filter(col("SIG1").isNotNull &&col("SIG2").isNotNull &&col("SIG3").isNotNull &&col("SIG4").isNotNull)

SIGNALSFiltered 获得的数据如下所示.

+-----------------+--------------------+--------------------+--------------------+--------------------+|NUM|SIG1|SIG2|SIG3|SIG4|+-----------------+--------------------+----------------------+--------------------+-------+|XXXXX01|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...||XXXXX02|[{"TIME":15695604780...|[{"TIME":15695604780...|[{"TIME":15695604780...|[{"TIME":15695604780...||XXXXX03|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...||XXXXX04|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...||XXXXX05|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...||XXXXX06|[{"TIME":15695605340...|[{"TIME":15695605340...|[{"TIME":15695605340...|[{"TIME":15695605340...||XXXXX07|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...||XXXXX08|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|

如果我们检查单行的全部数据,结果如下.

|XXXXX01|[{"TIME":1569560531000,"VALUE":3.7825},{"TIME":1569560475000,"VALUE":3.7812},{"TIME":15695604830000,"7VALUE":3.7825}},{"TIME":1569560491000,"VALUE":34.7875}]|[{"TIME":1569560537000,"VALUE":3.7825},{"TIME":1569560481000,"VALUE":34.7825},{"TIME":1569560489000,"VALUE":34.76"TIME:34.795060值":34.7825}]|[{"TIME":1569560505000,"VALUE":34.7825},{"TIME":1569560513000,"VALUE":34.7825},{"TIME":1569560521000,"VALUE":34.5"TIME606"值":34.7825}]|[{"TIME":1569560535000,"VALUE":34.7825},{"TIME":1569560479000,"VALUE":34.7825},{"TIME":1569560487000,"VALUE":34.782}]782

我只想要每列中最高的 TIME 对,而不是整个 TIME VALUE 对.输出应如下所示.

+-----------------+-----------------------------+---------------------------------------+---------------------------------------+----------------------------------------+|NUM|SIG1|SIG2|SIG3|SIG4|+-----------------+-----------------------------+-------------------------------+-----------------------------------------+-------------------------------+|XXXXX01|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":4.7825}]|[{"TIME":1569560531000,]782"VALUE"|[{"TIME":1569560531000,"VALUE":5.7825}]||XXXXX02|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":6.7825}]|[{"TIME":1569560531000,]7825"VALUE"|[{"TIME":1569560531000,"VALUE":7.7825}]||XXXXX03|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":9.7825}]|[{"TIME":1569560531000,]7825"VALUE"|[{"TIME":1569560531000,"VALUE":8.7825}]|

  1. 如何遍历每一行中的每一列并获得最高的 TIME-VALUE 对?
  2. 在每列 (SIG1,....SIG4) 中获得最高值后,只需更新其中最高的所有列中的 TIME 值.

  3. 有什么方法可以转换基本数据集,如下所示?列中的每个元素都应转换为新行.

+-----------------+-----------------------------+---------------------------------------+---------------------------------------+----------------------------------------+|NUM|SIG1|SIG2|SIG3|SIG4|+-----------------+-----------------------------+-------------------------------+-----------------------------------------+-------------------------------+|XXXXX01|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,]7825"VALUE"|[{"TIME":1569560531000,"VALUE":3.7825}]||XXXXX01|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,]7825"VALUE"|[{"TIME":1569560531000,"VALUE":3.7825}]||XXXXX01|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|null |[{"TIME":1569560531000,"VALUE":3.7825}]||XXXXX01|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,]7825"VALUE"|[{"TIME":1569560531000,"VALUE":3.7825}]||XXXXX02|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,]7825"VALUE"|[{"TIME":1569560531000,"VALUE":3.7825}]||XXXXX02|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,]7825"VALUE"|[{"TIME":1569560531000,"VALUE":3.7825}]||XXXXX02|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,]7825"VALUE"|[{"TIME":1569560531000,"VALUE":3.7825}]||XXXXX02|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,]7825"VALUE"|[{"TIME":1569560531000,"VALUE":3.7825}]|```任何线索或帮助表示赞赏!提前致谢.

解决方案

您必须编写一个用户定义的函数,如下所示.这将循环您的数据并获得最大时间值.注意:UDF仅供参考,您可以根据需要进行更改

<块引用>

  1. 如何遍历每一行中的每一列并获得最高的 TIME-VALUE 对?

scala>导入 org.apache.spark.sql.expressions.{UserDefinedFunction}标度>def MaxTime:UserDefinedFunction = udf((json:String) => {val pars = JSON.parseFull(json)无功输出=""pars.foreach{ x =>val y = x.asInstanceOf[List[Any]]变量 i = 1var TimeMap = scala.collection.mutable.Map[String, Long]()var ValueMap = scala.collection.mutable.Map[String, Double]()y.foreach{ zz =>val z = zz.asInstanceOf[Map[String,Double]]TimeMap(i.toString) = z("TIME").toLongValueMap(i.toString) = z("VALUE")我 = 我 + 1}output = """[{"TIME" : """ + TimeMap.maxBy(_._2)._2.toString + """ ,"VALUE": """ + ValueMap(TimeMap.maxBy(_._2)._1) + """}]"""}输出})标度>SIGNALSFiltered.withColumn("SIG1", MaxTime(col("SIG1")).withColumn("SIG2", MaxTime(col("SIG2")))).withColumn("SIG3", MaxTime(col("SIG3"))).withColumn("SIG4", MaxTime(col("SIG4"))).show(false)

<块引用>

  1. 在每列 (SIG1,....SIG4) 中获得最高值后,只需更新其中最高的所有列中的 TIME 值.

像上面一样编写相同的 UDF 并将完整的行作为参数传递.然后将每一列的值解析成Map,得到所有列中的最大值.

I have data in a dataframe , which was obtained from azure eventhub. Then I convert this data to json object and stored the required data into a dataset as shown below.

Code for obtaining data from eventhub and store it into a dataframe.

val connectionString = ConnectionStringBuilder(<ENDPOINT URL>)
    .setEventHubName(<EVENTHUB NAME>).build

val currTime = Instant.now
val ehConf = EventHubsConf(connectionString)
    .setConsumerGroup("<CONSUMER GRP>")
    .setStartingPosition(EventPosition
             .fromEnqueuedTime(currTime.minus(Duration.ofMinutes(30))))
    .setEndingPosition(EventPosition.fromEnqueuedTime(currTime))

val reader =  spark.read.format("eventhubs").options(ehConf.toMap).load()

var SIGNALS =  reader
    .select(get_json_object(($"body").cast("string"),"$.NUM").alias("NUM"),
            get_json_object(($"body").cast("string"),"$.SIG1").alias("SIG1"),
            get_json_object(($"body").cast("string"),"$.SIG2").alias("SIG2"),
            get_json_object(($"body").cast("string"),"$.SIG3").alias("SIG3"),
            get_json_object(($"body").cast("string"),"$.SIG4").alias("SIG4")
     )

val SIGNALSFiltered = SIGNALS.filter(col("SIG1").isNotNull &&
    col("SIG2").isNotNull && col("SIG3").isNotNull && col("SIG4").isNotNull)

The data obtained at SIGNALSFiltered is shown below.

+-----------------+--------------------+--------------------+--------------------+--------------------+
|              NUM|                SIG1|                SIG2|                SIG3|                SIG4|
+-----------------+--------------------+--------------------+--------------------+--------------------+
|XXXXX01|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|
|XXXXX02|[{"TIME":15695604780...|[{"TIME":15695604780...|[{"TIME":15695604780...|[{"TIME":15695604780...|
|XXXXX03|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|
|XXXXX04|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|
|XXXXX05|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|
|XXXXX06|[{"TIME":15695605340...|[{"TIME":15695605340...|[{"TIME":15695605340...|[{"TIME":15695605340...|
|XXXXX07|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|
|XXXXX08|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|

If we check entire data for a single row it will be as below.

|XXXXX01|[{"TIME":1569560531000,"VALUE":3.7825},{"TIME":1569560475000,"VALUE":3.7812},{"TIME":1569560483000,"VALUE":3.7812},{"TIME":1569560491000,"VALUE":34.7875}]|
    [{"TIME":1569560537000,"VALUE":3.7825},{"TIME":1569560481000,"VALUE":34.7825},{"TIME":1569560489000,"VALUE":34.7825},{"TIME":1569560497000,"VALUE":34.7825}]|
    [{"TIME":1569560505000,"VALUE":34.7825},{"TIME":1569560513000,"VALUE":34.7825},{"TIME":1569560521000,"VALUE":34.7825},{"TIME":1569560527000,"VALUE":34.7825}]|
    [{"TIME":1569560535000,"VALUE":34.7825},{"TIME":1569560479000,"VALUE":34.7825},{"TIME":1569560487000,"VALUE":34.7825}]

I want only the highest TIME pair from each column, not the entire TIME VALUE pairs. Output should be as shown below.

+-----------------+-----------------------------+---------------------------------------+---------------------------------------+----------------------------------------+
|              NUM|                         SIG1|                                   SIG2|                                   SIG3|                                    SIG4|
+-----------------+-----------------------------+---------------------------------------+---------------------------------------+----------------------------------------+
|XXXXX01|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":4.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":5.7825}]|
|XXXXX02|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":6.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":7.7825}]|
|XXXXX03|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":9.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":8.7825}]|

  1. How to Iterate through each column in each row and get the highest TIME-VALUE pair?
  2. After getting highest in each columns (SIG1,....SIG4) have to update only the value of TIME in all columns with highest among them.

  3. Is there Any way to convert the base dataset as below?. Each elements in a column should be converted to a new row.

+-----------------+-----------------------------+---------------------------------------+---------------------------------------+----------------------------------------+
|    NUM|                                   SIG1|                                   SIG2|                                   SIG3|                                    SIG4|
+-----------------+-----------------------------+---------------------------------------+---------------------------------------+----------------------------------------+
|XXXXX01|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|
|XXXXX01|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|
|XXXXX01|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|        null                           |[{"TIME":1569560531000,"VALUE":3.7825}]|
|XXXXX01|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|
|XXXXX02|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|
|XXXXX02|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|
|XXXXX02|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|
|XXXXX02|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|```

Any leads or help is appreciated! Thanks in Advance.

解决方案

You have to write one user defined function like below. which will loop your data and get Max Time Value. Note: UDF is just for reference, you can change it as per requirement

  1. How to Iterate through each column in each row and get the highest TIME-VALUE pair?

scala> import org.apache.spark.sql.expressions.{UserDefinedFunction} 

scala> def MaxTime:UserDefinedFunction = udf((json:String) => {
   val pars = JSON.parseFull(json)
   var output=""
   pars.foreach{ x => val y = x.asInstanceOf[List[Any]]
     var i = 1
     var TimeMap = scala.collection.mutable.Map[String, Long]()
     var ValueMap = scala.collection.mutable.Map[String, Double]()
     y.foreach{ zz => val z =  zz.asInstanceOf[Map[String,Double]]
       TimeMap(i.toString) =  z("TIME").toLong
       ValueMap(i.toString) =  z("VALUE")
       i = i + 1
     }
   output = """[{"TIME" : """ + TimeMap.maxBy(_._2)._2.toString + """ ,"VALUE": """ + ValueMap(TimeMap.maxBy(_._2)._1) + """}]"""
  } 
output})

scala> SIGNALSFiltered.withColumn("SIG1", MaxTime(col("SIG1")).withColumn("SIG2", MaxTime(col("SIG2")))).withColumn("SIG3", MaxTime(col("SIG3"))).withColumn("SIG4", MaxTime(col("SIG4"))).show(false)

  1. After getting highest in each columns (SIG1,....SIG4) have to update only the value of TIME in all columns with highest among them.

Write same UDF like above and pass complete row as a parameter. Then parse each column value into Map and get Maximum among all columns.

这篇关于遍历 Dataset 中具有键值对数组的列,并找出具有最大值的对的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆