什么是在Spark MapTask中调用http请求并读取inputstream的有效方法 [英] Whats the Efficient way to call http request and read inputstream in spark MapTask

查看:402
本文介绍了什么是在Spark MapTask中调用http请求并读取inputstream的有效方法的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

请参见下面的代码示例

JavaRDD<String> mapRDD = filteredRecords
            .map(new Function<String, String>() {

                @Override
                public String call(String url) throws Exception {
                    BufferedReader in = null;
                    URL formatURL = new URL((url.replaceAll("\"", ""))
                            .trim());
                    try {
                        HttpURLConnection con = (HttpURLConnection) formatURL
                                .openConnection();
                        in = new BufferedReader(new InputStreamReader(con
                                .getInputStream()));

                        return in.readLine();
                    } finally {
                        if (in != null) {
                            in.close();
                        }
                    }
                }
            }); 

此处url是http GET请求.例子

here url is http GET request. example

http://ip:port/cyb/test?event=movie&id=604568837&name=SID&timestamp_secs=1460494800&timestamp_millis=1461729600000&back_up_id=676700166

这段代码非常慢. IP和端口是随机的,负载是分布式的,因此ip的port可以有20个不同的值,所以我看不到瓶颈.

This piece of code is very slow . IP and port are random and load is distributed so ip can have 20 different value with port so I dont see bottleneck .

当我发表评论

 in = new BufferedReader(new InputStreamReader(con
                            .getInputStream()));

                    return in.readLine();

代码太快. 注意:要处理的输入数据为10GB.使用spark读取S3.

The code is too fast. NOTE: Input data to process is 10GB. Using spark to read from S3.

我在使用BufferedReader或InputStreamReader任何替代方法时有什么问题吗? 我无法在spark中使用foreach,因为我必须从服务器获取响应,并且需要将JAVARdd另存为HDFS上的textFile.

is there anything wrong I am doing with BufferedReader or InputStreamReader any alternative . I cant use foreach in spark as I have to get the response back from server and need to save JAVARdd as textFile on HDFS.

如果我们使用mappartition代码,如下所示

if we use mappartition code something as below

JavaRDD<String> mapRDD = filteredRecords.mapPartitions(new FlatMapFunction<Iterator<String>, String>() {

        @Override
        public Iterable<String> call(Iterator<String> tuple) throws Exception {

            final List<String> rddList = new ArrayList<String>();
            Iterable<String> iterable = new Iterable<String>() {

                @Override
                public Iterator<String> iterator() {
                    return rddList.iterator();
                }
            };
            while(tuple.hasNext()) {
                URL formatURL = new URL((tuple.next().replaceAll("\"", ""))
                        .trim());
                HttpURLConnection con = (HttpURLConnection) formatURL
                        .openConnection();
                try(BufferedReader br = new BufferedReader(new InputStreamReader(con
                        .getInputStream()))) {

                    rddList.add(br.readLine());

                } catch (IOException ex) {
                    return rddList;
                }
            }
            return iterable;
        }
    }); 

对于每条记录,我们也在做同样的事情..不是吗?

here also for each record we are doing same .. isnt it ?

推荐答案

当前您正在使用

地图功能

会为分区中的每一行创建一个url请求.

which creates a url request for each row in the partition.

您可以使用

mapPartition

mapPartition

这将使代码运行更快,因为它仅创建一次与服务器的连接,即每个分区只有一个连接.

Which will make the code run faster as it creates connection to the server only once , that is only one connection per partition.

这篇关于什么是在Spark MapTask中调用http请求并读取inputstream的有效方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆