Apache Flink:在 MapReduce() 中正确进行异步网络服务调用 [英] Apache Flink: Correctly make async webservice calls within MapReduce()

查看:29
本文介绍了Apache Flink:在 MapReduce() 中正确进行异步网络服务调用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个具有以下 mapPartition 函数的程序:

I've a program with the following mapPartition function:

public void mapPartition(Iterable<Tuple> values, Collector<Tuple2<Integer, String>> out)

我从输入的 values 中收集 100 个批次 &将它们发送到网络服务进行转换.我将结果添加回 out 集合.

I collect batches of 100 from the inputted values & send them to a web-service for conversion. The result I add back to the out collection.

为了加快进程,我通过使用 Executors 进行了网络服务调用 async.这造成了问题,要么我得到 taskManager 发布异常,或 AskTimeoutException.我增加了内存 &超时,但它没有帮助.有相当多的输入数据.我相信这会导致许多作业与 ExecutorService & 一起排队.因此占用大量内存.

In order to speed up the process, I made the web-service calls async through the use of Executors. This created issues, either I get the taskManager released exception, or AskTimeoutException. I increased memory & timeouts, but it didn't help. There's quite a lot of input data. I believe this resulted in a lot of jobs being queued up with ExecutorService & hence taking up lots of memory.

最好的方法是什么?

我也在查看 taskManager 与 taskSlot 的配置,但对两者之间的差异有点困惑(我猜它们类似于进程与线程?).不确定在什么时候增加 taskManagers 与 taskSlots?例如如果我有三台机器,每台机器有 4cpus,那么我的 taskManager=3taskSlot=4 是否应该?

I was also looking at the taskManager vs taskSlot configuration, but got a little confused on the differences between the two (I guess they're similar to process vs threads?). Wasn't sure at what point do I increase the taskManagers vs taskSlots? e.g. if I've got three machines with 4cpus per machine, so then should my taskManager=3 while my taskSlot=4?

我还考虑单独增加 mapPartition 的并行性,例如 10 以获得更多线程访问网络服务.意见或建议?

I was also considering increasing the mapPartition's parallelism alone to say 10 to get more threads hitting the web-service. Comments or suggestions?

推荐答案

您应该查看 Flink Asyncio 这将使您能够在流应用程序中以异步方式查询您的网络服务.

You should check out Flink Asyncio which would enable you to query your webservice in an asynchronous way in your streaming application.

需要注意的一点是,Asyncio 函数不是多线程调用,而是按顺序对每个分区的每条记录调用一次,因此您的 Web 应用程序需要确定性地返回并可能快速返回,以免作业被拖延.

One thing to note is that the Asyncio function is not called multithreaded and is called once per record per partition sequentially, so your web application needs to deterministically return and potentially return fast for the job to not being held up.

此外,潜在的更多分区数将有助于您的情况,但您的网络服务需要足够快地满足这些请求

Also, potentially higher number of partitions would help your case but again your webservice needs to fulfil those requests fast enough

来自 Flinks 网站的示例代码块:

Sample code block from Flinks Website:

// This example implements the asynchronous request and callback with Futures that have the
// interface of Java 8's futures (which is the same one followed by Flink's Future)

/**
 * An implementation of the 'AsyncFunction' that sends requests and sets the callback.
 */
class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> {

/** The database specific client that can issue concurrent requests with callbacks */
private transient DatabaseClient client;

@Override
public void open(Configuration parameters) throws Exception {
    client = new DatabaseClient(host, post, credentials);
}

@Override
public void close() throws Exception {
    client.close();
}

@Override
public void asyncInvoke(final String str, final AsyncCollector<Tuple2<String, String>> asyncCollector) throws Exception {

    // issue the asynchronous request, receive a future for result
    Future<String> resultFuture = client.query(str);

    // set the callback to be executed once the request by the client is complete
    // the callback simply forwards the result to the collector
    resultFuture.thenAccept( (String result) -> {

        asyncCollector.collect(Collections.singleton(new Tuple2<>(str, result)));

    });
  }
}

// create the original stream (In your case the stream you are mappartitioning)
DataStream<String> stream = ...;

// apply the async I/O transformation
DataStream<Tuple2<String, String>> resultStream =
AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);

由于用户想要创建大小为 100 的批次,而 asyncio 目前特定于 Streaming API,因此最好的方法是创建大小为 100 的计数窗口.

As the user wants to create batches of size 100 and asyncio is specific to Streaming API for the moment, thus the best way would be to create countwindows with size 100.

此外,要清除可能没有 100 个事件的最后一个窗口,自定义 触发器 可以与计数触发器和基于时间的触发器结合使用,以便触发器在元素计数或每隔几分钟后触发.

Also, to purge the last window which might not have 100 events, custom Triggers could be used with a combination of Count Triggers and Time Based Triggers such that the trigger fires after a count of elements or after every few minutes.

Flink 邮件列表,其中用户Kostya"创建了一个可用的自定义触发器这里

A good follow up is available here on Flink Mailing List where the user "Kostya" created a custom trigger which is available here

这篇关于Apache Flink:在 MapReduce() 中正确进行异步网络服务调用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆