如何在PySpark中进行范围查找和搜索 [英] How to do range lookup and search in PySpark

查看:440
本文介绍了如何在PySpark中进行范围查找和搜索的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我尝试在PySpark中编写一个函数,该函数可以进行范围内的组合搜索和查找值。以下是详细说明。

I try to code in PySpark a function which can do combination search and lookup values within a range. The following is the detailed description.

我有两个数据集。
一个数据集,例如 D1 ,基本上是一个查找表,如下所示:

I have two data sets. One data set, say D1, is basically a lookup table, as in below:

MinValue  MaxValue Value1 Value2
---------------------------------
1          1000      0.5     0.6
1001       2000      0.8     0.1
2001       4000      0.2     0.5
4001       9000      0.04    0.06

另一个数据集(例如D2)是具有数百万条记录的表,例如:

The other data set, say D2, is a table with millions of records, for example:

ID      InterestsRate       Days       
----------------------------------
1       19.99               29
2       11.99               49

每个 ID ,我需要根据不同的信用额度(可能值为 500、1000、2000、3000、5000 )来计算最大回报。

For each ID, I need to calculate the maximum return based on different credit limit with possible values of 500, 1000, 2000, 3000, 5000.

返回值的计算方式为


f(x) = InterestsRate *天数* Value1 * Value2。

f(x) = InterestsRate * Days * Value1 * Value2.

Value1 Value2 是通过在 D1 中查找信用额度来确定的。例如,如果信用额度为3000,则将返回查找 D1 ,0.2和0.5。

Value1 and Value2 are determined by looking up a credit limit in D1. For example, if the credit limit is 3000, the looking up D1, 0.2 and 0.5 will be returned.

对于 D2 中的每条记录,我想计算不同信用额度的收益并找出信用额度回报给我最大的回报。

For each record in D2, I want to calculate the return on different credit limit and find out the credit limit and return which gives me the max return.

到目前为止,我已经完成了两个功能:

So far I have completed two functions:

我将查找功能定义为

def LookUp(value):
    filter_str = "MinValue <=" + str(value) + " and MaxValue >=" + str(value)
    return D1.filter(filter_str)

我也定义了搜索功能as

I also defined a search function as

def Search(rate, day):
    credit_limit = [500, 1000, 2000, 3000, 5000]
    max=0;
    cl=-1;
    for i in range(1: len(credit_limit)):
       v1 = lookup(credit_limit[i]).select("value1")
       v2 = lookup(credit_limit[i]).select("value2")
       tmp = rate*day*value1*value2
       if max < tmp: 
          max=tmp 
          cl=credit_limit[i]

    return (cl, max)  

我将在D2上调用以下转换:

I will call the following transformation on D2:

res = D2.mapValues(lambda row: Search(row[1], row[2]))

我很惊讶遇到错误,我搜寻到我无法在RDD( D2 )上进行转换时使用数据框( D1 )。

With surprise, I run into errors and I googled that I cannot use data frame (D1) within a transformation on RDD (D2).

我还用谷歌搜索了可能的解决方法是广播 D1 。但是,我不知道如何使它工作。

I also googled that the possible solution is to broadcast D1. However, I don't know how to make it work.

请问如何在PySpark中实现此功能?

Would you please comment on how to implement this function in PySpark?

谢谢!

推荐答案

在使用 spark 时,您应该想到就 SQL 和表联接而言,而不是循环遍历列表。

When you're using spark you should think in terms of SQL and table joins instead of looping over lists.

所以我要做的第一件事是将您的信用额度列表转换为表格,我们称之为 D3

So the first thing I would do is to turn your credit limits list into a table, let's call it D3:

credit_limit = [500, 1000, 2000, 3000, 5000]
D3 = spark.createDataFrame([[x] for x in credit_limit], ["CreditLimit"])
D3.show()
#+-----------+
#|CreditLimit|
#+-----------+
#|        500|
#|       1000|
#|       2000|
#|       3000|
#|       5000|
#+-----------+

现在,您可以将此表加入 D1 D2 来计算每个信用额度的收益,然后使用 Window 函数对每次返回进行排名。当您评论中所述时,我们将

Now, you can join this table to D1 and D2 to compute the return for each credit limit, and then pick the max return using Window function to rank each return. As you stated in the comments we will pick the maximum credit limit if there is a tie.

import pyspark.sql.functions as f
from pyspark.sql import Window

w = Window.partitionBy("ID").orderBy(f.desc("Return"), f.desc("CreditLimit"))
D2.alias("D2").crossJoin(D3.alias("D3"))\
    .crossJoin(D1.alias("D1"))\
    .where("D3.CreditLimit BETWEEN D1.MinValue AND D1.MaxValue")\
    .withColumn("Return", f.expr("D2.InterestsRate*D2.Days*D1.Value1*D1.Value2"))\
    .withColumn("Rank", f.rank().over(w))\
    .where("Rank = 1")\
    .drop("Rank")\
    .show()
#+---+-------------+----+-----------+--------+--------+------+------+------------------+
#| ID|InterestsRate|Days|CreditLimit|MinValue|MaxValue|Value1|Value2|            Return|
#+---+-------------+----+-----------+--------+--------+------+------+------------------+
#|  1|        19.99|  29|       1000|       1|    1000|   0.5|   0.6|173.91299999999998|
#|  2|        11.99|  49|       1000|       1|    1000|   0.5|   0.6|           176.253|
#+---+-------------+----+-----------+--------+--------+------+------+------------------+

我们在这里做2种笛卡尔积,因此可能无法很好地扩展,但请尝试一下。

We're doing 2 Cartesian products here so this may not scale well, but give it a try.

这篇关于如何在PySpark中进行范围查找和搜索的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆