在PySpark SQL日期时间范围过滤器 [英] datetime range filter in PySpark SQL
问题描述
什么是时间戳字段来过滤数据帧的正确方法是什么?
我曾尝试不同的日期格式和过滤的方式,没有什么帮助:要么pyspark返回0对象,或者抛出一个错误,它不理解日期时间格式
下面是我走到这一步:
从pyspark进口SparkContext
从pyspark.sql进口SQLContext从django.utils进口时区
从django.conf导入设置从myapp.models进口代收SC = SparkContext(本地,DjangoApp)
SQLC = SQLContext(SC)
URL =的jdbc:在PostgreSQL://%(HOST)S /%(NAME)的用户=%(USER)S&放大器;密码=%(密码)的%settings.DATABASES ['默认']
SF = sqlc.load(来源=JDBC,URL =网址,DBTABLE ='myapp_collection')
范围时间戳字段:
system_tz = timezone.pytz.timezone(settings.TIME_ZONE)
date_from = datetime.datetime(2014,4,16,18,30,0,0,tzinfo = system_tz)
DATE_TO = datetime.datetime(2015,6,15,18,11,59,999999,tzinfo = system_tz)
1的尝试
date_filter =my_col> ='%s'和my_col< =%s的%(
date_from.isoformat(),date_to.isoformat()
)
SF = sf.filter(date_filter)
sf.count()出[12]:0
2的尝试
SF = sf.filter(sf.my_col> = date_from).filter(sf.my_col< = DATE_TO)
sf.count()-------------------------------------------------- -------------------------
Py4JJavaError:同时呼吁o63.count发生错误。
:org.apache.spark.SparkException:作业已中止由于阶段失败:
任务0级4.0失败1次,最近一次故障:
丢失任务0.0舞台4.0(TID 3,为localhost):org.postgresql.util.PSQLException:
错误:语法错误或接近18
#
#UPS .. JDBC不明白24小时的时间格式?
3的尝试
SF = sf.filter(my_col BETWEEN'%s'和'%s的%\\
(date_from.isoformat(),date_to.isoformat())
)
-------------------------------------------------- -------------------------
Py4JJavaError:同时呼吁o97.count发生错误。
:org.apache.spark.SparkException:作业已中止由于阶段失败:
任务0级17.0失败1次,最近一次故障:
丢失任务0.0舞台17.0(TID 13,为localhost):org.postgresql.util.PSQLException:
错误:语法错误或接近18
的
数据不存在于表格中,虽然
django_filters = {
my_col__gte':date_from,
my_col__lte':DATE_TO
}
Collection.objects.filter(** django_filters).Count之间的()出[17]:1093436
或者这样
django_range_filter = {'my_col__range':(date_from,DATE_TO)}
Collection.objects.filter(** django_range_filter).Count之间的()出[19]:1093436
让我们假设你的数据帧如下所示:
SF = sqlContext.createDataFrame([
[datetime.datetime(2013,6,29,11,34,29)],
[datetime.datetime(2015,7,14,11,34,27)],
[datetime.datetime(2012,3,10,19,00,11)],
[datetime.datetime(2016,2,8,12,21)]
[datetime.datetime(2014,4,4,11,28,29)]
],('my_col',))
使用模式:
根
| - my_col:时间戳(可为空=真)
和你想找到在以下日期范围:
进口日期时间,时间
日期=(2013-01-01 00:00:00,2015年7月1日00:00:00)时间戳=(
time.mktime(datetime.datetime.strptime(S,%Y-%M-%D%H:%M:%S)。timetuple())
对于s的日期)
这是可能使用时间戳来查询或者计算在驾驶员侧:
Q1 =CAST(my_col AS INT)BETWEEN {0}和{1}。格式(*时间戳)
sf.where(Q1).show()
或使用 UNIX_TIMESTAMP
功能:
Q2 =CAST(my_col AS INT)
BETWEEN UNIX_TIMESTAMP('{0}','YYYY-MM-DD HH:MM:SS)
和UNIX_TIMESTAMP({1}','YYYY-MM-DD HH:MM:SS')格式(*日期)sf.where(Q2).show()
另外,也可以在我在另一个答案
如果您使用原始SQL,可以使用提取时间戳的不同元素年
,日期
等。
sqlContext.sql(SELECT * FROM SF
WHERE YEAR(my_col)2014年间和2015年)。表明()
修改
由于星火1.5,您可以使用内置的功能:
=日期(2013-01-01,2015年7月1日)
date_from,DATE_TO = TO_DATE(亮(S))。CAST(TimestampType())对于s的日期]sf.where((sf.my_col> date_from)及(sf.my_col&下; DATE_TO))
What is the correct way to filter data frame by timestamp field?
I have tried different date formats and forms of filtering, nothing helps: either pyspark returns 0 objects, or throws an error that it doesn't understand datetime format
Here is what i got so far:
from pyspark import SparkContext
from pyspark.sql import SQLContext
from django.utils import timezone
from django.conf import settings
from myapp.models import Collection
sc = SparkContext("local", "DjangoApp")
sqlc = SQLContext(sc)
url = "jdbc:postgresql://%(HOST)s/%(NAME)s?user=%(USER)s&password=%(PASSWORD)s" % settings.DATABASES['default']
sf = sqlc.load(source="jdbc", url=url, dbtable='myapp_collection')
range for timestamp field:
system_tz = timezone.pytz.timezone(settings.TIME_ZONE)
date_from = datetime.datetime(2014, 4, 16, 18, 30, 0, 0, tzinfo=system_tz)
date_to = datetime.datetime(2015, 6, 15, 18, 11, 59, 999999, tzinfo=system_tz)
attempt 1
date_filter = "my_col >= '%s' AND my_col <= '%s'" % (
date_from.isoformat(), date_to.isoformat()
)
sf = sf.filter(date_filter)
sf.count()
Out[12]: 0
attempt 2
sf = sf.filter(sf.my_col >= date_from).filter(sf.my_col <= date_to)
sf.count()
---------------------------------------------------------------------------
Py4JJavaError: An error occurred while calling o63.count.
: org.apache.spark.SparkException: Job aborted due to stage failure:
Task 0 in stage 4.0 failed 1 times, most recent failure:
Lost task 0.0 in stage 4.0 (TID 3, localhost): org.postgresql.util.PSQLException:
ERROR: syntax error at or near "18"
#
# ups.. JDBC doesn't understand 24h time format??
attempt 3
sf = sf.filter("my_col BETWEEN '%s' AND '%s'" % \
(date_from.isoformat(), date_to.isoformat())
)
---------------------------------------------------------------------------
Py4JJavaError: An error occurred while calling o97.count.
: org.apache.spark.SparkException: Job aborted due to stage failure:
Task 0 in stage 17.0 failed 1 times, most recent failure:
Lost task 0.0 in stage 17.0 (TID 13, localhost): org.postgresql.util.PSQLException:
ERROR: syntax error at or near "18"
the data do exist in the table, though:
django_filters = {
'my_col__gte': date_from,
'my_col__lte': date_to
}
Collection.objects.filter(**django_filters).count()
Out[17]: 1093436
Or this way
django_range_filter = {'my_col__range': (date_from, date_to)}
Collection.objects.filter(**django_range_filter).count()
Out[19]: 1093436
Lets assume your data frame looks as follows:
sf = sqlContext.createDataFrame([
[datetime.datetime(2013, 6, 29, 11, 34, 29)],
[datetime.datetime(2015, 7, 14, 11, 34, 27)],
[datetime.datetime(2012, 3, 10, 19, 00, 11)],
[datetime.datetime(2016, 2, 8, 12, 21)],
[datetime.datetime(2014, 4, 4, 11, 28, 29)]
], ('my_col', ))
with schema:
root
|-- my_col: timestamp (nullable = true)
and you want to find dates in a following range:
import datetime, time
dates = ("2013-01-01 00:00:00", "2015-07-01 00:00:00")
timestamps = (
time.mktime(datetime.datetime.strptime(s, "%Y-%m-%d %H:%M:%S").timetuple())
for s in dates)
It is possible to query using timestamps either computed on a driver side:
q1 = "CAST(my_col AS INT) BETWEEN {0} AND {1}".format(*timestamps)
sf.where(q1).show()
or using unix_timestamp
function:
q2 = """CAST(my_col AS INT)
BETWEEN unix_timestamp('{0}', 'yyyy-MM-dd HH:mm:ss')
AND unix_timestamp('{1}', 'yyyy-MM-dd HH:mm:ss')""".format(*dates)
sf.where(q2).show()
It is also possible to use udf in a similar way I described in an another answer.
If you use raw SQL it is possible to extract different elements of timestamp using year
, date
, etc.
sqlContext.sql("""SELECT * FROM sf
WHERE YEAR(my_col) BETWEEN 2014 AND 2015").show()
EDIT:
Since Spark 1.5 you can use built-in functions:
dates = ("2013-01-01", "2015-07-01")
date_from, date_to = [to_date(lit(s)).cast(TimestampType()) for s in dates]
sf.where((sf.my_col > date_from) & (sf.my_col < date_to))
这篇关于在PySpark SQL日期时间范围过滤器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!