如何在Scalding中挖掘输出 [英] How to bucket outputs in Scalding

查看:179
本文介绍了如何在Scalding中挖掘输出的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图将管道输出到不同的目录中,以便每个目录的输出将基于某些ID进行分段。
所以在一个简单的map减少代码我会使用MultipleOutputs类,我会在减速器中做这样的事情。

  protected void reduce(final SomeKey key,
final Iterable< SomeValue> values,
final Context context ){

...
(SomeValue value:values){
String bucketId = computeBucketIdFrom(...);
multipleOutputs.write(key,value,folderName +/+ bucketId);
...

所以我想在烫伤时可以这样做

  ... 
val somePipe = Csv(in,separator =\t,
fields = someSchema ,
skipHeader = true)
.read

for(i < - 1 until numberOfBuckets){
somePipe
.filter('someId){ id:String => (id.hashCode%numberOfBuckets)== i}
.write(Csv(out +/ bucket+ i,
writeHeader = true,
separator =\t))
}

但我觉得你最终会重复同一个管道多次,会影响整体表现。



是否有其他选择?

谢谢

解决方案

是的,当然有更好的方法使用 TemplatedTsv



所以你上面的代码可以写成如下,

  val somePipe = Tsv(in,fields = someSchema,skipHeader = true)
.read
.write(TemplatedTsv(out,%s,' some_id,writeHeader = true))

这会将所有来自'some_id'的记录放在单独的文件夹下/ some_ids文件夹。 然而,你也可以创建整数桶。只需更改最后一行,

  .map('some_id  - >'bucket){id:String => id.hashCode%numberOfBuckets} 
.write(TemplatedTsv(out,%02d,'bucket,writeHeader = true,fields =('all'bucket'))

这将创建两个数字文件夹作为out / dd /。您还可以检查templatedTsv api here。



使用templatedTsv可能会遇到一些小问题,那就是reducer可能会生成很多小文件,下一份工作使用你的结果。因此,最好在写入磁盘之前对模板字段进行排序。我在这里写了一篇关于的博客。


I'm trying to output a pipe into different directories such that the output of each directory will be bucketed based on some ids. So in a plain map reduce code I would use the MultipleOutputs class and I would do something like this in the reducer.

protected void reduce(final SomeKey key,
      final Iterable<SomeValue> values,
      final Context context) {

   ...
   for (SomeValue value: values) {
     String bucketId = computeBucketIdFrom(...);
     multipleOutputs.write(key, value, folderName + "/" + bucketId);
   ...

So i guess one could do it like this in scalding

...
  val somePipe = Csv(in, separator = "\t",
        fields = someSchema,
        skipHeader = true)
    .read

  for (i <- 1 until numberOfBuckets) {
    somePipe
    .filter('someId) {id: String => (id.hashCode % numberOfBuckets) == i}
    .write(Csv(out + "/bucket" + i ,
      writeHeader = true,
      separator = "\t"))
  }

But I feel that you would end up reding the same pipe many times and it will affect the overall performance.

Is there any other alternatives?

Thanks

解决方案

Yes, of course there is a better way using TemplatedTsv.

So your code above can be written as follows,

val somePipe = Tsv(in, fields = someSchema, skipHeader = true)
    .read
    .write(TemplatedTsv(out, "%s", 'some_id, writeHeader = true))

This will put all records coming from 'some_id into separate folders under out/some_ids folder.

However, you can also create integer buckets. Just change the last lines,

.map('some_id -> 'bucket) { id: String => id.hashCode % numberOfBuckets }    
.write(TemplatedTsv(out, "%02d", 'bucket, writeHeader = true, fields = ('all except 'bucket)))

This will create two digit folders as out/dd/. You can also check templatedTsv api here.

There might be small problem using templatedTsv, that is reducers can generate lots of small files which can be bad for the next job using your results. Therefore, it is better to sort on template fields before writing to disk. I wrote a blog about about it here.

这篇关于如何在Scalding中挖掘输出的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆