pySpark 映射多个变量 [英] pySpark mapping multiple variables
问题描述
下面的代码将我的参考 df 的值和列名称映射到我的实际数据集,找到完全匹配,如果找到完全匹配,则返回 OutputValue
.但是,我试图添加规则,当 PrimaryValue = DEFAULT
也返回 OutputValue
.
The code below maps values and column names of my reference df with my actual dataset, finding exact matches and if an exact match is found, return the OutputValue
. However, I'm trying to add the rule that when PrimaryValue = DEFAULT
to also return the OutputValue
.
我试图解决这个问题的解决方案是创建一个具有空值的新数据框 - 因为下面的代码没有提供匹配项.因此,下一步将定位其对应的 PrimaryValue = DEFAULT
的空值,以将空值替换为 OutputValue
.
The solution I'm trying out to tackle this is to create a new dataframe with null values - since there was no match provided by code below. Thus the next step would be to target the null values whose corresponding PrimaryValue = DEFAULT
to replace null by the OutputValue
.
#create a map based on columns from reference_df
map_key = concat_ws('\0', final_reference.PrimaryName, final_reference.PrimaryValue)
map_value = final_reference.OutputValue
#dataframe of concatinated mappings to get the corresponding OutputValues from reference table
d = final_reference.agg(collect_set(array(concat_ws('\0','PrimaryName','PrimaryValue'), 'OutputValue')).alias('m')).first().m
#display(d)
#iterate through mapped values
mappings = create_map([lit(i) for i in chain.from_iterable(d)])
#dataframe with corresponding matched OutputValues
dataset = datasetM.select("*",*[ mappings[concat_ws('\0', lit(c), col(c))].alias(c_name) for c,c_name in matched_List.items()])
display(dataset)
推荐答案
从评论中的讨论来看,我认为您只需要从现有映射中添加一个默认映射,然后使用 coalease() 函数来查找第一个非空值,见下文:
From discussion in comments, I think you just need to add a default mappings from the existing one and then use coalease() function to find the first non-null value, see below:
from pyspark.sql.functions import collect_set, array, concat_ws, lit, col, create_map, coalesce
# skip some old code
d
#[['LeaseStatus\x00Abandoned', 'Active'],
# ['LeaseStatus\x00DEFAULT', 'Pending'],
# ['LeaseRecoveryType\x00Gross-modified', 'Modified Gross'],
# ['LeaseStatus\x00Archive', 'Expired'],
# ['LeaseStatus\x00Terminated', 'Terminated'],
# ['LeaseRecoveryType\x00Gross w/base year', 'Modified Gross'],
# ['LeaseRecoveryType\x00Gross', 'Gross']]
# original mapping
mappings = create_map([ lit(j) for i in d for j in i ])
# default mapping
mappings_default = create_map([ lit(j.split('\0')[0]) for i in d if i[0].upper().endswith('\x00DEFAULT') for j in i ])
#Column<b'map(LeaseStatus, Pending)'>
# a set of available PrimaryLookupAttributeName
available_list = set([ i[0].split('\0')[0] for i in d ])
# {'LeaseRecoveryType', 'LeaseStatus'}
# use coalesce to find the first non-null values from mappings, mappings_defaul etc
datasetPrimaryAttributes_False = datasetMatchedPortfolio.select("*",*[
coalesce(
mappings[concat_ws('\0', lit(c), col(c))],
mappings_default[c],
lit("Not Specified at Source" if c in available_list else "Lookup not found")
).alias(c_name) for c,c_name in matchedAttributeName_List.items()])
一些解释:
(1) d 是从 reference_df 中检索到的列表列表,我们使用列表推导式 [ lit(j) for i in d for j in i ]
将其展平为列表并将扁平化列表应用于 create_map
函数:
(1) d is a list of lists retrieved from the reference_df, we use a list comprehension [ lit(j) for i in d for j in i ]
to flatten this to a list and apply the flattened list to the create_map
function:
(2) mappings_default 与上面类似,但增加了一个if
条件作为过滤器,只保留有PrimaryLookupAttributeValue的条目(这是内部列表的第一项i[0]
) 以 \x00DEFAULT
结尾,然后使用 split
将 PrimaryLookupAttributeValue(基本上是 \x00DEFAULT
)从map_key.
(2) The mappings_default is similar to the above, but add a if
condition to serve as a filter and keep only entries having PrimaryLookupAttributeValue (which is the first item of the inner list i[0]
) ending with \x00DEFAULT
and then use split
to strip PrimaryLookupAttributeValue(which is basically \x00DEFAULT
) off from the map_key.
这篇关于pySpark 映射多个变量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!