PySpark 在嵌套数组中反转 StringIndexer [英] PySpark reversing StringIndexer in nested array

查看:24
本文介绍了PySpark 在嵌套数组中反转 StringIndexer的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 PySpark 使用 ALS 进行协同过滤.我的原始用户和项目 ID 是字符串,因此我使用 StringIndexer 将它们转换为数字索引(PySpark 的 ALS 模型要求我们这样做).

在我拟合模型后,我可以获得每个用户的前 3 个推荐,如下所示:

recs = (模型.recommendForAllUsers(3))

recs 数据框如下所示:

+-----------+------------+|用户ID索引|建议|+-----------+--------------------+|1580|[[10096,3.6725707...||4900|[[10096,3.0137873...||5300|[[10096,2.7274625...||6620|[[10096,2.4493625...||7240|[[10096,2.4928937...|+-----------+--------------------+只显示前 5 行根|-- userIdIndex: 整数 (nullable = false)|-- 建议:数组(可为空 = 真)||-- 元素: struct (containsNull = true)|||-- productIdIndex: 整数 (nullable = true)|||-- 评级:浮点数(可为空 = 真)

我想用这个数据框创建一个巨大的 JSOM 转储,我可以这样:

<代码>(记录.toJSON().saveAsTextFile("name_i_must_hide.recs"))

这些 json 的示例是:

<代码>{"userIdIndex": 1580,建议":[{"productIdIndex": 10096,评分":3.6725707},{"productIdIndex": 10141,评分":3.61542},{"productIdIndex": 11591,评分":3.536216}]}

userIdIndexproductIdIndex 键归因于 StringIndexer 转换.

我怎样才能取回这些列的原始值?我怀疑我必须使用 IndexToString 转换器,但我无法弄清楚如何将数据嵌套在 recs 数据帧内的数组中.

我尝试使用 Pipeline 评估器 (stages=[StringIndexer, ALS, IndexToString]),但该评估器似乎不支持这些索引器.>

干杯!

解决方案

在这两种情况下,您都需要访问标签列表.这可以使用 StringIndexerModel

访问

user_indexer_model = ... # 类型:StringIndexerModeluser_labels = user_indexer_model.labelsproduct_indexer_model = ... # 类型:StringIndexerModelproduct_labels = product_indexer_model.labels

或列元数据.

对于userIdIndex,你可以只应用IndexToString:

from pyspark.ml.feature import IndexToStringuser_id_to_label = IndexToString(inputCol="userIdIndex", outputCol="userId", labels=user_labels)user_id_to_label.transform(recs)

对于推荐,您需要 udf 或这样的表达式:

from pyspark.sql.functions import array, col, lit, structn = 3 # 与 numItems 相同product_labels_ = array(*[lit(x) for x in product_labels])建议 = 数组(*[结构(product_labels_[col("recommendations")[i]["productIdIndex"]].alias("productId"),col("recommendations")[i]["rating"].alias("rating")) 对于范围内的 i(n)])recs.withColumn("推荐", 推荐)

I'm using PySpark to do collaborative filtering using ALS. My original user and item id's are strings, so I used StringIndexer to convert them to numeric indices (PySpark's ALS model obliges us to do so).

After I've fitted the model, I can get the top 3 recommendations for each user like so:

recs = (
    model
    .recommendForAllUsers(3)
)

The recs dataframe looks like so:

+-----------+--------------------+
|userIdIndex|     recommendations|
+-----------+--------------------+
|       1580|[[10096,3.6725707...|
|       4900|[[10096,3.0137873...|
|       5300|[[10096,2.7274625...|
|       6620|[[10096,2.4493625...|
|       7240|[[10096,2.4928937...|
+-----------+--------------------+
only showing top 5 rows

root
 |-- userIdIndex: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- productIdIndex: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)

I want to create a huge JSOM dump with this dataframe, and I can like so:

(
    recs
    .toJSON()
    .saveAsTextFile("name_i_must_hide.recs")
)

and a sample of these jsons is:

{
  "userIdIndex": 1580,
  "recommendations": [
    {
      "productIdIndex": 10096,
      "rating": 3.6725707
    },
    {
      "productIdIndex": 10141,
      "rating": 3.61542
    },
    {
      "productIdIndex": 11591,
      "rating": 3.536216
    }
  ]
}

The userIdIndex and productIdIndex keys are due to the StringIndexer transformation.

How can I get the original value of these columns back? I suspect I must use the IndexToString transformer, but I can't quite figure out how since the data is nested in an array inside the recs Dataframe.

I tried to use a Pipeline evaluator (stages=[StringIndexer, ALS, IndexToString]) but it looks like this evaluator doesn't support these indexers.

Cheers!

解决方案

In both cases you'll need an access to the list of labels. This can be accessed using either a StringIndexerModel

user_indexer_model = ...  # type: StringIndexerModel
user_labels = user_indexer_model.labels

product_indexer_model = ...  # type: StringIndexerModel
product_labels = product_indexer_model.labels

or column metadata.

For userIdIndex you can just apply IndexToString:

from pyspark.ml.feature import IndexToString

user_id_to_label = IndexToString(
    inputCol="userIdIndex", outputCol="userId", labels=user_labels)
user_id_to_label.transform(recs)

For recommendations you'll need either udf or expression like this:

from pyspark.sql.functions import array, col, lit, struct

n = 3  # Same as numItems

product_labels_ = array(*[lit(x) for x in product_labels])
recommendations = array(*[struct(
    product_labels_[col("recommendations")[i]["productIdIndex"]].alias("productId"),
    col("recommendations")[i]["rating"].alias("rating")
) for i in range(n)])

recs.withColumn("recommendations", recommendations)

这篇关于PySpark 在嵌套数组中反转 StringIndexer的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆