PySpark 中的列过滤 [英] Column filtering in PySpark

查看:63
本文介绍了PySpark 中的列过滤的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个从 Hive 表加载的数据帧 df,它有一个时间戳列,比如 ts,字符串类型为 dd-MMM-yy hh.mm.ss.MS a(转换为 python 日期时间库,这是 %d-%b-%y %I.%M.%S.%f %p).

I have a dataframe df loaded from Hive table and it has a timestamp column, say ts, with string type of format dd-MMM-yy hh.mm.ss.MS a (converted to python datetime library, this is %d-%b-%y %I.%M.%S.%f %p).

现在我想从过去五分钟的数据框中过滤行:

Now I want to filter rows from the dataframe that are from the last five minutes:

only_last_5_minutes = df.filter(
    datetime.strptime(df.ts, '%d-%b-%y %I.%M.%S.%f %p') > datetime.now() - timedelta(minutes=5)
)

但是,这不起作用,我收到此消息

However, this does not work and I get this message

TypeError: strptime() argument 1 must be string, not Column

看起来我对列操作的应用有误,在我看来我必须创建一个 lambda 函数来过滤满足所需条件的每一列,但作为 Python 和 lambda 表达式的新手,我不知道不知道如何正确创建我的过滤器.请指教.

It looks like I have wrong application of column operation and it seems to me I have to create a lambda function to filter each column that satisfies the desired condition, but being a newbie to Python and lambda expression in particular, I don't know how to create my filter correct. Please advise.

附言我更喜欢将过滤器表示为 Python 本机(或 SparkSQL),而不是 Hive sql 查询表达式WHERE"中的过滤器.

P.S. I prefer to express my filters as Python native (or SparkSQL) rather than a filter inside Hive sql query expression 'WHERE'.

首选:

df = sqlContext.sql("SELECT * FROM my_table")
df.filter( // filter here)

不推荐:

df = sqlContext.sql("SELECT * FROM my_table WHERE...")

推荐答案

Spark >= 1.5

从 Spark 1.5 开始,您可以按如下方式解析日期字符串:

Since Spark 1.5 you can parse date string as follows:

from pyspark.sql.functions import expr, from_unixtime, lit, unix_timestamp
from pyspark.sql.types import TimestampType

parsed_df = df.select((from_unixtime(unix_timestamp(
    # Note: am-pm: pattern length should be 1 for Spark >= 3.0
    df.datetime, "dd-MMM-yy h.mm.ss.SSSSSS a"  
))).cast(TimestampType()).alias("datetime"))

parsed_df.where(col("datetime") >= lit(now) - expr("INTERVAL 5 minutes"))

然后应用间隔:

from pyspark.sql.functions import current_timestamp, expr

火花<1.5

可以使用用户定义的函数.

It is possible to use user defined function.

from datetime import datetime, timedelta
from pyspark.sql.types import BooleanType, TimestampType
from pyspark.sql.functions import udf, col

def in_last_5_minutes(now):
    def _in_last_5_minutes(then):
        then_parsed = datetime.strptime(then, '%d-%b-%y %I.%M.%S.%f %p')
        return then_parsed > now - timedelta(minutes=5)
    return udf(_in_last_5_minutes, BooleanType())

使用一些虚拟数据:

df = sqlContext.createDataFrame([
    (1, '14-Jul-15 11.34.29.000000 AM'),
    (2, '14-Jul-15 11.34.27.000000 AM'),
    (3, '14-Jul-15 11.32.11.000000 AM'),
    (4, '14-Jul-15 11.29.00.000000 AM'),
    (5, '14-Jul-15 11.28.29.000000 AM')
], ('id', 'datetime'))

now = datetime(2015, 7, 14, 11, 35)
df.where(in_last_5_minutes(now)(col("datetime"))).show()

正如预期的那样,我们只得到 3 个条目:

And as expected we get only 3 entries:

+--+--------------------+
|id|            datetime|
+--+--------------------+
| 1|14-Jul-15 11.34.2...|
| 2|14-Jul-15 11.34.2...|
| 3|14-Jul-15 11.32.1...|
+--+--------------------+

重新解析日期时间字符串效率很低,因此您可以考虑存储 TimestampType.

Parsing datetime string all over again is rather inefficient so you may consider storing TimestampType instead.

def parse_dt():
    def _parse(dt):
        return datetime.strptime(dt, '%d-%b-%y %I.%M.%S.%f %p')
    return udf(_parse, TimestampType())

df_with_timestamp = df.withColumn("timestamp", parse_dt()(df.datetime))

def in_last_5_minutes(now):
    def _in_last_5_minutes(then):
        return then > now - timedelta(minutes=5)
    return udf(_in_last_5_minutes, BooleanType())

df_with_timestamp.where(in_last_5_minutes(now)(col("timestamp")))

和结果:

+--+--------------------+--------------------+
|id|            datetime|           timestamp|
+--+--------------------+--------------------+
| 1|14-Jul-15 11.34.2...|2015-07-14 11:34:...|
| 2|14-Jul-15 11.34.2...|2015-07-14 11:34:...|
| 3|14-Jul-15 11.32.1...|2015-07-14 11:32:...|
+--+--------------------+--------------------+

终于可以使用带有时间戳的原始 SQL 查询了:

Finally it is possible to use raw SQL query with timestamps:

query = """SELECT * FROM df
     WHERE unix_timestamp(datetime, 'dd-MMM-yy HH.mm.ss.SSSSSS a') > {0}
     """.format(time.mktime((now - timedelta(minutes=5)).timetuple()))

sqlContext.sql(query)

同上,解析一次日期字符串效率更高.

Same as above it would be more efficient to parse date strings once.

如果列已经是 timestamp,则可以使用 datetime 文字:

If column is already a timestamp it possible to use datetime literals:

from pyspark.sql.functions import lit

df_with_timestamp.where(
    df_with_timestamp.timestamp > lit(now - timedelta(minutes=5)))

这篇关于PySpark 中的列过滤的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆