创建hfile时出现Spark问题-添加了一个密钥,该密钥的大小不比以前的单元大 [英] Spark issues in creating hfiles- Added a key not lexically larger than previous cell

查看:921
本文介绍了创建hfile时出现Spark问题-添加了一个密钥,该密钥的大小不比以前的单元大的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试创建hfiles来将其批量加载到Hbase中,即使一切看起来都很好,它仍会引发带有行键的错误。
我正在使用以下代码:

I am trying to create hfiles to do bulk load into Hbase and it keeps throwing the error with the row key even though everything looks fine. I am using the following code:

val df = sqlContext.read.format("com.databricks.spark.csv")
   .option("header", "true")
   .option("inferSchema", "true")
   .load("data.csv")

import sqlContext.implicits._

val DF2 = df.filter($"company".isNotNull)
  .dropDuplicates(Array("company"))
  .sortWithinPartitions("company").sort("company")

val rdd = DF2.flatMap(x => { 
  val rowKey = Bytes.toBytes(x(0).toString)
  for (i <- 0 to cols.length - 1) yield {
    val index = x.fieldIndex(new String(cols(i)))
    val value = if (x.isNullAt(index)) "".getBytes else x(index).toString.getBytes
         (new ImmutableBytesWritable(rowKey), new KeyValue(rowKey, COLUMN_FAMILY, cols(i), value))
  }
})

rdd.saveAsNewAPIHadoopFile("HDFS LOcation", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat2], fconf)

我正在使用以下数据

company,date,open,high,low,close,volume
ABG,01-Jan-2010,11.53,11.53,11.53,11.53,0
ABM,01-Jan-2010,20.66,20.66,20.66,20.66,0
ABR,01-Jan-2010,1.99,1.99,1.99,1.99,0
ABT,01-Jan-2010,53.99,53.99,53.99,53.99,0
ABX,01-Jan-2010,39.38,39.38,39.38,39.38,0
ACC,01-Jan-2010,28.1,28.1,28.1,28.1,0
ACE,01-Jan-2010,50.4,50.4,50.4,50.4,0
ACG,01-Jan-2010,8.25,8.25,8.25,8.25,0
ADC,01-Jan-2010,27.25,27.25,27.25,27.25,0

它抛出错误为

java.io.IOException: Added a key not lexically larger than previous. Current cell = ADC/data:high/1505862570671/Put/vlen=5/seqid=0, lastCell = ADC/data:open/1505862570671/Put/vlen=5/seqid=0
    at org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter.checkKey(AbstractHFileWriter.java:204)
    at org.apache.hadoop.hbase.io.hfile.HFileWriterV2.append(HFileWriterV2.java:265)
    at org.apache.hadoop.hbase.regionserver.StoreFile$Writer.append(StoreFile.java:992)
    at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2$1.write(HFileOutputFormat2.java:199)

我什至尝试对数据进行排序,但仍然会引发错误。

I even tried sorting the data but still the error is thrown.

推荐答案

花了几个小时后,我找到了解决方案,其根本原因是未对列进行排序。

After spending couple of hours I found the solution, rootcause is that the columns are not sorted.

由于Hfile需要按字典顺序对键值进行排序,因此在编写 HFileOutputFormat2-> AbstractHFileWriter 找到了添加了一个密钥,该密钥的大小不比以前大。当前单元格。在对列进行排序后,您已经在行级别应用了排序,它也会起作用。

Since Hfile needs keyvalue in lexicographically sorted order and in your case while writing HFileOutputFormat2->AbstractHFileWriter found Added a key not lexically larger than previous. Current cell. You have already applied sorting at row level once you sort the columns also it would work.

此处的问题带有很好的解释为什么-hbase-keyvaluesortreducer-need-to-sort-all-keyvalue

Question here with good explanation why-hbase-keyvaluesortreducer-need-to-sort-all-keyvalue.

解决方案:

//sort columns
val cols = companyDs.columns.sorted

//Rest of the code is same

val output = companyDs.rdd.flatMap(x => {
  val rowKey = Bytes.toBytes(x(0).toString)
 val hkey = new ImmutableBytesWritable(rowKey)
  for (i <- 0 to cols.length - 1) yield {
    val index = x.fieldIndex(new String(cols(i)))
    val value = if (x.isNullAt(index)) "".getBytes else x(index).toString.getBytes
    val kv = new KeyValue(rowKey,COLUMN_FAMILY, cols(i).getBytes(),System.currentTimeMillis()+i ,x(i).toString.getBytes())
    (hkey,kv)
  }
})
output.saveAsNewAPIHadoopFile("<path>"
  , classOf[ImmutableBytesWritable], classOf[KeyValue],
  classOf[HFileOutputFormat2], config)

这篇关于创建hfile时出现Spark问题-添加了一个密钥,该密钥的大小不比以前的单元大的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆