Scala 与 Python 的 Spark 性能 [英] Spark performance for Scala vs Python

查看:26
本文介绍了Scala 与 Python 的 Spark 性能的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

相比 Scala,我更喜欢 Python.但是,由于 Spark 本身是用 Scala 编写的,因此出于显而易见的原因,我希望我的代码在 Scala 中运行得比 Python 版本更快.

有了这个假设,我想学习 &为一些 1GB 的数据编写一些非常常见的预处理代码的 Scala 版本.数据来自

Python 性能第 0 阶段(11 分钟),第 1 阶段(7 秒)

两者都产生了不同的 DAG 可视化图(由于这两张图显示了 Scala (map) 和 Python (reduceByKey) 的不同阶段 0 函数)

但是,基本上两个代码都试图将数据转换为(dimension_id,值列表字符串)RDD 并保存到磁盘.输出将用于计算每个维度的各种统计数据.

在性能方面,像这样的真实数据的 Scala 代码似乎比 Python 版本运行慢 4 倍.对我来说好消息是它给了我继续使用 Python 的良好动力.坏消息是我不太明白为什么?

解决方案


讨论代码的原始答​​案可以在下面找到.

<小时>

首先,您必须区分不同类型的 API,每种 API 都有自己的性能考虑.

RDD API

(基于 JVM 编排的纯 Python 结构)

这是受 Python 代码性能和 PySpark 实现细节影响最大的组件.尽管 Python 性能不太可能成为问题,但您至少需要考虑以下几个因素:

  • JVM 通信的开销.实际上,所有进出 Python 执行器的数据都必须通过套接字和 JVM 工作器传递.虽然这是一种相对高效的本地通信,但它仍然不是免费的.
  • 基于进程的执行程序 (Python) 与基于线程的(单 JVM 多线程)执行程序 (Scala).每个 Python 执行器都在自己的进程中运行.作为一个副作用,它提供了比 JVM 更强的隔离和对执行程序生命周期的一些控制,但可能会显着提高内存使用量:

    • 解释器内存占用
    • 已加载库的占用空间
    • 广播效率较低(每个进程都需要自己的广播副本)
  • Python 代码本身的性能.一般来说,Scala 比 Python 快,但它会因任务而异.此外,您有多种选择,包括 JIT,如 Numba、C 扩展(Cython) 或专门的库,如 Theano.最后,如果您不使用 ML/MLlib(或简单的 NumPy 堆栈),请考虑使用 PyPya> 作为替代翻译.请参阅 SPARK-3094.

  • PySpark 配置提供了 spark.python.worker.reuse 选项,可用于在为每个任务分叉 Python 进程和重用现有进程之间进行选择.后一个选项似乎有助于避免昂贵的垃圾收集(它更像是一种印象而不是系统测试的结果),而前一个(默认)最适合用于昂贵的广播和导入.
  • 引用计数在 CPython 中用作第一行垃圾回收方法,适用于典型的 Spark 工作负载(类似流的处理,无引用周期)并降低了长时间 GC 暂停的风险.

MLlib

(混合 Python 和 JVM 执行)

基本注意事项与以前几乎相同,但有一些其他问题.虽然与 MLlib 一起使用的基本结构是普通的 Python RDD 对象,但所有算法都直接使用 Scala 执行.

这意味着将 Python 对象转换为 Scala 对象会产生额外成本,反之亦然,增加内存使用量以及我们稍后将介绍的一些额外限制.

截至目前(Spark 2.x),基于 RDD 的 API 处于维护模式并且是 计划在 Spark 3.0 中移除.

DataFrame API 和 Spark ML

(JVM 执行,Python 代码仅限于驱动程序)

这些可能是标准数据处理任务的最佳选择.由于 Python 代码大多仅限于驱动程序上的高级逻辑操作,因此 Python 和 Scala 之间应该没有性能差异.

一个例外是使用行式 Python UDF,其效率明显低于 Scala 等效项.虽然有一些改进的机会(Spark 2.0.0 有了实质性的发展),但最大的限制是内部表示 (JVM) 和 Python 解释器之间的完整往返.如果可能,您应该支持内置表达式的组合(示例.Spark 2.0 中改进了 Python UDF 行为.0,但与本机执行相比,它仍然是次优的.

随着 矢量化 UDF(SPARK-21190 和进一步扩展),它使用 Arrow Streaming 进行零拷贝反序列化的高效数据交换.对于大多数应用程序,它们的次要开销可以忽略不计.

还要确保避免在 DataFramesRDDs 之间传递不必要的数据.这需要昂贵的序列化和反序列化,更不用说与 Python 解释器之间的数据传输了.

值得注意的是,Py4J 调用具有相当高的延迟.这包括简单的调用,例如:

from pyspark.sql.functions import colcol("foo")

通常,这无关紧要(开销是恒定的,不取决于数据量),但在软实时应用程序的情况下,您可以考虑缓存/重用 Java 包装器.

GraphX 和 Spark 数据集

至于现在(Spark 1.6 2.1)都没有提供 PySpark API,所以你可以说 PySpark 比 Scala 差无限.

图X

在实践中,GraphX 开发几乎完全停止,项目目前处于维护模式,相关 JIRA 票已关闭,因为无法修复.GraphFrames 库提供了一个带有 Python 绑定的替代图形处理库.

数据集

从主观上讲,Python 中静态类型的 Datasets 没有太多位置,即使有当前的 Scala 实现也太简单了,无法提供与 DataFrame<相同的性能优势/代码>.

流媒体

就我目前所见,我强烈建议使用 Scala 而不是 Python.如果 PySpark 获得对结构化流的支持,未来可能会发生变化,但现在 Scala API 似乎更加健壮、全面和高效.我的经验非常有限.

Spark 2.x 中的结构化流似乎缩小了语言之间的差距,但目前它仍处于早期阶段.尽管如此,基于 RDD 的 API 已经在 Databricks 文档中被称为传统流"(访问日期 2017-03-03))因此有理由期待进一步的统一努力.

非性能注意事项

功能奇偶校验

并非所有 Spark 功能都通过 PySpark API 公开.请务必检查您需要的部分是否已经实现,并尝试了解可能的限制.

当您使用 MLlib 和类似的混合上下文时尤其重要(请参阅从任务中调用 Java/Scala 函数).公平地说,PySpark API 的某些部分,例如 mllib.linalg,提供了比 Scala 更全面的方法集.

API设计

PySpark API 密切反映了它的 Scala 对应物,因此不完全是 Pythonic.这意味着在语言之间进行映射非常容易,但同时 Python 代码可能更难理解.

复杂的架构

与纯 JVM 执行相比,PySpark 数据流相对复杂.推理 PySpark 程序或调试要困难得多.此外,至少必须对 Scala 和 JVM 有基本的了解.

Spark 2.x 及更高版本

不断转向 Dataset API,冻结 RDD API 为 Python 用户带来了机遇和挑战.虽然 API 的高级部分在 Python 中更容易公开,但更高级的功能几乎不可能直接使用.

此外,本机 Python 函数仍然是 SQL 世界中的二等公民.希望这将在未来通过 Apache Arrow 序列化得到改善(当前努力目标数据collection 但 UDF serde 是一个长期目标).

对于强烈依赖 Python 代码库的项目,纯 Python 替代品(如 DaskRay) 可能是一个有趣的选择.

不一定非要一对一

Spark DataFrame(SQL、Dataset)API 提供了一种在 PySpark 应用程序中集成 Scala/Java 代码的优雅方式.您可以使用 DataFrames 向本机 JVM 代码公开数据并读回结果.我已经解释了一些选项其他地方,您可以在 如何在 Pyspark 中使用 Scala 类.

它可以通过引入用户定义类型来进一步增强(请参阅如何在 Spark SQL 中为自定义类型定义架构?).

<小时>

问题中提供的代码有什么问题

(免责声明:Pythonista 的观点.很可能我错过了一些 Scala 技巧)

首先,您的代码中有一个部分根本没有意义.如果您已经使用 zipWithIndexenumerate 创建了 (key, value) 对,那么创建字符串只是为了在之后立即拆分它有什么意义?flatMap 不能递归工作,因此您可以简单地生成元组并跳过任何 map 后面的内容.

我发现有问题的另一部分是 reduceByKey.一般来说,reduceByKey 是有用的,如果应用聚合函数可以减少必须打乱的数据量.由于您只是连接字符串,因此这里没有任何好处.忽略低级的东西,比如引用的数量,你必须传输的数据量与 groupByKey 完全相同.

通常我不会详述这一点,但据我所知,这是您的 Scala 代码中的瓶颈.在 JVM 上连接字符串是一项相当昂贵的操作(参见例如:Scala 中的字符串连接是否和 Java 中的一样昂贵?).这意味着类似这样的 _.reduceByKey((v1: String, v2: String) => v1 + ',' + v2) 相当于 input4.reduceByKey(valsConcat) 在您的代码中不是一个好主意.

如果你想避免 groupByKey,你可以尝试使用 aggregateByKeyStringBuilder.类似的东西应该可以解决问题:

rdd.aggregateByKey(new StringBuilder)((acc, e) =>{if(!acc.isEmpty) acc.append(",").append(e)否则 acc.append(e)},(acc1, acc2) =>{if(acc1.isEmpty | acc2.isEmpty) acc1.addString(acc2)else acc1.append(",").addString(acc2)})

但我怀疑这是否值得大惊小怪.

牢记上述内容,我已将您的代码重写如下:

Scala:

val input = sc.textFile("train.csv", 6).mapPartitionsWithIndex{(idx, iter) =>if (idx == 0) iter.drop(1) else iter}val 对 = input.flatMap(line => line.split(",").zipWithIndex.map{case ("true", i) =>(一,1")case ("false", i) =>(我,0")情况p=>p.swap})val 结果 =pairs.groupByKey.map{case (k, vals) =>{val valsString = vals.mkString(",")s"$k,$valsString"}}result.saveAsTextFile("scalaout")

Python:

def drop_first_line(index, itr):如果索引 == 0:返回迭代器(列表(itr)[1:])别的:返回它def separate_cols(line):line = line.replace('true', '1').replace('false', '0')vals = line.split(',')对于 enumerate(vals) 中的 (i, x):产量 (i, x)输入 = (sc.textFile('train.csv', minPartitions=6).mapPartitionsWithIndex(drop_first_line))对 = input.flatMap(separate_cols)结果 =(对.groupByKey().map(lambda kv: "{0},{1}".format(kv[0], ",".join(kv[1]))))result.saveAsTextFile("pythonout")

结果

local[6] 模式(Intel(R) Xeon(R) CPU E3-1245 V2 @ 3.40GHz)下,每个执行器需要 4GB 内存(n = 3):

  • Scala - 平均:250.00 秒,标准差:12.49
  • Python - 平均值:246.66s,标准差:1.15

我很确定大部分时间都花在了改组、序列化、反序列化和其他次要任务上.只是为了好玩,这里是 Python 中的简单单线程代码,可以在不到一分钟的时间内在这台机器上执行相同的任务:

def go():使用 open("train.csv") 作为 fr:行 = [line.replace('true', '1').replace('false', '0').split(",")对于 fr 行]返回 zip(*lines[1:])

I prefer Python over Scala. But, as Spark is natively written in Scala, I was expecting my code to run faster in the Scala than the Python version for obvious reasons.

With that assumption, I thought to learn & write the Scala version of some very common preprocessing code for some 1 GB of data. Data is picked from the SpringLeaf competition on Kaggle. Just to give an overview of the data (it contains 1936 dimensions and 145232 rows). Data is composed of various types e.g. int, float, string, boolean. I am using 6 cores out of 8 for Spark processing; that's why I used minPartitions=6 so that every core has something to process.

Scala Code

val input = sc.textFile("train.csv", minPartitions=6)

val input2 = input.mapPartitionsWithIndex { (idx, iter) => 
  if (idx == 0) iter.drop(1) else iter }
val delim1 = "\001"

def separateCols(line: String): Array[String] = {
  val line2 = line.replaceAll("true", "1")
  val line3 = line2.replaceAll("false", "0")
  val vals: Array[String] = line3.split(",")

  for((x,i) <- vals.view.zipWithIndex) {
    vals(i) = "VAR_%04d".format(i) + delim1 + x
  }
  vals
}

val input3 = input2.flatMap(separateCols)

def toKeyVal(line: String): (String, String) = {
  val vals = line.split(delim1)
  (vals(0), vals(1))
}

val input4 = input3.map(toKeyVal)

def valsConcat(val1: String, val2: String): String = {
  val1 + "," + val2
}

val input5 = input4.reduceByKey(valsConcat)

input5.saveAsTextFile("output")

Python Code

input = sc.textFile('train.csv', minPartitions=6)
DELIM_1 = '\001'


def drop_first_line(index, itr):
  if index == 0:
    return iter(list(itr)[1:])
  else:
    return itr

input2 = input.mapPartitionsWithIndex(drop_first_line)

def separate_cols(line):
  line = line.replace('true', '1').replace('false', '0')
  vals = line.split(',')
  vals2 = ['VAR_%04d%s%s' %(e, DELIM_1, val.strip('\"'))
           for e, val in enumerate(vals)]
  return vals2


input3 = input2.flatMap(separate_cols)

def to_key_val(kv):
  key, val = kv.split(DELIM_1)
  return (key, val)
input4 = input3.map(to_key_val)

def vals_concat(v1, v2):
  return v1 + ',' + v2

input5 = input4.reduceByKey(vals_concat)
input5.saveAsTextFile('output')

Scala Performance Stage 0 (38 mins), Stage 1 (18 sec)

Python Performance Stage 0 (11 mins), Stage 1 (7 sec)

Both produces different DAG visualization graphs (due to which both pictures show different stage 0 functions for Scala (map) and Python (reduceByKey))

But, essentially both code tries to transform data into (dimension_id, string of list of values) RDD and save to disk. The output will be used to compute various statistics for each dimension.

Performance wise, Scala code for this real data like this seems to run 4 times slower than the Python version. Good news for me is that it gave me good motivation to stay with Python. Bad news is I didn't quite understand why?

解决方案


The original answer discussing the code can be found below.


First of all, you have to distinguish between different types of API, each with its own performance considerations.

RDD API

(pure Python structures with JVM based orchestration)

This is the component which will be most affected by the performance of the Python code and the details of PySpark implementation. While Python performance is rather unlikely to be a problem, there at least few factors you have to consider:

  • Overhead of JVM communication. Practically all data that comes to and from Python executor has to be passed through a socket and a JVM worker. While this is a relatively efficient local communication it is still not free.
  • Process-based executors (Python) versus thread based (single JVM multiple threads) executors (Scala). Each Python executor runs in its own process. As a side effect, it provides stronger isolation than its JVM counterpart and some control over executor lifecycle but potentially significantly higher memory usage:

    • interpreter memory footprint
    • footprint of the loaded libraries
    • less efficient broadcasting (each process requires its own copy of a broadcast)
  • Performance of Python code itself. Generally speaking Scala is faster than Python but it will vary on task to task. Moreover you have multiple options including JITs like Numba, C extensions (Cython) or specialized libraries like Theano. Finally, if you don't use ML / MLlib (or simply NumPy stack), consider using PyPy as an alternative interpreter. See SPARK-3094.

  • PySpark configuration provides the spark.python.worker.reuse option which can be used to choose between forking Python process for each task and reusing existing process. The latter option seems to be useful to avoid expensive garbage collection (it is more an impression than a result of systematic tests), while the former one (default) is optimal for in case of expensive broadcasts and imports.
  • Reference counting, used as the first line garbage collection method in CPython, works pretty well with typical Spark workloads (stream-like processing, no reference cycles) and reduces the risk of long GC pauses.

MLlib

(mixed Python and JVM execution)

Basic considerations are pretty much the same as before with a few additional issues. While basic structures used with MLlib are plain Python RDD objects, all algorithms are executed directly using Scala.

It means an additional cost of converting Python objects to Scala objects and the other way around, increased memory usage and some additional limitations we'll cover later.

As of now (Spark 2.x), the RDD-based API is in a maintenance mode and is scheduled to be removed in Spark 3.0.

DataFrame API and Spark ML

(JVM execution with Python code limited to the driver)

These are probably the best choice for standard data processing tasks. Since Python code is mostly limited to high-level logical operations on the driver, there should be no performance difference between Python and Scala.

A single exception is usage of row-wise Python UDFs which are significantly less efficient than their Scala equivalents. While there is some chance for improvements (there has been substantial development in Spark 2.0.0), the biggest limitation is full roundtrip between internal representation (JVM) and Python interpreter. If possible, you should favor a composition of built-in expressions (example. Python UDF behavior has been improved in Spark 2.0.0, but it is still suboptimal compared to native execution.

This may improved in the future has improved significantly with introduction of the vectorized UDFs (SPARK-21190 and further extensions), which uses Arrow Streaming for efficient data exchange with zero-copy deserialization. For most applications their secondary overheads can be just ignored.

Also be sure to avoid unnecessary passing data between DataFrames and RDDs. This requires expensive serialization and deserialization, not to mention data transfer to and from Python interpreter.

It is worth noting that Py4J calls have pretty high latency. This includes simple calls like:

from pyspark.sql.functions import col

col("foo")

Usually, it shouldn't matter (overhead is constant and doesn't depend on the amount of data) but in the case of soft real-time applications, you may consider caching/reusing Java wrappers.

GraphX and Spark DataSets

As for now (Spark 1.6 2.1) neither one provides PySpark API so you can say that PySpark is infinitely worse than Scala.

GraphX

In practice, GraphX development stopped almost completely and the project is currently in the maintenance mode with related JIRA tickets closed as won't fix. GraphFrames library provides an alternative graph processing library with Python bindings.

Dataset

Subjectively speaking there is not much place for statically typed Datasets in Python and even if there was the current Scala implementation is too simplistic and doesn't provide the same performance benefits as DataFrame.

Streaming

From what I've seen so far, I would strongly recommend using Scala over Python. It may change in the future if PySpark gets support for structured streams but right now Scala API seems to be much more robust, comprehensive and efficient. My experience is quite limited.

Structured streaming in Spark 2.x seem to reduce the gap between languages but for now it is still in its early days. Nevertheless, RDD based API is already referenced as "legacy streaming" in the Databricks Documentation (date of access 2017-03-03)) so it reasonable to expect further unification efforts.

Non-performance considerations

Feature parity

Not all Spark features are exposed through PySpark API. Be sure to check if the parts you need are already implemented and try to understand possible limitations.

It is particularly important when you use MLlib and similar mixed contexts (see Calling Java/Scala function from a task). To be fair some parts of the PySpark API, like mllib.linalg, provides a more comprehensive set of methods than Scala.

API design

The PySpark API closely reflects its Scala counterpart and as such is not exactly Pythonic. It means that it is pretty easy to map between languages but at the same time, Python code can be significantly harder to understand.

Complex architecture

PySpark data flow is relatively complex compared to pure JVM execution. It is much harder to reason about PySpark programs or debug. Moreover at least basic understanding of Scala and JVM in general is pretty much a must have.

Spark 2.x and beyond

Ongoing shift towards Dataset API, with frozen RDD API brings both opportunities and challenges for Python users. While high level parts of the API are much easier to expose in Python, the more advanced features are pretty much impossible to be used directly.

Moreover native Python functions continue to be second class citizen in the SQL world. Hopefully this will improve in the future with Apache Arrow serialization (current efforts target data collection but UDF serde is a long term goal).

For projects strongly depending on the Python codebase, pure Python alternatives (like Dask or Ray) could be an interesting alternative.

It doesn't have to be one vs. the other

The Spark DataFrame (SQL, Dataset) API provides an elegant way to integrate Scala/Java code in PySpark application. You can use DataFrames to expose data to a native JVM code and read back the results. I've explained some options somewhere else and you can find a working example of Python-Scala roundtrip in How to use a Scala class inside Pyspark.

It can be further augmented by introducing User Defined Types (see How to define schema for custom type in Spark SQL?).


What is wrong with code provided in the question

(Disclaimer: Pythonista point of view. Most likely I've missed some Scala tricks)

First of all, there is one part in your code which doesn't make sense at all. If you already have (key, value) pairs created using zipWithIndex or enumerate what is the point in creating string just to split it right afterwards? flatMap doesn't work recursively so you can simply yield tuples and skip following map whatsoever.

Another part I find problematic is reduceByKey. Generally speaking, reduceByKey is useful if applying aggregate function can reduce the amount of data that has to be shuffled. Since you simply concatenate strings there is nothing to gain here. Ignoring low-level stuff, like the number of references, the amount of data you have to transfer is exactly the same as for groupByKey.

Normally I wouldn't dwell on that, but as far as I can tell it is a bottleneck in your Scala code. Joining strings on JVM is a rather expensive operation (see for example: Is string concatenation in scala as costly as it is in Java?). It means that something like this _.reduceByKey((v1: String, v2: String) => v1 + ',' + v2) which is equivalent to input4.reduceByKey(valsConcat) in your code is not a good idea.

If you want to avoid groupByKey you can try to use aggregateByKey with StringBuilder. Something similar to this should do the trick:

rdd.aggregateByKey(new StringBuilder)(
  (acc, e) => {
    if(!acc.isEmpty) acc.append(",").append(e)
    else acc.append(e)
  },
  (acc1, acc2) => {
    if(acc1.isEmpty | acc2.isEmpty)  acc1.addString(acc2)
    else acc1.append(",").addString(acc2)
  }
)

but I doubt it is worth all the fuss.

Keeping the above in mind, I've rewritten your code as follows:

Scala:

val input = sc.textFile("train.csv", 6).mapPartitionsWithIndex{
  (idx, iter) => if (idx == 0) iter.drop(1) else iter
}

val pairs = input.flatMap(line => line.split(",").zipWithIndex.map{
  case ("true", i) => (i, "1")
  case ("false", i) => (i, "0")
  case p => p.swap
})

val result = pairs.groupByKey.map{
  case (k, vals) =>  {
    val valsString = vals.mkString(",")
    s"$k,$valsString"
  }
}

result.saveAsTextFile("scalaout")

Python:

def drop_first_line(index, itr):
    if index == 0:
        return iter(list(itr)[1:])
    else:
        return itr

def separate_cols(line):
    line = line.replace('true', '1').replace('false', '0')
    vals = line.split(',')
    for (i, x) in enumerate(vals):
        yield (i, x)

input = (sc
    .textFile('train.csv', minPartitions=6)
    .mapPartitionsWithIndex(drop_first_line))

pairs = input.flatMap(separate_cols)

result = (pairs
    .groupByKey()
    .map(lambda kv: "{0},{1}".format(kv[0], ",".join(kv[1]))))

result.saveAsTextFile("pythonout")

Results

In local[6] mode (Intel(R) Xeon(R) CPU E3-1245 V2 @ 3.40GHz) with 4GB memory per executor it takes (n = 3):

  • Scala - mean: 250.00s, stdev: 12.49
  • Python - mean: 246.66s, stdev: 1.15

I am pretty sure that most of that time is spent on shuffling, serializing, deserializing and other secondary tasks. Just for fun, here's naive single-threaded code in Python that performs the same task on this machine in less than a minute:

def go():
    with open("train.csv") as fr:
        lines = [
            line.replace('true', '1').replace('false', '0').split(",")
            for line in fr]
    return zip(*lines[1:])

这篇关于Scala 与 Python 的 Spark 性能的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆