扩展 Hadoop 的 TableInputFormat 以使用用于分发时间戳键的前缀进行扫描 [英] Extending Hadoop's TableInputFormat to scan with a prefix used for distribution of timestamp keys

查看:20
本文介绍了扩展 Hadoop 的 TableInputFormat 以使用用于分发时间戳键的前缀进行扫描的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 hbase 表,它的键是一个时间戳,带有一个字节随机前缀来分发键,因此扫描不会成为热点.我正在尝试扩展 TableInputFormat 以便我可以在具有范围的表上运行单个 MapReduce,为所有 256 个可能的前缀添加前缀,以便扫描具有指定时间戳范围的所有范围.不过,我的解决方案不起作用,因为它似乎总是扫描最后一个前缀 (127) 256 次.必须在所有扫描中共享某些内容.

I have an hbase table who's key is a timestamp with a one byte random prefix to distribute the keys so scans don't hotspot. I'm trying to extend TableInputFormat so that I can run a single MapReduce on the table with a range, prefixing all 256 possible prefixes so that all ranges with the specified timestamp range are scanned. My solution isn't working though, as it always seems to scan the last prefix (127) 256 times. Something must be shared across all scans.

我的代码如下.有什么想法吗?

My code is below. Any ideas?

public class PrefixedTableInputFormat extends TableInputFormat {

  @Override
  public List<InputSplit> getSplits(JobContext context)
    throws IOException {
    List<InputSplit> splits = new ArrayList<InputSplit>();
    Scan scan = getScan();
    byte startRow[] = scan.getStartRow(), stopRow[] = scan.getStopRow();
    byte prefixedStartRow[] = new byte[startRow.length+1];
    byte prefixedStopRow[] = new byte[stopRow.length+1];
    System.arraycopy(startRow, 0, prefixedStartRow, 1, startRow.length);
    System.arraycopy(stopRow, 0, prefixedStopRow, 1, stopRow.length);

    for (int prefix = -128; prefix < 128; prefix++) {
      prefixedStartRow[0] = (byte) prefix;
      prefixedStopRow[0] = (byte) prefix;
      scan.setStartRow(prefixedStartRow);
      scan.setStopRow(prefixedStopRow);
      setScan(scan);
      splits.addAll(super.getSplits(context));
    }

    return splits;
  }
}

  Configuration config = HBaseConfiguration.create();
  Job job = new Job(config, "Aggregate");
  job.setJarByClass(Aggregate.class);

  Scan scan = new Scan();
  scan.setStartRow("20120630".getBytes());
  scan.setStopRow("20120701".getBytes());
  scan.setCaching(500);
  scan.setCacheBlocks(false);

  TableMapReduceUtil.initTableMapperJob(
      "event",
      scan,
      Mapper.class,
      ImmutableBytesWritable.class,
      ImmutableBytesWritable.class,
      job,
      true,
      PrefixedTableInputFormat.class);
  TableMapReduceUtil.initTableReducerJob("event", Reducer.class, job);

推荐答案

您将需要在每次迭代中制作拆分的深层副本:

You're going to need to make a deep copy of the splits in each iteration:

for (int prefix = -128; prefix < 128; prefix++) {
  prefixedStartRow[0] = (byte) prefix;
  prefixedStopRow[0] = (byte) prefix;
  scan.setStartRow(prefixedStartRow);
  scan.setStopRow(prefixedStopRow);
  setScan(scan);

  for (InputSplit subSplit : super.getSplits(context)) {
    splits.add((InputSplit) ReflectionUtils.copy(conf,
          (TableSplit) subSplit, new TableSplit());
  }
}

这篇关于扩展 Hadoop 的 TableInputFormat 以使用用于分发时间戳键的前缀进行扫描的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆