Spark代码组织和最佳实践 [英] Spark code organization and best practices

查看:96
本文介绍了Spark代码组织和最佳实践的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

因此,在面向对象的世界中花了很多年,始终将代码重用,设计模式和最佳实践考虑在内,我发现自己在Spark世界中在代码组织和代码重用方面有些挣扎.

So, having spend many years in an object oriented world with code reuse, design patterns and best practices always taken into account, I find myself struggling somewhat with code organization and code reuse in world of Spark.

如果我尝试以可重用的方式编写代码,那么它几乎总是会带来性能成本,最终我会将其重写为适合我的特定用例的最佳方式.这个常量写出最适合该特定用例的内容"也会影响代码的组织,因为当它们全部真正属于一起"时,很难将代码拆分为不同的对象或模块,因此我最终只有很少的包含长代码的上帝"对象复杂的转换链.实际上,我经常认为,如果我回顾了当我在面向对象的世界中工作时所写的大部分Spark代码,我会退缩并将其视为意大利面条式代码".

If I try to write code in a reusable way, it nearly always comes with a performance cost and I end up rewriting it to whatever is optimal for my particular use case. This constant "write what is optimal for this particular use case" also affects code organization, because splitting code into different objects or modules is difficult when "it all really belongs together" and I thus end up with very few "God" object containing long chains of complex transformations. In fact, I frequently think that if I had taken a look at most of the Spark code I'm writing now back when I was working in the object oriented world, I would have winced and dismissed it as "spaghetti code".

我已经在互联网上冲浪,试图找到某种等同于面向对象世界的最佳实践的方法,但是运气不佳.我可以找到一些函数式编程的最佳实践",但是Spark只是增加了一层,因为性能是这里的主要因素.

I have surfed the internet trying to find some sort of equivalent to the best practices of the object oriented world, but without much luck. I can find some "best practices" for functional programming but Spark just adds an extra layer, because performance is such a major factor here.

所以我想问的是,您中的任何Spark专家都发现了一些您可以推荐的编写Spark代码的最佳实践吗?

So my question to you is, have any of you Spark gurus found some best practices for writing Spark code that you can recommend?

编辑

正如评论中所写,我实际上并不希望有人发布有关如何解决该问题的答案,但我希望这个社区中的某人遇到过某种马丁·福勒(Martin Fowler)类型,他曾在某处写过som文章或博客文章,介绍了如何解决Spark世界中的代码组织问题.

As written in a comment, I did not actually expect anyone to post an answer on how to solve this problem, but rather I was hoping that someone in this community had come across some Martin Fowler type, who had written som articles or blog posts somewhere on how to address problems with code organization in the world of Spark.

@DanielDarabos建议我举一个例子说明代码组织和性能冲突的情况.虽然我发现我在日常工作中经常遇到此问题,但我很难将其归结为一个很好的最小示例;),但我会尝试的.

@DanielDarabos suggested that I might put in an example of a situation where code organization and performance are conflicting. While I find that I frequently have issues with this in my everyday work, I find it a bit hard to boil it down to a good minimal example ;) but I will try.

在面向对象的世界中,我是单一责任原则"的忠实拥护者,因此,我将确保我的方法仅负责一件事.它使它们可重用且易于测试.因此,例如,如果我必须计算列表中某些数字的总和(与某些条件匹配),并且必须计算同一数字的平均值,那么我绝对可以创建两种方法-一种计算总和,而另一种则计算计算平均值.像这样:

In the object oriented world, I'm a big fan of the Single Responsibility Principle, so I would make sure that my methods were only responsible for one thing. It makes them reusable and easily testable. So if I had to, say, calculate the sum of some numbers in a list (matching some criteria) and I had to calculate the average of the same number, I would most definitely create two methods - one that calculated the sum and one that calculated the average. Like this:

def main(implicit args: Array[String]): Unit = {
  val list = List(("DK", 1.2), ("DK", 1.4), ("SE", 1.5))

  println("Summed weights for DK = " + summedWeights(list, "DK")
  println("Averaged weights for DK = " + averagedWeights(list, "DK")
}

def summedWeights(list: List, country: String): Double = {
  list.filter(_._1 == country).map(_._2).sum
}

def averagedWeights(list: List, country: String): Double = {
  val filteredByCountry = list.filter(_._1 == country) 
  filteredByCountry.map(_._2).sum/ filteredByCountry.length
}

我当然可以继续遵守Spark中的SRP:

I can of course continue to honor SRP in Spark:

def main(implicit args: Array[String]): Unit = {
  val df = List(("DK", 1.2), ("DK", 1.4), ("SE", 1.5)).toDF("country", "weight")

  println("Summed weights for DK = " + summedWeights(df, "DK")
  println("Averaged weights for DK = " + averagedWeights(df, "DK")
}


def avgWeights(df: DataFrame, country: String, sqlContext: SQLContext): Double = {
  import org.apache.spark.sql.functions._
  import sqlContext.implicits._

  val countrySpecific = df.filter('country === country)
  val summedWeight = countrySpecific.agg(avg('weight))

  summedWeight.first().getDouble(0)
}

def summedWeights(df: DataFrame, country: String, sqlContext: SQLContext): Double = {
  import org.apache.spark.sql.functions._
  import sqlContext.implicits._

  val countrySpecific = df.filter('country === country)
  val summedWeight = countrySpecific.agg(sum('weight))

  summedWeight.first().getDouble(0)
}

但是因为我的df可能包含数十亿行,所以我宁愿不必执行两次filter.实际上,性能与EMR成本直接相关,因此我真的不希望如此.为了克服它,我决定违反SRP并简单地将两个功能合而为一,并确保我对经过国家/地区过滤的DataFrame调用持久化,如下所示:

But because my df may contain billions of rows I would rather not have to perform the filter twice. In fact, performance is directly coupled to EMR cost, so I REALLY don't want that. To overcome it, I thus decide to violate SRP and simply put the two functions in one and make sure I call persist on the country-filtered DataFrame, like this:

def summedAndAveragedWeights(df: DataFrame, country: String, sqlContext: SQLContext): (Double, Double) = {
  import org.apache.spark.sql.functions._
  import sqlContext.implicits._

  val countrySpecific = df.filter('country === country).persist(StorageLevel.MEMORY_AND_DISK_SER)
  val summedWeights = countrySpecific.agg(sum('weight)).first().getDouble(0)
  val averagedWeights = summedWeights / countrySpecific.count()

  (summedWeights, averagedWeights)
}

现在,这个示例当然可以大大简化现实生活中遇到的事情.在这里,我可以简单地通过将df 移交给sum和avg函数(也将是更多的SRP)进行过滤和持久化来解决它,但是在现实生活中可能会有许多中间计算一次又一次地进行下去.换句话说,这里的filter函数仅是尝试制作一个简单示例,该示例将从持久化中受益.实际上,我认为对persist的调用是此处的关键字.调用persist将极大地加快我的工作,但是代价是我必须紧密耦合所有依赖于持久化DataFrame的代码-即使它们在逻辑上是分开的.

Now, this example if of course a huge simplification of what's encountered in real life. Here I could simply solve it by filtering and persisting df before handing it to the sum and avg functions (which would also be more SRP), but in real life there may be a number of intermediate calculations going on that are needed again and again. In other words, the filter function here is merely an attempt to make a simple example of something that will benefit from being persisted. In fact, I think calls to persist is a keyword here. Calling persist will vastly speed up my job, but the cost is that I have to tightly couple all code that depends on the persisted DataFrame - even if they are logically separate.

推荐答案

我认为您可以订阅youtube上的Apache Sparkdatabricks频道,收听更多内容,特别是其他人的经验教训.

I think you can subscribe Apache Spark, databricks channel on youtube, listen more and know more, especially for the experiences and lessons from others.

  • Apache Spark
  • databricks
  • Spark Technology Center

这里推荐一些视频:

  • SparkUI Visualization
  • slide SparkUI Visualization

生产中的火花:来自100多个生产用户的经验教训

slide生产中的火花:来自100多个生产用户的经验教训

slide Spark in Production: Lessons from 100+ Production Users

针对企业系统管理员的火花调整

针对企业系统管理员的火花调整

构建,调试和调整Spark机器学习管道-Joseph Bradley(Databricks)

slide构建,调试和调整Spark机器学习管道

slide Building, Debugging, and Tuning Spark Machine Learning Pipelines

编写Spark应用程序时的前5个错误

slide出现以下5个错误时编写Spark应用程序

slide Top 5 mistakes when writing Spark applications

调整和调试Apache Spark

slide调试Apache Spark

slide Tuning and Debugging Apache Spark

对Spark内部结构有更深入的了解-亚伦·戴维森(Databricks)

并且我已经发布并仍在我的github和博客上对其进行了更新:

and I've posted and still updating it on my github and blog:

  • github post
  • blog post

希望这可以帮助您〜

这篇关于Spark代码组织和最佳实践的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆