斯卡拉VS Python的性能星火 [英] Spark performance for Scala vs Python

查看:302
本文介绍了斯卡拉VS Python的性能星火的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我preFER的Python在斯卡拉。但是,由于星火本身是写在斯卡拉,我期待我的code运行在斯卡拉快于Python版本显而易见的原因。

I prefer Python over Scala. But, as Spark is natively written in Scala, I was expecting my code to run faster in the Scala than the Python version for obvious reasons.

通过这样的假设,我想学习和放大器;写一些很普通的preprocessing code的斯卡拉版本对一些1 NBSP; GB的数据。数据从 Kaggle 的春叶采摘的竞争。只是得到的数据(它包含1936年的尺寸和145232行)的概述。数据由各种类型的例如整型,浮点,字符串,布尔。我使用6芯出8星火处理;这就是为什么我用minPartitions = 6,使每个核心拥有的东西来处理。

With that assumption, I thought to learn & write the Scala version of some very common preprocessing code for some 1 GB of data. Data is picked from the SpringLeaf competition on Kaggle. Just to give an overview of the data (it contains 1936 dimensions and 145232 rows). Data is composed of various types e.g. int, float, string, boolean. I am using 6 cores out of 8 for Spark processing; that's why I used minPartitions=6 so that every core has something to process.

斯卡拉code

val input = sc.textFile("train.csv", minPartitions=6)

val input2 = input.mapPartitionsWithIndex { (idx, iter) => if (idx == 0) iter.drop(1) else iter }
val delim1 = "\001"

def separateCols(line: String): Array[String] = {
  val line2 = line.replaceAll("true", "1")
  val line3 = line2.replaceAll("false", "0")
  val vals: Array[String] = line3.split(",")

  for((x,i) <- vals.view.zipWithIndex) {
    vals(i) = "VAR_%04d".format(i) + delim1 + x
  }
  vals
}

val input3 = input2.flatMap(separateCols)

def toKeyVal(line: String): (String, String) = {
  val vals = line.split(delim1)
  (vals(0), vals(1))
}

val input4 = input3.map(toKeyVal)

def valsConcat(val1: String, val2: String): String = {
  val1 + "," + val2
}

val input5 = input4.reduceByKey(valsConcat)

input5.saveAsTextFile("output")

的Python code

input = sc.textFile('train.csv', minPartitions=6)
DELIM_1 = '\001'


def drop_first_line(index, itr):
  if index == 0:
    return iter(list(itr)[1:])
  else:
    return itr

input2 = input.mapPartitionsWithIndex(drop_first_line)

def separate_cols(line):
  line = line.replace('true', '1').replace('false', '0')
  vals = line.split(',')
  vals2 = ['VAR_%04d%s%s' %(e, DELIM_1, val.strip('\"')) for e, val in enumerate(vals)]
  return vals2


input3 = input2.flatMap(separate_cols)

def to_key_val(kv):
  key, val = kv.split(DELIM_1)
  return (key, val)
input4 = input3.map(to_key_val)

def vals_concat(v1, v2):
  return v1 + ',' + v2

input5 = input4.reduceByKey(vals_concat)
input5.saveAsTextFile('output')

斯卡拉表演
0级(38分钟),第1阶段(18秒)

Python的性能
0级(11分钟),第1阶段(7秒)

两者会产生不同的DAG的可视化图形(由于其中两个图片显示不同的阶段0功能斯卡拉(图)和Python(reduceByKey))

Both produces different DAG visualisation graphs (due to which both pics show different stage 0 functions for Scala (map) and Python (reduceByKey))

不过,基本上都code尝试将数据转换成(dimension_id,值列表的字符串)RDD并保存到磁盘。输出将被用于计算每一维度的各种统计

But, essentially both code tries to transform data into (dimension_id, string of list of values) RDD and save to disk. The output will be used to compute various statistics for each dimension.

在性能方面,斯卡拉code像这样这个现实数据似乎运行的 4倍慢比Python版本。
对我来说好消息是,它给了我很好的动机留在了Python。坏消息是我不明白为什么?

Performance wise, Scala code for this real data like this seems to run 4 times slower than the Python version. Good news for me is that it gave me good motivation to stay with Python. Bad news is I didn't quite understand why?

推荐答案

原来答案在讨论code可以在下面找到。


The original answer discussing the code can be found below.

你有不同类型的API来区分首先,每个都有自己的性能上的考虑。

First of all you have to distinguish between different types of API, each with its own performance consideration.

(基于JVM编排纯Python结构)

这是这将是受影响最大的Python code的性能和PySpark执行细节的组件。而Python的性能,而不太可能是一个问题有至少几个因素必须考虑:

This is the component which will be most affected by a performance of the Python code and the details of PySpark implementation. While Python performance is rather unlikely to be a problem there at least few factors you have to consider:


  • JVM通信的开销。实际上这涉及到从Python的遗嘱执行人的每一个数据必须通过插座与JVM工人进行传递。虽然这是一个相对高效的本地通信仍然是不自由的。

  • 基于过程的执行者(蟒蛇)与基于线程(单个JVM多线程)执行人(斯卡拉)。每个Python的执行者在自己的进程中运行。作为一个副作用,它提供了比其JVM对应较强的隔离和过执行人生命周期的某些控制,但可能显著更高的内存使用情况:

  • an overhead of JVM communication. Practically every data that comes to and from Python executor has to be passed through socket and JVM worker. While this is a relatively efficient local communication it is still not free.
  • process based executors (Python) versus thread based (single JVM multiple threads) executors (Scala). Each Python executor runs in its own process. As a side effect it provides stronger isolation than its JVM counterpart and some control over executor lifecycle but potentially significantly higher memory usage:


  • 间preTER内存footrpint

  • 加载库的足迹

  • 低效率的广播(每个进程都需要有自己的广播复印件)

一个Python code本身一般来说Scala是比普通的Python快,但它会在任务间变化的表现。此外,您有多个选项,包括如 Numba 即时编译器,C扩展(用Cython )或专业图书馆如 Theano 。最后,如​​果你不使用ML / MLlib(或简称numpy的堆栈),可以考虑使用 PyPy 作为替代间preTER。请参见 SPARK-3094

performance of a Python code itself Generally speaking Scala is faster than a plain Python but it will vary on task to task. Moreover you have multiple options including JITs like Numba, C extensions (Cython) or specialized libraries like Theano. Finally, if you don't use ML / MLlib (or simply NumPy stack), consider using PyPy as an alternative interpreter. See SPARK-3094.

(混合Python和JVM执行)

基本考虑是pretty大致相同的几个其他问题之前。虽然与MLlib使用的基本结构是普通的Python对象RDD所有算法使用Scala的直接执行。

Basic considerations are pretty much the same as before with a few additional issues. While basic structures used with MLlib are plain Python RDD objects all algorithms are executed directly using Scala.

这意味着把Python对象到斯卡拉的额外费用的对象和周围的其他方法,提高内存的使用和一些额外的限制,我们在后面会介绍。

It means additional cost of converting Python objects to Scala objects and the other way around, increased memory usage and some additional limitations we'll cover later.

(JVM执行与Python code受限于驱动程序)

这些可能是用于标准数据处理任务的最佳选择。因为Python code大多局限于对驾驶员的高级别的逻辑运算应该有Python和斯卡拉之间没有性能差异。

These are probably the best choice for standard data processing tasks. Since Python code is mostly limited to the high level logical operations on the driver there should be no performance difference between Python and Scala.

一个唯一的例外是Python的UDF的这显著比斯卡拉等值效率较低。虽然有一些机会改进最大的局限性在于内部重新presentation(JVM)和Python间preTER之间的完全往返。如果这是唯一可能的,你应该偏向内置前pressions的成分。例如,见堆栈溢出而与处理几列的UDF

A single exception are Python UDFs which significantly less efficient than its Scala equivalents. While there are some chances for improvements the biggest limitations is full roundtrip between internal representation (JVM) and Python interpreter. If it is only possible you should favor a compositions of built-in expressions. See for example Stack Overflow while processing several columns with a UDF

至于现在(星火1.6)没有一个提供PySpark API,因此可以说是PySpark比infinitively斯卡拉(更糟虽然引进 GraphFrames < /一>使得第一个不太重要)。

As for now (Spark 1.6) neither one provides PySpark API so you can say that PySpark is infinitively worse than Scala (although introduction of GraphFrames makes the first one less important).

并非所有的Spark功能是通过PySpark API暴露。一定要检查,如果你需要的是已经实现的部分,并尝试了解可能的限制。

Not all Spark features are exposed through PySpark API. Be sure to check if the parts you need are already implemented and try to understand possible limitations.

当您使用MLlib和类似的混合环境(见如何使用Java / Scala的功能,从一个动作或一个转变尤其重要?)。为了公平PySpark API的某些部分,如 mllib.linalg ,提供更多的COM prehensive一套比Scala的方法。

It is particularly important when you use MLlib and similar mixed contexts (see How to use Java/Scala function from an action or a transformation?). To be fair some parts of the PySpark API, like mllib.linalg, provide far more comprehensive set of methods than Scala.

星火数据框(SQL,数据集)API提供了一个优雅的方式在PySpark应用程序中集成的Scala / Java的code。您可以使用 DataFrames 来公开数据本机JVM code和读回的结果。我已经解释了一些选项别的地方,然后你可以找到的How使用Pyspark 内的Scala类。

Spark DataFrame (SQL, Dataset) API provides an elegant way to integrate Scala / Java code in PySpark application. You can use DataFrames to expose data to a native JVM code and read back the results. I've explained some options somewhere else and you can find a working example of Python-Scala roundtrip in How to use a Scala class inside Pyspark.

它可以通过引入用户定义类型(见如何定义架构在星火SQL自定义类型得到进一步增强? )。

It can be further augmented by introducing User Defined Types (see How to define schema for custom type in Spark SQL?).

(免责声明:观点Pythonista点最有可能的,我错过了一些斯卡拉技巧)

首先出现在你的code这是没有道理可言的一部分。如果你已经有了(键,值)使用创造了对 zipWithIndex 枚举什么是创建字符串只是事后权分裂的地步呢? flatMap 不递归工作,所以你可以简单地产生的元组,然后跳到下面的地图任何责任。

First of all there is one part in your code which doesn't make sense at all. If you already have (key, value) pairs created using zipWithIndex or enumerate what is the point point in creating string just to split it right afterwards? flatMap doesn't work recursively so you can simply yield tuples and skip following map whatsoever.

我觉得有问题另一部分是 reduceByKey 。一般来说 reduceByKey 是有用的,如果应用聚合函数可以减少已被洗牌的数据量。既然你只需连接字符串没有什么收获在这里。忽略低水平的东西,喜欢引用的数量,数据量,你必须转移是完全一样的 groupByKey

Another part I find problematic is reduceByKey. Generally speaking reduceByKey is useful if applying aggregate function can reduce amount of data that has to be shuffled. Since you simply concatenate strings there is nothing to gain here. Ignoring low level stuff, like number of references, amount of data you have to transfer is exactly the same as for groupByKey.

通常我不会纠缠于这一点,但据我可以告诉它是在你的斯卡拉code的瓶颈。 JVM上加入字符串是相当昂贵的操作(例如,见:是字符串连接Scala中的昂贵,因为它是在Java中? )。这意味着,像这样 _ reduceByKey((V1:字符串,V2:字符串)=&GT; V1 +,+ V2)。相当于 input4.reduceByKey(valsConcat)在code是不是一个好主意。

Normally I wouldn't dwell on that, but as far as I can tell it is a bottleneck in your Scala code. Joining strings on JVM is rather expensive operation (See for example: Is string concatenation in scala as costly as it is in Java?). It means that something like this _.reduceByKey((v1: String, v2: String) => v1 + ',' + v2) which is equivalent to input4.reduceByKey(valsConcat) in your code is not a good idea.

如果你想避免 groupByKey 您可以尝试使用 aggregateByKey 的StringBuilder 。类似的东西这个应该做的伎俩:

If you want to avoid groupByKey you can try to use aggregateByKey with StringBuilder. Something similar to this should do the trick:

rdd.aggregateByKey(new StringBuilder)(
  (acc, e) => {
    if(!acc.isEmpty) acc.append(",").append(e)
    else acc.append(e)
  },
  (acc1, acc2) => {
    if(acc1.isEmpty | acc2.isEmpty)  acc1.addString(acc2)
    else acc1.append(",").addString(acc2)
  }
)

但我怀疑这是值得大惊小怪。

but I doubt it is worth all the fuss.

以上牢记我已经重写你的code如下:

Keeping above in mind I've rewritten your code as follows:

斯卡拉

val input = sc.textFile("train.csv", 6).mapPartitionsWithIndex{
  (idx, iter) => if (idx == 0) iter.drop(1) else iter
}

val pairs = input.flatMap(line => line.split(",").zipWithIndex.map{
  case ("true", i) => (i, "1")
  case ("false", i) => (i, "0")
  case p => p.swap
})

val result = pairs.groupByKey.map{
  case (k, vals) =>  {
    val valsString = vals.mkString(",")
    s"$k,$valsString"
  }
}

result.saveAsTextFile("scalaout")

的Python

def drop_first_line(index, itr):
    if index == 0:
        return iter(list(itr)[1:])
    else:
        return itr

def separate_cols(line):
    line = line.replace('true', '1').replace('false', '0')
    vals = line.split(',')
    for (i, x) in enumerate(vals):
        yield (i, x)

input = (sc
    .textFile('train.csv', minPartitions=6)
    .mapPartitionsWithIndex(drop_first_line))

pairs = input.flatMap(separate_cols)

result = (pairs
    .groupByKey()
    .map(lambda kv: "{0},{1}".format(kv[0], ",".join(kv[1]))))

result.saveAsTextFile("pythonout")

结果

本地[6] 模式(英特尔(R)至强(R)CPU E3-1245 V2 @ 3.40GHz),每个执行人需要4GB内存(N = 3):

Results

In local[6] mode (Intel(R) Xeon(R) CPU E3-1245 V2 @ 3.40GHz) with 4GB memory per executor it takes (n = 3):


  • 斯卡拉 - 意思是:250.00s,标准偏差:12.49

  • 的Python - 意思是:246.66s,标准偏差:1.15

我是pretty确保大部分时间是花费在洗牌,序列化,反序列化和其他次要任务。只是为了好玩天真单线程code Python中,在不到一分钟内完成这台机器上相同的任务:

I am pretty sure that most of that time is spend on shuffling, serializing, deserializing and other secondary tasks. Just for fun a naive single-threaded code in Python that performs the same task on this machine in less than a minute:

def go():
    with open("train.csv") as fr:
        lines = [
            line.replace('true', '1').replace('false', '0').split(",")
            for line in fr]
    return zip(*lines[1:])

这篇关于斯卡拉VS Python的性能星火的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆