如何将 csv 文件转换为 rdd [英] How do I convert csv file to rdd

查看:74
本文介绍了如何将 csv 文件转换为 rdd的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是新来的火花.我想对 CSV 记录中的特定数据执行一些操作.

I'm new to spark. I want to perform some operations on particular data in a CSV record.

我正在尝试读取 CSV 文件并将其转换为 RDD.我的进一步操作基于 CSV 文件中提供的标题.

I'm trying to read a CSV file and convert it to RDD. My further operations are based on the heading provided in CSV file.

(来自评论)到目前为止,这是我的代码:

(From comments) This is my code so far:

final JavaRDD<String> File = sc.textFile(Filename).cache();
final JavaRDD<String> lines = File.flatMap(new FlatMapFunction<String, String>() { 
    @Override public Iterable<String> call(String s) { 
    return Arrays.asList(EOL.split(s)); 
    } 
});
final String heading=lines.first().toString();

我可以得到这样的标头值.我想将此映射到 CSV 文件中的每条记录.

I can get the header values like this. I want to map this to each record in CSV file.

final String[] header=heading.split(" "); 

我可以得到这样的标头值.我想将此映射到 CSV 文件中的每条记录.

I can get the header values like this. I want to map this to each record in CSV file.

在 java 中,我使用 CSVReader record.getColumnValue(Column header) 来获取特定值.我需要在这里做类似的事情.

In java I’m using CSVReader record.getColumnValue(Column header) to get the particular value. I need to do something similar to that here.

推荐答案

一种简单的方法是保留标题.

A simplistic approach would be to have a way to preserve the header.

假设您有一个 file.csv,例如:

Let's say you have a file.csv like:

user, topic, hits
om,  scala, 120
daniel, spark, 80
3754978, spark, 1

我们可以定义一个使用第一行解析版本的头类:

We can define a header class that uses a parsed version of the first row:

class SimpleCSVHeader(header:Array[String]) extends Serializable {
  val index = header.zipWithIndex.toMap
  def apply(array:Array[String], key:String):String = array(index(key))
}

我们可以使用该标头来进一步处理数据:

That we can use that header to address the data further down the road:

val csv = sc.textFile("file.csv")  // original file
val data = csv.map(line => line.split(",").map(elem => elem.trim)) //lines in rows
val header = new SimpleCSVHeader(data.take(1)(0)) // we build our header with the first line
val rows = data.filter(line => header(line,"user") != "user") // filter the header out
val users = rows.map(row => header(row,"user")
val usersByHits = rows.map(row => header(row,"user") -> header(row,"hits").toInt)
...

请注意,header 只不过是一个简单的助记符到数组索引的映射.几乎所有这些都可以在数组中元素的序数位置完成,例如 user = row(0)

Note that the header is not much more than a simple map of a mnemonic to the array index. Pretty much all this could be done on the ordinal place of the element in the array, like user = row(0)

PS:欢迎使用 Scala :-)

PS: Welcome to Scala :-)

这篇关于如何将 csv 文件转换为 rdd的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆