从风暴中插入行到HBase中 [英] Insert rows into HBase from a Storm bolt

查看:194
本文介绍了从风暴中插入行到HBase中的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我希望能够从分布式(非本地)Storm拓扑中将新条目写入HBase。有几个GitHub项目提供 HBase Mappers 预制风暴螺栓将Tuples写入HBase。这些项目提供了在LocalCluster上执行他们的示例的指示。



这两个项目都遇到了问题,并且直接从bolt中访问HBase API,它们都需要将HBase-site.xml文件包含在类路径中。使用直接API方法,也可能使用GitHub,当执行 HBaseConfiguration.create(); 时,它会尝试从条目中找到它需要的信息类路径。

如何修改风暴螺栓的类路径以包含Hbase配置文件?

更新:使用danehammer的回答,这就是我如何运作

将以下文件复制到您的〜/ .storm目录中:


  • hbase-common-0.98.0.2.1.2.0-402-hadoop2。 jar

  • hbase-site.xml

  • storm.yaml:注意:如果您没有将storm.yaml复制到该目录中,那么storm jar命令不会在类路径中使用该目录(请参阅 storm.py python脚本为自己查看这个逻辑 - 如果记录的话会更好)



<接下来,在拓扑类的主要方法中获取HBase配置并序列化它:

  final配置hbaseConfig = HBaseConfiguration.create( ); 
final DataOutputBuffer databufHbaseConfig = new DataOutputBuffer();
hbaseConfig.write(databufHbaseConfig);
final byte [] baHbaseConfigSerialized = databufHbaseConfig.getData();

通过构造函数将字节数组传递给喷口类。 spout类将这个字节数组保存到一个字段中(不要在构造函数中反序列化)我发现如果spout有一个Configuration字段,那么当运行拓扑时,你将得到一个无法序列化的异常)

在喷口的open方法中,反序列化配置并访问hbase表:

 配置hBaseConfiguration = new Configuration( ); 
ByteArrayInputStream bas = new ByteArrayInputStream(baHbaseConfigSerialized);
hBaseConfiguration.readFields(new DataInputStream(bas));
HTable tbl = new HTable(hBaseConfiguration,HBASE_TABLE_NAME);

扫描扫描=新扫描();
scan.addColumn(Bytes.toBytes(cf),Bytes.toBytes(YOUR_COLUMN));

scnrTbl = tbl.getScanner(scan);

现在,在您的nextTuple方法中,您可以使用扫描仪获取下一行:

 结果rsltWaveform = scnrWaveformTbl.next(); 

从结果中提取出您想要的结果,并将这些值放入一些可序列化的对象中。 / p>

解决方案

当您使用storm jar命令部署拓扑时,〜/ .storm 文件夹将位于类路径中(请参阅此链接< a>在jar命令下)。如果您将hbase-site.xml文件(或相关的* -site.xml文件)放在该文件夹中,storm jar中的 HBaseConfiguration.create()会发现文件并正确地返回给您一个 org.apache.hadoop.configuration.Configuration 。这需要在您的拓扑结构中存储和序列化,以便在集群周围分发该配置。


I would like to be able to write new entries into HBase from a distributed (not local) Storm topology. There exist a few GitHub projects that provide either HBase Mappers or pre-made Storm bolts to write Tuples into HBase. These projects provide instructions for executing their samples on the LocalCluster.

The problem that I am running into with both of these projects, and directly accessing the HBase API from the bolt, is that they all require the HBase-site.xml file to be included on the classpath. With the direct API approach, and perhaps with the GitHub ones as well, when you execute HBaseConfiguration.create(); it will try to find the information it needs from an entry on the classpath.

How can I modify the classpath for the storm bolts to include the Hbase configuration file?

Update: Using danehammer's answer, this is how i got it working

Copy the following files into your ~/.storm directory:

  • hbase-common-0.98.0.2.1.2.0-402-hadoop2.jar
  • hbase-site.xml
  • storm.yaml : NOTE: if you do not copy storm.yaml into that directory, then the storm jar command will NOT use that directory in the classpath (see the storm.py python script to see that logic for yourself - would be nice if this was documented)

Next, in your topology class's main method get the HBase Configuration and serialize it:

final Configuration hbaseConfig = HBaseConfiguration.create();
final DataOutputBuffer databufHbaseConfig = new DataOutputBuffer();
hbaseConfig.write(databufHbaseConfig);
final byte[] baHbaseConfigSerialized = databufHbaseConfig.getData();

Pass the byte array to your spout class through the constructor. The spout class saves this byte array to a field (Do not deserialize in the constructor. I found that if the spout has a Configuration field you will get a cannot serialize exception when running the topology)

in the spout's open method, deserialize the config and access the hbase table:

Configuration hBaseConfiguration = new Configuration();
ByteArrayInputStream bas = new ByteArrayInputStream(baHbaseConfigSerialized);
hBaseConfiguration.readFields(new DataInputStream(bas));
HTable tbl = new HTable(hBaseConfiguration, HBASE_TABLE_NAME);

Scan scan = new Scan();
scan.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("YOUR_COLUMN"));

scnrTbl = tbl.getScanner(scan);

Now, in your nextTuple method you can use the Scanner to get the next row:

Result rsltWaveform = scnrWaveformTbl.next();

Extract what you want from the result, and pass those values in some serializable object to the bolts.

解决方案

When you deploy a topology with the "storm jar" command, the ~/.storm folder will be on the classpath (see this link under jar command). If you placed the hbase-site.xml file (or related *-site.xml files) in that folder, HBaseConfiguration.create() during "storm jar" would find that file and correctly return you an org.apache.hadoop.configuration.Configuration. This would need to be stored and serialized within your topology in order to distribute that config around the cluster.

这篇关于从风暴中插入行到HBase中的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆