使用 Apache Spark 和 Java 将 CSV 解析为 DataFrame/DataSet [英] Parse CSV as DataFrame/DataSet with Apache Spark and Java
问题描述
我是 Spark 新手,我想使用 group-by &减少从 CSV 中找到以下内容(受雇者一行):
I am new to spark, and I want to use group-by & reduce to find the following from CSV (one line by employed):
Department, Designation, costToCompany, State
Sales, Trainee, 12000, UP
Sales, Lead, 32000, AP
Sales, Lead, 32000, LA
Sales, Lead, 32000, TN
Sales, Lead, 32000, AP
Sales, Lead, 32000, TN
Sales, Lead, 32000, LA
Sales, Lead, 32000, LA
Marketing, Associate, 18000, TN
Marketing, Associate, 18000, TN
HR, Manager, 58000, TN
我想通过部门、名称、状态使用带有sum(costToCompany)和TotalEmployeeCount的附加列来简化关于CSV的分组
I would like to simplify the about CSV with group by Department, Designation, State with additional columns with sum(costToCompany) and TotalEmployeeCount
应该得到如下结果:
Dept, Desg, state, empCount, totalCost
Sales,Lead,AP,2,64000
Sales,Lead,LA,3,96000
Sales,Lead,TN,2,64000
有什么方法可以使用转换和操作来实现这一点.或者我们应该去 RDD 操作?
Is there any way to achieve this using transformations and actions. Or should we go for RDD operations?
推荐答案
程序
创建一个类(Schema)来封装你的结构(方法 B 不需要它,但如果你使用 Java,它会让你的代码更容易阅读)
Procedure
Create a Class (Schema) to encapsulate your structure (it’s not required for the approach B, but it would make your code easier to read if you are using Java)
public class Record implements Serializable { String department; String designation; long costToCompany; String state; // constructor , getters and setters }
加载 CVS (JSON) 文件
Loading CVS (JSON) file
JavaSparkContext sc; JavaRDD<String> data = sc.textFile("path/input.csv"); //JavaSQLContext sqlContext = new JavaSQLContext(sc); // For previous versions SQLContext sqlContext = new SQLContext(sc); // In Spark 1.3 the Java API and Scala API have been unified JavaRDD<Record> rdd_records = sc.textFile(data).map( new Function<String, Record>() { public Record call(String line) throws Exception { // Here you can use JSON // Gson gson = new Gson(); // gson.fromJson(line, Record.class); String[] fields = line.split(","); Record sd = new Record(fields[0], fields[1], fields[2].trim(), fields[3]); return sd; } });
注册一个表(使用你定义的架构类)
Register a table (using the your defined Schema Class)
此时你有两种方法:
JavaSchemaRDD table = sqlContext.applySchema(rdd_records, Record.class); table.registerAsTable("record_table"); table.printSchema();
使用您想要的 Query-group-by 查询表
Query the table with your desired Query-group-by
JavaSchemaRDD res = sqlContext.sql(" select department,designation,state,sum(costToCompany),count(*) from record_table group by department,designation,state ");
在这里,您还可以使用 SQL 方法执行您想要的任何其他查询
Here you would also be able to do any other query you desire, using a SQL approach
使用复合键映射:
Department
,Designation
,State
JavaPairRDD<String, Tuple2<Long, Integer>> records_JPRDD = rdd_records.mapToPair(new PairFunction<Record, String, Tuple2<Long, Integer>>(){ public Tuple2<String, Tuple2<Long, Integer>> call(Record record){ Tuple2<String, Tuple2<Long, Integer>> t2 = new Tuple2<String, Tuple2<Long,Integer>>( record.Department + record.Designation + record.State, new Tuple2<Long, Integer>(record.costToCompany,1) ); return t2; }
});
reduceByKey 使用组合键,对
costToCompany
列求和,按key累加记录数reduceByKey using the composite key, summing
costToCompany
column, and accumulating the number of records by keyJavaPairRDD<String, Tuple2<Long, Integer>> final_rdd_records = records_JPRDD.reduceByKey(new Function2<Tuple2<Long, Integer>, Tuple2<Long, Integer>, Tuple2<Long, Integer>>() { public Tuple2<Long, Integer> call(Tuple2<Long, Integer> v1, Tuple2<Long, Integer> v2) throws Exception { return new Tuple2<Long, Integer>(v1._1 + v2._1, v1._2+ v2._2); } });
这篇关于使用 Apache Spark 和 Java 将 CSV 解析为 DataFrame/DataSet的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!