Apache Flink:在MapReduce()中正确进行异步Web服务调用 [英] Apache Flink: Correctly make async webservice calls within MapReduce()

查看:102
本文介绍了Apache Flink:在MapReduce()中正确进行异步Web服务调用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个具有以下mapPartition函数的程序:

I've a program with the following mapPartition function:

public void mapPartition(Iterable<Tuple> values, Collector<Tuple2<Integer, String>> out)

我从输入的values&将它们发送到网络服务进行转换.我将结果添加回out集合.

I collect batches of 100 from the inputted values & send them to a web-service for conversion. The result I add back to the out collection.

为了加快处理过程,我通过使用Executors进行了Web服务调用async.这造成了问题,或者我得到了

In order to speed up the process, I made the web-service calls async through the use of Executors. This created issues, either I get the taskManager released exception, or AskTimeoutException. I increased memory & timeouts, but it didn't help. There's quite a lot of input data. I believe this resulted in a lot of jobs being queued up with ExecutorService & hence taking up lots of memory.

什么是最好的方法?

我也正在查看taskManager与taskSlot的配置,但对两者之间的区别有些困惑(我想它们与进程与线程相似吗?).不确定在什么时候增加taskManagers和taskSlots?例如如果我有三台机器,每台机器的4cpus,那么我的taskManager=3是否应该同时我的taskSlot=4?

I was also looking at the taskManager vs taskSlot configuration, but got a little confused on the differences between the two (I guess they're similar to process vs threads?). Wasn't sure at what point do I increase the taskManagers vs taskSlots? e.g. if I've got three machines with 4cpus per machine, so then should my taskManager=3 while my taskSlot=4?

我也在考虑单独增加mapPartition的并行度,比如说10,以使更多线程访问Web服务.评论或建议?

I was also considering increasing the mapPartition's parallelism alone to say 10 to get more threads hitting the web-service. Comments or suggestions?

推荐答案

您应该查看

You should check out Flink Asyncio which would enable you to query your webservice in an asynchronous way in your streaming application.

要注意的一件事是Asyncio函数没有被称为多线程,而是每个分区的每个记录依次被调用一次,因此您的Web应用程序需要确定性地返回并可能快速返回,以使该任务不会被搁置.

One thing to note is that the Asyncio function is not called multithreaded and is called once per record per partition sequentially, so your web application needs to deterministically return and potentially return fast for the job to not being held up.

另外,可能会有更多数量的分区会帮助您解决问题,但是您的Web服务再次需要足够快地满足这些请求

Also, potentially higher number of partitions would help your case but again your webservice needs to fulfil those requests fast enough

Flinks网站上的示例代码块:

Sample code block from Flinks Website:

// This example implements the asynchronous request and callback with Futures that have the
// interface of Java 8's futures (which is the same one followed by Flink's Future)

/**
 * An implementation of the 'AsyncFunction' that sends requests and sets the callback.
 */
class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> {

/** The database specific client that can issue concurrent requests with callbacks */
private transient DatabaseClient client;

@Override
public void open(Configuration parameters) throws Exception {
    client = new DatabaseClient(host, post, credentials);
}

@Override
public void close() throws Exception {
    client.close();
}

@Override
public void asyncInvoke(final String str, final AsyncCollector<Tuple2<String, String>> asyncCollector) throws Exception {

    // issue the asynchronous request, receive a future for result
    Future<String> resultFuture = client.query(str);

    // set the callback to be executed once the request by the client is complete
    // the callback simply forwards the result to the collector
    resultFuture.thenAccept( (String result) -> {

        asyncCollector.collect(Collections.singleton(new Tuple2<>(str, result)));

    });
  }
}

// create the original stream (In your case the stream you are mappartitioning)
DataStream<String> stream = ...;

// apply the async I/O transformation
DataStream<Tuple2<String, String>> resultStream =
AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);

由于用户要创建大小为100的批次,并且asyncio目前特定于Streaming API,因此最好的方法是创建大小为100的计数窗口.

As the user wants to create batches of size 100 and asyncio is specific to Streaming API for the moment, thus the best way would be to create countwindows with size 100.

此外,要清除可能没有100个事件的最后一个窗口,请自定义

Also, to purge the last window which might not have 100 events, custom Triggers could be used with a combination of Count Triggers and Time Based Triggers such that the trigger fires after a count of elements or after every few minutes.

此处

A good follow up is available here on Flink Mailing List where the user "Kostya" created a custom trigger which is available here

这篇关于Apache Flink:在MapReduce()中正确进行异步Web服务调用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆