按日期对火花数据框进行分组 [英] Group spark dataframe by date
问题描述
我从 SQLServer 表加载了一个 DataFrame.它看起来像这样:
<预><代码>>>>df.show()+--------------------+------------+|时间戳|价值 |+--------------------+------------+|2015-12-02 00:10:...|652.8||2015-12-02 00:20:...|518.4||2015-12-02 00:30:...|524.6||2015-12-02 00:40:...|382.9||2015-12-02 00:50:...|461.6||2015-12-02 01:00:...|476.6||2015-12-02 01:10:...|472.6||2015-12-02 01:20:...|353.0||2015-12-02 01:30:...|407.9||2015-12-02 01:40:...|475.9||2015-12-02 01:50:...|513.2||2015-12-02 02:00:...|569.0||2015-12-02 02:10:...|711.4||2015-12-02 02:20:...|457.6||2015-12-02 02:30:...|392.0||2015-12-02 02:40:...|459.5||2015-12-02 02:50:...|560.2||2015-12-02 03:00:...|252.9||2015-12-02 03:10:...|228.7||2015-12-02 03:20:...|312.2|+--------------------+------------+现在我想按小时(或天、月或...)对值进行分组(和求和),但我真的不知道我该怎么做.
这就是我加载 DataFrame 的方式.不过,我觉得这不是正确的做法:
query = """SELECT column1 AS 时间戳,column2 AS 值发件人表WHERE blahblah"""sc = SparkContext("local", 'test')sqlctx = SQLContext(sc)df = sqlctx.load(source="jdbc",url="jdbc:sqlserver://",dbtable="(%s) AS 别名" % 查询)
可以吗?
Spark 从 1.5.0 开始提供了许多函数,例如 dayofmonth
、hour
、month
或 year
可以对日期和时间戳进行操作.因此,如果 timestamp
是 TimestampType
,您只需要一个正确的表达式.例如:
from pyspark.sql.functions 导入小时,意思是(df.groupBy(hour("timestamp").alias("hour")).agg(mean("value").alias("mean")).展示())## +----+-----------------+## |小时|意思|## +----+-----------------+## |0|508.05999999999995|## |1|449.86666666666666|## |2|524.9499999999999|## |3|264.59999999999997|## +----+-----------------+
1.5.0 之前的最佳选择是将 HiveContext
和 Hive UDF 与 selectExpr
一起使用:
df.selectExpr("year(timestamp) AS year", "value").groupBy("year").sum()## +----+---------+----------+## |年|总和(年)|总和(值)|## +----+---------+----------+## |2015|40300|9183.0|## +----+---------+----------+
或原始 SQL:
df.registerTempTable("df")sqlContext.sql("""SELECT MONTH(timestamp) AS 月份,SUM(value) AS values_sum发件人按月分组(时间戳)""")
请记住,聚合是由 Spark 执行的,而不是下推到外部源.通常这是一种理想的行为,但在某些情况下,您可能更喜欢将聚合作为子查询执行以限制数据传输.
I've loaded a DataFrame from a SQLServer table. It looks like this:
>>> df.show()
+--------------------+----------+
| timestamp| Value |
+--------------------+----------+
|2015-12-02 00:10:...| 652.8|
|2015-12-02 00:20:...| 518.4|
|2015-12-02 00:30:...| 524.6|
|2015-12-02 00:40:...| 382.9|
|2015-12-02 00:50:...| 461.6|
|2015-12-02 01:00:...| 476.6|
|2015-12-02 01:10:...| 472.6|
|2015-12-02 01:20:...| 353.0|
|2015-12-02 01:30:...| 407.9|
|2015-12-02 01:40:...| 475.9|
|2015-12-02 01:50:...| 513.2|
|2015-12-02 02:00:...| 569.0|
|2015-12-02 02:10:...| 711.4|
|2015-12-02 02:20:...| 457.6|
|2015-12-02 02:30:...| 392.0|
|2015-12-02 02:40:...| 459.5|
|2015-12-02 02:50:...| 560.2|
|2015-12-02 03:00:...| 252.9|
|2015-12-02 03:10:...| 228.7|
|2015-12-02 03:20:...| 312.2|
+--------------------+----------+
Now I'd like to group (and sum) values by hour (or day, or month or...), but I don't really have a clue about how can I do that.
That's how I load the DataFrame. I've got the feeling that this isn't the right way to do it, though:
query = """
SELECT column1 AS timestamp, column2 AS value
FROM table
WHERE blahblah
"""
sc = SparkContext("local", 'test')
sqlctx = SQLContext(sc)
df = sqlctx.load(source="jdbc",
url="jdbc:sqlserver://<CONNECTION_DATA>",
dbtable="(%s) AS alias" % query)
Is it ok?
Since 1.5.0 Spark provides a number of functions like dayofmonth
, hour
, month
or year
which can operate on dates and timestamps. So if timestamp
is a TimestampType
all you need is a correct expression. For example:
from pyspark.sql.functions import hour, mean
(df
.groupBy(hour("timestamp").alias("hour"))
.agg(mean("value").alias("mean"))
.show())
## +----+------------------+
## |hour| mean|
## +----+------------------+
## | 0|508.05999999999995|
## | 1| 449.8666666666666|
## | 2| 524.9499999999999|
## | 3|264.59999999999997|
## +----+------------------+
Pre-1.5.0 your best option is to use HiveContext
and Hive UDFs either with selectExpr
:
df.selectExpr("year(timestamp) AS year", "value").groupBy("year").sum()
## +----+---------+----------+
## |year|SUM(year)|SUM(value)|
## +----+---------+----------+
## |2015| 40300| 9183.0|
## +----+---------+----------+
or raw SQL:
df.registerTempTable("df")
sqlContext.sql("""
SELECT MONTH(timestamp) AS month, SUM(value) AS values_sum
FROM df
GROUP BY MONTH(timestamp)""")
Just remember that aggregation is performed by Spark not pushed-down to the external source. Usually it is a desired behavior but there are situations when you may prefer to perform aggregation as a subquery to limit data transfer.
这篇关于按日期对火花数据框进行分组的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!