通过Spark Scala从ElasticSearch读取嵌套数据 [英] Reading Nested data from ElasticSearch via Spark Scala

查看:673
本文介绍了通过Spark Scala从ElasticSearch读取嵌套数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试通过Spark Scala从Elasticsearch读取数据:



Scala 2.11.8,Spark 2.3.0,Elasticsearch 5.6.8



要连接- spark2-shell --jars elasticsearch-spark-20_2.11-5.6.8.jar

  val df = spark.read.format( org.elasticsearch.spark.sql)。option( es.nodes, xxxxxxx ).option( es.port, xxxx)。option( es.net.http.auth.user, xxxxx)。option( spark.serializer, org.apache.spark.serializer .KryoSerializer)。option( es.net.http.auth.pass, xxxxxx)。option( es.net.ssl, true)。option( es.nodes.wan.only , true)。option( es.net.ssl.cert.allow.self.signed, true)。option( es.net.ssl.truststore.location, xxxxx)。option(  es.net.ssl.truststore.pass, xxxxx)。option( es.read.field.as.array.include, true)。option( pushdown, true)。option ( es.read.field.as.array.include, a4,a4.a41,a4.a42,a4.a43,a4.a43.a431,a4.a43.a432,a4.a44,a4.a45 ).load(< index_name>)

以下架构

  |-a1:字符串(可为空= true)
|-a2:字符串(可为空)true
|-a3:struct(可为空= true)
| |-a31:整数(nullable = true)
| |-a32:结构(nullable = true)
|-a4:数组(nullable = true)
| |-元素:struct(containsNull = true)
| | |-a41:字符串(nullable = true)
| | |-a42:字符串(nullable = true)
| | |-a43:结构(nullable = true)
| | | |-a431:字符串(nullable = true)
| | | |-a432:字符串(nullable = true)
| | |-a44:字符串(nullable = true)
| | |-a45:字符串(nullable = true)
|-a8:字符串(nullable = true)
|-a9:数组(nullable = true)
| |-元素:struct(containsNull = true)
| | |-a91:字符串(可为空= true)
| | |-a92:字符串(nullable = true)
|-a10:字符串(nullable = true)
|-a11:时间戳记(nullable = true)

尽管我能够通过以下命令从直接列和嵌套模式级别1(即a9或a3列)中读取数据:

  df.select(explode($ a9)。as( exploded))。select( exploded。*)。show 

当我尝试读取a4元素时出现了问题,因为它使我跌落到错误下面:

  [阶段18:> (0 + 1)/ 1] 20/02/28 02:43:23 WARN scheduler.TaskSetManager:在阶段18.0(TID 54,xxxxxxx,执行者12)中丢失了任务0.0:scala.MatchError:Buffer()(属于scala类) .collection.convert.Wrappers $ JListWrapper)
在org.apache.spark.sql.catalyst.CatalystTypeConverters $ StringConverter $ .toCatalystImpl(CatalystTypeConverters.scala:276)
在org.apache.spark.sql。 Catalyst.CatalystTypeConverters $ StringConverter $ .toCatalystImpl(CatalystTypeConverters.scala:275)
at org.apache.spark.sql.catalyst.CatalystTypeConverters $ CatalystTypeConverter.toCatalyst(CatalystTypeConverters.apache $ scala:103) .spark.sql.catalyst.CatalystTypeConverters $ StructConverter.toCatalystImpl(CatalystTypeConverters.scala:241)
在org.apache.spark.sql.catalyst.CatalystTypeConverters $ StructConverter.toCatalystImpl(CatalystType231)
在org.apache.spark.sql.catalyst.CatalystTypeConverters $ CatalystTypeConverter.toCatalyst(CatalystTypeConver ters.scala:103)在org.apache.spark.sql.catalyst.CatalystTypeConverters $ ArrayConverter $$ anonfun $ toCatalystImpl $ 2.apply(CatalystTypeConverters.scala:164)
在scal.collection.TraversableLike $ $ anonfun $ map $ 1.apply(TraversableLike.scala:234)
在scala.collection.TraversableLike $$ anonfun $ map $ 1.apply(TraversableLike.scala:234)
在scala.collection.Iterator $ class.foreach(Iterator.scala:893)
在scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
在scala.collection.IterableLike $ class.foreach(IterableLike.scala:72)
在scala.collection.AbstractIterable.foreach(Iterable.scala:54)
在scala.collection.TraversableLike $ class.map(TraversableLike.scala:234)
在scala.collection.AbstractTraversable .map(Traversable.scala:104)
在org.apache.spark.sql.catalyst.CatalystTypeConverters $ ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:164)
在org.apache .spark.sql.catalyst.CatalystTypeConverters $ ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:154)
at org.apache.spark.sql.catalyst.CatalystTypeConverters $ CatalystTypeConverter.toCatalyst(CatalystTypeConverters $ CatalystTypeConverters。在org.apache.spark.sql.catalyst.CatalystTypeConverters $$ anonfun $ createToCatalystConverter $ 2.apply(CatalystTypeConverters.scala:379)
在org.apache.spark.sql.execution.RDDConversions $$ anonfun $ rowToRow $ anonfun $ apply $ 3.apply(ExistingRDD.scala:60)
at org.apache.spark.sql.execution.RDDConversions $$ anonfun $ rowToRowRdd $ 1 $$ anonfun $ apply $ 3.apply(ExistingRDD.scala:57 )在scala.collection.Iterator $$ anon $ 11.next(Iterator.scala:409)中的
在org.apache.spark.sql.catalyst.expressions.GeneratedClass $ GeneratedIteratorForCodegenStage1.processNext(未知源)中的

在org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
在org.apache.spark.sql .execution.WholeStageCodegenExec $$ anonfun $ 10 $$ anon $ 1.hasNext(WholeStageCodegenExec.scala:614)
在scala.collection.Iterator $$ anon $ 12.hasNext(Iterator.scala:439)
在scala .collection.Iterator $ JoinIterator.hasNext(Iterator.scala:211)
在scala.collection.Iterator $$ anon $ 11.hasNext(Iterator.scala:408)
在org.apache.spark.sql .catalyst.expressions.GeneratedClass $ GeneratedIteratorForCodegenStage2.processNext(未知源)
在org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
在org.apache.spark。 sql.execution.WholeStageCodegenExec $$ anonfun $ 10 $$ anon $ 1.hasNext(WholeStageCodegenExec.scala:614)
at org.apache.spark.sql.execution.SparkPlan $$ anonfun $ 2.apply(SparkPlan.scala:253 )
在org.apache.spark.sql.execution.SparkPlan $$ anonfun $ 2.apply(SparkPlan.scala:247)
在org.apache.spark.rdd.RDD $$ anonfun $ mapPartitionsInternal $ 1 $$ anonfun $ apply $ 25.apply(RDD.scal a:836)
在org.apache.spark.rdd.RDD $$ anonfun $ mapPartitionsInternal $ 1 $$ anonfun $ apply $ 25.apply(RDD.scala:836)
在org.apache.spark。 rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
在org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
在org.apache.spark.rdd.RDD。迭代器(RDD.scala:288)
,位于org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
,位于org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD。 scala:324)org.apache.spark.rdd.RDD.iterator(RDD.scala:288)中的
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)中的

在org.apache.spark.scheduler.Task.run(Task.scala:109)
在org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:381)
在java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
在java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolEx ecutor.java:624)
在java.lang.Thread.run(Thread.java:748)

20/02/28 02:43:23错误scheduler.TaskSetManager:任务0在阶段18.0中失败4次;正在中止作业
org.apache.spark.SparkException:作业由于阶段失败而中止:阶段18.0中的任务0失败4次,最近一次失败:阶段18.0中的任务0.3丢失(TID 57,xxxxxxx,执行程序12): scala.MatchError:Buffer()(类scala.collection.convert.Wrappers $ JListWrapper)
在org.apache.spark.sql.catalyst.CatalystTypeConverters $ StringConverter $ .toCatalystImpl(CatalystTypeConverters.scala:276)
在org.apache.spark.sql.catalyst.CatalystTypeConverters $ StringConverter $ .toCatalystImpl(CatalystTypeConverters.scala:275)
在org.apache.spark.sql.catalyst.CatalystTypeConverters $ CatalystTypeConverter(toCatalystTypeConverter.toCatalyst :103)在org.apache.spark.sql.catalyst.CatalystTypeConverters $ StructConverter.toCatalystImpl(CatalystTypeConverters.scala:241)中的
在org.apache.spark.sql.catalyst.CatalystTypeConverters $ StructConverterI。 (CatalystTypeConverters.scala:231)

Anyt是我做错了什么,还是缺少任何步骤?请帮助

解决方案

在我的脑海中,当未通过spark / ElasticSearch连接器猜测的架构出现此错误



请记住, ES是无模式的,而 SparkSQL具有硬模式。缩小这种差距并非总是可能的,因此这只是尽力而为。



在连接两者时,连接器对文档进行采样并尝试猜测一个模式:字段A是一个字符串,字段B是一个具有两个子字段的对象结构:B.1是一个日期,而B.2是一个字符串数组,...等等。



如果猜错了(通常:给定的列/子列被猜测为字符串,但在某些文档中实际上是数组或数字),则JSON到SparkSQL的转换会发出此类错误。 / p>

文档,它指出:


Elasticsearch会处理具有单个或多个值相同;实际上,映射不提供有关此的信息。作为客户端,这意味着在实际读取字段之前,无法判断该字段是否为单值。在大多数情况下,这不是问题,elasticsearch-hadoop会自动动态创建必要的列表/数组。但是,在具有严格模式的环境(例如Spark SQL)中,不允许从其声明的类型更改字段实际值。更糟糕的是,即使在读取数据之前,也需要此信息。由于映射不够确定,因此elasticsearch-hadoop允许用户通过字段信息(特别是es.read.field.as.array.include和es.read.field.as.array.exclude)指定额外的信息。


所以我请您检查在问题中报告的架构(Spark猜测的架构)是否确实有效



如果不是,那么您会有一些选择:


  1. 分别更正映射。如果问题与无法识别的数组类型有关,则可以使用配置选项。您可以看到 es.read.field.as.array.include (分别为 .exclude )选项(其中用于主动告诉Spark文档中的哪些属性是数组(而不是数组)。如果未使用字段,则 es.read.field.exclude 是一个选项,会从Spark完全排除给定字段,从而绕过它的可能纲要issus。


  2. 如果无法为所有情况提供有效架构给ElasticSearch(例如,某些字段有时是一个数字,有时是一个字符串,并且无法分辨),那么基本上,您将不得不回到RDD级别(并且如果需要的话,一旦架构又回到了Dataset / Dataframe)



I am trying to read data from Elasticsearch via Spark Scala:

Scala 2.11.8, Spark 2.3.0, Elasticsearch 5.6.8

To Connect -- spark2-shell --jars elasticsearch-spark-20_2.11-5.6.8.jar

val df = spark.read.format("org.elasticsearch.spark.sql").option("es.nodes", "xxxxxxx").option("es.port", "xxxx").option("es.net.http.auth.user","xxxxx").option("spark.serializer", "org.apache.spark.serializer.KryoSerializer").option("es.net.http.auth.pass", "xxxxxx").option("es.net.ssl", "true").option("es.nodes.wan.only", "true").option("es.net.ssl.cert.allow.self.signed", "true").option("es.net.ssl.truststore.location", "xxxxx").option("es.net.ssl.truststore.pass", "xxxxx").option("es.read.field.as.array.include","true").option("pushdown", "true").option("es.read.field.as.array.include","a4,a4.a41,a4.a42,a4.a43,a4.a43.a431,a4.a43.a432,a4.a44,a4.a45").load("<index_name>") 

Schema as below

 |-- a1: string (nullable = true)
 |-- a2: string (nullable = true)
 |-- a3: struct (nullable = true)
 |    |-- a31: integer (nullable = true)
 |    |-- a32: struct (nullable = true)
 |-- a4: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a41: string (nullable = true)
 |    |    |-- a42: string (nullable = true)
 |    |    |-- a43: struct (nullable = true)
 |    |    |    |-- a431: string (nullable = true)
 |    |    |    |-- a432: string (nullable = true)
 |    |    |-- a44: string (nullable = true)
 |    |    |-- a45: string (nullable = true)
 |-- a8: string (nullable = true)
 |-- a9: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a91: string (nullable = true)
 |    |    |-- a92: string (nullable = true)
 |-- a10: string (nullable = true)
 |-- a11: timestamp (nullable = true)

Though I am able to read data from direct columns and nested schema level 1 (i.e a9 or a3 columns) via command:

df.select(explode($"a9").as("exploded")).select("exploded.*").show

Problem is occuring when I am trying to read a4 elements as its throwing me below error:

    [Stage 18:>                                                         (0 + 1) / 1]20/02/28 02:43:23 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 18.0 (TID 54, xxxxxxx, executor 12): scala.MatchError: Buffer() (of class scala.collection.convert.Wrappers$JListWrapper)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:276)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:275)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:241)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:231)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter$$anonfun$toCatalystImpl$2.apply(CatalystTypeConverters.scala:164)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:164)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:154)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:379)
        at org.apache.spark.sql.execution.RDDConversions$$anonfun$rowToRowRdd$1$$anonfun$apply$3.apply(ExistingRDD.scala:60)
        at org.apache.spark.sql.execution.RDDConversions$$anonfun$rowToRowRdd$1$$anonfun$apply$3.apply(ExistingRDD.scala:57)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
        at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:211)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:836)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:836)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:381)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

20/02/28 02:43:23 ERROR scheduler.TaskSetManager: Task 0 in stage 18.0 failed 4 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 18.0 failed 4 times, most recent failure: Lost task 0.3 in stage 18.0 (TID 57, xxxxxxx, executor 12): scala.MatchError: Buffer() (of class scala.collection.convert.Wrappers$JListWrapper)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:276)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:275)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:241)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:231)

Anything I am doing wrong or any steps I am missing? Please Help

解决方案

Out of the top of my head, this error occurs when the schema guessed by the spark/ElasticSearch connector is not actually compatible with the data being read.

Keep in my that ES is schemaless, and SparkSQL has a "hard" schema. Bridging this gap is not always possible, so it's all just a best effort.

When connecting the two, the connector samples the documents and tries to guess a schema : "field A is a string, field B is an object structure with two subfield : B.1 being a date, and B.2 being an array of strings, ... whatever".

If it guessed wrong (typically : a given column / subcolumn is guessed as being a String, but in some documents it in fact is an array or a number), then the JSON to SparkSQL conversion emits those kind of errors.

In the words of the documentation, it states :

Elasticsearch treats fields with single or multi-values the same; in fact, the mapping provides no information about this. As a client, it means one cannot tell whether a field is single-valued or not until is actually being read. In most cases this is not an issue and elasticsearch-hadoop automatically creates the necessary list/array on the fly. However in environments with strict schema such as Spark SQL, changing a field actual value from its declared type is not allowed. Worse yet, this information needs to be available even before reading the data. Since the mapping is not conclusive enough, elasticsearch-hadoop allows the user to specify the extra information through field information, specifically es.read.field.as.array.include and es.read.field.as.array.exclude.

So I'd adivse you to check that the schema you reported in your question (the schema guessed by Spark) is actually valid agains all your documents, or not.

If it's not, you have a few options going forward :

  1. Correct the mapping individually. If the problem is linked to an array type not being recognized as such, you can do so using configuration options. You can see the es.read.field.as.array.include (resp. .exclude) option (which is used to actively tell Spark which properties in the documents are array (resp. not array). If a field is unused, es.read.field.exclude is an option that will exclude a given field from Spark altogether, bypassing possible schema issus for it.

  2. If there is no way to provide a valid schema for all cases to ElasticSearch (e.g. some field is sometimes a number, somtimes a string, and there is no way to tell), then basically, you're stuck to going back at the RDD level (and if need be, go back to Dataset / Dataframe once the schema is well defined).

这篇关于通过Spark Scala从ElasticSearch读取嵌套数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆