扩展Hadoop的TableInputFormat以使用用于分发时间戳键的前缀进行扫描 [英] Extending Hadoop's TableInputFormat to scan with a prefix used for distribution of timestamp keys

查看:149
本文介绍了扩展Hadoop的TableInputFormat以使用用于分发时间戳键的前缀进行扫描的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个hbase表,谁的关键是带有一个字节随机前缀的时间戳来分配密钥,所以扫描不是热点。我试图扩展 TableInputFormat ,这样我就可以在带有范围的表上运行一个MapReduce,前缀为全部256个可能的前缀,以便具有指定时间戳范围的所有范围都是扫描。我的解决方案虽然不起作用,因为它似乎总是扫描最后一个前缀(127)256次。必须在所有扫描中共享。



我的代码如下。任何想法?

  public class PrefixedTableInputFormat extends TableInputFormat {
$ b $ @Override
public List< InputSplit> getSplits(JobContext上下文)
抛出IOException {
List< InputSplit> splits = new ArrayList< InputSplit>();
扫描scan = getScan();
字节startRow [] = scan.getStartRow(),stopRow [] = scan.getStopRow();
字节prefixedStartRow [] =新字节[startRow.length + 1];
字节prefixedStopRow [] =新字节[stopRow.length + 1];
System.arraycopy(startRow,0,prefixedStartRow,1,startRow.length);
System.arraycopy(stopRow,0,prefixedStopRow,1,stopRow.length); (int前缀= -128;前缀< 128;前缀++){
prefixedStartRow [0] =(字节)前缀的

;
prefixedStopRow [0] =(字节)前缀;
scan.setStartRow(prefixedStartRow);
scan.setStopRow(prefixedStopRow);
setScan(scan);
splits.addAll(super.getSplits(context));
}

回报拆分;


code




  Configuration config = HBaseConfiguration.create(); 
Job job =新的Job(config,Aggregate);
job.setJarByClass(Aggregate.class);

扫描扫描=新扫描();
scan.setStartRow(20120630.getBytes());
scan.setStopRow(20120701.getBytes());
scan.setCaching(500);
scan.setCacheBlocks(false);

TableMapReduceUtil.initTableMapperJob(
event,
scan,
Mapper.class,
ImmutableBytesWritable.class,
ImmutableBytesWritable.class,
job,
true,
PrefixedTableInputFormat.class);
TableMapReduceUtil.initTableReducerJob(event,Reducer.class,job);


解决方案

您需要进行深层复制每个迭代中的分裂:

$ p $ for(int prefix = -128; prefix <128; prefix ++){
prefixedStartRow [0] =(字节)前缀;
prefixedStopRow [0] =(字节)前缀;
scan.setStartRow(prefixedStartRow);
scan.setStopRow(prefixedStopRow);
setScan(scan); $(InputSplit)ReflectionUtils.copy(conf,
(TableSplit))subSplit,new TableSplit();

for(InputSplit subSplit:super.getSplits(context)){
splits.add ));
}
}


I have an hbase table who's key is a timestamp with a one byte random prefix to distribute the keys so scans don't hotspot. I'm trying to extend TableInputFormat so that I can run a single MapReduce on the table with a range, prefixing all 256 possible prefixes so that all ranges with the specified timestamp range are scanned. My solution isn't working though, as it always seems to scan the last prefix (127) 256 times. Something must be shared across all scans.

My code is below. Any ideas?

public class PrefixedTableInputFormat extends TableInputFormat {

  @Override
  public List<InputSplit> getSplits(JobContext context)
    throws IOException {
    List<InputSplit> splits = new ArrayList<InputSplit>();
    Scan scan = getScan();
    byte startRow[] = scan.getStartRow(), stopRow[] = scan.getStopRow();
    byte prefixedStartRow[] = new byte[startRow.length+1];
    byte prefixedStopRow[] = new byte[stopRow.length+1];
    System.arraycopy(startRow, 0, prefixedStartRow, 1, startRow.length);
    System.arraycopy(stopRow, 0, prefixedStopRow, 1, stopRow.length);

    for (int prefix = -128; prefix < 128; prefix++) {
      prefixedStartRow[0] = (byte) prefix;
      prefixedStopRow[0] = (byte) prefix;
      scan.setStartRow(prefixedStartRow);
      scan.setStopRow(prefixedStopRow);
      setScan(scan);
      splits.addAll(super.getSplits(context));
    }

    return splits;
  }
}

and

  Configuration config = HBaseConfiguration.create();
  Job job = new Job(config, "Aggregate");
  job.setJarByClass(Aggregate.class);

  Scan scan = new Scan();
  scan.setStartRow("20120630".getBytes());
  scan.setStopRow("20120701".getBytes());
  scan.setCaching(500);
  scan.setCacheBlocks(false);

  TableMapReduceUtil.initTableMapperJob(
      "event",
      scan,
      Mapper.class,
      ImmutableBytesWritable.class,
      ImmutableBytesWritable.class,
      job,
      true,
      PrefixedTableInputFormat.class);
  TableMapReduceUtil.initTableReducerJob("event", Reducer.class, job);

解决方案

You're going to need to make a deep copy of the splits in each iteration:

for (int prefix = -128; prefix < 128; prefix++) {
  prefixedStartRow[0] = (byte) prefix;
  prefixedStopRow[0] = (byte) prefix;
  scan.setStartRow(prefixedStartRow);
  scan.setStopRow(prefixedStopRow);
  setScan(scan);

  for (InputSplit subSplit : super.getSplits(context)) {
    splits.add((InputSplit) ReflectionUtils.copy(conf,
          (TableSplit) subSplit, new TableSplit());
  }
}

这篇关于扩展Hadoop的TableInputFormat以使用用于分发时间戳键的前缀进行扫描的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆