flink 弹性搜索连接器 [英] flink elasticsearch connector

查看:47
本文介绍了flink 弹性搜索连接器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用以下代码将 Flink 连接到 ElasticSearch.但是用Flink运行的时候,会显示很多错误.程序先从一个端口输入数据,然后根据写的程序读取命令行中的每一行.然后显示字数.主要问题是连接到弹性搜索时,不幸的是在连接时出错.这些是错误吗?将 Minimal Flink 连接到 Elastic Search 需要哪些类?

公共类弹性{public static void main(String[] args) 抛出异常 {//要连接的端口最终的 int 端口;尝试 {最终 ParameterTool params = ParameterTool.fromArgs(args);port = params.getInt("port");} 捕获(异常 e){System.err.println("未指定端口.请运行'SocketWindowWordCount --port <port>'");返回;}//获取执行环境最终 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//通过连接到套接字获取输入数据数据流<字符串>text = env.socketTextStream("localhost", port, "\n");//解析数据,分组,窗口,聚合计数数据流windowCounts = 文本.flatMap(new FlatMapFunction() {@覆盖public void flatMap(String value, Collector out) {for (String word : value.split("\\s")) {out.collect(new WordWithCount(word, 1L));}}}).keyBy("字").timeWindow(Time.seconds(5), Time.seconds(1)).reduce(new ReduceFunction() {@覆盖公共 WordWithCount 减少(WordWithCount a,WordWithCount b){return new WordWithCount(a.word, a.count + b.count);}});//使用单个线程打印结果,而不是并行打印windowCounts.print().setParallelism(1);text.print().setParallelism(1);env.execute("套接字窗口字数");列表httpHosts = new ArrayList();httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));httpHosts.add(new HttpHost("10.2.3.1", 9200, "http"));httpHosts.add(new HttpHost("my-ip",9200,"http"));ElasticsearchSink.BuilderesSinkBuilder = new ElasticsearchSink.Builder(http主机,新的 ElasticsearchSinkFunction() {公共索引请求创建索引请求(字符串元素){映射<字符串,字符串>json = new HashMap();json.put("数据", 元素);返回 Requests.indexRequest().index("伊朗").type("int").source(json);}@覆盖公共无效过程(字符串元素,RuntimeContext ctx,RequestIndexer 索引器){indexer.add(createIndexRequest(element));}});esSinkBuilder.setBulkFlushMaxActions(1);final Header[] defaultHeaders = new Header[]{new BasicHeader("header", "value")};esSinkBuilder.setRestClientFactory(new RestClientFactory() {@覆盖public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {restClientBuilder.setDefaultHeaders(defaultHeaders).setMaxRetryTimeoutMillis(10000).setPathPrefix("a").setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {@覆盖公共 RequestConfig.Builder CustomizeRequestConfig(RequestConfig.Builder builder) {返回 builder.setSocketTimeout(10000);}});}});text.addSink(esSinkBuilder.build());}//带有计数的单词的数据类型公共静态类 WordWithCount {公共字符串字;公长计数;公共 WordWithCount() {}public WordWithCount(String word, long count) {this.word = 字;this.count = 计数;}@覆盖公共字符串 toString() {返回单词+:"+计数;}}}

我的elasticsearch版本:7.5.0我的 flink 版本:1.8.3

我的错误:

sudo/etc/flink-1.8.3/bin/flink run -c org.apache.flink.Elastic/root/FlinkElastic-1.0.jar --port 9000------------------------------------------------------------该程序以以下异常结束:java.lang.RuntimeException:无法从类中查找 main(String[]) 方法org.apache.flink.Elastic:org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction在 org.apache.flink.client.program.PackagedProgram.hasMainMethod(PackagedProgram.java:527)在 org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:246)……还有 7 个引起:java.lang.NoClassDefFoundError:org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction在 java.lang.Class.getDeclaredMethods0(Native Method)在 java.lang.Class.privateGetDeclaredMethods(Class.java:2701)在 java.lang.Class.privateGetMethodRecursive(Class.java:3048)在 org.apache.flink.client.program.PackagedProgram.hasMainMethod(PackagedProgram.java:521)……还有 7 个引起:java.lang.ClassNotFoundException:org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction在 java.net.URLClassLoader.findClass(URLClassLoader.java:382)在 java.lang.ClassLoader.loadClass(ClassLoader.java:418)在 org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:120)在 java.lang.ClassLoader.loadClass(ClassLoader.java:351)……还有 13 个

我的pom:

 org.apache.flink<artifactId>FlinkElastic</artifactId><version>1.0</version><构建><插件><插件><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.6.1</version><配置><来源>6</来源><目标>6</目标></配置></插件></plugins></build><依赖项><依赖><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch6_2.11</artifactId><version>1.8.3</version></依赖><依赖><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.8.3</version><scope>编译</scope></依赖><依赖><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>1.8.3</version><scope>编译</scope></依赖><依赖><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.11</artifactId><version>1.8.3</version></依赖></依赖项><属性><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></属性>

解决方案

请在此处找到 Flink Elastic Connector 代码.我使用了下面提到的以下依赖项和版本.

  • Flink:1.10.0
  • 弹性搜索:7.6.2
  • flink-connector-elasticsearch7
  • 斯卡拉:2.12.11
  • SBT:1.2.8
  • Java:11.0.4

注意事项:

  • 从 ElasticSearch 6.x 开始,他们开始全面支持 REST 弹性客户端.在 Elastic5.x 之前,他们使用的是传输弹性客户端.
<块引用>

1.Flink 数据流

val inputStream: DataStream[(String, String)] = ...ESSinkService.sinkToES(inputStream, index)

<块引用>

2.ElastiSearchSink 函数

包demo.elastic导入 org.apache.flink.streaming.api.scala._导入 org.apache.log4j._导入 org.apache.flink.api.common.functions.RuntimeContext导入 org.apache.flink.streaming.connectors.elasticsearch7.{ElasticsearchSink, RestClientFactory}导入 org.apache.flink.streaming.connectors.elasticsearch.{ActionRequestFailureHandler, ElasticsearchSinkFunction, RequestIndexer}导入 org.apache.http.HttpHost导入 org.elasticsearch.client.{Requests, RestClientBuilder}导入 org.elasticsearch.common.xcontent.XContentType导入 org.elasticsearch.action.ActionRequest导入 org.apache.flink.streaming.api.datastream.DataStreamSink类 ESSinkService {val logger = Logger.getLogger(getClass.getName)val httpHosts = new java.util.ArrayList[HttpHost]httpHosts.add(new HttpHost("localhost", 9200, "http"))httpHosts.add(new HttpHost("localhost", 9200, "http"))def sinkToES(counted: DataStream[(String, String)], index: String): DataStreamSink[(String, String)] = {val esSinkBuilder = new ElasticsearchSink.Builder[(String, String)](httpHosts, new ElasticsearchSinkFunction[(String, String)] {def process(element: (String, String), ctx: RuntimeContext, indexer: RequestIndexer) {indexer.add(Requests.indexRequest.index(element._2 + "_" + index).source(element._1, XContentType.JSON))}})esSinkBuilder.setBulkFlushMaxActions(2)esSinkBuilder.setBulkFlushInterval(1000L)esSinkBuilder.setFailureHandler(new ActionRequestFailureHandler {覆盖 def onFailure(actionRequest: ActionRequest, throwable: Throwable, i: Int, requestIndexer: RequestIndexer): Unit = {println("@@@@@@@在 ElasticsearchSink 失败时:-->" + throwable.getMessage)}})esSinkBuilder.setRestClientFactory(new RestClientFactory {覆盖 def configureRestClientBuilder(restClientBuilder: RestClientBuilder): Unit = {/*restClientBuilder.setDefaultHeaders(...)restClientBuilder.setMaxRetryTimeoutMillis(...)restClientBuilder.setPathPrefix(...)restClientBuilder.setHttpClientConfigCallback(...)*/}})counted.addSink(esSinkBuilder.build())}}对象 ESSinkService 扩展了 ESSinkService

注意:欲了解更多详情,请点击 这里.

I used the following code to connect Flink to ElasticSearch. But when running with Flink, a lot of errors are displayed.The program first enters the data from a port and then reads each line in the command line according to the program written. It then displays the number of words. The main problem is when connecting to a elasticsearch that unfortunately gives error when connecting. Are these errors? What classes do you need to connect Minimal Flink to Elastic Search?

public class Elastic {

    public static void main(String[] args) throws Exception {


       // the port to connect to
       final int port;
       try {
           final ParameterTool params = ParameterTool.fromArgs(args);
           port = params.getInt("port");
       } catch (Exception e) {
           System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'");
           return;
         }

        // get the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

       // get input data by connecting to the socket
       DataStream<String> text = env.socketTextStream("localhost", port, "\n");

      // parse the data, group it, window it, and aggregate the counts
       DataStream<WordWithCount> windowCounts = text
            .flatMap(new FlatMapFunction<String, WordWithCount>() {
                @Override
                public void flatMap(String value, Collector<WordWithCount> out) {
                    for (String word : value.split("\\s")) {
                        out.collect(new WordWithCount(word, 1L));
                    }
                }
            })
            .keyBy("word")
            .timeWindow(Time.seconds(5), Time.seconds(1))
            .reduce(new ReduceFunction<WordWithCount>() {
                @Override
                public WordWithCount reduce(WordWithCount a, WordWithCount b) {
                    return new WordWithCount(a.word, a.count + b.count);
                }
            });

    // print the results with a single thread, rather than in parallel
    windowCounts.print().setParallelism(1);
    text.print().setParallelism(1);

    env.execute("Socket Window WordCount");



    List<HttpHost> httpHosts = new ArrayList<HttpHost>();
    httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
    httpHosts.add(new HttpHost("10.2.3.1", 9200, "http"));
    httpHosts.add(new HttpHost("my-ip",9200,"http"));


    ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<String>(
            httpHosts,
            new ElasticsearchSinkFunction<String>() {
                public IndexRequest createIndexRequest(String element) {
                    Map<String, String> json = new HashMap<String, String>();
                    json.put("data", element);

                    return Requests.indexRequest()
                            .index("iran")
                            .type("int")
                            .source(json);
                }

                @Override
                public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
                    indexer.add(createIndexRequest(element));
                }
            }
    );

    esSinkBuilder.setBulkFlushMaxActions(1);

    final Header[] defaultHeaders = new Header[]{new BasicHeader("header", "value")};

    esSinkBuilder.setRestClientFactory(new RestClientFactory() {
        @Override
        public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
            restClientBuilder.setDefaultHeaders(defaultHeaders)
                    .setMaxRetryTimeoutMillis(10000)
                    .setPathPrefix("a")
                    .setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
                        @Override
                        public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder builder) {
                            return builder.setSocketTimeout(10000);
                        }
                    });
        }
    });

    text.addSink(esSinkBuilder.build());


}

// Data type for words with count
public static class WordWithCount {

    public String word;
    public long count;

    public WordWithCount() {
    }

    public WordWithCount(String word, long count) {
        this.word = word;
        this.count = count;
    }

    @Override
    public String toString() {
        return word + " : " + count;
    }
}
}

my elasticsearch version: 7.5.0 my flink version: 1.8.3

my error:

sudo /etc/flink-1.8.3/bin/flink run -c org.apache.flink.Elastic /root/FlinkElastic-1.0.jar --port 9000

------------------------------------------------------------
The program finished with the following exception:

java.lang.RuntimeException: Could not look up the main(String[]) method from the class 
org.apache.flink.Elastic: 
org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction
    at org.apache.flink.client.program.PackagedProgram.hasMainMethod(PackagedProgram.java:527)
    at org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:246)
    ... 7 more
Caused by: java.lang.NoClassDefFoundError: 
org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction
    at java.lang.Class.getDeclaredMethods0(Native Method)
    at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
    at java.lang.Class.privateGetMethodRecursive(Class.java:3048)
    at org.apache.flink.client.program.PackagedProgram.hasMainMethod(PackagedProgram.java:521)
... 7 more
Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:120)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 13 more

my pom:

    <groupId>org.apache.flink</groupId>
<artifactId>FlinkElastic</artifactId>
<version>1.0</version>
<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.6.1</version>
            <configuration>
                <source>6</source>
                <target>6</target>
            </configuration>
        </plugin>
    </plugins>
</build>


<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
        <version>1.8.3</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.8.3</version>
        <scope>compile</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.11</artifactId>
        <version>1.8.3</version>
        <scope>compile</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.11</artifactId>
        <version>1.8.3</version>
    </dependency>
</dependencies>

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

解决方案

Please find the Flink Elastic Connector code here. I have used the following dependencies and versions mentioned below.

  • Flink: 1.10.0
  • ElasticSearch: 7.6.2
  • flink-connector-elasticsearch7
  • Scala: 2.12.11
  • SBT: 1.2.8
  • Java: 11.0.4

Point to be noted here:

  • Since ElasticSearch 6.x onwards they started full support of the REST elastic client. And till Elastic5.x they were using Transport elastic client.

1. Flink DataStream

val inputStream: DataStream[(String, String)] = ...

ESSinkService.sinkToES(inputStream, index)

2. ElastiSearchSink Function

package demo.elastic

import org.apache.flink.streaming.api.scala._
import org.apache.log4j._
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.connectors.elasticsearch7.{ElasticsearchSink, RestClientFactory}
import org.apache.flink.streaming.connectors.elasticsearch.{ActionRequestFailureHandler, ElasticsearchSinkFunction, RequestIndexer}
import org.apache.http.HttpHost
import org.elasticsearch.client.{Requests, RestClientBuilder}
import org.elasticsearch.common.xcontent.XContentType
import org.elasticsearch.action.ActionRequest
import org.apache.flink.streaming.api.datastream.DataStreamSink


class ESSinkService {

  val logger = Logger.getLogger(getClass.getName)

  val httpHosts = new java.util.ArrayList[HttpHost]
  httpHosts.add(new HttpHost("localhost", 9200, "http"))
  httpHosts.add(new HttpHost("localhost", 9200, "http"))

  def sinkToES(counted: DataStream[(String, String)], index: String): DataStreamSink[(String, String)] = {

    val esSinkBuilder = new ElasticsearchSink.Builder[(String, String)](
      httpHosts, new ElasticsearchSinkFunction[(String, String)] {
        def process(element: (String, String), ctx: RuntimeContext, indexer: RequestIndexer) {

          indexer.add(Requests.indexRequest
            .index(element._2 + "_" + index)
            .source(element._1, XContentType.JSON))
        }
      }
    )
    esSinkBuilder.setBulkFlushMaxActions(2)
    esSinkBuilder.setBulkFlushInterval(1000L)
    esSinkBuilder.setFailureHandler(new ActionRequestFailureHandler {
      override def onFailure(actionRequest: ActionRequest, throwable: Throwable, i: Int, requestIndexer: RequestIndexer): Unit = {

        println("@@@@@@@On failure from ElasticsearchSink:-->" + throwable.getMessage)
      }
    })

    esSinkBuilder.setRestClientFactory(new RestClientFactory {
      override def configureRestClientBuilder(restClientBuilder: RestClientBuilder): Unit = {
        /*restClientBuilder.setDefaultHeaders(...)
        restClientBuilder.setMaxRetryTimeoutMillis(...)
        restClientBuilder.setPathPrefix(...)
        restClientBuilder.setHttpClientConfigCallback(...)*/
      }
    })

    counted.addSink(esSinkBuilder.build())
  }
}

object ESSinkService extends ESSinkService

Note: For more details click here.

这篇关于flink 弹性搜索连接器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆