使用Apache Spark和Java将CSV解析为DataFrame / DataSet [英] Parse CSV as DataFrame/DataSet with Apache Spark and Java

查看:179
本文介绍了使用Apache Spark和Java将CSV解析为DataFrame / DataSet的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是新来的火花,我想使用group-by&缩小以从CSV中找到以下内容(按雇用的一行):

I am new to spark, and I want to use group-by & reduce to find the following from CSV (one line by employed):

  Department, Designation, costToCompany, State
  Sales, Trainee, 12000, UP
  Sales, Lead, 32000, AP
  Sales, Lead, 32000, LA
  Sales, Lead, 32000, TN
  Sales, Lead, 32000, AP
  Sales, Lead, 32000, TN 
  Sales, Lead, 32000, LA
  Sales, Lead, 32000, LA
  Marketing, Associate, 18000, TN
  Marketing, Associate, 18000, TN
  HR, Manager, 58000, TN

我想简化通过部门,指定,州分组的CSV,其他列包含 sum(costToCompany) TotalEmployeeCount

I would like to simplify the about CSV with group by Department, Designation, State with additional columns with sum(costToCompany) and TotalEmployeeCount

应该得到如下结果:

  Dept, Desg, state, empCount, totalCost
  Sales,Lead,AP,2,64000
  Sales,Lead,LA,3,96000  
  Sales,Lead,TN,2,64000

有没有办法实现这个用途变换和行动。或者我们应该进行RDD操作吗?

Is there any way to achieve this using transformations and actions. Or should we go for RDD operations?

推荐答案

程序




  • 创建一个类(模式)来封装你的结构(它不是方法B所必需的,但如果你使用Java,它会让你的代码更容易阅读)

    Procedure

    • Create a Class (Schema) to encapsulate your structure (it’s not required for the approach B, but it would make your code easier to read if you are using Java)

      public class Record implements Serializable {
        String department;
        String designation;
        long costToCompany;
        String state;
        // constructor , getters and setters  
      }
      


    • 加载CVS (JSON)文件

    • Loading CVS (JSON) file

      JavaSparkContext sc;
      JavaRDD<String> data = sc.textFile("path/input.csv");
      //JavaSQLContext sqlContext = new JavaSQLContext(sc); // For previous versions 
      SQLContext sqlContext = new SQLContext(sc); // In Spark 1.3 the Java API and Scala API have been unified
      
      
      JavaRDD<Record> rdd_records = sc.textFile(data).map(
        new Function<String, Record>() {
            public Record call(String line) throws Exception {
               // Here you can use JSON
               // Gson gson = new Gson();
               // gson.fromJson(line, Record.class);
               String[] fields = line.split(",");
               Record sd = new Record(fields[0], fields[1], fields[2].trim(), fields[3]);
               return sd;
            }
      });
      


    • 此时你有两种方法:


      • 注册表(使用您定义的架构类)

      • Register a table (using the your defined Schema Class)

      JavaSchemaRDD table = sqlContext.applySchema(rdd_records, Record.class);
      table.registerAsTable("record_table");
      table.printSchema();
      


    • 使用所需的Query-group-by

    • Query the table with your desired Query-group-by

      JavaSchemaRDD res = sqlContext.sql("
        select department,designation,state,sum(costToCompany),count(*) 
        from record_table 
        group by department,designation,state
      ");
      


    • 在这里,您还可以使用SQL方法进行任何其他查询

    • Here you would also be able to do any other query you desire, using a SQL approach


      • 使用复合键进行映射: Department 指定

      JavaPairRDD<String, Tuple2<Long, Integer>> records_JPRDD = 
      rdd_records.mapToPair(new
        PairFunction<Record, String, Tuple2<Long, Integer>>(){
          public Tuple2<String, Tuple2<Long, Integer>> call(Record record){
            Tuple2<String, Tuple2<Long, Integer>> t2 = 
            new Tuple2<String, Tuple2<Long,Integer>>(
              record.Department + record.Designation + record.State,
              new Tuple2<Long, Integer>(record.costToCompany,1)
            );
            return t2;
      }
      

      });

      使用复合键的reduceByKey,求和 costToCompany 列,并按键累计记录数

      reduceByKey using the composite key, summing costToCompany column, and accumulating the number of records by key

      JavaPairRDD<String, Tuple2<Long, Integer>> final_rdd_records = 
       records_JPRDD.reduceByKey(new Function2<Tuple2<Long, Integer>, Tuple2<Long,
       Integer>, Tuple2<Long, Integer>>() {
          public Tuple2<Long, Integer> call(Tuple2<Long, Integer> v1,
          Tuple2<Long, Integer> v2) throws Exception {
              return new Tuple2<Long, Integer>(v1._1 + v2._1, v1._2+ v2._2);
          }
      });
      


    • 这篇关于使用Apache Spark和Java将CSV解析为DataFrame / DataSet的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆