我如何CSV文件转换为RDD [英] How do I convert csv file to rdd
问题描述
我是新来的火花。我想在CSV记录上执行特定数据的一些操作。
I'm new to spark. I want to perform some operations on particular data in a CSV record.
我想读一个CSV文件,并将其转换为RDD。我进一步的操作是基于CSV文件中提供的标题。
I'm trying to read a CSV file and convert it to RDD. My further operations are based on the heading provided in CSV file.
(从评论)
这是我的code迄今:
(From comments) This is my code so far:
final JavaRDD<String> File = sc.textFile(Filename).cache();
final JavaRDD<String> lines = File.flatMap(new FlatMapFunction<String, String>() {
@Override public Iterable<String> call(String s) {
return Arrays.asList(EOL.split(s));
}
});
final String heading=lines.first().toString();
我能得到这样的标头值。我想这个映射到CSV文件中的每个记录。
I can get the header values like this. I want to map this to each record in CSV file.
final String[] header=heading.split(" ");
我能得到这样的标头值。我想这个映射到CSV文件中的每个记录。
I can get the header values like this. I want to map this to each record in CSV file.
在java中我使用 CSVReader record.getColumnValue(列标题)
来获取特定值。我需要做类似的东西在这里。
In java I’m using CSVReader record.getColumnValue(Column header)
to get the particular value. I need to do something similar to that here.
推荐答案
一个简单的办法是有办法preserve头。
A simplistic approach would be to have a way to preserve the header.
让我们假设你有一个像FILE.CSV:
Let's say you have a file.csv like:
user, topic, hits
om, scala, 120
daniel, spark, 80
3754978, spark, 1
我们可以定义使用第一行的分析版本一个头类:
We can define a header class that uses a parsed version of the first row:
class SimpleCSVHeader(header:Array[String]) extends Serializable {
val index = header.zipWithIndex.toMap
def apply(array:Array[String], key:String):String = array(index(key))
}
这是我们可以进一步使用头,以解决数据在路上:
That we can use that header to address the data further down the road:
val csv = sc.textFile("file.csv") // original file
val data = csv.map(line => line.split(",").map(elem => elem.trim)) //lines in rows
val header = new SimpleCSVHeader(data.take(1)(0)) // we build our header with the first line
val rows = data.filter(line => header(line,"user") != "user") // filter the header out
val users = rows.map(row => header(row,"user")
val usersByHits = rows.map(row => header(row,"user") -> header(row,"hits").toInt)
...
请注意,头
不是比助记符数组索引的一个简单的地图等等。 pretty多本都可能造成对阵列,喜欢在序地方元素来完成用户=行(0)
Note that the header
is not much more than a simple map of a mnemonic to the array index. Pretty much all this could be done on the ordinal place of the element in the array, like user = row(0)
PS:欢迎斯卡拉: - )
PS: Welcome to Scala :-)
这篇关于我如何CSV文件转换为RDD的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!