使用pyspark保存并附加HDFS文件 [英] save and append a file in HDFS using pyspark

查看:3661
本文介绍了使用pyspark保存并附加HDFS文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

pyspark 中有一个数据框叫做 df 。我已将 df 注册为 temptable ,如下所示。

I have a data frame in pyspark called df. I have registered this df as a temptable like below.

df.registerTempTable('mytempTable')

date=datetime.now().strftime('%Y-%m-%d %H:%M:%S')

现在从这张临时表中,我会得到某些值,如max_id列 id

Now from this temp table I will get certain values, like max_id of a column id

min_id = sqlContext.sql("select nvl(min(id),0) as minval from mytempTable").collect()[0].asDict()['minval']

max_id = sqlContext.sql("select nvl(max(id),0) as maxval from mytempTable").collect()[0].asDict()['maxval']


$ b $

Now I will collect all these values like below.

test = ("{},{},{}".format(date,min_id,max_id))

我发现 test 不是数据框,但它是 str 字符串

I found that test is not a data frame but it is a str string

>>> type(test)
<type 'str'>

现在我要保存这个 test 作为文件放在 HDFS 中。我还想将数据附加到 hdfs 中的同一个文件中。

Now I want save this test as a file in HDFS. I would also like to append data to the same file in hdfs.

如何使用 pyspark

仅供参考使用 spark 1.6 t有权访问 databricks csv 包。

FYI I am using spark 1.6 and don't have access to databricks csv package.

推荐答案

你只需要将你的数据与 concat_ws 连接起来,并将其作为文本输入就可以了:

Here you go, you'll just need to concat your data with concat_ws and right it as a text:

query = """select concat_ws(',', date, nvl(min(id), 0), nvl(max(id), 0))
from mytempTable"""

sqlContext.sql(query).write("text").mode("append").save("/tmp/fooo")

甚至是更好的选择:

Or even a better alternative :

from pyspark.sql import functions as f

(sqlContext
    .table("myTempTable")
    .select(f.concat_ws(",", f.first(f.lit(date)), f.min("id"), f.max("id")))
    .coalesce(1)
    .write.format("text").mode("append").save("/tmp/fooo"))

这篇关于使用pyspark保存并附加HDFS文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆