应用 pyspark ALS 的“recommendProductsForUsers"时出现 StackOverflow 错误;(尽管可用集群> 300GB Ram) [英] StackOverflow-error when applying pyspark ALS's "recommendProductsForUsers" (although cluster of >300GB Ram available)

查看:24
本文介绍了应用 pyspark ALS 的“recommendProductsForUsers"时出现 StackOverflow 错误;(尽管可用集群> 300GB Ram)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

寻求专业知识来指导我解决以下问题.

Looking for expertise to guide me on issue below.

背景:

  • 我正在尝试使用受 这个例子
  • 作为部署基础架构,我使用 Google Cloud Dataproc 集群.
  • 我的代码中的基石是记录了recommendProductsForUsers"功能here 返回模型中所有用户的前 X 个产品
  • I'm trying to get going with a basic PySpark script inspired on this example
  • As deploy infrastructure I use a Google Cloud Dataproc Cluster.
  • Cornerstone in my code is the function "recommendProductsForUsers" documented here which gives me back the top X products for all users in the model

我遇到的问题

  • ALS.Train 脚本在 GCP 上运行流畅且扩展性良好(轻松超过 100 万客户).

  • The ALS.Train script runs smoothly and scales well on GCP (Easily >1mn customers).

但是,应用预测:即使用函数PredictAll"或recommendProductsForUsers",根本无法扩展.我的脚本对于小数据集(<100 Customer小于 100 个产品).但是,当将其扩展到与业务相关的规模时,我无法对其进行扩展(例如,>50k 客户和 >10k 产品)

However, applying the predictions: i.e. using funcitons 'PredictAll' or 'recommendProductsForUsers', does not scale at all. My script runs smooth for a small dataset (<100 Customer with <100 products). However, when bringing it to a business-relevant size, I don't manage to scale it (e.g., >50k Customers and >10k products)

然后我得到的错误如下:

Error I then get is below:

 16/08/16 14:38:56 WARN org.apache.spark.scheduler.TaskSetManager:
   Lost task 22.0 in stage 411.0 (TID 15139,
   productrecommendation-high-w-2.c.main-nova-558.internal):
   java.lang.StackOverflowError
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
        at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
        at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
        at scala.collection.immutable.$colon$colon.readObject(List.scala:362)

  • 我什至获得了一个 300 GB 的集群(1 个 108GB 的​​主节点 + 2 个 108 GB RAM 的节点)来尝试运行它;它适用于 50k 客户,但不适用于更多

  • I even went as far as getting a 300 GB Cluster (1 main node of 108GB + 2 nodes of 108 GB RAM) to try it to run it; it works for 50k customers but not for anything more

    我的目标是建立一个可以为 >800k 客户运行的设置

    Ambition is to have a setup where I can run for >800k customers

    详情

    失败的代码行

    predictions = model.recommendProductsForUsers(10).flatMap(lambda p: p[1]).map(lambda p: (str(p[0]), str(p[1]), float(p[2])))
    pprint.pprint(predictions.take(10))
    schema = StructType([StructField("customer", StringType(), True), StructField("sku", StringType(), True), StructField("prediction", FloatType(), True)])
    dfToSave = sqlContext.createDataFrame(predictions, schema).dropDuplicates()
    

    您建议如何进行?我觉得脚本末尾的合并"部分(即当我将其写入 dfToSave 时)会导致错误;有没有办法绕过这个&逐个保存?

    How do you suggest to proceed? I feel that the 'merging' part at the end of my script (i.e. when I write it to dfToSave) causes the error; is there a way to bypass this & save part-by-part?

    推荐答案

    从堆栈跟踪来看,这似乎是与 Spark 在使用 ALS 训练时给出 StackOverflowError

    From the stack trace this appears to be the same issue as Spark gives a StackOverflowError when training using ALS

    基本上,Spark 以递归方式表达 RDD 谱系,以便在迭代工作负载过程中没有对事物进行惰性评估时,您最终会得到深度嵌套的对象.调用 sc.setCheckpointDir 并调整检查点间隔将减少此 RDD 沿袭的长度.

    Basically, Spark expresses RDD lineage recursively so that you end up with deeply nested objects when things haven't been lazy evaluated over the course of an iterative workload. Calling sc.setCheckpointDir and adjusting the checkpoint interval will mitigate the length of this RDD lineage.

    这篇关于应用 pyspark ALS 的“recommendProductsForUsers"时出现 StackOverflow 错误;(尽管可用集群> 300GB Ram)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

  • 查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆