Azure Databricks Scala:如何按照各自的层次结构替换行 [英] Azure Databricks Scala : How to replace rows following a respective hirarchy

查看:29
本文介绍了Azure Databricks Scala:如何按照各自的层次结构替换行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

记住以下数据集:

我想获得

如您所见,基本上的想法是遵循列 ACTUAL_ID 指示的路径,直到它为空(如果尚未为空)

我尝试在传递完整初始数据帧的地方使用 udf,并且递归地会找到我想要的内容,但似乎无法将数据帧传递给 UDF.我也考虑过替换一行的值,但似乎不可能.

我最近的尝试:

def calculateLatestImdate(df: DataFrame, lookupId: String) : String = {var foundId = df.filter($"ID" === lookupId).select($"ACTUAL_ID").first.getAs[String]("ID");if (foundId == "" || foundId == null){查找 ID}别的{计算最新日期(df,foundId);}}val calculateLatestImdateUdf = udf((df:DataFrame, s:String) => {计算最新日期(df,s)})val df = sc.parallelize(Seq((1", ", A"), (2", 3", B"), (3", ";6", C"), (4", 5", D"), (5", ", E"), (6","", "F"))).toDF("ID","ACTUAL_ID", "DATA")val finalDf = df.withColumn("FINAL_ID", when(isEmpty($"ACTUAL_ID"), $"ID").otherwise(calculateLatestImdateUdf(df, $"ACTUAL_ID")))

解决方案

这对我来说有点像图形问题,所以我使用 Scala 和图形框架找到了答案.它利用了

如果您的数据已经在一个数据帧中,只需一个 select 和一个 where 过滤器,例如

,就可以轻松地从原始数据帧中生成边缘数据帧

//从数据帧创建GraphFrameval v2 = dfval e2 = df.select("id", "actual_id").withColumn(rel", lit(被链接到")).where(actual_id > 0").toDF("src", "dst", "rel")val g2 = GraphFrame(v2, e2)打印(g2)g2.vertices.show()g2.edges.show()

Having in mind the following dataset:

I would like to obtain

As you can see, basically the idea is to follow the path indicated by column ACTUAL_ID until it is null (if it wasn't already)

I tried to use a udf where I was passing the full initial Dataframe and the recursively would find what I want but it seems it is not possible to pass Dataframes to UDFs. I also looked into replacing a value of a row, but it seems that is not possible.

My latest attempt:

def calculateLatestImdate(df: DataFrame, lookupId: String) : String = {
  var foundId = df.filter($"ID" === lookupId).select($"ACTUAL_ID").first.getAs[String]("ID");
  if (foundId == "" || foundId == null)
  {
    lookupId
  }
  else
  {
    calculateLatestImdate(df, foundId);
  }
}

val calculateLatestImdateUdf = udf((df:DataFrame, s:String) => {
  calculateLatestImdate(df,s)
})

val df = sc.parallelize(Seq(("1", "", "A"), ("2", "3", "B"), ("3", "6", "C"), ("4", "5", "D"), ("5", "", "E"), ("6", "", "F"))).toDF("ID","ACTUAL_ID", "DATA")

val finalDf = df.withColumn("FINAL_ID", when(isEmpty($"ACTUAL_ID"), $"ID").otherwise(calculateLatestImdateUdf(df, $"ACTUAL_ID")))

解决方案

This looked a bit like a graph problem to me so I worked up an answer using Scala and graphframes. It makes use of the connectedComponents algorithm and the outDegrees method of the graphframe. I've made an assumption that the end of each tree is unique as per your sample data but this assumption needs to be checked. I'd be interested to see what the performance is like with more data, but let me know what you think of the solution.

The complete script:

// NB graphframes had to be installed separately with the right Scala version 
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.graphframes._


// Create the test data

// Vertices dataframe
val v2 = sqlContext.createDataFrame(List(
  ( 1, 0, "A" ), ( 2, 3, "B" ), ( 3, 6, "C" ),
    ( 4, 5, "D" ), ( 5, 0, "E" ), ( 6, 0, "F" )
)).toDF("id", "actual_id", "data")

// Edge dataframe
val e2 = sqlContext.createDataFrame(List(
  (2, 3, "is linked to"),
  (3, 6, "is linked to"),
  (4, 5, "is linked to")
)).toDF("src", "dst", "relationship")


// Create the graph frame
val g2 = GraphFrame(v2, e2)
print(g2)


// The connected components adds a component id to each 'group'
sc.setCheckpointDir("/tmp/graphframes-example-connected-components")

val components = g2.connectedComponents.run() // doesn't work on Spark 1.4
display(components)




// "end" of tree nodes have no outDegree, so add that in to the component df
val endOfTree = components.join(g2.outDegrees, Seq("id"), "left")
  .select("component", "data")
  .where("outDegree is null")

endOfTree.show()


components.as("c").join(endOfTree.as("t"), $"c.component" === $"t.component")
  .select($"c.id", $"c.component", $"t.data")
  .orderBy("id")
  .show()

My results:

If your data is already in a dataframe, it's easy to generate the edges dataframe from your original with just a select and a where filter, eg

// Create the GraphFrame from the dataframe
val v2 = df

val e2 = df
  .select("id", "actual_id")
  .withColumn("rel", lit("is linked to"))
  .where("actual_id > 0")
  .toDF("src", "dst", "rel")

val g2 = GraphFrame(v2, e2)
print(g2)

g2.vertices.show()
g2.edges.show()

这篇关于Azure Databricks Scala:如何按照各自的层次结构替换行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆