在 Elasticsearch Spark 中将 EPOCH 转换为日期 [英] Converting EPOCH to Date in Elasticsearch Spark

查看:41
本文介绍了在 Elasticsearch Spark 中将 EPOCH 转换为日期的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 DataFrame 正在将它写入 ES

I have a DataFrame that I am writing it to the ES

在写入 ES 之前,我将 EVTExit 列转换为日期,它位于 EPOCH.

Before writing to ES, I am converting the EVTExit column to Date, which is in EPOCH.

workset = workset.withColumn("EVTExit", to_date(from_unixtime($"EVTExit".divide(1000))))

workset.select("EVTExit").show(10)

+----------+
| EVTExit|
+----------+
|2014-06-03|
|null      |
|2012-10-23|
|2014-06-03|
|2015-11-05|

正如我所看到的,这个 EVTEExit 被转换为 Date.

As I can see this EVTExit is converted to Date.

workset.write.format("org.elasticsearch.spark.sql").save("workset/workset1")

但是写到 ES 后,我还是得到了 EPOC 格式.

But after writing it to the ES, I am still getting it in EPOC format.

"EVTExit" : 1401778800000

谁能知道这里出了什么问题.

Can anyone have the ideas what's going wrong here.

谢谢

推荐答案

让我们考虑一下您问题中的 DataFrame 示例:

Let's consider the DataFrame example from your question :

scala> val df = workset.select("EVTExit")
// df: org.apache.spark.sql.DataFrame = [EVTExit: date]

scala> df.printSchema
// root
//  |-- EVTExit: date (nullable = true)

您需要将列转换为字符串并禁用 es.mapping.date.rich,默认情况下为 true.

You would need to cast the column into a string and disable the es.mapping.date.rich which is true by default.

该参数定义是为 Elasticsearch 中的 Date 字段创建一个类似 Date 的丰富对象,还是将它们作为原语(String 或 long)返回.实际的对象类型基于使用的库;值得注意的例外是 Map/Reduce,它不提供内置的 Date 对象,因此无论此设置如何,都会返回 LongWritable 和 Text.

The parameter define whether to create a rich Date like object for Date fields in Elasticsearch or returned them as primitives (String or long). The actual object type is based on the library used; noteable exception being Map/Reduce which provides no built-in Date object and as such LongWritable and Text are returned regardless of this setting.

我同意,这是违反直觉的,但如果您希望 elasticsearch 不将其转换为 long 格式,这是目前唯一的解决方案.这实际上是相当痛苦的.

I agree, this is counter intuitive but it's the only solution for now if you wish that elasticsearch doesn't convert it into long format. This is actually quite painful.

scala> val df2 = df.withColumn("EVTExit_1", $"EVTExit".cast("string"))
// df2: org.apache.spark.sql.DataFrame = [EVTExit: date, EVTExit_1: string]

scala> df2.show
// +----------+----------+
// |   EVTExit| EVTExit_1|
// +----------+----------+
// |2014-06-03|2014-06-03|
// |      null|      null|
// |2012-10-23|2012-10-23|
// |2014-06-03|2014-06-03|
// |2015-11-05|2015-11-05|
// +----------+----------+

现在您可以将数据写入elasticsearch:

Now you can write your data to elasticsearch:

scala> df2.write.format("org.elasticsearch.spark.sql").option("es.mapping.date.rich", "false").save("workset/workset1")

现在让我们检查一下 ES 上的内容.首先让我们看看映射:

Now let's check what's on ES. First let's see the mapping :

$ curl -XGET localhost:9200/workset?pretty=true
{
  "workset" : {
    "aliases" : { },
    "mappings" : {
      "workset1" : {
        "properties" : {
          "EVTExit" : {
            "type" : "long"
          },
          "EVTExit_1" : {
            "type" : "date",
            "format" : "strict_date_optional_time||epoch_millis"
          }
        }
      }
    },
    "settings" : {
      "index" : {
        "creation_date" : "1475063310916",
        "number_of_shards" : "5",
        "number_of_replicas" : "1",
        "uuid" : "i3Rb014sSziCmYm9LyIc5A",
        "version" : {
          "created" : "2040099"
        }
      }
    },
    "warmers" : { }
  }
}

我们好像有约会了.现在让我们检查一下内容:

It seems like we have our dates. Now let's check the contents :

$ curl -XGET localhost:9200/workset/_search?pretty=true -d '{ "size" : 1 }'
{
  "took" : 2,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 5,
    "max_score" : 1.0,
    "hits" : [ {
      "_index" : "workset",
      "_type" : "workset1",
      "_id" : "AVdwn-vFWzMbysX5OjMA",
      "_score" : 1.0,
      "_source" : {
        "EVTExit" : 1401746400000,
        "EVTExit_1" : "2014-06-03"
      }
    } ]
  }
}

注意 1:我保留了这两个字段用于演示目的,但我认为您明白了.

Note 1: I kept both fields for the demonstration purpose but I think that you get the point.

注意 2:spark-shell 内使用 Elasticsearch 2.4、Spark 1.6.2、scala 2.10 和 elasticsearch-spark 2.3.2 进行测试

Note 2: Tested with Elasticsearch 2.4, Spark 1.6.2, scala 2.10 and elasticsearch-spark 2.3.2 inside spark-shell

$ spark-shell --master local[*] --packages org.elasticsearch:elasticsearch-spark_2.10:2.3.2

注意 3:pyspark 相同的解决方案:

Note 3: Same solution in with pyspark :

from pyspark.sql.functions import col
df2 = df.withColumn("EVTExit_1",col("EVTExit").cast("string"))
df2.write.format("org.elasticsearch.spark.sql") \
   .option("es.mapping.date.rich", "false").save("workset/workset1")

这篇关于在 Elasticsearch Spark 中将 EPOCH 转换为日期的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆