转换列并更新 DataFrame [英] Transforming a column and update the DataFrame
问题描述
所以,我在下面所做的是从 DataFrame
中删除一列 A
因为我想应用一个转换(这里我只是 json.加载
JSON 字符串)并用转换后的列替换旧列.转换后,我只是加入了两个结果数据帧.
So, what I'm doing below is I drop a column A
from a DataFrame
because I want to apply a transformation (here I just json.loads
a JSON string) and replace the old column with the transformed one. After the transformation I just join the two resulting data frames.
df = df_data.drop('A').join(
df_data[['ID', 'A']].rdd\
.map(lambda x: (x.ID, json.loads(x.A))
if x.A is not None else (x.ID, None))\
.toDF()\
.withColumnRenamed('_1', 'ID')\
.withColumnRenamed('_2', 'A'),
['ID']
)
我不喜欢的当然是我面临的开销,因为我必须执行 withColumnRenamed
操作.
The thing I dislike about this is of course the overhead I'm faced because I had to do the withColumnRenamed
operations.
使用熊猫我会做这样的事情:
With pandas All I'd do something like this:
pdf = pd.DataFrame([json.dumps([0]*np.random.randint(5,10)) for i in range(10)], columns=['A'])
pdf.A = pdf.A.map(lambda x: json.loads(x))
pdf
但以下在 pyspark 中不起作用:
but the following does not work in pyspark:
df.A = df[['A']].rdd.map(lambda x: json.loads(x.A))
那么有没有比我在第一个代码片段中所做的更简单的方法?
So is there an easier way than what I'm doing in my first code snipped?
推荐答案
我认为您不需要删除列并进行连接.以下代码应该*与您发布的内容相同:
I do not think you need to drop the column and do the join. The following code should* be equivalent to what you posted:
cols = df_data.columns
df = df_data.rdd\
.map(
lambda row: tuple(
[row[c] if c != 'A' else (json.loads(row[c]) if row[c] is not None else None)
for c in cols]
)
)\
.toDF(cols)
*我还没有真正测试过这段代码,但我认为这应该可行.
但要回答您的一般问题,您可以使用 withColumn()
就地转换列.
But to answer your general question, you can transform a column in-place using withColumn()
.
df = df_data.withColumn("A", my_transformation_function("A").alias("A"))
其中 my_transformation_function()
可以是 udf
或 pyspark sql 函数
.
Where my_transformation_function()
can be a udf
or a pyspark sql function
.
这篇关于转换列并更新 DataFrame的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!