使用 Apache Spark 和 Java 将 CSV 解析为 DataFrame/DataSet [英] Parse CSV as DataFrame/DataSet with Apache Spark and Java

查看:52
本文介绍了使用 Apache Spark 和 Java 将 CSV 解析为 DataFrame/DataSet的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是 Spark 新手,我想使用 group-by &减少从 CSV 中找到以下内容(受雇者一行):

I am new to spark, and I want to use group-by & reduce to find the following from CSV (one line by employed):

  Department, Designation, costToCompany, State
  Sales, Trainee, 12000, UP
  Sales, Lead, 32000, AP
  Sales, Lead, 32000, LA
  Sales, Lead, 32000, TN
  Sales, Lead, 32000, AP
  Sales, Lead, 32000, TN 
  Sales, Lead, 32000, LA
  Sales, Lead, 32000, LA
  Marketing, Associate, 18000, TN
  Marketing, Associate, 18000, TN
  HR, Manager, 58000, TN

我想通过部门、名称、状态使用带有sum(costToCompany)TotalEmployeeCount的附加列来简化关于CSV的分组

I would like to simplify the about CSV with group by Department, Designation, State with additional columns with sum(costToCompany) and TotalEmployeeCount

应该得到如下结果:

  Dept, Desg, state, empCount, totalCost
  Sales,Lead,AP,2,64000
  Sales,Lead,LA,3,96000  
  Sales,Lead,TN,2,64000

有什么方法可以使用转换和操作来实现这一点.或者我们应该去 RDD 操作?

Is there any way to achieve this using transformations and actions. Or should we go for RDD operations?

推荐答案

程序

  • 创建一个类(Schema)来封装你的结构(方法 B 不需要它,但如果你使用 Java,它会让你的代码更容易阅读)

    Procedure

    • Create a Class (Schema) to encapsulate your structure (it’s not required for the approach B, but it would make your code easier to read if you are using Java)

      public class Record implements Serializable {
        String department;
        String designation;
        long costToCompany;
        String state;
        // constructor , getters and setters  
      }
      

    • 加载 CVS (JSON) 文件

    • Loading CVS (JSON) file

      JavaSparkContext sc;
      JavaRDD<String> data = sc.textFile("path/input.csv");
      //JavaSQLContext sqlContext = new JavaSQLContext(sc); // For previous versions 
      SQLContext sqlContext = new SQLContext(sc); // In Spark 1.3 the Java API and Scala API have been unified
      
      
      JavaRDD<Record> rdd_records = sc.textFile(data).map(
        new Function<String, Record>() {
            public Record call(String line) throws Exception {
               // Here you can use JSON
               // Gson gson = new Gson();
               // gson.fromJson(line, Record.class);
               String[] fields = line.split(",");
               Record sd = new Record(fields[0], fields[1], fields[2].trim(), fields[3]);
               return sd;
            }
      });
      

    • 此时你有两种方法:

      • 注册一个表(使用你定义的架构类)

      • Register a table (using the your defined Schema Class)

      JavaSchemaRDD table = sqlContext.applySchema(rdd_records, Record.class);
      table.registerAsTable("record_table");
      table.printSchema();
      

    • 使用您想要的 Query-group-by 查询表

    • Query the table with your desired Query-group-by

      JavaSchemaRDD res = sqlContext.sql("
        select department,designation,state,sum(costToCompany),count(*) 
        from record_table 
        group by department,designation,state
      ");
      

    • 在这里,您还可以使用 SQL 方法执行您想要的任何其他查询

    • Here you would also be able to do any other query you desire, using a SQL approach

      • 使用复合键映射:Department,Designation,State

      JavaPairRDD<String, Tuple2<Long, Integer>> records_JPRDD = 
      rdd_records.mapToPair(new
        PairFunction<Record, String, Tuple2<Long, Integer>>(){
          public Tuple2<String, Tuple2<Long, Integer>> call(Record record){
            Tuple2<String, Tuple2<Long, Integer>> t2 = 
            new Tuple2<String, Tuple2<Long,Integer>>(
              record.Department + record.Designation + record.State,
              new Tuple2<Long, Integer>(record.costToCompany,1)
            );
            return t2;
      }
      

      });

      reduceByKey 使用组合键,对costToCompany列求和,按key累加记录数

      reduceByKey using the composite key, summing costToCompany column, and accumulating the number of records by key

      JavaPairRDD<String, Tuple2<Long, Integer>> final_rdd_records = 
       records_JPRDD.reduceByKey(new Function2<Tuple2<Long, Integer>, Tuple2<Long,
       Integer>, Tuple2<Long, Integer>>() {
          public Tuple2<Long, Integer> call(Tuple2<Long, Integer> v1,
          Tuple2<Long, Integer> v2) throws Exception {
              return new Tuple2<Long, Integer>(v1._1 + v2._1, v1._2+ v2._2);
          }
      });
      

    • 这篇关于使用 Apache Spark 和 Java 将 CSV 解析为 DataFrame/DataSet的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆