什么是在Spark MapTask中调用http请求并读取inputstream的有效方法 [英] Whats the Efficient way to call http request and read inputstream in spark MapTask
问题描述
请参见下面的代码示例
JavaRDD<String> mapRDD = filteredRecords
.map(new Function<String, String>() {
@Override
public String call(String url) throws Exception {
BufferedReader in = null;
URL formatURL = new URL((url.replaceAll("\"", ""))
.trim());
try {
HttpURLConnection con = (HttpURLConnection) formatURL
.openConnection();
in = new BufferedReader(new InputStreamReader(con
.getInputStream()));
return in.readLine();
} finally {
if (in != null) {
in.close();
}
}
}
});
此处url是http GET请求.例子
here url is http GET request. example
http://ip:port/cyb/test?event=movie&id=604568837&name=SID×tamp_secs=1460494800×tamp_millis=1461729600000&back_up_id=676700166
这段代码非常慢. IP和端口是随机的,负载是分布式的,因此ip的port可以有20个不同的值,所以我看不到瓶颈.
This piece of code is very slow . IP and port are random and load is distributed so ip can have 20 different value with port so I dont see bottleneck .
当我发表评论
in = new BufferedReader(new InputStreamReader(con
.getInputStream()));
return in.readLine();
代码太快. 注意:要处理的输入数据为10GB.使用spark读取S3.
The code is too fast. NOTE: Input data to process is 10GB. Using spark to read from S3.
我在使用BufferedReader或InputStreamReader任何替代方法时有什么问题吗? 我无法在spark中使用foreach,因为我必须从服务器获取响应,并且需要将JAVARdd另存为HDFS上的textFile.
is there anything wrong I am doing with BufferedReader or InputStreamReader any alternative . I cant use foreach in spark as I have to get the response back from server and need to save JAVARdd as textFile on HDFS.
如果我们使用mappartition代码,如下所示
if we use mappartition code something as below
JavaRDD<String> mapRDD = filteredRecords.mapPartitions(new FlatMapFunction<Iterator<String>, String>() {
@Override
public Iterable<String> call(Iterator<String> tuple) throws Exception {
final List<String> rddList = new ArrayList<String>();
Iterable<String> iterable = new Iterable<String>() {
@Override
public Iterator<String> iterator() {
return rddList.iterator();
}
};
while(tuple.hasNext()) {
URL formatURL = new URL((tuple.next().replaceAll("\"", ""))
.trim());
HttpURLConnection con = (HttpURLConnection) formatURL
.openConnection();
try(BufferedReader br = new BufferedReader(new InputStreamReader(con
.getInputStream()))) {
rddList.add(br.readLine());
} catch (IOException ex) {
return rddList;
}
}
return iterable;
}
});
对于每条记录,我们也在做同样的事情..不是吗?
here also for each record we are doing same .. isnt it ?
推荐答案
当前您正在使用
地图功能
会为分区中的每一行创建一个url请求.
which creates a url request for each row in the partition.
您可以使用
mapPartition
mapPartition
这将使代码运行更快,因为它仅创建一次与服务器的连接,即每个分区只有一个连接.
Which will make the code run faster as it creates connection to the server only once , that is only one connection per partition.
这篇关于什么是在Spark MapTask中调用http请求并读取inputstream的有效方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!