Pyspark:在运行时动态生成when()子句的条件 [英] Pyspark: dynamically generate condition for when() clause during runtime

查看:134
本文介绍了Pyspark:在运行时动态生成when()子句的条件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已将一个csv文件读入 pyspark数据框中.现在,如果我在 when()子句中应用条件,则在 runtime 之前给出条件时,它会很好地工作.

I have read a csv file into pyspark dataframe. Now if I apply conditions in when() clause, it works fine when the conditions are given before runtime.

import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import functions
from pyspark.sql.functions import col

sc = SparkContext('local', 'example')
sql_sc = SQLContext(sc)

pandas_df = pd.read_csv('file.csv')  # assuming the file contains a header
# Sample content of csv file
# col1,value
# 1,aa
# 2,bbb

s_df = sql_sc.createDataFrame(pandas_df)

new_df = s_df.withColumn('value', functions.when((col("col1") == 2) | (col("value") == "aa"), s_df.value).otherwise(
    2))

new_df.show(truncate=False)

但是我需要从列表中动态地在when子句中形成条件.

But I need to dynamically form the conditions inside when clause from a list.

[{'column': 'col1', 'operator': '==', 'value': 2}, {'column': 'value', 'operator': '==', 'value': "aa"}]

有什么办法可以做到这一点?

Is there any way to achieve this?

谢谢.

推荐答案

您可以:

  1. 动态生成 SQL字符串,Python 3.6+'f字符串对此非常方便.
  2. 将其传递到 pyspark.sql.functions.expr ,以制作出 pyspark.sql.column.Column .
  1. dynamically generate the SQL string, Python 3.6+' f-strings are really convenient for this.
  2. pass it to the pyspark.sql.functions.expr to make a pyspark.sql.column.Column out of it.


对于您的示例,类似这样的方法应该起作用:


For your example, something like this should work:

给出 s_df 的架构:

root
 |-- col1: long (nullable = false)
 |-- value: string (nullable = false)

导入功能并实例化您的条件集合:

Importing functions and instantiate your conditions collection:

[...]
from pyspark.sql.functions import col, expr, when
conditions = [
    {'column': 'col1', 'operator': '==', 'value':  3}, 
    {'column': 'value', 'operator': '==', 'value': "'aa'"}
]

  • 生成整个if语句:
  • new_df = s_df.withColumn('value', expr(
        f"if({conditions[0]['column']}{conditions[0]['operator']}{conditions[0]['value']}"
        f" OR {conditions[1]['column']}{conditions[1]['operator']}{conditions[1]['value']},"
        "value, 2)")).show()
    

    • 或者只生成条件,然后传递给 when 函数.
    • new_df = s_df.withColumn('value',when(
          expr(
              f"{conditions[0]['column']}{conditions[0]['operator']}{conditions[0]['value']}"
              f" OR {conditions[1]['column']}{conditions[1]['operator']}{conditions[1]['value']}"
          ),
          col("value")).otherwise(2)).show()
      

      这篇关于Pyspark:在运行时动态生成when()子句的条件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆