Spark RDD按键查找 [英] Spark RDD find by key

查看:1456
本文介绍了Spark RDD按键查找的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我从HBase转换了RDD:

val hbaseRDD:RDD [(String,Array [String])]其中tuple._1是rowkey。和数组是HBase中的值。

  4929101-ACTIVE,[4929101,2015-05-20 10: 02:44,dummy1,dummy2] 
4929102-ACTIVE,[4929102,2015-05-20 10:02:44,dummy1,dummy2]
4929103-ACTIVE,[4929103,2015-05-20 10:02:44,dummy1,dummy2]

我也有一个SchemaRDD(id,date1,col1,col2,col3)转换为

val refDataRDD:RDD [(String ,Array [String])],我将迭代并检查它是否存在于hbaseRDD中:

  4929103,[2015 -05-21 10:03:44,EV01,col2,col3] 
4929104,[2015-05-21 10:03:44,EV02,col2 ,col3]

问题是,


  • 如何检查key(tuple._1)/(4929103)是否存在于hbaseRDD中并获取相应的值(tuple._2)? - 我无法在rdd.filter中使用PairRDD的查找函数,它会抛出scala.MatchError:null,但它在

     <$ c之外$ c> val filteredRDD = rdd.filter(sqlRow => {
    val hbaseLookup = hbaseRDD.lookup(sqlRow(0).toString +-ACTIVE)
    // if if found,check if date1 hbaseRDD< sqlRow(1)
    //否则如果未找到,则保留行
    true
    })

    我不确定这是否是问题,因为我在切换查找线时遇到了NPE:



    < pre $ val sqlRowHbase = hbaseRDD.filter(row => {



    <注意:我正在做这些行之前的hbaseRDD.count。和hbaseRDD.lookup可以在rdd.filter之外正常工作。




所以基本上,我试图通过hbaseRDD中的key来查找并获取行/值。加入它们有点复杂,因为两个RDD中的某些值可能为null,而且它取决于很多场景排w应该保留什么数据。

解决方案

假设您需要查找的a_id集包含在RDD中,我认为您可以使用leftOuterJoin而不是迭代并查找每个值。

我在上面看到了关于date1潜在可变位置的评论。虽然我没有在下面解决它,但我认为这应该在查找本身之前通过某种特定的每行映射来处理。

如果我正确得到伪代码,那么您的RDD为(id,date)并且想要更新它通过在hbase中查找数据并更新日期,如果在hbase中为该id找到一行,并且它的日期早于refData中的那一行。这是正确的吗?如果是这样,假设你有一些像这样的参考数据:

  $ b $ p> 

code> val refData = sc.parallelize(Array(
(4929103,2015-05-21 10:03:44),
(4929104,2015-05 -21 10:03:44)
))

还有一些来自Hbase :

  val hbaseRDD = sc.parallelize(Array(
(4929101-ACTIVE,Array(4929101 ,2015-05-20 10:02:44)),
(4929102-ACTIVE,Array(4929102,2015-05-20 10:02:44)),
(4929103-ACTIVE,Array(4929103,2015-05-20 10:02:44))
))

然后,您可以通过一个简单的leftOuterJoin查找每个从refData到hbase的id并找到每行:如果需要,更新日期:

  refData 
//在Hbase中查找date1 a_id值与searledIds中的id匹配的所有行
.leftOuterJoin(hbaseRDD.map {case(rowkey,Array(a_id,date1))=>(a_id,date1)})

//更新如果日期来自hBase,则返回refData中的日期
.map {case(rowKey,(refDate,maybeRowDate))=> (rowKey,chooseDate(refDate,maybeRowDate))}
.collect

$ b $ def chooseDate(refDate:String,rowDate:Option [String])= rowDate match {

//如果在Hbase中没有发现行:keep ref date
case None => refDate

case Some(rDate)=>
if(true)/ *通过首先解析日期来替换它,然后检查rowDate< refDate * /
rowDate
else
refDate
}


I have an RDD transformed from HBase:

val hbaseRDD: RDD[(String, Array[String])] where the tuple._1 is the rowkey. and the array are the values in HBase.

4929101-ACTIVE, ["4929101","2015-05-20 10:02:44","dummy1","dummy2"]
4929102-ACTIVE, ["4929102","2015-05-20 10:02:44","dummy1","dummy2"]
4929103-ACTIVE, ["4929103","2015-05-20 10:02:44","dummy1","dummy2"]

I also have a SchemaRDD (id,date1,col1,col2,col3) transformed to

val refDataRDD: RDD[(String, Array[String])] for which I will iterate over and check if it exists in hbaseRDD:

4929103, ["2015-05-21 10:03:44","EV01","col2","col3"]
4929104, ["2015-05-21 10:03:44","EV02","col2","col3"]

Question is,

  • How do I check if a key (tuple._1)/("4929103") is existing in hbaseRDD and get the corresponding values (tuple._2)? - I can't use PairRDD's lookup function inside an rdd.filter, it throws "scala.MatchError: null", but it works outside

    val filteredRDD = rdd.filter(sqlRow => {
      val hbaseLookup = hbaseRDD.lookup(sqlRow(0).toString + "-ACTIVE")
      // if found, check if date1 of hbaseRDD < sqlRow(1)
      // else if not found, retain row
      true
    })
    

    I'm not sure if that's the problem though, since I am also experiencing a NPE when I switch the lookup line to:

    val sqlRowHbase = hbaseRDD.filter(row => {
    

    Note: I am doing an hbaseRDD.count before these lines. and hbaseRDD.lookup works fine outside the rdd.filter

So basically, I am trying to "find" by key in hbaseRDD and get the row/values. It's a little complicated to join them since some values in both RDDs may be null. And it depends on a lot of scenarios what row would be retained with what data.

解决方案

Assuming the set of a_id you need to lookup is contained in an RDD, I think you could use a leftOuterJoin instead of iterating and looking up each value.

I saw your comment above regarding the potentially changeable position of date1. I'm not addressing it below though, I think this should be handled before the lookup itself by some kind of specific mapping of each row.

If I get the pseudo code correctly, you have an RDD of (id, date) and want to update it by looking up data in hbase and update the date if a row is found in hbase for this id and if its date is earlier than the one in refData. Is that correct?

If so, assuming you have some ref data like this:

val refData = sc.parallelize(Array(
 ("4929103","2015-05-21 10:03:44"),
 ("4929104","2015-05-21 10:03:44")
))

And some row data from Hbase:

val hbaseRDD = sc.parallelize(Array(
    ("4929101-ACTIVE", Array("4929101","2015-05-20 10:02:44")),
    ("4929102-ACTIVE", Array("4929102","2015-05-20 10:02:44")),
    ("4929103-ACTIVE", Array("4929103","2015-05-20 10:02:44"))
))

Then you can do the lookup of each id from refData into hbase with a simple leftOuterJoin, and for each row found: update the date if necessary:

refData
  // looks up in Hbase all rows whose date1 a_id value matches the id in searchedIds
  .leftOuterJoin(hbaseRDD.map{ case (rowkey, Array(a_id, date1)) => (a_id, date1)})

  // update the date in refData if date from hBase is earlier
  .map { case (rowKey, (refDate, maybeRowDate)) => ( rowKey, chooseDate (refDate, maybeRowDate)) }
  .collect


def chooseDate(refDate: String, rowDate: Option[String]) =  rowDate match {

  // if row not found in Hbase: keep ref date
  case None => refDate

  case Some(rDate) => 
    if (true) /* replace this by first parsing the date, then check if rowDate < refDate */ 
        rowDate
    else
        refDate
}

这篇关于Spark RDD按键查找的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆