Pyspark:在运行时为 when() 子句动态生成条件 [英] Pyspark: dynamically generate condition for when() clause during runtime
本文介绍了Pyspark:在运行时为 when() 子句动态生成条件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我已将 csv 文件读入 pyspark dataframe
.现在,如果我在 when()
子句中应用条件,当条件在 runtime
之前给出时它可以正常工作.
I have read a csv file into pyspark dataframe
.
Now if I apply conditions in when()
clause, it works fine when the conditions are given before runtime
.
import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import functions
from pyspark.sql.functions import col
sc = SparkContext('local', 'example')
sql_sc = SQLContext(sc)
pandas_df = pd.read_csv('file.csv') # assuming the file contains a header
# Sample content of csv file
# col1,value
# 1,aa
# 2,bbb
s_df = sql_sc.createDataFrame(pandas_df)
new_df = s_df.withColumn('value', functions.when((col("col1") == 2) | (col("value") == "aa"), s_df.value).otherwise(
2))
new_df.show(truncate=False)
但我需要在列表中的 when 子句中动态形成条件.
But I need to dynamically form the conditions inside when clause from a list.
[{'column': 'col1', 'operator': '==', 'value': 2}, {'column': 'value', 'operator': '==', 'value': "aa"}]
有什么办法可以实现吗?
Is there any way to achieve this?
提前致谢.
推荐答案
您可以:
- 动态生成 SQL 字符串,Python 3.6+' f-strings 对此非常方便.
- 将其传递给
pyspark.sql.functions.expr
以生成pyspark.sql.column.Column
.
- dynamically generate the SQL string, Python 3.6+' f-strings are really convenient for this.
- pass it to the
pyspark.sql.functions.expr
to make apyspark.sql.column.Column
out of it.
<小时>
对于你的例子,这样的事情应该可以工作:
For your example, something like this should work:
给定 s_df
的架构:
root
|-- col1: long (nullable = false)
|-- value: string (nullable = false)
导入函数并实例化您的条件集合:
Importing functions and instantiate your conditions collection:
[...]
from pyspark.sql.functions import col, expr, when
conditions = [
{'column': 'col1', 'operator': '==', 'value': 3},
{'column': 'value', 'operator': '==', 'value': "'aa'"}
]
- 生成整个 if 语句:
new_df = s_df.withColumn('value', expr(
f"if({conditions[0]['column']}{conditions[0]['operator']}{conditions[0]['value']}"
f" OR {conditions[1]['column']}{conditions[1]['operator']}{conditions[1]['value']},"
"value, 2)")).show()
- 或者只生成条件,传递给
when
函数.
new_df = s_df.withColumn('value',when(
expr(
f"{conditions[0]['column']}{conditions[0]['operator']}{conditions[0]['value']}"
f" OR {conditions[1]['column']}{conditions[1]['operator']}{conditions[1]['value']}"
),
col("value")).otherwise(2)).show()
这篇关于Pyspark:在运行时为 when() 子句动态生成条件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文