flink 1.3.1 elasticsearch 5.5.1. ElasticsearchSinkFunction失败,并出现java.lang.NoSuchMethodError [英] flink 1.3.1 elasticsearch 5.5.1. ElasticsearchSinkFunction fails with java.lang.NoSuchMethodError

查看:267
本文介绍了flink 1.3.1 elasticsearch 5.5.1. ElasticsearchSinkFunction失败,并出现java.lang.NoSuchMethodError的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Scala / sbt进行以下示例:

I'm going through following samples using Scala / sbt:

flink/elasticsearch/kibana
我的built.sbt包括以下版本:

libraryDependencies ++= Seq(
  "org.apache.flink" %% "flink-scala" % "1.3.1" % "provided",
  "org.apache.flink" %% "flink-streaming-scala" % "1.3.1" % "provided",
  "org.apache.flink" %% "flink-clients" % "1.3.1" % "provided",
  "joda-time" % "joda-time" % "2.9.9",
  "com.google.guava" % "guava" % "22.0",
  "com.typesafe" % "config" % "1.3.0",
  "org.apache.flink" % "flink-connector-kafka-0.10_2.10" % "1.2.0",
  "org.elasticsearch" % "elasticsearch" % "5.5.1",
  "org.elasticsearch.client" % "transport" % "5.5.1",
  "org.apache.flink" % "flink-connector-elasticsearch5_2.10" % "1.3.1"
)

程序失败,出现以下异常(减去):

Program fails with following exception (substract):

    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.NoSuchMethodError: org.elasticsearch.action.bulk.BulkProcessor.add(Lorg/elasticsearch/action/ActionRequest;)Lorg/elasticsearch/action/bulk/BulkProcessor;
    at org.apache.flink.streaming.connectors.elasticsearch.BulkProcessorIndexer.add(BulkProcessorIndexer.java:52)
    at org.aj.flink.stream.TaxiDashboard$MyElasticsearchInserter.process(TaxiDashboard.scala:157)
    at org.aj.flink.stream.TaxiDashboard$MyElasticsearchInserter.process(TaxiDashboard.scala:137)
    at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:282)
    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:41)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:575)

以下是我相关的进口商品:

Here are my related imports:

import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.client.Requests

这是MyElasticsearchInserter的代码:

  final class MyElasticsearchInserter(index: String, mappingType: String)
    extends ElasticsearchSinkFunction[(Int, Long, (Float, Float), Short)] {

    def process(element: (Int, Long, (Float, Float), Short),
                ctx: RuntimeContext,
                indexer: RequestIndexer): Unit = {

      // construct JSON document to index
      val json = Map(
        "time" -> element._2.toString,
        "location" -> (element._3._1 + "," + element._3._2),
        "cnt" -> element._4.toString
      )

      val rqst: IndexRequest = Requests.indexRequest
        .index(index)
        .`type`(mappingType)
        .source(json.asJava)
      indexer.add(rqst)
    }
  }

以下是添加接收器的代码:

Here is code for adding a Sink:

if (writeToElasticsearch) {
  // write to Elasticsearch
  nycRides.addSink(new ElasticsearchSink(esConfig,
    esTransport, new MyElasticsearchInserter("nyc-idx", "popular-locations")))

}

看起来问题在于flink 1.3.1是否可以与elasticsearch 5.5.1一起使用?找不到以下方法?

It looks the question is rather if flink 1.3.1 can be used with elasticsearch 5.5.1? Following method is not found?

Caused by: java.lang.NoSuchMethodError: org.elasticsearch.action.bulk.BulkProcessor.add(Lorg/elasticsearch/action/ActionRequest;)Lorg/elasticsearch/action/bulk/BulkProcessor;
        at org.apache.flink.streaming.connectors.elasticsearch.BulkProcessorIndexer.add(BulkProcessorIndexer.java:52)
        at org.aj.flink.stream.TaxiDashboard$MyElasticsearchInserter.process(TaxiDashboard.scala:157)

非常感谢

推荐答案

在快速浏览了ES代码库之后,我认为他们在5.2.0中删除了该方法(BulkProcessor#add(ActionRequest)).

Just after a quick look at ES code base I think they removed that method(BulkProcessor#add(ActionRequest)) in 5.2.0.

Flink在内部使用该方法.不幸的是,我担心Flink连接器现在无法与ES 5.3+连接器一起使用.

Flink uses that method internally. Unfortunately I fear Flink connector won't work with ES 5.3+ connector right now.

这篇关于flink 1.3.1 elasticsearch 5.5.1. ElasticsearchSinkFunction失败,并出现java.lang.NoSuchMethodError的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆