pySpark 映射多个变量 [英] pySpark mapping multiple variables

查看:14
本文介绍了pySpark 映射多个变量的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

下面的代码将我的参考 df 的值和列名称映射到我的实际数据集,找到完全匹配,如果找到完全匹配,则返回 OutputValue.但是,我试图添加规则,当 PrimaryValue = DEFAULT 也返回 OutputValue.

The code below maps values and column names of my reference df with my actual dataset, finding exact matches and if an exact match is found, return the OutputValue. However, I'm trying to add the rule that when PrimaryValue = DEFAULT to also return the OutputValue.

我试图解决这个问题的解决方案是创建一个具有空值的新数据框 - 因为下面的代码没有提供匹配项.因此,下一步将定位其对应的 PrimaryValue = DEFAULT 的空值,以将空值替换为 OutputValue.

The solution I'm trying out to tackle this is to create a new dataframe with null values - since there was no match provided by code below. Thus the next step would be to target the null values whose corresponding PrimaryValue = DEFAULT to replace null by the OutputValue.

  #create a map based on columns from reference_df
  map_key = concat_ws('\0', final_reference.PrimaryName, final_reference.PrimaryValue)
  map_value = final_reference.OutputValue

  #dataframe of concatinated mappings to get the corresponding OutputValues from reference table
  d = final_reference.agg(collect_set(array(concat_ws('\0','PrimaryName','PrimaryValue'), 'OutputValue')).alias('m')).first().m
  #display(d)

  #iterate through mapped values 
  mappings = create_map([lit(i) for i in chain.from_iterable(d)])

  #dataframe with corresponding matched OutputValues
  dataset = datasetM.select("*",*[ mappings[concat_ws('\0', lit(c), col(c))].alias(c_name) for c,c_name in matched_List.items()]) 
  display(dataset)

推荐答案

从评论中的讨论来看,我认为您只需要从现有映射中添加一个默认映射,然后使用 coalease() 函数来查找第一个非空值,见下文:

From discussion in comments, I think you just need to add a default mappings from the existing one and then use coalease() function to find the first non-null value, see below:

from pyspark.sql.functions import collect_set, array, concat_ws, lit, col, create_map, coalesce

# skip some old code

d    
#[['LeaseStatus\x00Abandoned', 'Active'],
# ['LeaseStatus\x00DEFAULT', 'Pending'],
# ['LeaseRecoveryType\x00Gross-modified', 'Modified Gross'],
# ['LeaseStatus\x00Archive', 'Expired'],
# ['LeaseStatus\x00Terminated', 'Terminated'],
# ['LeaseRecoveryType\x00Gross w/base year', 'Modified Gross'],
# ['LeaseRecoveryType\x00Gross', 'Gross']]

# original mapping
mappings = create_map([ lit(j) for i in d for j in i ])

# default mapping
mappings_default = create_map([ lit(j.split('\0')[0]) for i in d if i[0].upper().endswith('\x00DEFAULT') for j in i ])
#Column<b'map(LeaseStatus, Pending)'>

# a set of available PrimaryLookupAttributeName
available_list = set([ i[0].split('\0')[0] for i in d ])
# {'LeaseRecoveryType', 'LeaseStatus'}

# use coalesce to find the first non-null values from mappings, mappings_defaul etc
datasetPrimaryAttributes_False = datasetMatchedPortfolio.select("*",*[ 
  coalesce(
    mappings[concat_ws('\0', lit(c), col(c))],
    mappings_default[c],
    lit("Not Specified at Source" if c in available_list else "Lookup not found")
  ).alias(c_name) for c,c_name in matchedAttributeName_List.items()])

一些解释:

(1) d 是从 reference_df 中检索到的列表列表,我们使用列表推导式 [ lit(j) for i in d for j in i ] 将其展平为列表并将扁平化列表应用于 create_map 函数:

(1) d is a list of lists retrieved from the reference_df, we use a list comprehension [ lit(j) for i in d for j in i ] to flatten this to a list and apply the flattened list to the create_map function:

(2) mappings_default 与上面类似,但增加了一个if条件作为过滤器,只保留有PrimaryLookupAttributeValue的条目(这是内部列表的第一项i[0]) 以 \x00DEFAULT 结尾,然后使用 split 将 PrimaryLookupAttributeValue(基本上是 \x00DEFAULT)从map_key.

(2) The mappings_default is similar to the above, but add a if condition to serve as a filter and keep only entries having PrimaryLookupAttributeValue (which is the first item of the inner list i[0]) ending with \x00DEFAULT and then use split to strip PrimaryLookupAttributeValue(which is basically \x00DEFAULT) off from the map_key.

这篇关于pySpark 映射多个变量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆