通过异步HTTP调用进行Spark作业 [英] Spark job with Async HTTP call

查看:412
本文介绍了通过异步HTTP调用进行Spark作业的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我从URL列表中构建了一个RDD,然后尝试通过一些异步http调用来获取数据. 在进行其他计算之前,我需要所有结果. 理想情况下,出于扩展考虑,我需要在differents节点上进行http调用.

I build a RDD from a list of urls, and then try to fetch datas with some async http call. I need all the results before doing other calculs. Ideally, I need to make the http calls on differents nodes for scaling considerations.

我做了这样的事情:

//init spark
val sparkContext = new SparkContext(conf)
val datas = Seq[String]("url1", "url2")

//create rdd
val rdd = sparkContext.parallelize[String](datas)

//httpCall return Future[String]
val requests = rdd.map((url: String) => httpCall(url))

//await all results (Future.sequence may be better)
val responses = requests.map(r => Await.result(r, 10.seconds))

//print responses
response.collect().foreach((s: String) => println(s))

//stop spark
sparkContext.stop()

这项工作,但Spark作业从未完成!

This work, but Spark job never finish !

所以我想知道使用Spark(或Future [RDD])处理Future的最佳实践是什么.

So I wonder what is are the best practices for dealing with Future using Spark (or Future[RDD]).

我认为这个用例看起来很普通,但是还没有找到答案.

I think this use case looks pretty common, but didn't find any answer yet.

最诚挚的问候

推荐答案

这种用例看起来很普通

this use case looks pretty common

并非如此,因为它根本无法按您(可能)所期望的那样工作.由于每个任务都在标准Scala Iterators上进行操作,因此这些操作将被压缩在一起.这意味着实际上所有操作都将被阻止.假设您有三个URL ["x","y","z"],则将按以下顺序执行代码:

Not really, because it simply doesn't work as you (probably) expect. Since each task operates on standard Scala Iterators these operations will be squashed together. It means that all operations will be blocking in practice. Assuming you have three URLs ["x", "y", "z"] you code will be executed in a following order:

Await.result(httpCall("x", 10.seconds))
Await.result(httpCall("y", 10.seconds))
Await.result(httpCall("z", 10.seconds))

您可以轻松地在本地重现相同的行为.如果要异步执行代码,则应使用mapPartitions:

You can easily reproduce the same behavior locally. If you want to execute your code asynchronously you should handle this explicitly using mapPartitions:

rdd.mapPartitions(iter => {
  ??? // Submit requests
  ??? // Wait until all requests completed and return Iterator of results
})

但这比较棘手.无法保证给定分区的所有数据都适合内存,因此您可能还需要一些批处理机制.

but this is relatively tricky. There is no guarantee all data for a given partition fits into memory so you'll probably need some batching mechanism as well.

所有这些,我无法重述您所描述的问题,可能是某些配置问题,或者是httpCall本身的问题.

All of that being said I couldn't reproduce the problem you've described to is can be some configuration issue or a problem with httpCall itself.

另一方面,允许单个超时杀死整个任务似乎不是一个好主意.

On a side note allowing a single timeout to kill whole task doesn't look like a good idea.

这篇关于通过异步HTTP调用进行Spark作业的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆