pySpark映射多个变量 [英] pySpark mapping multiple variables

查看:179
本文介绍了pySpark映射多个变量的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

下面的代码将我的参考df的值和列名与我的实际数据集对应起来,查找完全匹配,如果找到完全匹配,则返回OutputValue.但是,我正在尝试添加规则,即PrimaryValue = DEFAULT时还要返回OutputValue.

The code below maps values and column names of my reference df with my actual dataset, finding exact matches and if an exact match is found, return the OutputValue. However, I'm trying to add the rule that when PrimaryValue = DEFAULT to also return the OutputValue.

我正在尝试解决的解决方案是创建一个具有空值的新数据框-因为下面的代码没有提供匹配项.因此,下一步将针对其对应的PrimaryValue = DEFAULT的空值,用OutputValue替换空值.

The solution I'm trying out to tackle this is to create a new dataframe with null values - since there was no match provided by code below. Thus the next step would be to target the null values whose corresponding PrimaryValue = DEFAULT to replace null by the OutputValue.

  #create a map based on columns from reference_df
  map_key = concat_ws('\0', final_reference.PrimaryName, final_reference.PrimaryValue)
  map_value = final_reference.OutputValue

  #dataframe of concatinated mappings to get the corresponding OutputValues from reference table
  d = final_reference.agg(collect_set(array(concat_ws('\0','PrimaryName','PrimaryValue'), 'OutputValue')).alias('m')).first().m
  #display(d)

  #iterate through mapped values 
  mappings = create_map([lit(i) for i in chain.from_iterable(d)])

  #dataframe with corresponding matched OutputValues
  dataset = datasetM.select("*",*[ mappings[concat_ws('\0', lit(c), col(c))].alias(c_name) for c,c_name in matched_List.items()]) 
  display(dataset)

推荐答案

从注释中的讨论中,我认为您只需要从现有的映射中添加默认映射,然后使用

From discussion in comments, I think you just need to add a default mappings from the existing one and then use coalease() function to find the first non-null value, see below:

from pyspark.sql.functions import collect_set, array, concat_ws, lit, col, create_map, coalesce

# skip some old code

d    
#[['LeaseStatus\x00Abandoned', 'Active'],
# ['LeaseStatus\x00DEFAULT', 'Pending'],
# ['LeaseRecoveryType\x00Gross-modified', 'Modified Gross'],
# ['LeaseStatus\x00Archive', 'Expired'],
# ['LeaseStatus\x00Terminated', 'Terminated'],
# ['LeaseRecoveryType\x00Gross w/base year', 'Modified Gross'],
# ['LeaseRecoveryType\x00Gross', 'Gross']]

# original mapping
mappings = create_map([ lit(j) for i in d for j in i ])

# default mapping
mappings_default = create_map([ lit(j.split('\0')[0]) for i in d if i[0].upper().endswith('\x00DEFAULT') for j in i ])
#Column<b'map(LeaseStatus, Pending)'>

# a set of available PrimaryLookupAttributeName
available_list = set([ i[0].split('\0')[0] for i in d ])
# {'LeaseRecoveryType', 'LeaseStatus'}

# use coalesce to find the first non-null values from mappings, mappings_defaul etc
datasetPrimaryAttributes_False = datasetMatchedPortfolio.select("*",*[ 
  coalesce(
    mappings[concat_ws('\0', lit(c), col(c))],
    mappings_default[c],
    lit("Not Specified at Source" if c in available_list else "Lookup not found")
  ).alias(c_name) for c,c_name in matchedAttributeName_List.items()])

一些解释:

(1)d是从reference_df中检索的列表的列表,我们使用列表理解[ lit(j) for i in d for j in i ]将其展平为列表,并将展平的列表应用于create_map函数:

(1) d is a list of lists retrieved from the reference_df, we use a list comprehension [ lit(j) for i in d for j in i ] to flatten this to a list and apply the flattened list to the create_map function:

(2)mappings_default与上面的相似,但是添加了一个if条件以用作过滤器,并且仅保留具有PrimaryLookupAttributeValue(这是内部列表i[0]的第一项)以,然后使用split从map_key中剥离PrimaryLookupAttributeValue(基本上是\x00DEFAULT).

(2) The mappings_default is similar to the above, but add a if condition to serve as a filter and keep only entries having PrimaryLookupAttributeValue (which is the first item of the inner list i[0]) ending with \x00DEFAULT and then use split to strip PrimaryLookupAttributeValue(which is basically \x00DEFAULT) off from the map_key.

这篇关于pySpark映射多个变量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆