星火MLLib的LassoWithSGD没有规模? [英] Spark MLLib's LassoWithSGD doesn't scale?

查看:214
本文介绍了星火MLLib的LassoWithSGD没有规模?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有类似于如下code:

VAL fileContent = sc.textFile(文件:/// MYFILE)VAL数据= fileContent.map(行=> {
    VAL explodedRow = row.split(,)的地图(S => s.toDouble)。    新LabeledPoint(explodedRow(13),Vectors.dense(    阵列(explodedRow(10),explodedRow(11),explodedRow(12))
))})VAL算法中=新LassoWithSGD()。setIntercept(真)VAL波长= 0.0
algo.optimizer.setRegParam(拉姆达)
algo.optimizer.setNumIterations(100)
algo.optimizer.setStepSize(1.0)VAL模型= algo.run(数据集)

我在云中我的虚拟服务器上的20内核上运行这一点。该文件与几百万行的本地(即不在HDFS)文件。我在本地模式下运行它,用SBT运行(即我不使用群集,我不使用火花提交)。

我本来期望这是获得越来越快的,因为我增加spark.master =本地[*]从设置本地[8]当地[40]。相反,它需要的时间相同数量不管我使用的设置(但我从火花的UI,我的执行程序具有在任何给定时间,该时间等于预期量活动任务的最大数目,即〜8通知本地[8],〜40为本地[40],等等 - 所以似乎并行作品)

在默认情况下的分区数我的数据集RDD是4。我试图迫使分区数到20,没有成功 - 事实上,它减缓了套索算法下来,甚至更多...

是我定标过程中的预期不正确的?有人可以帮我解决这个?


解决方案

  

是我定标过程中的预期不正确的?


嘛,那种。我希望你不介意,我使用Python一点点来证明我的观点。


  1. 让我们可以大方的说的几百万行的实际上是多万。随着40 000 000的值(拦截+ 3的功能每行+标签),它使各地的380 MB的数据(Java的双击是的 双击precision 64位IEEE 754浮点)。让我们创建一些虚拟数据:

    导入numpy的是NPN = 10 * 1000 ** 2
    X = np.random.uniform(大小=(N,4))#特点
    Y = np.random.uniform(大小=(N,1))#标签
    THETA = np.random.uniform(大小=(4,1))#参数估计


  2. 梯度下降(因为默认 miniBatchFraction LassoWithSGD 每一步都是1.0它是不是真的随机)忽略正规化需要这样的操作。

    DEF步骤(X,Y,THETA):
        返回((X.dot(THETA) - Y)* X)的.sum(0)

    所以让我们看看需要多长时间我们的数据本地概念:

    %timeit -n 15步骤(X,Y,THETA)
    ## 15圈,最好的3:每循环743毫秒

    比每步的第二,无需任何额外的优化更少。直觉上是pretty快,它不会是容易搭配这一点。只是为了好玩让我们看看它有多少需要得到像这样的数据封闭形式的解决方案

    %timeit -n 15 np.linalg.inv(X.transpose()点(X))。点( X.transpose())。点(Y)
    ## 15圈,最好的3:每循环1.33小号


  3. 现在让我们回到星火。对于单点残差可以并行计算。因此,这是当你增加其并行处理分区的数量而线性扩展的一部分。

    问题是,你要汇总数据在本地,序列化,转移给了司机,反序列化和地方减少获取每个步骤后的最终结果。然后,你必须计算新的THETA,序列化回送等。

    所有的,可以通过小型分批和一些进一步优化的正确用法,但在你由整个系统的延迟限制了一天结束得到改善。值得注意的是,当你增加一个工人一边并行还会增大,必须在驱动器和其他方式轮顺序执行的工作量。这种或那种方式的 Amdahl定律会咬你。

    此外所有上述忽略实际执行

    现在可以进行另一项实验。首先是一些虚拟数据:

    本地计算机上的内核 nCores = 8#号我用测试
    RDD = sc.parallelize([],nCores)

    和bechmark:

    %timeit -n 40 rdd.mapPartitions(波长X:X).Count之间的()
    ## 40圈,最好的3:每循环82.3毫秒

    这意味着,与8个内核,没有任何真正的处理或网络流量,我们得到的地步,我们不能通过增加星火并行好得多(743ms / 8 =每个分区92.875ms假定并行部分的线性可扩展性)


只是上面总结:


  • 如果数据可以方便地在本地使用梯度下降一个封闭形式的解决方案处理只是在浪费时间。如果你想增加并行/减少延迟,你可以用良好的线性代数库

  • 星火旨在处理大量数据的不以减少延迟。如果你的数据在几年的老智能手机的内存适合这是一个好兆头,是不正确的工具

  • 如果计算是便宜的,然后不断的成本成为限制因素


旁注:


  • 比较多的每台机器的核心是一般来讲不是最好的选择,除非你可以用IO吞吐量符合该

I have code similar to what follows:

val fileContent = sc.textFile("file:///myfile")

val dataset = fileContent.map(row => {
    val explodedRow = row.split(",").map(s => s.toDouble)

    new LabeledPoint(explodedRow(13), Vectors.dense(

    Array(explodedRow(10), explodedRow(11), explodedRow(12))
))})

val algo = new LassoWithSGD().setIntercept(true)

val lambda = 0.0
algo.optimizer.setRegParam(lambda)
algo.optimizer.setNumIterations(100)
algo.optimizer.setStepSize(1.0)

val model = algo.run(dataset)

I'm running this in the cloud on my virtual server with 20 cores. The file is a "local" (i.e. not in HDFS) file with a few million rows. I run this in local mode, with sbt run (i.e. I don't use a cluster, I don't use spark-submit).

I would have expected this to get be increasingly faster as I increase the spark.master=local[*] setting from local[8] to local[40]. Instead, it takes the same amount of time regardless of what setting I use (but I notice from the Spark UI that my executor has a maximum number of Active Tasks at any given time that is equal to the expected amount, i.e. ~8 for local[8], ~40 for local[40], etc. -- so it seems that the parallelization works).

By default the number of partitions my dataset RDD is 4. I tried forcing the number of partitions to 20, without success -- in fact it slows the Lasso algorithm down even more...

Is my expectation of the scaling process incorrect? Can somebody help me troubleshoot this?

解决方案

Is my expectation of the scaling process incorrect?

Well, kind of. I hope you don't mind I use a little bit of Python to prove my point.

  1. Lets be generous and say a few million rows is actually ten million. With 40 000 000 values (intercept + 3 features + label per row) it gives around 380 MB of data (Java Double is a double-precision 64-bit IEEE 754 floating point). Lets create some dummy data:

    import numpy as np
    
    n = 10 * 1000**2
    X = np.random.uniform(size=(n, 4))  # Features
    y = np.random.uniform(size=(n, 1))  # Labels
    theta = np.random.uniform(size=(4, 1))  # Estimated parameters
    

  2. Each step of gradient descent (since default miniBatchFraction for LassoWithSGD is 1.0 it is not really stochastic) ignoring regularization requires operation like this.

    def step(X, y, theta):
        return ((X.dot(theta) - y) * X).sum(0)
    

    So lets see how long it takes locally on our data:

    %timeit -n 15 step(X, y, theta)
    ## 15 loops, best of 3: 743 ms per loop
    

    Less than a second per step, without any additional optimizations. Intuitively it is pretty fast and it won't be easy to match this. Just for fun lets see how much it takes to get closed form solution for data like this

    %timeit -n 15 np.linalg.inv(X.transpose().dot(X)).dot(X.transpose()).dot(y)
    ## 15 loops, best of 3: 1.33 s per loop
    

  3. Now lets go back to Spark. Residuals for a single point can be computed in parallel. So this is a part which scales linearly when you increase number of partitions which are processed in parallel.

    Problem is that you have to aggregate data locally, serialize, transfer to the driver, deserialize and reduce locally to get a final result after each step. Then you have compute new theta, serialize send back and so on.

    All of that can be improved by a proper usage of mini batches and some further optimizations but at the end of the day you are limited by a latency of a whole system. It is worth noting that when you increase parallelism on a worker side you also increase amount of work that has to be performed sequentially on a driver and the other way round. One way or another the Amdahl's law will bite you.

    Also all of the above ignores actual implementation.

    Now lets perform another experiment. First some dummy data:

    nCores = 8  # Number of cores on local machine I use for tests
    rdd = sc.parallelize([], nCores)
    

    and bechmark:

    %timeit -n 40 rdd.mapPartitions(lambda x: x).count()
    ## 40 loops, best of 3: 82.3 ms per loop
    

    It means that with 8 cores, without any real processing or network traffic we get to the point where we cannot do much better by increasing parallelism in Spark (743ms / 8 = 92.875ms per partition assuming linear scalability of the parallelized part)

Just to summarize above:

  • if data can be easily processed locally with a closed-form solution using gradient descent is just a waste of time. If you want to increase parallelism / reduce latency you can use good linear algebra libraries
  • Spark is designed to handle large amounts of data not to reduce latency. If your data fits in a memory of a few years old smartphone it is a good sign that is not the right tool
  • if computations are cheap then constant costs become a limiting factor

Side notes:

  • relatively large number of cores per machine is generally speaking not the best choice unless you can match this with IO throughput

这篇关于星火MLLib的LassoWithSGD没有规模?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆