在PySpark列筛选 [英] Column filtering in PySpark

查看:4154
本文介绍了在PySpark列筛选的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

东风加载从蜂巢表中的数据框,它有一个时间戳列,说 TS ,用细绳格式类型 DD-MMM-YY hh.mm.ss.MS一个(转换为蟒蛇日期时间库,这是 D-%B-% %Y%I.%M.%S.%F%p )。

现在我想筛选来自数据框是从最后五分钟行:

  only_last_5_minutes = df.filter(
    datetime.strptime(df.ts,'%D-%B-%Y%I.%M.%S.%F%P')> datetime.now() - timedelta(分钟= 5)

但是,这并不工作,我得到这个消息

 类型错误:strptime()参数1必须是字符串,而不是列

它看起来像我有列操作错误的应用程序,并在我看来,我要创建一个lambda函数来过滤满足所需条件的每一列,但作为一个新手到Python,特别是拉姆达前pression ,我不知道如何创建我的筛选器正确。请指教。

P.S。
我preFER给前preSS我的过滤器作为原生的Python(或SparkSQL),而不是内部的蜂巢SQL查询前pression过滤器WHERE。

preferred:

  DF = sqlContext.sql(SELECT * FROM my_table的)
df.filter(//这里进行过滤)

不是preferred:

  DF = sqlContext.sql(SELECT * FROM my_table的WHERE ...)


解决方案

有可能使用用户定义的函数。

 从日期时间日期时间进口,timedelta
从pyspark.sql.types导入BooleanType,TimestampType
从pyspark.sql.functions导入UDF,山坳高清in_last_5_minutes(现在的):
    高清_in_last_5_minutes(当时):
        then_parsed = datetime.strptime(当时,'%D-%B-%Y%I.%M.%S.%F%P')
        返回then_parsed>现在 - timedelta(分= 5)
    返回UDF(_in_last_5_minutes,BooleanType())

使用一些虚拟数据:

  DF = sqlContext.createDataFrame([
    (1,'14 -Jul-15 11.34.29.000000 AM'),
    (2,'14 -Jul-15 11.34.27.000000 AM'),
    (3,'14 -Jul-15 11.32.11.000000 AM'),
    (4,'14 -Jul-15 11.29.00.000000 AM'),
    (5,'14 -Jul-15 11.28.29.000000 AM')
],('身份证','日期时间'))现在=日期时间(2015年,7,14,11,35)
df.where(in_last_5_minutes(现)(COL(日期时间)))。展()

和预期的一样,我们只得到3项:

  +  -  + -------------------- +
| ID |日期时间|
+ - + -------------------- +
| 1 | 7月14日 - 15 11.34.2 ... |
| 2 | 7月14日 - 15 11.34.2 ... |
| 3 | 7月14日 - 15 11.32.1 ... |
+ - + -------------------- +

解析日期时间字符串一遍是相当低效,所以你可以考虑存储 TimestampType 代替。

  DEF parse_dt():
    高清_parse(DT):
        返回datetime.strptime(DT,'%D-%B-%Y%I.%M.%S.%F%P')
    返回UDF(_parse,TimestampType())df_with_timestamp = df.withColumn(时间戳,parse_dt()(df.datetime))高清in_last_5_minutes(现在的):
    高清_in_last_5_minutes(当时):
        然后返回GT&;现在 - timedelta(分= 5)
    返回UDF(_in_last_5_minutes,BooleanType())df_with_timestamp.where(in_last_5_minutes(现)(COL(时间戳)))

和结果是:

  +  -  + -------------------- + ------------- ------- +
| ID |日期时间|时间戳|
+ - + -------------------- + -------------------- +
| 1 | 7月14日 - 15 11.34.2 ... | 2015年7月14日11:34:... |
| 2 | 7月14日 - 15 11.34.2 ... | 2015年7月14日11:34:... |
| 3 | 7月14日 - 15 11.32.1 ... | 2015年7月14日11:32:... |
+ - + -------------------- + -------------------- +

最后,可以使用带有时间戳原始的SQL查询:

 查询=SELECT * FROM DF
     WHERE UNIX_TIMESTAMP(日期时间,DD-MMM-YY HH.mm.ss.SSSSSS一个')> {0}
     .format(time.mktime((现在 - timedelta(分= 5))timetuple()))sqlContext.sql(查询)

同上这将是更有效地分析一次,日期字符串。

如果列已经是时间戳可以使用日期时间文本:

 从pyspark.sql.functions导入已点燃df_with_timestamp.where(
    df_with_timestamp.timestamp>点亮(现 - timedelta(分= 5)))

修改

由于星火1.5可以解析日期字符串如下:

 从pyspark.sql.functions导入FROM_UNIXTIME,UNIX_TIMESTAMP
从pyspark.sql.types进口TimestampTypedf.select((FROM_UNIXTIME(UNIX_TIMESTAMP(
    df.datetime,YY-MMM-DD h.mm.ss.SSSSSS AA
)))。CAST(TimestampType())。别名(日期时间))

I have a dataframe df loaded from Hive table and it has a timestamp column, say ts, with string type of format dd-MMM-yy hh.mm.ss.MS a (converted to python datetime library, this is %d-%b-%y %I.%M.%S.%f %p).

Now I want to filter rows from the dataframe that are from the last five minutes:

only_last_5_minutes = df.filter(
    datetime.strptime(df.ts, '%d-%b-%y %I.%M.%S.%f %p') > datetime.now() - timedelta(minutes=5)
)

However, this does not work and I get this message

TypeError: strptime() argument 1 must be string, not Column

It looks like I have wrong application of column operation and it seems to me I have to create a lambda function to filter each column that satisfies the desired condition, but being a newbie to Python and lambda expression in particular, I don't know how to create my filter correct. Please advise.

P.S. I prefer to express my filters as Python native (or SparkSQL) rather than a filter inside Hive sql query expression 'WHERE'.

preferred:

df = sqlContext.sql("SELECT * FROM my_table")
df.filter( // filter here)

not preferred:

df = sqlContext.sql("SELECT * FROM my_table WHERE...")

解决方案

It is possible to use user defined function.

from datetime import datetime, timedelta
from pyspark.sql.types import BooleanType, TimestampType
from pyspark.sql.functions import udf, col

def in_last_5_minutes(now):
    def _in_last_5_minutes(then):
        then_parsed = datetime.strptime(then, '%d-%b-%y %I.%M.%S.%f %p')
        return then_parsed > now - timedelta(minutes=5)
    return udf(_in_last_5_minutes, BooleanType())

Using some dummy data:

df = sqlContext.createDataFrame([
    (1, '14-Jul-15 11.34.29.000000 AM'),
    (2, '14-Jul-15 11.34.27.000000 AM'),
    (3, '14-Jul-15 11.32.11.000000 AM'),
    (4, '14-Jul-15 11.29.00.000000 AM'),
    (5, '14-Jul-15 11.28.29.000000 AM')
], ('id', 'datetime'))

now = datetime(2015, 7, 14, 11, 35)
df.where(in_last_5_minutes(now)(col("datetime"))).show()

And as expected we get only 3 entries:

+--+--------------------+
|id|            datetime|
+--+--------------------+
| 1|14-Jul-15 11.34.2...|
| 2|14-Jul-15 11.34.2...|
| 3|14-Jul-15 11.32.1...|
+--+--------------------+

Parsing datetime string all over again is rather inefficient so you may consider storing TimestampType instead.

def parse_dt():
    def _parse(dt):
        return datetime.strptime(dt, '%d-%b-%y %I.%M.%S.%f %p')
    return udf(_parse, TimestampType())

df_with_timestamp = df.withColumn("timestamp", parse_dt()(df.datetime))

def in_last_5_minutes(now):
    def _in_last_5_minutes(then):
        return then > now - timedelta(minutes=5)
    return udf(_in_last_5_minutes, BooleanType())

df_with_timestamp.where(in_last_5_minutes(now)(col("timestamp")))

and result:

+--+--------------------+--------------------+
|id|            datetime|           timestamp|
+--+--------------------+--------------------+
| 1|14-Jul-15 11.34.2...|2015-07-14 11:34:...|
| 2|14-Jul-15 11.34.2...|2015-07-14 11:34:...|
| 3|14-Jul-15 11.32.1...|2015-07-14 11:32:...|
+--+--------------------+--------------------+

Finally it is possible to use raw SQL query with timestamps:

query = """SELECT * FROM df
     WHERE unix_timestamp(datetime, 'dd-MMM-yy HH.mm.ss.SSSSSS a') > {0}
     """.format(time.mktime((now - timedelta(minutes=5)).timetuple()))

sqlContext.sql(query)

Same as above it would be more efficient to parse date strings once.

If column is already a timestamp it possible to use datetime literals:

from pyspark.sql.functions import lit

df_with_timestamp.where(
    df_with_timestamp.timestamp > lit(now - timedelta(minutes=5)))

EDIT

Since Spark 1.5 you can parse date string as follows:

from pyspark.sql.functions import from_unixtime, unix_timestamp
from pyspark.sql.types import TimestampType

df.select((from_unixtime(unix_timestamp(
    df.datetime, "yy-MMM-dd h.mm.ss.SSSSSS aa"
))).cast(TimestampType()).alias("datetime"))

这篇关于在PySpark列筛选的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆