pySpark 迭代重复变量 [英] pySpark iterating repetitive variables

查看:85
本文介绍了pySpark 迭代重复变量的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个当前有效的代码,但我希望提高效率并避免硬编码:

I have a code that currently works, however I'm looking to make it more efficient and avoid hard coding:

1) 避免硬编码:当 Id = 4 时,NotDefined_filterDomainLookup 会参考 default_reference df 以获得相应的代码和名称.而不是硬编码代码和名称值.

1) avoid hard coding: for NotDefined_filterDomainLookup will like to reference the default_reference df for the corresponding Code and Name when Id = 4. Instead of hard coding the Code and Name value.

问题 1

列名列表和对应的新列名

test_matchedAttributeName_List =dict(matchedDomains.agg(collect_set(array('DomainName', 'TargetAttributeForName')).alias('m')).first().m)

Output: {'LeaseType': 'ConformedLeaseTypeName', 'LeaseRecoveryType': 'ConformedLeaseRecoveryTypeName', 'LeaseStatus': 'ConformedLeaseStatusName'}

工作代码,除了要避免硬编码.具体来说,我想在Id = 4时引用default_reference df对应的Code和Name

cond = col('PrimaryLookupAttributeName').isNull() & col('SecondaryLookupAttributeName').isNull()

NotDefined_filterDomainLookup = filterDomainLookup \
    .withColumn('OutputItemIdByAttribute', when(cond, lit('4')).otherwise(col('OutputItemIdByAttribute'))) \
    .withColumn('OutputItemCodeByAttribute', when(cond, lit('N/D')).otherwise(col('OutputItemCodeByAttribute'))) \
    .withColumn('OutputItemNameByAttribute', when(cond, lit('Not Defined')).otherwise(col('OutputItemNameByAttribute'))) 

------------+------------------------------+---------------------------+----------------

------------+-----------------------+-------------------------+----------------

推荐答案

对于问题 2,根据您的代码,我建议进行如下调整:

For the Question-2, based on your code, I'd advise some adjustments as below:

  • 设置item_keys,包括IdNameCode,并使用列表推导式合并相同的逻辑立>
  • 使用struct代替array来实现上述逻辑
  • 无需为NotDefned_Attribute_List创建Python字典,元组列表就足够了
  • Set up item_keys including Id, Name and Code and merge the same logic using list comprehensions
  • Use struct instead of array to implement the above logic
  • No need to create Python dictionary for NotDefned_Attribute_List, list of tuples are enough and better

请参阅以下步骤:

(1) 设置两个聚合函数来计算用于testing_mappingsNotDefined_Attribute_Listitem_map.检查 named_structstruct(两种方法相同练习任务)

(1) Set up two aggregate functions to calculate item_map used for testing_mappings and NotDefined_Attribute_List. check named_struct and struct (two methods for the same task for your exercises)

from itertools import chain
from pyspark.sql.functions import expr, collect_set, struct, col

item_keys = ['Id', 'Name', 'Code']

# use SQL expression
m1_by_sql_expr = expr("""
  collect_set(
    named_struct(
      'attr_name', PrimaryLookupAttributeName,
      'attr_value', PrimaryLookupAttributeValue,
      'Id', OutputItemIdByValue,
      'Name', OutputItemNameByValue,
      'Code', OutputItemCodeByValue
    )
  ) as item_map
""")

# use PySpark API functions
m2_by_func = collect_set(
    struct(
      col('DomainName').alias('domain'),
      col('TargetAttributeForId').alias('Id'),
      col('TargetAttributeForName').alias('Name'),
      col('TargetAttributeForCode').alias('Code')
    )
  ).alias('item_map')

(2) 设置ItemKey(Id, Code or Name) + PrimaryLookupAttributeName + PrimaryLookupAttributeValue 映射到 ItemValue

(2) Set up ItemKey(Id, Code or Name) + PrimaryLookupAttributeName + PrimaryLookupAttributeValue mapping to ItemValue

  m1 = NotDefined_filterDomainLookup.agg(m1_by_sql_expr).first().item_map

  """create a list of tuples of (map_key, map_value) to create MapType column:
     | map_key = concat_ws('\0', item_key, attr_name, attr_value)
     | map_value = item_value
  """
  testingId = [('\0'.join([k, row.attr_name, row.attr_value]), row[k]) for row in m1 for k in item_keys if row[k]]
  #[('Id\x00LeaseRecoveryType\x00Gross w/base year', '18'),
  # ('Name\x00LeaseRecoveryType\x00Gross w/base year', 'Modified Gross'),
  # ('Id\x00LeaseStatus\x00Abandoned', '10'),
  # ('Name\x00LeaseStatus\x00Abandoned', 'Active'),
  # ('Id\x00LeaseStatus\x00Draft', '10'),
  # ('Name\x00LeaseStatus\x00Draft', 'Pending'),
  # ('Id\x00LeaseStatus\x00Archive', '11'),
  # ('Name\x00LeaseStatus\x00Archive', 'Expired'),
  # ('Id\x00LeaseStatus\x00Terminated', '10'),
  # ('Name\x00LeaseStatus\x00Terminated', 'Terminated'),
  # ('Id\x00LeaseRecoveryType\x00Gross', '11'),
  # ('Name\x00LeaseRecoveryType\x00Gross', 'Gross'),
  # ('Id\x00LeaseRecoveryType\x00Gross-modified', '15'),
  # ('Name\x00LeaseRecoveryType\x00Gross-modified', 'Modified Gross')]

  # this could be a problem for too many entries.
  testing_mappings = create_map([lit(i) for i in chain.from_iterable(testingId)])

(3) 创建NotDefined_AttributeCode_List(逻辑同(2),m2使用PySpark API函数)

(3) Create NotDefined_AttributeCode_List (same logic as in (2), use PySpark API functions for m2)

  m2 = matchedDomains.agg(m2_by_func).first().item_map

  NotDefned_Attribute_List = [(k, row.domain, row[k]) for row in m2 for k in item_keys if row[k]]

(4) 根据 NotDefined_Attribute_List 获取附加列的列表:

(4) Get a list of additional columns based on NotDefined_Attribute_List:

  additional_cols = [
    testing_mappings[concat_ws('\0', lit(k), lit(c), col(c))].alias(c_name)
      for k,c,c_name in NotDefined_Attribute_List
  ]

(5) 选择附加列

if count_ND > 0: 

  # move code above in (2), (3) and (4) here

  # set up testing_NotDefined
  testing_NotDefined = datasetMatchedPortfolio.select("*", *additional_cols)

else:
  print("no Not Defines exist")

这篇关于pySpark 迭代重复变量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆