应用pyspark ALS的"recommendProductsForUsers"时出现StackOverflow错误(尽管有超过300GB Ram的群集可用) [英] StackOverflow-error when applying pyspark ALS's "recommendProductsForUsers" (although cluster of >300GB Ram available)

查看:553
本文介绍了应用pyspark ALS的"recommendProductsForUsers"时出现StackOverflow错误(尽管有超过300GB Ram的群集可用)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

正在寻找专业知识来指导我解决以下问题.

Looking for expertise to guide me on issue below.

背景:

  • 我正在尝试从此 例子
  • 作为部署基础架构,我使用Google Cloud Dataproc集群.
  • 我的代码中的
  • Cornerstone是记录为

发生问题

  • ALS.Train脚本运行平稳,并且可以在GCP上很好地扩展(轻松地拥有100万以上的客户).

  • The ALS.Train script runs smoothly and scales well on GCP (Easily >1mn customers).

但是,应用预测:即使用功能块"PredictAll"或"recommendProductsForUsers"根本无法扩展.我的脚本可以在较小的数据集(< 100客户 < 100产品).但是,在将其扩展到与业务相关的规模时,我无法进行扩展(例如,> 50k客户和> 10,000产品)

However, applying the predictions: i.e. using funcitons 'PredictAll' or 'recommendProductsForUsers', does not scale at all. My script runs smooth for a small dataset (<100 Customer with <100 products). However, when bringing it to a business-relevant size, I don't manage to scale it (e.g., >50k Customers and >10k products)

我随后遇到的错误如下:

Error I then get is below:

 16/08/16 14:38:56 WARN org.apache.spark.scheduler.TaskSetManager:
   Lost task 22.0 in stage 411.0 (TID 15139,
   productrecommendation-high-w-2.c.main-nova-558.internal):
   java.lang.StackOverflowError
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
        at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
        at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
        at scala.collection.immutable.$colon$colon.readObject(List.scala:362)

  • 我什至尝试使用一个300 GB的集群(1个主节点为108GB + 2个节点为108 GB RAM)来尝试运行它.它适用于5万名客户,但不适用于任何其他人

  • I even went as far as getting a 300 GB Cluster (1 main node of 108GB + 2 nodes of 108 GB RAM) to try it to run it; it works for 50k customers but not for anything more

    雄心壮志是为了让我可以为超过80万名客户运行

    Ambition is to have a setup where I can run for >800k customers

    详细信息

    失败的代码行

    predictions = model.recommendProductsForUsers(10).flatMap(lambda p: p[1]).map(lambda p: (str(p[0]), str(p[1]), float(p[2])))
    pprint.pprint(predictions.take(10))
    schema = StructType([StructField("customer", StringType(), True), StructField("sku", StringType(), True), StructField("prediction", FloatType(), True)])
    dfToSave = sqlContext.createDataFrame(predictions, schema).dropDuplicates()
    

    您建议如何进行?我觉得脚本结尾处的合并"部分(即,当我将其写入dfToSave时)会导致错误;有没有办法绕过此&分别保存吗?

    How do you suggest to proceed? I feel that the 'merging' part at the end of my script (i.e. when I write it to dfToSave) causes the error; is there a way to bypass this & save part-by-part?

    推荐答案

    在堆栈跟踪中,这似乎与

    From the stack trace this appears to be the same issue as Spark gives a StackOverflowError when training using ALS

    基本上,Spark会递归地表示RDD沿袭,这样,当您在迭代工作负载过程中没有对内容进行延迟评估时,您就可以得到深层嵌套的对象.调用sc.setCheckpointDir并调整检查点间隔将减轻此RDD世系的长度.

    Basically, Spark expresses RDD lineage recursively so that you end up with deeply nested objects when things haven't been lazy evaluated over the course of an iterative workload. Calling sc.setCheckpointDir and adjusting the checkpoint interval will mitigate the length of this RDD lineage.

    这篇关于应用pyspark ALS的"recommendProductsForUsers"时出现StackOverflow错误(尽管有超过300GB Ram的群集可用)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

  • 查看全文
    相关文章
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆