pySpark 迭代重复变量 [英] pySpark iterating repetitive variables
问题描述
我有一个当前有效的代码,但我希望提高效率并避免硬编码:
I have a code that currently works, however I'm looking to make it more efficient and avoid hard coding:
1) 避免硬编码:当 Id = 4 时,NotDefined_filterDomainLookup
会参考 default_reference df 以获得相应的代码和名称.而不是硬编码代码和名称值.
1) avoid hard coding: for NotDefined_filterDomainLookup
will like to reference the default_reference df for the corresponding Code and Name when Id = 4. Instead of hard coding the Code and Name value.
问题 1
列名列表和对应的新列名
test_matchedAttributeName_List =dict(matchedDomains.agg(collect_set(array('DomainName', 'TargetAttributeForName')).alias('m')).first().m)
Output: {'LeaseType': 'ConformedLeaseTypeName', 'LeaseRecoveryType': 'ConformedLeaseRecoveryTypeName', 'LeaseStatus': 'ConformedLeaseStatusName'}
工作代码,除了要避免硬编码.具体来说,我想在Id = 4时引用default_reference df对应的Code和Name
cond = col('PrimaryLookupAttributeName').isNull() & col('SecondaryLookupAttributeName').isNull()
NotDefined_filterDomainLookup = filterDomainLookup \
.withColumn('OutputItemIdByAttribute', when(cond, lit('4')).otherwise(col('OutputItemIdByAttribute'))) \
.withColumn('OutputItemCodeByAttribute', when(cond, lit('N/D')).otherwise(col('OutputItemCodeByAttribute'))) \
.withColumn('OutputItemNameByAttribute', when(cond, lit('Not Defined')).otherwise(col('OutputItemNameByAttribute')))
------------+------------------------------+---------------------------+----------------
------------+-----------------------+-------------------------+----------------
推荐答案
对于问题 2,根据您的代码,我建议进行如下调整:
For the Question-2, based on your code, I'd advise some adjustments as below:
- 设置item_keys,包括Id、Name 和Code,并使用列表推导式合并相同的逻辑立>
- 使用struct代替array来实现上述逻辑
- 无需为NotDefned_Attribute_List创建Python字典,元组列表就足够了
- Set up item_keys including Id, Name and Code and merge the same logic using list comprehensions
- Use struct instead of array to implement the above logic
- No need to create Python dictionary for NotDefned_Attribute_List, list of tuples are enough and better
请参阅以下步骤:
(1) 设置两个聚合函数来计算用于testing_mappings 和NotDefined_Attribute_List 的item_map.检查 named_struct 和 struct(两种方法相同练习任务)
(1) Set up two aggregate functions to calculate item_map used for testing_mappings and NotDefined_Attribute_List. check named_struct and struct (two methods for the same task for your exercises)
from itertools import chain
from pyspark.sql.functions import expr, collect_set, struct, col
item_keys = ['Id', 'Name', 'Code']
# use SQL expression
m1_by_sql_expr = expr("""
collect_set(
named_struct(
'attr_name', PrimaryLookupAttributeName,
'attr_value', PrimaryLookupAttributeValue,
'Id', OutputItemIdByValue,
'Name', OutputItemNameByValue,
'Code', OutputItemCodeByValue
)
) as item_map
""")
# use PySpark API functions
m2_by_func = collect_set(
struct(
col('DomainName').alias('domain'),
col('TargetAttributeForId').alias('Id'),
col('TargetAttributeForName').alias('Name'),
col('TargetAttributeForCode').alias('Code')
)
).alias('item_map')
(2) 设置ItemKey(Id, Code or Name) + PrimaryLookupAttributeName + PrimaryLookupAttributeValue 映射到 ItemValue
(2) Set up ItemKey(Id, Code or Name) + PrimaryLookupAttributeName + PrimaryLookupAttributeValue mapping to ItemValue
m1 = NotDefined_filterDomainLookup.agg(m1_by_sql_expr).first().item_map
"""create a list of tuples of (map_key, map_value) to create MapType column:
| map_key = concat_ws('\0', item_key, attr_name, attr_value)
| map_value = item_value
"""
testingId = [('\0'.join([k, row.attr_name, row.attr_value]), row[k]) for row in m1 for k in item_keys if row[k]]
#[('Id\x00LeaseRecoveryType\x00Gross w/base year', '18'),
# ('Name\x00LeaseRecoveryType\x00Gross w/base year', 'Modified Gross'),
# ('Id\x00LeaseStatus\x00Abandoned', '10'),
# ('Name\x00LeaseStatus\x00Abandoned', 'Active'),
# ('Id\x00LeaseStatus\x00Draft', '10'),
# ('Name\x00LeaseStatus\x00Draft', 'Pending'),
# ('Id\x00LeaseStatus\x00Archive', '11'),
# ('Name\x00LeaseStatus\x00Archive', 'Expired'),
# ('Id\x00LeaseStatus\x00Terminated', '10'),
# ('Name\x00LeaseStatus\x00Terminated', 'Terminated'),
# ('Id\x00LeaseRecoveryType\x00Gross', '11'),
# ('Name\x00LeaseRecoveryType\x00Gross', 'Gross'),
# ('Id\x00LeaseRecoveryType\x00Gross-modified', '15'),
# ('Name\x00LeaseRecoveryType\x00Gross-modified', 'Modified Gross')]
# this could be a problem for too many entries.
testing_mappings = create_map([lit(i) for i in chain.from_iterable(testingId)])
(3) 创建NotDefined_AttributeCode_List(逻辑同(2),m2使用PySpark API函数)
(3) Create NotDefined_AttributeCode_List (same logic as in (2), use PySpark API functions for m2)
m2 = matchedDomains.agg(m2_by_func).first().item_map
NotDefned_Attribute_List = [(k, row.domain, row[k]) for row in m2 for k in item_keys if row[k]]
(4) 根据 NotDefined_Attribute_List 获取附加列的列表:
(4) Get a list of additional columns based on NotDefined_Attribute_List:
additional_cols = [
testing_mappings[concat_ws('\0', lit(k), lit(c), col(c))].alias(c_name)
for k,c,c_name in NotDefined_Attribute_List
]
(5) 选择附加列
if count_ND > 0:
# move code above in (2), (3) and (4) here
# set up testing_NotDefined
testing_NotDefined = datasetMatchedPortfolio.select("*", *additional_cols)
else:
print("no Not Defines exist")
这篇关于pySpark 迭代重复变量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!