使用带有正则表达式(Scala?)的字典的 PySpark UDF 优化挑战 [英] PySpark UDF optimization challenge using a dictionary with regex's (Scala?)

查看:52
本文介绍了使用带有正则表达式(Scala?)的字典的 PySpark UDF 优化挑战的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试优化下面的代码 (PySpark UDF).

I am trying to optimize the code below (PySpark UDF).

它给了我想要的结果(基于我的数据集),但在非常大的数据集(大约 180M)上它太慢了.

It gives me the desired result (based on my data set) but it's too slow on very large datasets (approx. 180M).

结果(准确性)优于可用的 Python 模块(例如 geotext、hdx-python-country).所以我不是在寻找另一个模块.

The results (accuracy) are better than available Python modules (e.g. geotext, hdx-python-country). So I'm not looking for another module.

数据帧:

df = spark.createDataFrame([
  ["3030 Whispering Pines Circle, Prosper Texas, US","John"],   
  ["Kalverstraat Amsterdam","Mary"],   
  ["Kalverstraat Amsterdam, Netherlands","Lex"] 
]).toDF("address","name")

regex.csv:

iso2;keywords
US;\bArizona\b
US;\bTexas\b
US;\bFlorida\b
US;\bChicago\b
US;\bAmsterdam\b
US;\bProsper\b
US;\bUS$
CA;\bAlberta\b
CA;\bNova Scotia\b
CA;\bNova Scotia\b
CA;\bWhitehorse\b
CA;\bCA$
NL;\bAmsterdam\b
NL;\Netherlands\b
NL;\bNL$

......<many, many more>

regex.csv 创建 Pandas DataFrame,按 iso2 分组并加入 keywords (\bArizona\b|\b德克萨斯\b\b佛罗里达\b|\bUS$).

Creating a Pandas DataFrame from regex.csv, group by iso2 and joining the keywords (\bArizona\b|\bTexas\b\bFlorida\b|\bUS$).

df = pd.read_csv(regex.csv, sep=';')
df_regex = df.groupby('iso2').agg({'keywords': '|'.join }).reset_index()

功能:

def get_iso2(x): 
 
    iso2={}
    
    for j, row in df_regex.iterrows():
 
        regex = re.compile(row['keywords'],re.I|re.M)         
        matches = re.finditer(regex, x)
        
        for m in matches:
            iso2[row['iso2']] = iso2.get(row['iso2'], 0) + 1
            
    return [key for key, value in iso2.items() for _ in range(value)]

PySpark UDF:

PySpark UDF:

get_iso2_udf = F.udf(get_iso2, T.ArrayType(T.StringType()))

创建新列:

df_new = df.withColumn('iso2',get_iso2_udf('address')

预期的样本输出:

[US,US,NL]
[CA]
[BE,BE,AU]

有些地方出现在一个以上的国家(输入地址栏是城市、省、州、国家...)

Some places occur in more than one country (input is address column with city, province, state, country...)

示例:

3030 Whispering Pines Circle, Prosper Texas, US ->[美国,美国,美国]
Kalverstraat 阿姆斯特丹 ->[美国,荷兰]
Kalverstraat 阿姆斯特丹,荷兰 ->[美国,荷兰,荷兰]

3030 Whispering Pines Circle, Prosper Texas, US -> [US,US,US]
Kalverstraat Amsterdam -> [US,NL]
Kalverstraat Amsterdam, Netherlands -> [US, NL, NL]

也许在 PySpark 中使用 Scala UDF 是一种选择,但我不知道如何.

Maybe using Scala UDFs in PySpark is an option, but I have no idea how.

非常感谢您的优化建议!

Your optimisation recommendations are highly appreciated!

推荐答案

IIUC,不使用UDF可以尝试以下步骤:

IIUC, you can try the following steps without using UDF:

from pyspark.sql.functions import expr, first, collect_list, broadcast, monotonically_increasing_id, flatten
import pandas as pd

df = spark.createDataFrame([
  ["3030 Whispering Pines Circle, Prosper Texas, US","John"],
  ["Kalverstraat Amsterdam","Mary"],
  ["Kalverstraat Amsterdam, Netherlands","Lex"],
  ["xvcv", "ddd"]
]).toDF("address","name")

步骤 1: 将 df_regex 转换为 Spark 数据帧 df1 并将 unique_id 添加到 df.

Step-1: convert df_regex to a Spark dataframe df1 and add an unique_id to df.

df_regex = pd.read_csv("file:///path/to/regex.csv", sep=";")

# adjust keywords to uppercase except chars preceded with backslash:
df_regex["keywords"] = df_regex["keywords"].str.replace(r'(^|\\.)([^\\]*)', lambda m: m.group(1) + m.group(2).upper())

# create regex patterns:
df_regex = df_regex.groupby('iso2').agg({'keywords':lambda x: '(?m)' + '|'.join(x)}).reset_index()

df1 = spark.createDataFrame(df_regex)
df1.show(truncate=False)
+----+---------------------------------------------------------------------------------+
|iso2|keywords                                                                         |
+----+---------------------------------------------------------------------------------+
|CA  |(?m)\bALBERTA\b|\bNOVA SCOTIA\b|\bWHITEHORSE\b|\bCA$                             |
|NL  |(?m)\bAMSTERDAM\b|\bNETHERLANDS\b|\bNL$                                          |
|US  |(?m)\bARIZONA\b|\bTEXAS\b|\bFLORIDA\b|\bCHICAGO\b|\bAMSTERDAM\b|\bPROSPER\b|\bUS$|
+----+---------------------------------------------------------------------------------+

df = df.withColumn('id', monotonically_increasing_id())
df.show(truncate=False)
+-----------------------------------------------+----+---+
|address                                        |name|id |
+-----------------------------------------------+----+---+
|3030 Whispering Pines Circle, Prosper Texas, US|John|0  |
|Kalverstraat Amsterdam                         |Mary|1  |
|Kalverstraat Amsterdam, Netherlands            |Lex |2  |
|xvcv                                           |ddd |3  |
+-----------------------------------------------+----+---+

第 2 步: 使用 rlike 将 df_regex 左连接到 df

Step-2: left join df_regex to df using rlike

df2 = df.alias('d1').join(broadcast(df1.alias('d2')), expr("upper(d1.address) rlike d2.keywords"), "left")
df2.show()
+--------------------+----+---+----+--------------------+
|             address|name| id|iso2|            keywords|
+--------------------+----+---+----+--------------------+
|3030 Whispering P...|John|  0|  US|(?m)\bARIZONA\b|\...|
|Kalverstraat Amst...|Mary|  1|  NL|(?m)\bAMSTERDAM\b...|
|Kalverstraat Amst...|Mary|  1|  US|(?m)\bARIZONA\b|\...|
|Kalverstraat Amst...| Lex|  2|  NL|(?m)\bAMSTERDAM\b...|
|Kalverstraat Amst...| Lex|  2|  US|(?m)\bARIZONA\b|\...|
|                xvcv| ddd|  3|null|                null|
+--------------------+----+---+----+--------------------+

Step-3:通过拆分d1.address计算d1.address中匹配的d2.keywords的数量> by d2.keywords,然后将结果数组的大小减1:

Step-3: count number of matched d2.keywords in d1.address by splitting d1.address by d2.keywords, and then reduce the size of the resulting Array by 1:

df3 = df2.withColumn('num_matches', expr("size(split(upper(d1.address), d2.keywords))-1"))
+--------------------+----+---+----+--------------------+-----------+
|             address|name| id|iso2|            keywords|num_matches|
+--------------------+----+---+----+--------------------+-----------+
|3030 Whispering P...|John|  0|  US|(?m)\bARIZONA\b|\...|          3|
|Kalverstraat Amst...|Mary|  1|  NL|(?m)\bAMSTERDAM\b...|          1|
|Kalverstraat Amst...|Mary|  1|  US|(?m)\bARIZONA\b|\...|          1|
|Kalverstraat Amst...| Lex|  2|  NL|(?m)\bAMSTERDAM\b...|          2|
|Kalverstraat Amst...| Lex|  2|  US|(?m)\bARIZONA\b|\...|          1|
|                xvcv| ddd|  3|null|                null|         -2|
+--------------------+----+---+----+--------------------+-----------+

步骤 4: 使用 array_repeat 重复iso2 num_matches 次的值(需要Spark 2.4+):

Step-4: use array_repeat to repeat the value of iso2 num_matches times (require Spark 2.4+):

df4 = df3.withColumn("iso2", expr("array_repeat(iso2, num_matches)"))
+--------------------+----+---+------------+--------------------+-----------+
|             address|name| id|        iso2|            keywords|num_matches|
+--------------------+----+---+------------+--------------------+-----------+
|3030 Whispering P...|John|  0|[US, US, US]|(?m)\bARIZONA\b|\...|          3|
|Kalverstraat Amst...|Mary|  1|        [NL]|(?m)\bAMSTERDAM\b...|          1|
|Kalverstraat Amst...|Mary|  1|        [US]|(?m)\bARIZONA\b|\...|          1|
|Kalverstraat Amst...| Lex|  2|    [NL, NL]|(?m)\bAMSTERDAM\b...|          2|
|Kalverstraat Amst...| Lex|  2|        [US]|(?m)\bARIZONA\b|\...|          1|
|                xvcv| ddd|  3|          []|                null|         -2|
+--------------------+----+---+------------+--------------------+-----------+

步骤 5: groupby 并进行聚合:

Step-5: groupby and do the aggregation:

df_new = df4 \
    .groupby('id') \
    .agg(
      first('address').alias('address'),
      first('name').alias('name'),
      flatten(collect_list('iso2')).alias('countries')
)
+---+--------------------+----+------------+
| id|             address|name|   countries|
+---+--------------------+----+------------+
|  0|3030 Whispering P...|John|[US, US, US]|
|  1|Kalverstraat Amst...|Mary|    [NL, US]|
|  3|                xvcv| ddd|          []|
|  2|Kalverstraat Amst...| Lex|[NL, NL, US]|
+---+--------------------+----+------------+

替代方案:步骤 3 也可以由 Pandas UDF 处理:

Alternative: Step-3 can also be handled by Pandas UDF:

from pyspark.sql.functions import pandas_udf, PandasUDFType
from pandas import Series
import re

@pandas_udf("int", PandasUDFType.SCALAR)
def get_num_matches(addr, ptn):
    return Series([ 0 if p is None else len(re.findall(p,s)) for p,s in zip(ptn,addr) ])

df3 = df2.withColumn("num_matches", get_num_matches(expr('upper(address)'), 'keywords'))
+--------------------+----+---+----+--------------------+-----------+
|             address|name| id|iso2|            keywords|num_matches|
+--------------------+----+---+----+--------------------+-----------+
|3030 Whispering P...|John|  0|  US|(?m)\bARIZONA\b|\...|          3|
|Kalverstraat Amst...|Mary|  1|  NL|(?m)\bAMSTERDAM\b...|          1|
|Kalverstraat Amst...|Mary|  1|  US|(?m)\bARIZONA\b|\...|          1|
|Kalverstraat Amst...| Lex|  2|  NL|(?m)\bAMSTERDAM\b...|          2|
|Kalverstraat Amst...| Lex|  2|  US|(?m)\bARIZONA\b|\...|          1|
|                xvcv| ddd|  3|null|                null|          0|
+--------------------+----+---+----+--------------------+-----------+

注意事项:

  1. 由于不区分大小写的模式匹配代价高昂,我们转换了关键字的所有字符(除了锚点或转义字符,如 \b\B\A, \z) 到大写.
  2. 提醒一下,rlikeregexp_replace 中使用的模式是基于 Java 的,而在 pandas_udf 中它是基于 Python 的,在 regex 中设置模式时可能会略有不同..
  1. as pattern-matching with case-insensitive is expensive, we converted all chars of keywords (except anchors or escaped chars like \b, \B, \A, \z) to upper case.
  2. just a reminder, patterns used in rlike and regexp_replace are Java-based while in pandas_udf it's Python-based which might have slight differences when setting up patterns in regex.csv.

方法 2:使用 pandas_udf

由于使用 join 和 groupby 会触发数据混洗,因此上述方法可能会很慢.测试的另一种选择:

Method-2: using pandas_udf

As using join and groupby triggers data shuffling, the above method could be slow. Just one more option for your testing:

df_regex = pd.read_csv("file:///path/to/regex.csv", sep=";")

df_regex["keywords"] = df_regex["keywords"].str.replace(r'(^|\\.)([^\\]*)', lambda m: m.group(1) + m.group(2).upper())

df_ptn = spark.sparkContext.broadcast(
    df_regex.groupby('iso2').agg({'keywords':lambda x: '(?m)' + '|'.join(x)})["keywords"].to_dict()
)
df_ptn.value
#{'CA': '(?m)\\bALBERTA\\b|\\bNOVA SCOTIA\\b|\\bNOVA SCOTIA\\b|\\bWHITEHORSE\\b|\\bCA$',
# 'NL': '(?m)\\bAMSTERDAM\\b|\\bNETHERLANDS\\b|\\bNL$',
# 'US': '(?m)\\bARIZONA\\b|\\bTEXAS\\b|\\bFLORIDA\\b|\\bCHICAGO\\b|\\bAMSTERDAM\\b|\\bPROSPER\\b|\\bUS$'}

# REF: https://stackoverflow.com/questions/952914/how-to-make-a-flat-list-out-of-list-of-lists
from operator import iconcat
from functools import reduce
from pandas import Series
from pyspark.sql.functions import pandas_udf, PandasUDFType, flatten

def __get_iso2(addr, ptn):   
   return Series([ reduce(iconcat, [[k]*len(re.findall(v,s)) for k,v in ptn.value.items()]) for s in addr ])

get_iso2 = pandas_udf(lambda x:__get_iso2(x, df_ptn), "array<string>", PandasUDFType.SCALAR)

df.withColumn('iso2', get_iso2(expr("upper(address)"))).show()
+--------------------+----+---+------------+
|             address|name| id|        iso2|
+--------------------+----+---+------------+
|3030 Whispering P...|John|  0|[US, US, US]|
|Kalverstraat Amst...|Mary|  1|    [NL, US]|
|Kalverstraat Amst...| Lex|  2|[NL, NL, US]|
|                xvcv| ddd|  3|          []|
+--------------------+----+---+------------+

或者在pandas_udf中返回一个数组数组(没有reduceiconcat)并用Spark做flatten:

Or return an array of arrays in pandas_udf (w/o reduce and iconcat) and do flatten with Spark:

def __get_iso2_2(addr, ptn):
    return Series([ [[k]*len(re.findall(v,s)) for k,v in ptn.value.items()] for s in addr ])

get_iso2_2 = pandas_udf(lambda x:__get_iso2_2(x, df_ptn), "array<array<string>>", PandasUDFType.SCALAR)

df.withColumn('iso2', flatten(get_iso2_2(expr("upper(address)")))).show()

更新:要查找独特的国家/地区,请执行以下操作:

Update: to find unique countries, do the following:

def __get_iso2_3(addr, ptn):
  return Series([ [k for k,v in ptn.value.items() if re.search(v,s)] for s in addr ])

get_iso2_3 = pandas_udf(lambda x:__get_iso2_3(x, df_ptn), "array<string>", PandasUDFType.SCALAR)

df.withColumn('iso2', get_iso2_3(expr("upper(address)"))).show()
+--------------------+----+--------+
|             address|name|    iso2|
+--------------------+----+--------+
|3030 Whispering P...|John|    [US]|
|Kalverstraat Amst...|Mary|[NL, US]|
|Kalverstraat Amst...| Lex|[NL, US]|
|                xvcv| ddd|      []|
+--------------------+----+--------+

方法 3:使用列表推导式:

类似于@CronosNull 的方法,如果 regex.csv 的列表是可管理的,您可以使用列表理解来处理:

Method-3: use a list comprehension:

Similar to @CronosNull's method, In case the list of regex.csv is manageable, you can handle this using a list comprehension:

from pyspark.sql.functions import size, split, upper, col, array, expr, flatten

df_regex = pd.read_csv("file:///path/to/regex.csv", sep=";")
df_regex["keywords"] = df_regex["keywords"].str.replace(r'(^|\\.)([^\\]*)', lambda m: m.group(1) + m.group(2).upper())
df_ptn = df_regex.groupby('iso2').agg({'keywords':lambda x: '(?m)' + '|'.join(x)})["keywords"].to_dict()

df1 = df.select("*", *[ (size(split(upper(col('address')), v))-1).alias(k) for k,v in df_ptn.items()])

df1.select(*df.columns, flatten(array(*[ expr("array_repeat('{0}',`{0}`)".format(c)) for c in df_ptn.keys() ])).alias('iso2')).show()
+--------------------+----+---+------------+
|             address|name| id|        iso2|
+--------------------+----+---+------------+
|3030 Whispering P...|John|  0|[US, US, US]|
|Kalverstraat Amst...|Mary|  1|    [NL, US]|
|Kalverstraat Amst...| Lex|  2|[NL, NL, US]|
|                xvcv| ddd|  3|          []|
+--------------------+----+---+------------+

这篇关于使用带有正则表达式(Scala?)的字典的 PySpark UDF 优化挑战的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆