使用Apache Spark和Java将CSV解析为DataFrame / DataSet [英] Parse CSV as DataFrame/DataSet with Apache Spark and Java
问题描述
我是新来的火花,我想使用group-by&缩小以从CSV中找到以下内容(按雇用的一行):
I am new to spark, and I want to use group-by & reduce to find the following from CSV (one line by employed):
Department, Designation, costToCompany, State
Sales, Trainee, 12000, UP
Sales, Lead, 32000, AP
Sales, Lead, 32000, LA
Sales, Lead, 32000, TN
Sales, Lead, 32000, AP
Sales, Lead, 32000, TN
Sales, Lead, 32000, LA
Sales, Lead, 32000, LA
Marketing, Associate, 18000, TN
Marketing, Associate, 18000, TN
HR, Manager, 58000, TN
我想简化通过部门,指定,州分组的CSV,其他列包含 sum(costToCompany)和 TotalEmployeeCount
I would like to simplify the about CSV with group by Department, Designation, State with additional columns with sum(costToCompany) and TotalEmployeeCount
应该得到如下结果:
Dept, Desg, state, empCount, totalCost
Sales,Lead,AP,2,64000
Sales,Lead,LA,3,96000
Sales,Lead,TN,2,64000
有没有办法实现这个用途变换和行动。或者我们应该进行RDD操作吗?
Is there any way to achieve this using transformations and actions. Or should we go for RDD operations?
推荐答案
程序
-
创建一个类(模式)来封装你的结构(它不是方法B所必需的,但如果你使用Java,它会让你的代码更容易阅读)
Procedure
Create a Class (Schema) to encapsulate your structure (it’s not required for the approach B, but it would make your code easier to read if you are using Java)
public class Record implements Serializable { String department; String designation; long costToCompany; String state; // constructor , getters and setters }
-
加载CVS (JSON)文件
Loading CVS (JSON) file
JavaSparkContext sc; JavaRDD<String> data = sc.textFile("path/input.csv"); //JavaSQLContext sqlContext = new JavaSQLContext(sc); // For previous versions SQLContext sqlContext = new SQLContext(sc); // In Spark 1.3 the Java API and Scala API have been unified JavaRDD<Record> rdd_records = sc.textFile(data).map( new Function<String, Record>() { public Record call(String line) throws Exception { // Here you can use JSON // Gson gson = new Gson(); // gson.fromJson(line, Record.class); String[] fields = line.split(","); Record sd = new Record(fields[0], fields[1], fields[2].trim(), fields[3]); return sd; } });
-
注册表(使用您定义的架构类)
Register a table (using the your defined Schema Class)
此时你有两种方法:
JavaSchemaRDD table = sqlContext.applySchema(rdd_records, Record.class);
table.registerAsTable("record_table");
table.printSchema();
使用所需的Query-group-by
Query the table with your desired Query-group-by
JavaSchemaRDD res = sqlContext.sql("
select department,designation,state,sum(costToCompany),count(*)
from record_table
group by department,designation,state
");
在这里,您还可以使用SQL方法进行任何其他查询
Here you would also be able to do any other query you desire, using a SQL approach
-
使用复合键进行映射:
Department
,指定
,州
JavaPairRDD<String, Tuple2<Long, Integer>> records_JPRDD =
rdd_records.mapToPair(new
PairFunction<Record, String, Tuple2<Long, Integer>>(){
public Tuple2<String, Tuple2<Long, Integer>> call(Record record){
Tuple2<String, Tuple2<Long, Integer>> t2 =
new Tuple2<String, Tuple2<Long,Integer>>(
record.Department + record.Designation + record.State,
new Tuple2<Long, Integer>(record.costToCompany,1)
);
return t2;
}
});
使用复合键的reduceByKey,求和 costToCompany
列,并按键累计记录数
reduceByKey using the composite key, summing costToCompany
column, and accumulating the number of records by key
JavaPairRDD<String, Tuple2<Long, Integer>> final_rdd_records =
records_JPRDD.reduceByKey(new Function2<Tuple2<Long, Integer>, Tuple2<Long,
Integer>, Tuple2<Long, Integer>>() {
public Tuple2<Long, Integer> call(Tuple2<Long, Integer> v1,
Tuple2<Long, Integer> v2) throws Exception {
return new Tuple2<Long, Integer>(v1._1 + v2._1, v1._2+ v2._2);
}
});
这篇关于使用Apache Spark和Java将CSV解析为DataFrame / DataSet的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!