pySpark映射多个变量 [英] pySpark mapping multiple variables
问题描述
下面的代码将我的参考df的值和列名与我的实际数据集对应起来,查找完全匹配,如果找到完全匹配,则返回OutputValue
.但是,我正在尝试添加规则,即PrimaryValue = DEFAULT
时还要返回OutputValue
.
The code below maps values and column names of my reference df with my actual dataset, finding exact matches and if an exact match is found, return the OutputValue
. However, I'm trying to add the rule that when PrimaryValue = DEFAULT
to also return the OutputValue
.
我正在尝试解决的解决方案是创建一个具有空值的新数据框-因为下面的代码没有提供匹配项.因此,下一步将针对其对应的PrimaryValue = DEFAULT
的空值,用OutputValue
替换空值.
The solution I'm trying out to tackle this is to create a new dataframe with null values - since there was no match provided by code below. Thus the next step would be to target the null values whose corresponding PrimaryValue = DEFAULT
to replace null by the OutputValue
.
#create a map based on columns from reference_df
map_key = concat_ws('\0', final_reference.PrimaryName, final_reference.PrimaryValue)
map_value = final_reference.OutputValue
#dataframe of concatinated mappings to get the corresponding OutputValues from reference table
d = final_reference.agg(collect_set(array(concat_ws('\0','PrimaryName','PrimaryValue'), 'OutputValue')).alias('m')).first().m
#display(d)
#iterate through mapped values
mappings = create_map([lit(i) for i in chain.from_iterable(d)])
#dataframe with corresponding matched OutputValues
dataset = datasetM.select("*",*[ mappings[concat_ws('\0', lit(c), col(c))].alias(c_name) for c,c_name in matched_List.items()])
display(dataset)
推荐答案
从注释中的讨论中,我认为您只需要从现有的映射中添加默认映射,然后使用
From discussion in comments, I think you just need to add a default mappings from the existing one and then use coalease() function to find the first non-null value, see below:
from pyspark.sql.functions import collect_set, array, concat_ws, lit, col, create_map, coalesce
# skip some old code
d
#[['LeaseStatus\x00Abandoned', 'Active'],
# ['LeaseStatus\x00DEFAULT', 'Pending'],
# ['LeaseRecoveryType\x00Gross-modified', 'Modified Gross'],
# ['LeaseStatus\x00Archive', 'Expired'],
# ['LeaseStatus\x00Terminated', 'Terminated'],
# ['LeaseRecoveryType\x00Gross w/base year', 'Modified Gross'],
# ['LeaseRecoveryType\x00Gross', 'Gross']]
# original mapping
mappings = create_map([ lit(j) for i in d for j in i ])
# default mapping
mappings_default = create_map([ lit(j.split('\0')[0]) for i in d if i[0].upper().endswith('\x00DEFAULT') for j in i ])
#Column<b'map(LeaseStatus, Pending)'>
# a set of available PrimaryLookupAttributeName
available_list = set([ i[0].split('\0')[0] for i in d ])
# {'LeaseRecoveryType', 'LeaseStatus'}
# use coalesce to find the first non-null values from mappings, mappings_defaul etc
datasetPrimaryAttributes_False = datasetMatchedPortfolio.select("*",*[
coalesce(
mappings[concat_ws('\0', lit(c), col(c))],
mappings_default[c],
lit("Not Specified at Source" if c in available_list else "Lookup not found")
).alias(c_name) for c,c_name in matchedAttributeName_List.items()])
一些解释:
(1)d是从reference_df中检索的列表的列表,我们使用列表理解[ lit(j) for i in d for j in i ]
将其展平为列表,并将展平的列表应用于create_map
函数:
(1) d is a list of lists retrieved from the reference_df, we use a list comprehension [ lit(j) for i in d for j in i ]
to flatten this to a list and apply the flattened list to the create_map
function:
(2)mappings_default与上面的相似,但是添加了一个if
条件以用作过滤器,并且仅保留具有PrimaryLookupAttributeValue(这是内部列表i[0]
的第一项)以split
从map_key中剥离PrimaryLookupAttributeValue(基本上是\x00DEFAULT
).
(2) The mappings_default is similar to the above, but add a if
condition to serve as a filter and keep only entries having PrimaryLookupAttributeValue (which is the first item of the inner list i[0]
) ending with \x00DEFAULT
and then use split
to strip PrimaryLookupAttributeValue(which is basically \x00DEFAULT
) off from the map_key.
这篇关于pySpark映射多个变量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!