Elasticsearch连接器在IDE中工作,但在本地群集上不工作 [英] Elasticsearch connector works in IDE but not on local cluster

查看:285
本文介绍了Elasticsearch连接器在IDE中工作,但在本地群集上不工作的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试撰写使用提供的转换为Elasticsearch 2.3索引=nofollow> Elasticsearch2连接器

I am trying to write a Twitter stream into an Elasticsearch 2.3 index using the provided Elasticsearch2 connector

在IntelliJ中运行我的作业工作正常,但是当我在本地群集上运行该jar作业时,我收到以下错误:

Running my job in IntelliJ works fine but when I run that jar job on a local cluster I get the following error:

05/09/2016 13:26:58 Job execution switched to status RUNNING.
05/09/2016 13:26:58 Source: Custom Source -> (Sink: Unnamed, Sink: Unnamed, Sink: Unnamed)(1/1) switched to SCHEDULED 
05/09/2016 13:26:58 Source: Custom Source -> (Sink: Unnamed, Sink: Unnamed, Sink: Unnamed)(1/1) switched to DEPLOYING 
05/09/2016 13:26:58 Source: Custom Source -> (Sink: Unnamed, Sink: Unnamed, Sink: Unnamed)(1/1) switched to RUNNING 
05/09/2016 13:26:59 Source: Custom Source -> (Sink: Unnamed, Sink: Unnamed, Sink: Unnamed)(1/1) switched to FAILED 
java.lang.RuntimeException: Client is not connected to any Elasticsearch nodes!
    at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:172)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
    at java.lang.Thread.run(Thread.java:745)

05/09/2016 13:26:59 Job execution switched to status FAILING.
java.lang.RuntimeException: Client is not connected to any Elasticsearch nodes!
    at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:172)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
    at java.lang.Thread.run(Thread.java:745)
05/09/2016 13:26:59 Job execution switched to status FAILED.

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
    at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
    at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
    at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:65)
    at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:541)
    at com.pl.greeny.flink.TwitterAnalysis$.main(TwitterAnalysis.scala:69)
    at com.pl.greeny.flink.TwitterAnalysis.main(TwitterAnalysis.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
    at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
    at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:860)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:327)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1187)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1238)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:807)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:753)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:753)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Client is not connected to any Elasticsearch nodes!
    at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:172)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
    at java.lang.Thread.run(Thread.java:745)

我在scala中的代码:

My code in scala:

val config = new java.util.HashMap[String, String]
      config.put("bulk.flush.max.actions", "1")
      config.put("cluster.name", "elasticsearch")
      config.put("node.name", "node-1")

      config.put("path.home", "/media/user/e5e05ab5-28f3-4cee-a57c-444e32b99f04/thesis/elasticsearch-2.3.2/bin")
      val transports = new util.ArrayList[InetSocketAddress]
      transports.add(new InetSocketAddress(InetAddress.getLocalHost(),9300))
    transports.add(new InetSocketAddress(InetAddress.getLoopbackAddress(),9300))
    transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"),9300))
    transports.add(new InetSocketAddress(InetAddress.getByName("localhost"),9300))
    stream.addSink(new ElasticsearchSink(config, transports, new ElasticSearchSinkTwitter()))

从IDE运行该程序与本地群集有什么区别?

What is the difference between running that program from an IDE and the local cluster?

推荐答案

这样的问题通常是由IDE(IntelliJ,Eclipse)和Flink通过胖罐提交作业来管理/包含依赖关系的不同方式引起的。

Problems like this are often caused by the different ways that dependencies are managed / included by IDEs (IntelliJ, Eclipse) and Flink's job submission via fat jars.

我前几天遇到了同样的问题和任务管理器日志文件揭示了以下根本原因:

I had the same problem the other day and the task manager log file revealed the following root cause:


java.lang.IllegalArgumentException:具有名称的org.apache.lucene.codecs.PostingsFormat类型的SPI类'Lucene50'不存在。您需要将支持此SPI的相应JAR文件添加到类路径中。当前类路径支持以下名称:[es090,completion090,XBloomFilter]

java.lang.IllegalArgumentException: An SPI class of type org.apache.lucene.codecs.PostingsFormat with name 'Lucene50' does not exist. You need to add the corresponding JAR file supporting this SPI to your classpath. The current classpath supports the following names: [es090, completion090, XBloomFilter]

搜索错误我在SO上找到了这个答案问题:

Searching for the error I found this answer on SO that solved the issue:

https://stackoverflow.com/a/38354027/ 3609571

将以下依赖项添加到我的 pom.xml

by adding the following dependency to my pom.xml:

<dependency>
    <groupId>org.apache.lucene</groupId>
    <artifactId>lucene-core</artifactId>
    <version>5.4.1</version>
 </dependency>

注意,在这种情况下,依赖关系的顺序很重要。它仅在将 lucene-core 依赖项置于顶部时才起作用。将它添加到最后并不适用于我。所以这更像是一个黑客,而不是一个正确的解决方案。

Note, the order of dependencies matters in this case. It only worked when putting the lucene-core dependency on top. Adding it to the end did not work for me. So this more a "hack" than a proper fix.

这篇关于Elasticsearch连接器在IDE中工作,但在本地群集上不工作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆