Scala Spark:通过数据集映射操作创建数据集列表 [英] Scala spark: Create List of Dataset from a Dataset map operation
问题描述
假设我要在转换另一个数据集后创建两种类型的metric:metricA或metricB.如果满足特定条件,则将同时生成metricA和B,如果不满足条件,则仅生成度量A.其想法是将2个度量写入2条不同的路径(pathA,pathB).
Suppose I want to create 2 types of metric : metricA or metricB after transforming another dataset. If a certain condition is met, it'll generate both metricA and B, if condition is not met, generate only metric A. The idea is to write the 2 metrics to 2 different paths (pathA, pathB).
我采用的方法是创建GeneralMetric的数据集,然后根据内部内容进行写入,写入不同的路径,但显然它不起作用,因为数据集内的模式匹配不起作用
The approach I took was to create a Dataset of GeneralMetric and then based on whats inside, write to different paths, but obviously it didn't work as pattern matching inside Dataset wouldn't work
val s: SparkSession = SparkSession
.builder()
.appName("Metric")
.getOrCreate()
import s.implicits._
case class original (id : Int, units: List[Double])
case class MetricA (a: Int, b: Int, filtered_unit: List[Double])
case class MetricB (a: Int, filtered_unit: List[Double])
case class GeneralMetric(metricA: MetricA, metricB: Option[MetricB])
def createA: MetricA = {
MetricA(1, 1, List(1.0, 2.0)
}
def createB: MetricB = {
MetricB(1, List(10.0, 20.0)
}
def create (isBoth: Boolean): GeneralMetric = {
if(isBoth) {
val a: MetricA = createA()
val b: MetricB = createB()
GeneralMetric(a, Some(b))
}
else {
val a: MetricA = createA()
GeneralMetric(a, None)
}
}
val originalDF: DataFrame
val result : Dataset[GeneralMetric] =
originalDF.as[original]
.map { r =>
if(r.id == 21) create(true)
else create(false)
}
val pathA: String = "s3://pathA"
val pathB: String = "s3://pathB"
//below code obviously wouldn't work
result.map(x => {
case (metricA, Some(metricB)) => {
metricA.write.parquet(pathA)
metricB.write.parquet(pathB)
}
case (metricA, None) => metricA.write.parquet(pathA)
})
我想到的下一种方法是将结果放入List [GeneralMetric]中,其中GeneralMetric是由MetricA和MetricB扩展的密封路径,但是如何创建数据集转换返回GeneralMetric列表.
The next approach I was thinking of, was putting the results in a List[GeneralMetric], where GeneralMetric is a sealed trail, extended by both MetricA and MetricB, but how can I make a dataset transformation return a list of GeneralMetric.
任何想法都会有所帮助
推荐答案
为什么不
result.map({
case (metricA, Some(metricB)) =>
metricA.write.parquet(pathA)
metricB.write.parquet(pathB)
case (metricA, None) => metricA.write.parquet(pathA)
})
在您的情况下工作?这只是语法问题吗?
work in your case? Is this just a syntax problem?
也:看来您是独立发送指标的(至少在此示例中如此).您可以将其建模为:
Also: it seems that you send metrics independently (or at least in this example). You could model it as:
sealed trait Metric {
def write
}
case class MetricA (a: Int, b: Int, filtered_unit: List[Double]) extends Metric {
override def write: Unit = ???
}
case class MetricB (a: Int, filtered_unit: List[Double]) extends Metric {
override def write: Unit = ???
}
并致电
implicit val enc: Encoder[Metric] = Encoders.kryo[Metric]
val result: Dataset[Metric] =
originalDF.as[original]
.flatMap { r =>
if (r.id == 21) createA :: createB :: Nil
else createA :: Nil
}
result.foreach(metric.write.parquet())
这篇关于Scala Spark:通过数据集映射操作创建数据集列表的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!