如何从map/filter/etc执行异步操作(即返回Future)? [英] How to execute async operations (i.e. returning a Future) from map/filter/etc.?

查看:81
本文介绍了如何从map/filter/etc执行异步操作(即返回Future)?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 DataSet.map 操作,需要从外部REST API中提取数据.

I have a DataSet.map operation that needs to pull data in from an external REST API.

REST API客户端返回 Future [Int] .

The REST API client returns a Future[Int].

是否可以通过某种方式异步地等待 DataSet.map 操作此 Future ?还是我需要使用 Await.result 阻塞线程?还是这不是完成的事情……也就是说,我应该尝试将API持有的数据加载到其自身的 DataSet 中,然后执行 join 吗?

Is it possible to have the DataSet.map operation somehow await this Future asynchronously? Or will I need to block the thread using Await.result? Or is this just not the done thing... i.e. should I instead try and load the data held by the API into a DataSet of its own, and perform a join?

提前谢谢!

不同于:具有异步HTTP调用的火花作业

原因:这个问题可以讨论如何以不同的方式解决问题,例如,使用第二个 DataSet join .此外,链接的问题没有关于Spark是否可以处理异步转换(如果可以)以及应如何构造异步转换的明确答案.

Reason: This question is open to discussing how to solve the problem differently, say, using a second DataSet and a join instead. Furthermore, the linked question contains no definitive answer as to whether Spark can handle asynchronous transformations - and if it can - how they should be structured.

推荐答案

这是一个有趣的问题(我也不认为这是其他问题的重复).

It's an interesting question (that I don't think is a duplicate of the other question either).

是的,您可以提交Spark作业,这就是说Spark作业将异步执行(在调用后,主调用线程可以自由执行任何所需的操作).这是

Yes, you can submit Spark jobs which is to say that the Spark jobs are going to be executed asynchronously (leaving the main calling thread free to do whatever it wants after the call). This is SparkContext.submitJob.

是的,您可以使用相同的 SparkContext 从多个线程同时运行Spark作业,即 SparkContext 是线程安全的.

Yes, you can run Spark jobs simultaneously from multiple threads using the very same SparkContext, i.e. SparkContext is thread-safe.

鉴于这两个选项,您可以拥有一个线程池(使用 java.util.concurrent.Executors )并执行依次执行异步操作的Spark作业,例如说从外部REST API提取数据,该API返回Future [Int]."

Given the two options, you can have a thread pool (using java.util.concurrent.Executors) and execute Spark jobs that in turn execute an asynchronous action, say "pull data in from an external REST API that returns a Future[Int]."

现在,这部分与Spark无关.您希望如何获得有关 Future [Int] 的结果的通知取决于您.您可以 Await 或只是注册一个回调,以在发生成功 Failure 时被调用.这取决于您,并且与Spark无关.

Now, this part has nothing to do with Spark. How you want to get notified about the result of a Future[Int] is up to you. You can Await or just register a callback to get called when a Success or a Failure happen. It's up to you and have nothing to do with Spark.

重要的是您将如何提交或运行Spark作业,因为仅 map 不会做到这一点. map 是一种转换.我宁愿使用 foreachPartition 来代替外部调用.

What does matter is how you're going to submit or run a Spark job since map alone won't do this. map is a transformation. I'd rather use foreachPartition instead that would do the external call.

这篇关于如何从map/filter/etc执行异步操作(即返回Future)?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆