Pyspark:在运行时为 when() 子句动态生成条件 [英] Pyspark: dynamically generate condition for when() clause during runtime

查看:38
本文介绍了Pyspark:在运行时为 when() 子句动态生成条件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已将 csv 文件读入 pyspark dataframe.现在,如果我在 when() 子句中应用条件,当条件在 runtime 之前给出时它可以正常工作.

I have read a csv file into pyspark dataframe. Now if I apply conditions in when() clause, it works fine when the conditions are given before runtime.

import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import functions
from pyspark.sql.functions import col

sc = SparkContext('local', 'example')
sql_sc = SQLContext(sc)

pandas_df = pd.read_csv('file.csv')  # assuming the file contains a header
# Sample content of csv file
# col1,value
# 1,aa
# 2,bbb

s_df = sql_sc.createDataFrame(pandas_df)

new_df = s_df.withColumn('value', functions.when((col("col1") == 2) | (col("value") == "aa"), s_df.value).otherwise(
    2))

new_df.show(truncate=False)

但我需要在列表中的 when 子句中动态形成条件.

But I need to dynamically form the conditions inside when clause from a list.

[{'column': 'col1', 'operator': '==', 'value': 2}, {'column': 'value', 'operator': '==', 'value': "aa"}]

有什么办法可以实现吗?

Is there any way to achieve this?

提前致谢.

推荐答案

您可以:

  1. 动态生成 SQL 字符串,Python 3.6+' f-strings 对此非常方便.
  2. 将其传递给 pyspark.sql.functions.expr 以生成 pyspark.sql.column.Column .
  1. dynamically generate the SQL string, Python 3.6+' f-strings are really convenient for this.
  2. pass it to the pyspark.sql.functions.expr to make a pyspark.sql.column.Column out of it.

<小时>

对于你的例子,这样的事情应该可以工作:


For your example, something like this should work:

给定 s_df 的架构:

root
 |-- col1: long (nullable = false)
 |-- value: string (nullable = false)

导入函数并实例化您的条件集合:

Importing functions and instantiate your conditions collection:

[...]
from pyspark.sql.functions import col, expr, when
conditions = [
    {'column': 'col1', 'operator': '==', 'value':  3}, 
    {'column': 'value', 'operator': '==', 'value': "'aa'"}
]

  • 生成整个 if 语句:
  • new_df = s_df.withColumn('value', expr(
        f"if({conditions[0]['column']}{conditions[0]['operator']}{conditions[0]['value']}"
        f" OR {conditions[1]['column']}{conditions[1]['operator']}{conditions[1]['value']},"
        "value, 2)")).show()
    

    • 或者只生成条件,传递给when函数.
    • new_df = s_df.withColumn('value',when(
          expr(
              f"{conditions[0]['column']}{conditions[0]['operator']}{conditions[0]['value']}"
              f" OR {conditions[1]['column']}{conditions[1]['operator']}{conditions[1]['value']}"
          ),
          col("value")).otherwise(2)).show()
      

      这篇关于Pyspark:在运行时为 when() 子句动态生成条件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆