遍历Dataset中具有键值对数组的列,并找出具有最大值的对 [英] Iterate through a column in Dataset which have array of key value pairs and find out a pair with max value

查看:78
本文介绍了遍历Dataset中具有键值对数组的列,并找出具有最大值的对的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个数据帧中的数据,该数据帧是从azure eventhub获得的.然后,我将这些数据转换为json对象,并将所需的数据存储到数据集中,如下所示.

I have data in a dataframe , which was obtained from azure eventhub. Then I convert this data to json object and stored the required data into a dataset as shown below.

用于从eventhub获取数据并将其存储到数据帧中的代码.

Code for obtaining data from eventhub and store it into a dataframe.

val connectionString = ConnectionStringBuilder(<ENDPOINT URL>)
    .setEventHubName(<EVENTHUB NAME>).build

val currTime = Instant.now
val ehConf = EventHubsConf(connectionString)
    .setConsumerGroup("<CONSUMER GRP>")
    .setStartingPosition(EventPosition
             .fromEnqueuedTime(currTime.minus(Duration.ofMinutes(30))))
    .setEndingPosition(EventPosition.fromEnqueuedTime(currTime))

val reader =  spark.read.format("eventhubs").options(ehConf.toMap).load()

var SIGNALS =  reader
    .select(get_json_object(($"body").cast("string"),"$.NUM").alias("NUM"),
            get_json_object(($"body").cast("string"),"$.SIG1").alias("SIG1"),
            get_json_object(($"body").cast("string"),"$.SIG2").alias("SIG2"),
            get_json_object(($"body").cast("string"),"$.SIG3").alias("SIG3"),
            get_json_object(($"body").cast("string"),"$.SIG4").alias("SIG4")
     )

val SIGNALSFiltered = SIGNALS.filter(col("SIG1").isNotNull &&
    col("SIG2").isNotNull && col("SIG3").isNotNull && col("SIG4").isNotNull)

SIGNALSFiltered 处获得的数据如下所示.

The data obtained at SIGNALSFiltered is shown below.

+-----------------+--------------------+--------------------+--------------------+--------------------+
|              NUM|                SIG1|                SIG2|                SIG3|                SIG4|
+-----------------+--------------------+--------------------+--------------------+--------------------+
|XXXXX01|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|
|XXXXX02|[{"TIME":15695604780...|[{"TIME":15695604780...|[{"TIME":15695604780...|[{"TIME":15695604780...|
|XXXXX03|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|
|XXXXX04|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|
|XXXXX05|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|
|XXXXX06|[{"TIME":15695605340...|[{"TIME":15695605340...|[{"TIME":15695605340...|[{"TIME":15695605340...|
|XXXXX07|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|
|XXXXX08|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|

如果我们检查单个行的全部数据,将如下所示.

If we check entire data for a single row it will be as below.

|XXXXX01|[{"TIME":1569560531000,"VALUE":3.7825},{"TIME":1569560475000,"VALUE":3.7812},{"TIME":1569560483000,"VALUE":3.7812},{"TIME":1569560491000,"VALUE":34.7875}]|
    [{"TIME":1569560537000,"VALUE":3.7825},{"TIME":1569560481000,"VALUE":34.7825},{"TIME":1569560489000,"VALUE":34.7825},{"TIME":1569560497000,"VALUE":34.7825}]|
    [{"TIME":1569560505000,"VALUE":34.7825},{"TIME":1569560513000,"VALUE":34.7825},{"TIME":1569560521000,"VALUE":34.7825},{"TIME":1569560527000,"VALUE":34.7825}]|
    [{"TIME":1569560535000,"VALUE":34.7825},{"TIME":1569560479000,"VALUE":34.7825},{"TIME":1569560487000,"VALUE":34.7825}]

我只希望每列中有最高的TIME对,而不是整个TIME VALUE对.输出应如下所示.

I want only the highest TIME pair from each column, not the entire TIME VALUE pairs. Output should be as shown below.

+-----------------+-----------------------------+---------------------------------------+---------------------------------------+----------------------------------------+
|              NUM|                         SIG1|                                   SIG2|                                   SIG3|                                    SIG4|
+-----------------+-----------------------------+---------------------------------------+---------------------------------------+----------------------------------------+
|XXXXX01|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":4.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":5.7825}]|
|XXXXX02|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":6.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":7.7825}]|
|XXXXX03|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":9.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":8.7825}]|

  1. 如何遍历每一行的每一列并获得最高的TIME-VALUE对?
  2. 在每列(SIG1,.... SIG4)中获得最高值之后,只需更新其中所有列中最高的列的TIME值即可.

  1. How to Iterate through each column in each row and get the highest TIME-VALUE pair?
  2. After getting highest in each columns (SIG1,....SIG4) have to update only the value of TIME in all columns with highest among them.

有什么方法可以如下转换基本数据集?列中的每个元素都应转换为新行.

Is there Any way to convert the base dataset as below?. Each elements in a column should be converted to a new row.

+-----------------+-----------------------------+---------------------------------------+---------------------------------------+----------------------------------------+
|    NUM|                                   SIG1|                                   SIG2|                                   SIG3|                                    SIG4|
+-----------------+-----------------------------+---------------------------------------+---------------------------------------+----------------------------------------+
|XXXXX01|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|
|XXXXX01|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|
|XXXXX01|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|        null                           |[{"TIME":1569560531000,"VALUE":3.7825}]|
|XXXXX01|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|
|XXXXX02|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|
|XXXXX02|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|
|XXXXX02|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|
|XXXXX02|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|```

Any leads or help is appreciated! Thanks in Advance.

推荐答案

您必须编写一个用户定义的函数,如下所示.这将循环您的数据并获得最大时间值".注意:UDF仅供参考,您可以根据要求进行更改

You have to write one user defined function like below. which will loop your data and get Max Time Value. Note: UDF is just for reference, you can change it as per requirement

  1. 如何遍历每一行的每一列并获得最高的TIME-VALUE对?

scala> import org.apache.spark.sql.expressions.{UserDefinedFunction} 

scala> def MaxTime:UserDefinedFunction = udf((json:String) => {
   val pars = JSON.parseFull(json)
   var output=""
   pars.foreach{ x => val y = x.asInstanceOf[List[Any]]
     var i = 1
     var TimeMap = scala.collection.mutable.Map[String, Long]()
     var ValueMap = scala.collection.mutable.Map[String, Double]()
     y.foreach{ zz => val z =  zz.asInstanceOf[Map[String,Double]]
       TimeMap(i.toString) =  z("TIME").toLong
       ValueMap(i.toString) =  z("VALUE")
       i = i + 1
     }
   output = """[{"TIME" : """ + TimeMap.maxBy(_._2)._2.toString + """ ,"VALUE": """ + ValueMap(TimeMap.maxBy(_._2)._1) + """}]"""
  } 
output})

scala> SIGNALSFiltered.withColumn("SIG1", MaxTime(col("SIG1")).withColumn("SIG2", MaxTime(col("SIG2")))).withColumn("SIG3", MaxTime(col("SIG3"))).withColumn("SIG4", MaxTime(col("SIG4"))).show(false)

  1. 在每一列(SIG1,.... SIG4)达到最高值之后,只需更新其中所有列中最高的列的TIME值即可.

编写与上述相同的UDF,并将完整的行作为参数传递.然后将每个列的值解析到Map中,并在所有列中获得最大值.

Write same UDF like above and pass complete row as a parameter. Then parse each column value into Map and get Maximum among all columns.

这篇关于遍历Dataset中具有键值对数组的列,并找出具有最大值的对的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆