Elasticsearch Connector 作为 Flink 中的源 [英] Elasticsearch Connector as Source in Flink
问题描述
我使用 Elasticsearch Connector 作为 Sink 将数据插入 Elasticsearch(参见:https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/elasticsearch.html).
I used Elasticsearch Connector as a Sink to insert data into Elasticsearch (see : https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/elasticsearch.html).
但是,我没有找到任何连接器来从 Elasticsearch 获取数据作为源.
But, I did not found any connector to get data from Elasticsearch as source.
是否有任何连接器或示例可以在 Flink 管道中使用 Elasticsearch 文档作为源?
Is there any connector or example to use Elasticsearch documents as source in a Flink pipline?
问候,
阿里
推荐答案
我最终定义了一个简单的从 ElasticSearch 读取的函数
I finaly defined a simple read from ElasticSearch function
public static class ElasticsearchFunction
extends ProcessFunction<MetricMeasurement, MetricPrediction> {
public ElasticsearchFunction() throws UnknownHostException {
client = new PreBuiltTransportClient(settings)
.addTransportAddress(new TransportAddress(InetAddress.getByName("YOUR_IP"), PORT_NUMBER));
}
@Override
public void processElement(MetricMeasurement in, Context context, Collector<MetricPrediction> out) throws Exception {
MetricPrediction metricPrediction = new MetricPrediction();
metricPrediction.setMetricId(in.getMetricId());
metricPrediction.setGroupId(in.getGroupId());
metricPrediction.setBucket(in.getBucket());
// Get the metric measurement from Elasticsearch
SearchResponse response = client.prepareSearch("YOUR_INDEX_NAME")
.setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
.setQuery(QueryBuilders.termQuery("YOUR_TERM", in.getMetricId())) // Query
.setPostFilter(QueryBuilders.rangeQuery("value").from(0L).to(50L)) // Filter
.setFrom(0).setSize(1).setExplain(true)
.get();
SearchHit[] results = response.getHits().getHits();
for(SearchHit hit : results){
String sourceAsString = hit.getSourceAsString();
if (sourceAsString != null) {
ObjectMapper mapper = new ObjectMapper();
MetricMeasurement obj = mapper.readValue(sourceAsString, MetricMeasurement.class);
obj.getMetricId();
metricPrediction.setPredictionValue(obj.getValue());
}
}
out.collect(metricPrediction);
}
}
这篇关于Elasticsearch Connector 作为 Flink 中的源的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!