Spark MLLib 的 LassoWithSGD 无法扩展? [英] Spark MLLib's LassoWithSGD doesn't scale?

查看:25
本文介绍了Spark MLLib 的 LassoWithSGD 无法扩展?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有类似于以下内容的代码:

I have code similar to what follows:

val fileContent = sc.textFile("file:///myfile")

val dataset = fileContent.map(row => {
    val explodedRow = row.split(",").map(s => s.toDouble)

    new LabeledPoint(explodedRow(13), Vectors.dense(

    Array(explodedRow(10), explodedRow(11), explodedRow(12))
))})

val algo = new LassoWithSGD().setIntercept(true)

val lambda = 0.0
algo.optimizer.setRegParam(lambda)
algo.optimizer.setNumIterations(100)
algo.optimizer.setStepSize(1.0)

val model = algo.run(dataset)

我在具有 20 个内核的虚拟服务器上的云中运行它.该文件是具有几百万行的本地"(即不在 HDFS 中)文件.我在本地模式下运行它,使用 sbt run(即我不使用集群,我不使用 spark-submit).

I'm running this in the cloud on my virtual server with 20 cores. The file is a "local" (i.e. not in HDFS) file with a few million rows. I run this in local mode, with sbt run (i.e. I don't use a cluster, I don't use spark-submit).

当我将 spark.master=local[*] 设置从 local[8] 增加到 local[40] 时,我原以为这会变得越来越快.相反,无论我使用什么设置,它都需要相同的时间(但我从 Spark UI 中注意到,我的执行程序在任何给定时间的最大活动任务数等于预期的数量,即本地的 ~8[8], ~40 for local[40] 等等——看来并行化是可行的).

I would have expected this to get be increasingly faster as I increase the spark.master=local[*] setting from local[8] to local[40]. Instead, it takes the same amount of time regardless of what setting I use (but I notice from the Spark UI that my executor has a maximum number of Active Tasks at any given time that is equal to the expected amount, i.e. ~8 for local[8], ~40 for local[40], etc. -- so it seems that the parallelization works).

默认情况下,我的数据集 RDD 的分区数是 4.我尝试将分区数强制设置为 20,但没有成功——实际上它使 Lasso 算法的速度减慢了更多......

By default the number of partitions my dataset RDD is 4. I tried forcing the number of partitions to 20, without success -- in fact it slows the Lasso algorithm down even more...

我对缩放过程的期望不正确吗?有人可以帮我解决这个问题吗?

Is my expectation of the scaling process incorrect? Can somebody help me troubleshoot this?

推荐答案

我对扩展过程的期望是否不正确?

Is my expectation of the scaling process incorrect?

嗯,有点.我希望你不介意我使用一点 Python 来证明我的观点.

Well, kind of. I hope you don't mind I use a little bit of Python to prove my point.

  1. 让我们大方说几百万行实际上是一千万行.有 40 000 000 个值(截距 + 3 个特征 + 每行标签),它提供了大约 380 MB 的数据(Java Double 是一个 双精度 64 位 IEEE 754 浮点).让我们创建一些虚拟数据:

  1. Lets be generous and say a few million rows is actually ten million. With 40 000 000 values (intercept + 3 features + label per row) it gives around 380 MB of data (Java Double is a double-precision 64-bit IEEE 754 floating point). Lets create some dummy data:

import numpy as np

n = 10 * 1000**2
X = np.random.uniform(size=(n, 4))  # Features
y = np.random.uniform(size=(n, 1))  # Labels
theta = np.random.uniform(size=(4, 1))  # Estimated parameters

  • 梯度下降的每一步(因为 LassoWithSGD 的默认 miniBatchFraction 是 1.0,所以它不是真正随机的)忽略正则化需要这样的操作.

  • Each step of gradient descent (since default miniBatchFraction for LassoWithSGD is 1.0 it is not really stochastic) ignoring regularization requires operation like this.

    def step(X, y, theta):
        return ((X.dot(theta) - y) * X).sum(0)
    

    让我们看看在本地处理我们的数据需要多长时间:

    So lets see how long it takes locally on our data:

    %timeit -n 15 step(X, y, theta)
    ## 15 loops, best of 3: 743 ms per loop
    

    每一步不到一秒,没有任何额外的优化.直观地说,它非常快,而且不容易匹配.只是为了好玩,让我们看看为这样的数据获得封闭形式的解决方案需要多少

    Less than a second per step, without any additional optimizations. Intuitively it is pretty fast and it won't be easy to match this. Just for fun lets see how much it takes to get closed form solution for data like this

    %timeit -n 15 np.linalg.inv(X.transpose().dot(X)).dot(X.transpose()).dot(y)
    ## 15 loops, best of 3: 1.33 s per loop
    

  • 现在让我们回到 Spark.可以并行计算单个点的残差.因此,当您增加并行处理的分区数量时,这是一个线性扩展的部分.

  • Now lets go back to Spark. Residuals for a single point can be computed in parallel. So this is a part which scales linearly when you increase number of partitions which are processed in parallel.

    问题是你必须在本地聚合数据,序列化,传输到驱动程序,在本地反序列化和reduce,才能在每一步之后得到最终结果.然后你需要计算新的 theta,序列化发送回等等.

    Problem is that you have to aggregate data locally, serialize, transfer to the driver, deserialize and reduce locally to get a final result after each step. Then you have compute new theta, serialize send back and so on.

    所有这些都可以通过正确使用小批量和一些进一步的优化来改进,但最终你会受到整个系统延迟的限制.值得注意的是,当你增加工作端的并行度时,你也会增加必须在驱动程序上顺序执行的工作量,反之亦然.阿姆达尔定律会以一种或另一种方式咬住你.

    All of that can be improved by a proper usage of mini batches and some further optimizations but at the end of the day you are limited by a latency of a whole system. It is worth noting that when you increase parallelism on a worker side you also increase amount of work that has to be performed sequentially on a driver and the other way round. One way or another the Amdahl's law will bite you.

    此外,以上所有内容都忽略了实际实现.

    Also all of the above ignores actual implementation.

    现在让我们进行另一个实验.首先是一些虚拟数据:

    Now lets perform another experiment. First some dummy data:

    nCores = 8  # Number of cores on local machine I use for tests
    rdd = sc.parallelize([], nCores)
    

    和基准:

    %timeit -n 40 rdd.mapPartitions(lambda x: x).count()
    ## 40 loops, best of 3: 82.3 ms per loop
    

    这意味着使用 8 个内核,在没有任何实际处理或网络流量的情况下,我们无法通过增加 Spark 中的并行性来做得更好(假设并行化部分的线性可扩展性,每个分区 743 毫秒/8 = 92.875 毫秒)

    It means that with 8 cores, without any real processing or network traffic we get to the point where we cannot do much better by increasing parallelism in Spark (743ms / 8 = 92.875ms per partition assuming linear scalability of the parallelized part)

    简单总结一下:

    • 如果使用梯度下降的封闭形式的解决方案可以在本地轻松处理数据只是浪费时间.如果你想增加并行度/减少延迟,你可以使用好的线性代数库
    • Spark 旨在处理大量数据,而不是减少延迟.如果您的数据适合几年前智能手机的内存,则表明它不是正确的工具
    • 如果计算成本低,那么固定成本就会成为限制因素

    附注:

    • 每台机器的内核数量相对较多通常不是最佳选择,除非您可以将其与 IO 吞吐量相匹配

    这篇关于Spark MLLib 的 LassoWithSGD 无法扩展?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

  • 查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆