从用java火花CSV读取列 [英] Read column from csv with java spark
问题描述
我尝试阅读Java和火花的CSV。
现在我这样做:
字符串主=本地[2];
字符串csvInput =/home/username/Downloads/countrylist.csv;
字符串csvOutput =/家/用户名/下载/ countrylist JavaSparkContext SC =新JavaSparkContext(主,loadwholecsv,System.getenv(SPARK_HOME),System.getenv(JARS)); JavaRDD<串GT; csvData = sc.textFile(csvInput,1);
JavaRDD<名单,LT;弦乐>>线= csvData.map(新功能<字符串列表<串GT;>(){
@覆盖
公开名单<串GT;调用(String s)将{
返回新的ArrayList<串GT;(Arrays.asList(s.split(\\\\ S *,\\\\ S *)));
}
});
所以我有CSV文件在我RDD行的所有行。我也写了这个方法可以得到一列:
公共静态JavaRDD<串GT; getColumn(JavaRDD<名单,LT;弦乐>>的数据,最终诠释指数)
{
返回data.flatMap(
新FlatMapFunction<名单,LT;弦乐>中字符串>()
{
公众可迭代<串GT;调用(列表<串GT S)
{
返回Arrays.asList(s.get(索引));
}
}
);
}
但后来我想对很多列变换和更改列等,所以它会更容易有充满的列的ArrayList,而不是一个LINES RDD的位置。
有没有人一个想法如何实现这一目标?我不想称之为getColumn()n次。
将是巨大的,如果你能帮助我。
说明:
我csvData看起来是这样的:
一,二,三
四,五,六
七,八,九
我行RDD看起来是这样的:
[一,二,三]
[四,五,六]
[七,第八位,九]
但是我想这样的:
[一,四,七]
【二,五,八]
[三,六,九]
要做的map-reduce基于矩阵易位,这基本上是问什么,你会被进行:
-
将您行到索引元组:(提示:使用zipWithIndex和地图)
[(1,1-一种),(1,2,二),(1,3,三)]
[(2,1,四),(2,2,五),(2,3,六)]
[(3,1,七),(3,2,第八位),(3,3,9份)]
块引用> -
添加列作为关键每个元组:(提示:利用图)
[(1,(1,1,一种)),(2,(1,2,二)),(3,(1,3,三))]
[(1,(2,1,四)),(2,(2,2,五)),(3,(2,3,六))]
[(1,(3,1,七)),(2,(3,2,第八位)),(3,(3,3-,九))]
块引用> -
通过按键集团
[(1,[(3,1,七),(1,1,一种),(2,1,四)])]
[(2,[(1,2,二),(3,2,第八位),(2,2,五)])]
[(3,[(2,3-,六),(1,3,三),(3,3,9份))])]
块引用> -
排序值回为了并删除索引工件(提示:图)
[一,四,七]
[二,五,第八位]
[三,六,九]
块引用>
I try to read a csv with java and spark.
Now I do this:
String master = "local[2]";
String csvInput = "/home/username/Downloads/countrylist.csv";
String csvOutput = "/home/username/Downloads/countrylist";
JavaSparkContext sc = new JavaSparkContext(master, "loadwholecsv", System.getenv("SPARK_HOME"), System.getenv("JARS"));
JavaRDD<String> csvData = sc.textFile(csvInput, 1);
JavaRDD<List<String>> lines = csvData.map(new Function <String, List<String>>() {
@Override
public List<String> call(String s) {
return new ArrayList<String>(Arrays.asList(s.split("\\s*,\\s*")));
}
});
So I have all the "lines" of the csv-file as a line in my RDD. I also wrote this method for getting a column:
public static JavaRDD<String> getColumn (JavaRDD<List<String>> data, final int index)
{
return data.flatMap(
new FlatMapFunction <List<String>, String>()
{
public Iterable<String> call (List<String> s)
{
return Arrays.asList(s.get(index));
}
}
);
}
But later I want to do many transformations on columns and change position of columns etc. So it would be easier to have an RDD filled with the COLUMNS as Arraylists, not the LINES.
Has anyone an idea how to achieve this? I don't want to call "getColumn()" n-times.
Would be great if you can help me.
Explanation: My csvData looks like this:
one, two, three
four, five, six
seven, eight, nine
My lines RDD looks like this:
[one, two, three]
[four, five, six]
[seven, eigth, nine]
But I want this:
[one, four, seven]
[two, five, eight]
[three, six, nine]
To do a map-reduce based matrix transposal, which is basically what is being asked, you would proceed by:
Transform your lines into indexed tuples: (hint: use zipWithIndex and map)
[(1,1,one), (1,2,two), (1,3,three)] [(2,1,four), (2,2,five), (2,3,six)] [(3,1,seven), (3,2,eigth), (3,3,nine)]
Add the column as key to each tuple: (hint: use map)
[(1,(1,1,one)), (2,(1,2,two)), (3,(1,3,three))] [(1,(2,1,four)), (2,(2,2,five)),(3,(2,3,six))] [(1,(3,1,seven)), (2,(3,2,eigth)), (3,(3,3,nine))]
Group by key
[(1,[(3,1,seven), (1,1,one), (2,1,four)])] [(2,[(1,2,two), (3,2,eigth), (2,2,five)])] [(3,[,(2,3,six),(1,3,three), (3,3,nine))])]
Sort values back in order and remove the indexing artifacts (hint: map)
[ one, four, seven ] [ two, five, eigth ] [ three, six, nine ]
这篇关于从用java火花CSV读取列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!