在Elasticsearch Spark中将EPOCH转换为日期 [英] Converting EPOCH to Date in Elasticsearch Spark
问题描述
我有一个将其写入ES的DataFrame
I have a DataFrame that I am writing it to the ES
在写ES之前,我要将EVTExit
列转换为EPOCH中的Date.
Before writing to ES, I am converting the EVTExit
column to Date, which is in EPOCH.
workset = workset.withColumn("EVTExit", to_date(from_unixtime($"EVTExit".divide(1000))))
workset.select("EVTExit").show(10)
+----------+
| EVTExit|
+----------+
|2014-06-03|
|null |
|2012-10-23|
|2014-06-03|
|2015-11-05|
正如我所见,EVTExit
已转换为日期.
As I can see this EVTExit
is converted to Date.
workset.write.format("org.elasticsearch.spark.sql").save("workset/workset1")
但是在将其写入ES之后,我仍然可以使用EPOC格式.
But after writing it to the ES, I am still getting it in EPOC format.
"EVTExit" : 1401778800000
任何人都可以知道这里出了什么问题.
Can anyone have the ideas what's going wrong here.
谢谢
推荐答案
让我们考虑问题中的DataFrame
示例:
Let's consider the DataFrame
example from your question :
scala> val df = workset.select("EVTExit")
// df: org.apache.spark.sql.DataFrame = [EVTExit: date]
scala> df.printSchema
// root
// |-- EVTExit: date (nullable = true)
您需要将列强制转换为字符串,并禁用默认情况下为true
的es.mapping.date.rich
.
You would need to cast the column into a string and disable the es.mapping.date.rich
which is true
by default.
该参数定义是为Elasticsearch中的Date字段创建类似Rich Date的对象还是将其作为原语(字符串或long)返回.实际的对象类型基于所使用的库.值得注意的例外是Map/Reduce,它不提供内置的Date对象,因此无论此设置如何,都会返回LongWritable和Text.
The parameter define whether to create a rich Date like object for Date fields in Elasticsearch or returned them as primitives (String or long). The actual object type is based on the library used; noteable exception being Map/Reduce which provides no built-in Date object and as such LongWritable and Text are returned regardless of this setting.
我同意,这是违反直觉的,但是如果您希望elasticsearch
不会将其转换为long
格式,则这是目前唯一的解决方案.这实际上是很痛苦的.
I agree, this is counter intuitive but it's the only solution for now if you wish that elasticsearch
doesn't convert it into long
format. This is actually quite painful.
scala> val df2 = df.withColumn("EVTExit_1", $"EVTExit".cast("string"))
// df2: org.apache.spark.sql.DataFrame = [EVTExit: date, EVTExit_1: string]
scala> df2.show
// +----------+----------+
// | EVTExit| EVTExit_1|
// +----------+----------+
// |2014-06-03|2014-06-03|
// | null| null|
// |2012-10-23|2012-10-23|
// |2014-06-03|2014-06-03|
// |2015-11-05|2015-11-05|
// +----------+----------+
现在您可以将数据写入elasticsearch
:
Now you can write your data to elasticsearch
:
scala> df2.write.format("org.elasticsearch.spark.sql").option("es.mapping.date.rich", "false").save("workset/workset1")
现在让我们检查一下ES上的内容.首先让我们看一下映射:
Now let's check what's on ES. First let's see the mapping :
$ curl -XGET localhost:9200/workset?pretty=true
{
"workset" : {
"aliases" : { },
"mappings" : {
"workset1" : {
"properties" : {
"EVTExit" : {
"type" : "long"
},
"EVTExit_1" : {
"type" : "date",
"format" : "strict_date_optional_time||epoch_millis"
}
}
}
},
"settings" : {
"index" : {
"creation_date" : "1475063310916",
"number_of_shards" : "5",
"number_of_replicas" : "1",
"uuid" : "i3Rb014sSziCmYm9LyIc5A",
"version" : {
"created" : "2040099"
}
}
},
"warmers" : { }
}
}
好像我们有约会.现在让我们检查内容:
It seems like we have our dates. Now let's check the contents :
$ curl -XGET localhost:9200/workset/_search?pretty=true -d '{ "size" : 1 }'
{
"took" : 2,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"failed" : 0
},
"hits" : {
"total" : 5,
"max_score" : 1.0,
"hits" : [ {
"_index" : "workset",
"_type" : "workset1",
"_id" : "AVdwn-vFWzMbysX5OjMA",
"_score" : 1.0,
"_source" : {
"EVTExit" : 1401746400000,
"EVTExit_1" : "2014-06-03"
}
} ]
}
}
注1:我保留了两个字段用于演示目的,但我认为你明白了.
Note 1: I kept both fields for the demonstration purpose but I think that you get the point.
注意2:已在spark-shell
$ spark-shell --master local[*] --packages org.elasticsearch:elasticsearch-spark_2.10:2.3.2
注意3:与pyspark
相同的解决方案:
from pyspark.sql.functions import col
df2 = df.withColumn("EVTExit_1",col("EVTExit").cast("string"))
df2.write.format("org.elasticsearch.spark.sql") \
.option("es.mapping.date.rich", "false").save("workset/workset1")
这篇关于在Elasticsearch Spark中将EPOCH转换为日期的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!