如何在Spark Streaming应用程序中异步写入行以加快批处理执行速度? [英] How to write rows asynchronously in Spark Streaming application to speed up batch execution?

查看:456
本文介绍了如何在Spark Streaming应用程序中异步写入行以加快批处理执行速度?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个火花工作,我需要在每个微批处理中编写SQL查询的输出.写操作是一项非常昂贵的操作,并且会导致批处理执行时间超过批处理间隔.

I have a spark job where I need to write the output of the SQL query every micro-batch. Write is a expensive operation perf wise and is causing the batch execution time to exceed the batch interval.

我正在寻找提高写入性能的方法.

I am looking for ways to improve the performance of write.

  1. 是否正在像下面所示的那样在一个单独的线程中异步执行写操作?

  1. Is doing the write action in a separate thread asynchronously like shown below a good option?

这会导致任何副作用,因为Spark本身以分布式方式执行?

Would this cause any side effects because Spark itself executes in a distributed manner?

还有其他/更好的方法来加快写入速度吗?

Are there other/better ways of speeding up the write?

// Create a fixed thread pool to execute asynchronous tasks
val executorService = Executors.newFixedThreadPool(2)
dstream.foreachRDD { rdd =>
  import org.apache.spark.sql._
  val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate
  import spark.implicits._
  import spark.sql

  val records = rdd.toDF("record")
  records.createOrReplaceTempView("records")
  val result = spark.sql("select * from records")

  // Submit a asynchronous task to write
  executorService.submit {
    new Runnable {
      override def run(): Unit = {
        result.write.parquet(output)
      }
    }
  }
}

推荐答案

1-是否在单独的线程中异步执行写操作(如下面所示)是一个好选择?

不.在这里理解问题的关键是问谁在写".写入是通过为群集中的执行程序上的工作分配的资源来完成的.将write命令放在异步线程池上,就像将新的Office经理添加到具有固定人员的办公室一样.如果两个经理必须共享同一名员工,那么他们将能够完成比单独一个人更多的工作吗?好吧,一个合理的答案是只有第一任经理没有给他们足够的工作,所以就有一定的自由度".

No. The key to understand the issue here is to ask 'who is doing the write'. The write is done by the resources allocated for your job on the executors in a cluster. Placing the write command on an async threadpool is like adding a new office manager to an office with a fixed staff. Will two managers be able to do more work than one alone given that they have to share the same staff? Well, one reasonable answer is "only if the first manager was not giving them enough work, so there's some free capacity".

回到我们的集群,我们正在处理大量IO的写操作.并行写入作业将导致争用IO资源,从而使每个独立作业都更长.最初,我们的工作可能看起来比单经理版"更好,但是麻烦最终将打击我们. 我制作了一张图表,试图说明它是如何工作的.请注意,并行作业所花费的时间与时间轴上并发的时间成比例.

Going back to our cluster, we are dealing with a write operation that is heavy on IO. Parallelizing write jobs will lead to contention for IO resources, making each independent job longer. Initially, our job might look better than the 'single manager version', but trouble will eventually hit us. I've made a chart that attempts to illustrate how that works. Note that the parallel jobs will take longer proportionally to the amount of time that they are concurrent in the timeline.

一旦达到工作开始延迟的位置,我们就会有不稳定的工作,最终将失败.

Once we reach that point where jobs start getting delayed, we have an unstable job that will eventually fail.

2-因为Spark本身以分布式方式执行,这会引起任何副作用吗?

我能想到的一些效果:

  • 集群负载和IO竞争可能更高.
  • 作业正在线程池队列上排队,而不是在Spark Streaming队列上排队.由于延迟是隐藏的",并且从Spark Streaming的角度来看一切都很好,因此我们无法通过Spark UI和监视API来监视工作.

3-还有其他/更好的方法来加快写入速度吗? (从便宜到昂贵订购)

3- Are there other/better ways of speeding up the write? (ordered from cheap to expensive)

  • 如果要附加到镶木地板文件,请经常创建一个新文件.随着时间的流逝,花费越来越昂贵.
  • 增加您的批处理间隔或使用Window操作写入更大的Parquet块.实木复合地板喜欢大文件
  • 调整数据的分区和分布=>确保Spark可以并行执行写操作
  • 增加群集资源,必要时添加更多节点
  • 使用更快的存储空间

这篇关于如何在Spark Streaming应用程序中异步写入行以加快批处理执行速度?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆