Apache Flink 与 Elasticsearch 的集成 [英] Apache Flink integration with Elasticsearch
问题描述
我正在尝试将 Flink 与 Elasticsearch 2.1.1 集成,我正在使用 maven 依赖项
I am trying to integrate Flink with Elasticsearch 2.1.1, I am using the maven dependency
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch2_2.10</artifactId>
<version>1.1-SNAPSHOT</version>
</dependency>
这是我从 Kafka 队列中读取事件的 Java 代码(工作正常)但不知何故这些事件没有在 Elasticsearch 中发布并且也没有错误,如果我更改了以下代码中的任何一个与 ElasticSearch 的端口、主机名、集群名称或索引名称相关的设置,然后我立即看到一个错误,但目前它没有显示任何错误,也没有在 ElasticSearch 中创建任何新文档
and here's the Java Code where I am reading the events from a Kafka queue (which works fine) but somehow the events are not getting posted in the Elasticsearch and there is no error either, in the below code if I change any of the settings related to port, hostname, cluster name or index name of ElasticSearch then immediately I see an error but currently it doesn't show any error nor any new documents get created in ElasticSearch
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// parse user parameters
ParameterTool parameterTool = ParameterTool.fromArgs(args);
DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer082<>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties()));
messageStream.print();
Map<String, String> config = new HashMap<>();
config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_INTERVAL_MS, "1");
config.put("cluster.name", "FlinkDemo");
List<InetSocketAddress> transports = new ArrayList<>();
transports.add(new InetSocketAddress(InetAddress.getByName("localhost"), 9300));
messageStream.addSink(new ElasticsearchSink<String>(config, transports, new TestElasticsearchSinkFunction()));
env.execute();
}
private static class TestElasticsearchSinkFunction implements ElasticsearchSinkFunction<String> {
private static final long serialVersionUID = 1L;
public IndexRequest createIndexRequest(String element) {
Map<String, Object> json = new HashMap<>();
json.put("data", element);
return Requests.indexRequest()
.index("flink").id("hash"+element).source(json);
}
@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
}
推荐答案
我确实在本地机器上运行它并进行调试,但是,我唯一缺少的是正确配置日志记录,因为大多数弹性问题都是在log.warn"语句中描述.问题是 elasticsearch-2.2.1 客户端 API 中BulkRequestHandler.java"中的异常,它抛出错误 -org.elasticsearch.action.ActionRequestValidationException: Validation Failed: 1: type is missing;"因为我创建了索引但不是我觉得很奇怪的类型,因为它应该主要关注索引并默认创建类型.
I was indeed running it on the local machine and debugging as well but, the only thing I was missing is to properly configure logging, as most of elastic issues are described in "log.warn" statement. The issue was the exception inside "BulkRequestHandler.java" in elasticsearch-2.2.1 client API, which was throwing the error -"org.elasticsearch.action.ActionRequestValidationException: Validation Failed: 1: type is missing;" As I had created the index but not an type which I find pretty strange as it should be primarily be concerned with index and create the type by default.
这篇关于Apache Flink 与 Elasticsearch 的集成的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!