如何在Scalding中以Hive样式的目录结构输出数据? [英] How to output data with Hive-style directory structure in Scalding?

查看:121
本文介绍了如何在Scalding中以Hive样式的目录结构输出数据?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们正在使用Scalding进行ETL,并将输出生成为带有分区的Hive表.因此,我们希望分区的目录名称类似于"state = CA".我们正在如下使用TemplatedTsv:

We are using Scalding to do ETL and generate the output as a Hive table with partitions. Consequently, we want the directory names for partitions to be something like "state=CA" for example. We are using TemplatedTsv as follows:

pipe
   // some other ETL
   .map('STATE -> 'hdfs_state) { state: Int => "State=" + state }
   .groupBy('hdfs_state) { _.pass }
   .write(TemplatedTsv(baseOutputPath, "%s", 'hdfs_state,
          writeHeader = false,
          sinkMode = SinkMode.UPDATE,
          fields = ('all except 'hdfs_state)))

我们采用了如何在Scalding中存储输出的代码示例. 这是我们遇到的两个问题:

We adopt the code sample from How to bucket outputs in Scalding. Here are two issues we have:

    IntelliJ无法解决
  • except :我缺少一些进口商品吗?我们不希望在"fields =()"语句中显式输入所有字段,因为字段是从groupBy语句中的代码派生的.如果明确输入,它们很容易不同步.
  • 当我们创建一个额外的列时,这种方法看起来太笨拙,因此Hive/Hcatalog可以处理目录名称.我们想知道什么是正确的方法?
  • except can't be resolved by IntelliJ: Am I missing some imports? We don't want to explicitly enter all the fields within the "fields = ()" statement as fields are derived from the code inside the groupBy statement. If entering explicitly, they could be easily out of sync.
  • This approach looks too hacky as we are creating an extra column so that the directory names can be processed by Hive/Hcatalog. We are wondering what should be the right way to accomplish it?

非常感谢!

推荐答案

对不起,前面的示例是伪代码.下面我将给出一个带有输入数据示例的小代码.

Sorry previous example was a pseudocode. Below I will give a small code with input data example.

请注意,这仅适用于缩放版本0.12.0或更高版本

下面我们输入的图像定义了一些购买数据,

Let's image we have input as below which define some purchase data,

user1   1384034400  6   75
user1   1384038000  6   175
user2   1383984000  48  3
user3   1383958800  48  281
user3   1384027200  9   7
user3   1384027200  9   11
user4   1383955200  37  705
user4   1383955200  37  15
user4   1383969600  36  41
user4   1383969600  36  21

制表符分开,第三列是州号.这里我们有整数,但是对于基于字符串的状态,您可以轻松地适应.

Tab separated and the 3rd column is a State number. Here we have integer but for string based States you can easily adapt.

此代码将读取输入并将其放入"State = stateid"输出文件夹存储桶中.

This code will read the input and put them in 'State=stateid' output folder buckets.

class TemplatedTsvExample(args: Args) extends Job(args) {

  val purchasesPath = args("purchases")
  val outputPath    = args("output")

  // defines both input & output schema, you can also make separate for each of them
  val ioSchema = ('USERID, 'TIMESTAMP, 'STATE, 'PURCHASE)

  val Purchases =
     Tsv(purchasesPath, ioSchema)
     .read
     .map('STATE -> 'STATENAME) { state: Int => "State=" + state } // here you can make necessary changes
     .groupBy('STATENAME) { _.pass } // this is optional
     .write(TemplatedTsv(outputPath, "%s", 'STATENAME, false, SinkMode.REPLACE, ioSchema))
} 

我希望这会有所帮助.请问我是否不清楚.

I hope this is helpful. Please ask me if anything is not clear.

您可以找到完整的代码这里.

You can find full code here.

这篇关于如何在Scalding中以Hive样式的目录结构输出数据?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆