Apache Spark:使用文件夹结构来减少分析的运行时间 [英] Apache Spark: Using folder structures to reduce run-time of analyses

查看:50
本文介绍了Apache Spark:使用文件夹结构来减少分析的运行时间的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想通过根据其特性将巨大的csv文件细分为不同的分区来优化Spark应用程序的运行时间.

I want to optimize the run-time of a Spark application by subdividing a huge csv file into different partitions, dependent of their characteristics.

例如我有一个带有客户ID(整数,a)的列,一个带有日期(月+年,例如01.2015,b)的列,以及一个带有产品ID(整数,c)的列(还有更多带有特定产品数据的列,不需要用于分区).

E.g. I have a column with customer ids (integer, a), a column with dates (month+year, e.g. 01.2015, b), and a column with product ids (integer, c) (and more columns with product specific data, not needed for the partitioning).

我想建立一个像/customer/a/date/b/product/c这样的文件夹结构.如果用户想了解2016年1月出售的来自客户X的产品信息,则可以加载和分析保存在/customer/X/date/01.2016/*中的文件.

I want to build a folder structure like /customer/a/date/b/product/c. When a user wants to know information about products from customer X, sold in January 2016, he could load and analyse the file saved in /customer/X/date/01.2016/*.

是否可以通过通配符加载此类文件夹结构?还应该有可能加载特定时间范围内的所有客户或产品,例如由01.2015至09.2015.是否可以使用/customer/*/date/*.2015/product/c之类的通配符?或如何解决这样的问题?

Is there a possibility to load such folder structures via wildcards? It should also be possible to load all customer or products of an specific time range, e.g. 01.2015 till 09.2015. Is it possible to use wildcards like /customer/*/date/*.2015/product/c? Or how could a problem like this be solved?

我想对数据进行一次分区,然后再在分析中加​​载特定文件,以减少这些作业的运行时间(不考虑分区的其他工作).

I want to partition the data once, and later load the specific files in the analysis, to reduce the run-time for these jobs (disregarded the additional work for the partitioning).

解决方案:使用Parquet文件

我更改了Spark应用程序以将数据保存到Parquet文件中,现在一切正常,我可以通过提供文件夹结构来预选择数据.这是我的代码段:

I changed my Spark Application to save my data to Parquet files, now everything works fine and I can pre-select the data by giving folder-structure. Here my code snippet:

JavaRDD<Article> goodRdd = ...

SQLContext sqlContext = new SQLContext(sc);

List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("keyStore", DataTypes.IntegerType, false));
fields.add(DataTypes.createStructField("textArticle", DataTypes.StringType, false));

StructType schema = DataTypes.createStructType(fields);

JavaRDD<Row> rowRDD = goodRdd.map(new Function<Article, Row>() {
    public Row call(Article article) throws Exception {
        return RowFactory.create(article.getKeyStore(), article.getTextArticle());
    }
});

DataFrame storeDataFrame = sqlContext.createDataFrame(rowRDD, schema);

// WRITE PARQUET FILES
 storeDataFrame.write().partitionBy(fields.get(0).name()).parquet("hdfs://hdfs-master:8020/user/test/parquet/");

// READ PARQUET FILES
DataFrame read = sqlContext.read().option("basePath", "hdfs://hdfs-master:8020/user/test/parquet/").parquet("hdfs://hdfs-master:8020/user/test/parquet/keyStore=1/");

System.out.println("READ : " + read.count());

重要

不要尝试只包含一列的表!当您尝试调用partitionBy方法时,您将获得异常!

Don't try out with a table with only one column! You will get Exceptions when you try to call the partitionBy method!

推荐答案

因此,在Spark中,您可以按所需的方式保存和读取分区数据.但是,当您使用以下命令保存数据时,Spark会使用/customer=a/date=b/product=c的约定,而不是像使用/customer/a/date/b/product/c那样创建路径.

So, in Spark you can save and read partitioned data much in the way you are looking for. However, rather than creating the path like you have /customer/a/date/b/product/c Spark will use this convention /customer=a/date=b/product=c when you save data using:

df.write.partitionBy("customer", "date", "product").parquet("/my/base/path/")

当您需要读入数据时,需要这样指定basepath-option:

When you need to read in the data, you need to specify the basepath-option like this:

sqlContext.read.option("basePath", "/my/base/path/").parquet("/my/base/path/customer=*/date=*.2015/product=*/")

以及/my/base/path/之后的所有内容都将由Spark解释为列.在此处给出的示例中,Spark将三个列customerdateproduct添加到数据帧.请注意,您可以根据需要对任何列使用通配符.

and everything following /my/base/path/ will be interpreted as columns by Spark. In the example given here, Spark would add the three columns customer, date and product to the dataframe. Note that you can use wildcards for any of the columns as you like.

对于读取特定时间范围内的数据,您应该意识到Spark使用谓词下推,因此它只会将符合条件的数据实际加载到内存中(由某些过滤器转换指定).但是,如果您确实要明确指定范围,则可以生成路径名列表,然后将其传递给read函数.像这样:

As for reading in data in a specific time range, you should be aware that Spark uses predicate push down, so it will only actually load data into memory that fits the criteria (as specified by some filter-transformation). But if you really want to specify range explicitly, you could generate a list of path names and then pass that to the read function. Like this:

val pathsInMyRange = List("/my/path/customer=*/date=01.2015/product=*", 
                          "/my/path/customer=*/date=02.2015/product=*", 
                          "/my/path/customer=*/date=03.2015/product=*"...,
                          "/my/path/customer=*/date=09.2015/product=*")

sqlContext.read.option("basePath", "/my/base/path/").parquet(pathsInMyRange:_*)

无论如何,我希望这会有所帮助:)

Anyway, I hope this helps :)

这篇关于Apache Spark:使用文件夹结构来减少分析的运行时间的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆