如何从地图/过滤器/等执行异步操作(即返回未来)? [英] How to execute async operations (i.e. returning a Future) from map/filter/etc.?

查看:23
本文介绍了如何从地图/过滤器/等执行异步操作(即返回未来)?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 DataSet.map 操作,需要从外部 REST API 中提取数据.

I have a DataSet.map operation that needs to pull data in from an external REST API.

REST API 客户端返回一个 Future[Int].

The REST API client returns a Future[Int].

是否可以让 DataSet.map 操作以某种方式异步等待这个 Future?或者我是否需要使用 Await.result 来阻塞线程?或者这只是没有完成的事情......即我应该尝试将API保存的数据加载到它自己的DataSet中,然后执行join?

Is it possible to have the DataSet.map operation somehow await this Future asynchronously? Or will I need to block the thread using Await.result? Or is this just not the done thing... i.e. should I instead try and load the data held by the API into a DataSet of its own, and perform a join?

提前致谢!

不同于:使用异步 HTTP 调用的 Spark 作业

原因:这个问题可以讨论如何以不同的方式解决问题,例如,使用第二个 DataSet 和一个 join 代替.此外,链接的问题没有明确回答 Spark 是否可以处理异步转换 - 如果可以 - 它们应该如何构建.

Reason: This question is open to discussing how to solve the problem differently, say, using a second DataSet and a join instead. Furthermore, the linked question contains no definitive answer as to whether Spark can handle asynchronous transformations - and if it can - how they should be structured.

推荐答案

这是一个有趣的问题(我认为这也不是另一个问题的重复).

It's an interesting question (that I don't think is a duplicate of the other question either).

是的,您可以提交 Spark 作业,也就是说 Spark 作业将异步执行(让主调用线程在调用后可以自由地做它想做的任何事情).这是 SparkContext.submitJob.

Yes, you can submit Spark jobs which is to say that the Spark jobs are going to be executed asynchronously (leaving the main calling thread free to do whatever it wants after the call). This is SparkContext.submitJob.

是的,您可以使用相同的 SparkContext 从多个线程同时运行 Spark 作业,即 SparkContext 是线程安全的.

Yes, you can run Spark jobs simultaneously from multiple threads using the very same SparkContext, i.e. SparkContext is thread-safe.

鉴于这两个选项,您可以拥有一个线程池(使用 java.util.concurrent.Executors) 并执行 Spark 作业,这些作业依次执行异步操作,例如从返回 Future[Int] 的外部 REST API 中提取数据."

Given the two options, you can have a thread pool (using java.util.concurrent.Executors) and execute Spark jobs that in turn execute an asynchronous action, say "pull data in from an external REST API that returns a Future[Int]."

现在,这部分与 Spark 无关.您希望如何获得 Future[Int] 结果的通知取决于您.您可以 Await 或仅注册一个回调以在 SuccessFailure 发生时被调用.这取决于您,与 Spark 无关.

Now, this part has nothing to do with Spark. How you want to get notified about the result of a Future[Int] is up to you. You can Await or just register a callback to get called when a Success or a Failure happen. It's up to you and have nothing to do with Spark.

重要的是您将如何提交或运行 Spark 作业,因为仅 map 无法做到这一点.map 是一个转换.我宁愿使用 foreachPartition 而不是进行外部调用.

What does matter is how you're going to submit or run a Spark job since map alone won't do this. map is a transformation. I'd rather use foreachPartition instead that would do the external call.

这篇关于如何从地图/过滤器/等执行异步操作(即返回未来)?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆