Apache Flink与Elasticsearch集成 [英] Apache Flink integration with Elasticsearch

查看:1460
本文介绍了Apache Flink与Elasticsearch集成的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试将Flink与Elasticsearch 2.1.1集成,我正在使用maven依赖关系

 < dependency> 
< groupId> org.apache.flink< / groupId>
< artifactId> flink-connector-elasticsearch2_2.10< / artifactId>
< version> 1.1-SNAPSHOT< / version>
< / dependency>

这里是Java代码,我正在读取Kafka队列中的事件(可以正常工作),但不知何故,事件没有被发布在Elasticsearch中,也没有错误,在下面的代码中,如果我更改任何与端口,主机名,集群名称或索引名称的ElasticSearch相关的设置,那么立即我看到一个错误,但目前它不显示任何错误,也不会在ElasticSearch中创建任何新文档

  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 

//解析用户参数
ParameterTool parameterTool = ParameterTool.fromArgs(args);

DataStream< String> messageStream = env.addSource(new FlinkKafkaConsumer082(parameterTool.getRequired(topic),new SimpleStringSchema(),parameterTool.getProperties()));

messageStream.print();

地图< String,String> config = new HashMap<>();
config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS,1);
config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_INTERVAL_MS,1);

config.put(cluster.name,FlinkDemo);

列表< InetSocketAddress> transports = new ArrayList<>();
transports.add(new InetSocketAddress(InetAddress.getByName(localhost),9300));

messageStream.addSink(new ElasticsearchSink< String>(config,transports,new TestElasticsearchSinkFunction()));

env.execute();
}
private static class TestElasticsearchSinkFunction implements ElasticsearchSinkFunction< String> {
private static final long serialVersionUID = 1L;

public IndexRequest createIndexRequest(String element){
Map< String,Object> json = new HashMap<>();
json.put(data,element);

return Requests.indexRequest()
.index(flink)。id(hash+ element).source(json);
}

@Override
public void process(String element,RuntimeContext ctx,RequestIndexer indexer){
indexer.add(createIndexRequest(element));
}
}


解决方案

确实在本地机器上运行它和调试,但是,我唯一缺少的是正确配置日志记录,因为大多数弹性问题在log.warn语句中描述。问题是在elasticsearch-2.2.1客户端API中的BulkRequestHandler.java中的异常,它抛出了错误 - org.elasticsearch.action.ActionRequestValidationException:验证失败:1:缺少类型;正如我创建的索引,但不是一个类型,我觉得很奇怪,因为它应该主要关心索引,并默认创建类型。


I am trying to integrate Flink with Elasticsearch 2.1.1, I am using the maven dependency

     <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-elasticsearch2_2.10</artifactId>
        <version>1.1-SNAPSHOT</version>
    </dependency>

and here's the Java Code where I am reading the events from a Kafka queue (which works fine) but somehow the events are not getting posted in the Elasticsearch and there is no error either, in the below code if I change any of the settings related to port, hostname, cluster name or index name of ElasticSearch then immediately I see an error but currently it doesn't show any error nor any new documents get created in ElasticSearch

       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // parse user parameters
    ParameterTool parameterTool = ParameterTool.fromArgs(args);

    DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer082<>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties()));

    messageStream.print();

    Map<String, String> config = new HashMap<>();
    config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
    config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_INTERVAL_MS, "1");

    config.put("cluster.name", "FlinkDemo");

    List<InetSocketAddress> transports = new ArrayList<>();
    transports.add(new InetSocketAddress(InetAddress.getByName("localhost"), 9300));

    messageStream.addSink(new ElasticsearchSink<String>(config, transports, new TestElasticsearchSinkFunction()));

    env.execute();
}
private static class TestElasticsearchSinkFunction implements ElasticsearchSinkFunction<String> {
    private static final long serialVersionUID = 1L;

    public IndexRequest createIndexRequest(String element) {
        Map<String, Object> json = new HashMap<>();
        json.put("data", element);

        return Requests.indexRequest()
                .index("flink").id("hash"+element).source(json);
    }

    @Override
    public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
        indexer.add(createIndexRequest(element));
    }
}

解决方案

I was indeed running it on the local machine and debugging as well but, the only thing I was missing is to properly configure logging, as most of elastic issues are described in "log.warn" statement. The issue was the exception inside "BulkRequestHandler.java" in elasticsearch-2.2.1 client API, which was throwing the error -"org.elasticsearch.action.ActionRequestValidationException: Validation Failed: 1: type is missing;" As I had created the index but not an type which I find pretty strange as it should be primarily be concerned with index and create the type by default.

这篇关于Apache Flink与Elasticsearch集成的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆