Scala Spark:通过数据集映射操作创建数据集列表 [英] Scala spark: Create List of Dataset from a Dataset map operation

查看:72
本文介绍了Scala Spark:通过数据集映射操作创建数据集列表的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设我要在转换另一个数据集后创建两种类型的metric:metricA或metricB.如果满足特定条件,则将同时生成metricA和B,如果不满足条件,则仅生成度量A.其想法是将2个度量写入2条不同的路径(pathA,pathB).

Suppose I want to create 2 types of metric : metricA or metricB after transforming another dataset. If a certain condition is met, it'll generate both metricA and B, if condition is not met, generate only metric A. The idea is to write the 2 metrics to 2 different paths (pathA, pathB).

我采用的方法是创建GeneralMetric的数据集,然后根据内部内容进行写入,写入不同的路径,但显然它不起作用,因为数据集内的模式匹配不起作用

The approach I took was to create a Dataset of GeneralMetric and then based on whats inside, write to different paths, but obviously it didn't work as pattern matching inside Dataset wouldn't work

val s: SparkSession = SparkSession
    .builder()
    .appName("Metric")
    .getOrCreate()
import s.implicits._

case class original (id : Int, units: List[Double])

case class MetricA (a: Int, b: Int, filtered_unit: List[Double])
case class MetricB (a: Int, filtered_unit: List[Double])
case class GeneralMetric(metricA: MetricA, metricB: Option[MetricB])

def createA: MetricA = {
    MetricA(1, 1, List(1.0, 2.0)
}

def createB: MetricB = {
    MetricB(1, List(10.0, 20.0)
}
def create (isBoth: Boolean): GeneralMetric = {
    if(isBoth) {
       val a: MetricA = createA()
       val b: MetricB = createB()
       GeneralMetric(a, Some(b))
    }
    else {
       val a: MetricA = createA()
       GeneralMetric(a, None)
    }
}

val originalDF: DataFrame

val result : Dataset[GeneralMetric] =
                 originalDF.as[original]
                 .map { r =>
                      if(r.id == 21) create(true)
                      else create(false)
                 }

val pathA: String = "s3://pathA"
val pathB: String = "s3://pathB"

//below code obviously wouldn't work
result.map(x => {
    case (metricA, Some(metricB)) => {
      metricA.write.parquet(pathA)
      metricB.write.parquet(pathB)
    }
    case (metricA, None) => metricA.write.parquet(pathA)

  })

我想到的下一种方法是将结果放入List [GeneralMetric]中,其中GeneralMetric是由MetricA和MetricB扩展的密封路径,但是如何创建数据集转换返回GeneralMetric列表.

The next approach I was thinking of, was putting the results in a List[GeneralMetric], where GeneralMetric is a sealed trail, extended by both MetricA and MetricB, but how can I make a dataset transformation return a list of GeneralMetric.

任何想法都会有所帮助

推荐答案

为什么不

result.map({
    case (metricA, Some(metricB)) =>
      metricA.write.parquet(pathA)
      metricB.write.parquet(pathB)
    case (metricA, None) => metricA.write.parquet(pathA)

  })

在您的情况下工作?这只是语法问题吗?

work in your case? Is this just a syntax problem?

也:看来您是独立发送指标的(至少在此示例中如此).您可以将其建模为:

Also: it seems that you send metrics independently (or at least in this example). You could model it as:

sealed trait Metric {
  def write
}
case class MetricA (a: Int, b: Int, filtered_unit: List[Double]) extends Metric {
  override def write: Unit = ???
}
case class MetricB (a: Int, filtered_unit: List[Double]) extends Metric {
  override def write: Unit = ???
}

并致电

implicit val enc: Encoder[Metric] = Encoders.kryo[Metric]
val result: Dataset[Metric] =
    originalDF.as[original]
      .flatMap { r =>
        if (r.id == 21) createA :: createB :: Nil
        else createA :: Nil
      }
result.foreach(metric.write.parquet())

这篇关于Scala Spark:通过数据集映射操作创建数据集列表的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆