如何使用 scala 或 python 在 apache spark 中运行多线程作业? [英] How to run Multi threaded jobs in apache spark using scala or python?

查看:19
本文介绍了如何使用 scala 或 python 在 apache spark 中运行多线程作业?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正面临一个与 Spark 中的并发相关的问题,这使我无法在生产中使用它,但我知道有办法摆脱它.我正在尝试使用订单历史在 700 万用户上为 10 亿产品运行 Spark ALS.首先,我正在获取不同用户的列表,然后对这些用户运行循环以获取推荐,这是一个非常缓慢的过程,需要几天时间才能为所有用户获取推荐.我尝试使用笛卡尔用户和产品来一次性获得所有推荐,但再次将其提供给 elasticsearch 我必须为每个用户过滤和排序记录,然后我才能将其提供给 elasticsearch 以供其他 API 使用.

I am facing a problem related to concurrency in spark which is stopping me from using it in production but I know there is a way out of it. I am trying to run Spark ALS on 7 million users for a billion products using order history. Firstly I am taking a list of distinct Users and then running a loop on these users to get recommendations, which is pretty slow process and will take days to get recommendations for all users. I tried doing cartesian users and products to get recommendations for all at once but again to feed this to elasticsearch I have to filter and sort records for each users and only then I can feed it to elasticsearch to be consumed by other APIs.

所以请给我推荐一个解决方案,它在这种用例中具有很强的可扩展性,并且可以在生产中使用并提供实时建议.

So please suggest me a solution which is pretty scalable in such use case and to be used in production with realtime recommendations.

这是我在 Scala 中的代码片段,它可以让您了解我目前正在如何解决问题:

Here is my code snippet in scala which will give you an idea how I am currently approaching to solve the problem:

  //    buy_values -> RDD with Rating(<int user_id>, <int product_id>, <double rating>)
  def recommend_for_user(user: Int): Unit = {
      println("Recommendations for User ID: " + user);
      // Product IDs which are not bought by user 
      val candidates = buys_values
        .filter(x => x("customer_id").toString.toInt != user)
        .map(x => x("product_id").toString.toInt)
        .distinct().map((user, _))
      // find 30 products with top rating
      val recommendations = bestModel.get
        .predict(candidates)
        .takeOrdered(30)(Ordering[Double].reverse.on(x => x.rating))

      var i = 1
      var ESMap = Map[String, String]()
      recommendations.foreach { r =>
        ESMap += r.product.toString -> bitem_ids.value(r.product)
      }
      //  push to elasticsearch with user as id
      client.execute {
        index into "recommendation" / "items" id user fields ESMap
      }.await
      // remove candidate RDD from memory
      candidates.unpersist()
  }
  // iterate on each user to get recommendations for the user [slow process]
  user_ids.foreach(recommend_for_user)

推荐答案

很明显,程序中的瓶颈在于搜索 candidates.鉴于 Spark 架构,它严重限制了您的并行化能力,并通过为每个用户启动 Spark 作业而增加了大量开销.

It is pretty clear that bottleneck in your program is a search for candidates. Given the Spark architecture it severely limits your ability to parallelize and adds substantial overhead by starting Spark job for each user.

假设典型场景,有 700 万用户10 亿产品,大部分时间您将预测整个范围的产品减去用户已经购买的少数产品.至少在我看来重要的问题是为什么还要费心过滤.即使你推荐以前买过的产品真的有害吗?

Assuming typical scenario, with 7 million users and a billion products most of time you'll predict over a whole range of products minus few already bought by the user. At least in my opinion important question is why even bother with filtering. Even if you recommend product which has been previously bought is it really harmful?

除非你有非常严格的要求,否则我会简单地忽略这个问题并使用 MatrixFactorizationModel.recommendProductsForUsers 几乎可以为您完成所有工作,不包括数据导出.之后,您可以执行批量导出,一切顺利.

Unless you have very strict requirements I would simply ignore the problem and use MatrixFactorizationModel.recommendProductsForUsers which pretty much does all the job, excluding data export, for you. After that you can perform bulk export and you're good to go.

现在假设您有一个明确的无重复政策.在假设典型用户仅购买相对少量产品的情况下,您可以从为每个用户获取一组产品开始:

Now lets say you have a clear no-duplicates policy. Working under assumption that a typical user purchased only a relatively small number of products you can start with obtaining a set of products for each user:

val userProdSet = buy_values
    .map{case (user, product, _) => (user, product)} 
    .aggregateByKey(Set.empty[Int])((s, e) => s + e, (s1, s2) => s1 ++ s2)

接下来您可以简单地映射 userProdSet 以获取预测:

Next you can simply map userProdSet to get predictions:

// Number of predictions for each user
val nPred = 30;

userProdSet.map{case (user, prodSet) => {
    val recommended = model
         // Find recommendations for user
        .recommendProducts(_, nPred + prodSet.size))
        // Filter to remove already purchased 
        .filter(rating => !prodSet.contains(rating.product))
        // Sort and limit
        .sortBy(_.rating)
        .reverse
        .take(nPred)
    (user, recommended)
}}

您可以通过使用可变集进行聚合和广播模型来进一步改进,但这是一个总体思路.

You can improve further by using mutable sets for aggregation and by broadcasting the model but thats a general idea.

如果 user_ids 中的用户数低于整个集合中的用户数 (buy_values),您可以简单地过滤 userProdSet 以保留只有一部分用户.

If number of user in user_ids is lower than number of user in a whole set (buy_values) you can simply filter userProdSet to keep only a subset of users.

这篇关于如何使用 scala 或 python 在 apache spark 中运行多线程作业?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆