如何在 Spark Streaming 应用程序中异步写入行以加快批处理执行速度? [英] How to write rows asynchronously in Spark Streaming application to speed up batch execution?

查看:27
本文介绍了如何在 Spark Streaming 应用程序中异步写入行以加快批处理执行速度?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 Spark 作业,我需要在每个微批次中编写 SQL 查询的输出.写入在性能上是一项代价高昂的操作,并且会导致批处理执行时间超过批处理间隔.

I have a spark job where I need to write the output of the SQL query every micro-batch. Write is a expensive operation perf wise and is causing the batch execution time to exceed the batch interval.

我正在寻找提高写入性能的方法.

I am looking for ways to improve the performance of write.

  1. 在单独的线程中异步执行写入操作(如下所示)是一个不错的选择吗?

  1. Is doing the write action in a separate thread asynchronously like shown below a good option?

这是否会导致任何副作用,因为 Spark 本身以分布式方式执行?

Would this cause any side effects because Spark itself executes in a distributed manner?

是否有其他/更好的方法来加快写入速度?

Are there other/better ways of speeding up the write?

// Create a fixed thread pool to execute asynchronous tasks
val executorService = Executors.newFixedThreadPool(2)
dstream.foreachRDD { rdd =>
  import org.apache.spark.sql._
  val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate
  import spark.implicits._
  import spark.sql

  val records = rdd.toDF("record")
  records.createOrReplaceTempView("records")
  val result = spark.sql("select * from records")

  // Submit a asynchronous task to write
  executorService.submit {
    new Runnable {
      override def run(): Unit = {
        result.write.parquet(output)
      }
    }
  }
}

推荐答案

1 - 在单独的线程中异步执行写入操作(如下所示)是一个不错的选择吗?

没有.理解这里问题的关键是问谁在写".写入由在集群中的执行程序上为您的作业分配的资源完成.将写入命令放置在异步线程池上就像将新的办公室经理添加到具有固定员工的办公室.考虑到必须共享相同的员工,两位经理能否比单独一位经理完成更多的工作?嗯,一个合理的答案是只有当第一任经理没有给他们足够的工作时,才会有一些空闲容量".

No. The key to understand the issue here is to ask 'who is doing the write'. The write is done by the resources allocated for your job on the executors in a cluster. Placing the write command on an async threadpool is like adding a new office manager to an office with a fixed staff. Will two managers be able to do more work than one alone given that they have to share the same staff? Well, one reasonable answer is "only if the first manager was not giving them enough work, so there's some free capacity".

回到我们的集群,我们正在处理一个 IO 很重的写操作.并行化写作业会导致对 IO 资源的争用,使每个独立作业的时间更长.最初,我们的工作可能看起来比单一经理版本"更好,但麻烦最终会降临到我们身上.我制作了一个图表,试图说明它是如何工作的.请注意,并行作业将根据它们在时间轴中并发的时间量成比例地花费更长的时间.

Going back to our cluster, we are dealing with a write operation that is heavy on IO. Parallelizing write jobs will lead to contention for IO resources, making each independent job longer. Initially, our job might look better than the 'single manager version', but trouble will eventually hit us. I've made a chart that attempts to illustrate how that works. Note that the parallel jobs will take longer proportionally to the amount of time that they are concurrent in the timeline.

一旦我们到达作业开始延迟的那一点,我们就有了一个最终会失败的不稳定作业.

Once we reach that point where jobs start getting delayed, we have an unstable job that will eventually fail.

2- 由于 Spark 本身以分布式方式执行,这会导致任何副作用吗?

我能想到的一些效果:

  • 可能更高的集群负载和 IO 争用.
  • 作业在线程池队列而不是 Spark 流队列上排队.我们失去了通过 Spark UI 和监控 API 监控我们的工作的能力,因为延迟是隐藏的",从 Spark Streaming 的角度来看一切都很好.

3- 是否有其他/更好的方法来加快写入速度?(从便宜到贵的顺序)

  • 如果您要附加到镶木地板文件,请经常创建一个新文件.随着时间的推移,追加变得昂贵.
  • 增加批处理间隔或使用窗口操作来编写更大的 Parquet 块.Parquet 喜欢大文件
  • 调整数据的分区和分布 => 确保 Spark 可以并行写入
  • 增加集群资源,必要时添加更多节点
  • 使用更快的存储空间

这篇关于如何在 Spark Streaming 应用程序中异步写入行以加快批处理执行速度?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆