通过星火写入HBase的:任务不序列化 [英] Writing to HBase via Spark: Task not serializable

查看:2172
本文介绍了通过星火写入HBase的:任务不序列化的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图用星火1.0 HBase的(0.96.0-hadoop2)写一些简单的数据,但我不断收到越来越序列化的问题。下面是相关code:

I'm trying to write some simple data in HBase (0.96.0-hadoop2) using Spark 1.0 but I keep getting getting serialization problems. Here is the relevant code:

import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.SparkContext
import java.util.Properties
import java.io.FileInputStream
import org.apache.hadoop.hbase.client.Put

object PutRawDataIntoHbase{
  def main(args: Array[String]): Unit = {
    var propFileName = "hbaseConfig.properties"
    if(args.size > 0){
      propFileName = args(0)
    }

    /** Load properties here **/
   val theData = sc.textFile(prop.getProperty("hbase.input.filename"))
     .map(l => l.split("\t"))
     .map(a => Array("%010d".format(a(9).toInt)+ "-" + a(0) , a(1)))

   val tableName = prop.getProperty("hbase.table.name")
   val hbaseConf = HBaseConfiguration.create()
   hbaseConf.set("hbase.rootdir", prop.getProperty("hbase.rootdir"))
   hbaseConf.addResource(prop.getProperty("hbase.site.xml"))
   val myTable = new HTable(hbaseConf, tableName)
   theData.foreach(a=>{
     var p = new Put(Bytes.toBytes(a(0)))
     p.add(Bytes.toBytes(hbaseColFamily), Bytes.toBytes("col"), Bytes.toBytes(a(1)))
      myTable.put(p)
    })
  }
}

运行code的结果:

Running the code results in:

Failed to run foreach at putDataIntoHBase.scala:79
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException:org.apache.hadoop.hbase.client.HTable

地图更换的foreach不会崩溃,但我不写无论是。
任何帮助将大大AP preciated。

Replacing the foreach with map doesn't crash but I doesn't write either. Any help will be greatly appreciated.

推荐答案

HBaseConfiguration 重新presents到HBase的服务器的连接池。很明显,它不能被序列化并送到工作节点。由于 HTable 使用该池HBase的服务器进行通信,也不能过于序列化。

The class HBaseConfiguration represents a pool of connections to HBase servers. Obviously, it can't be serialized and sent to the worker nodes. Since HTable uses this pool to communicate with the HBase servers, it can't be serialized too.

基本上有三种方式来处理这个问题:

Basically, there are three ways to handle this problem:

请注意方法使用 foreachPartition的:

Note the use of foreachPartition method:

val tableName = prop.getProperty("hbase.table.name")
<......>
theData.foreachPartition { iter =>
  val hbaseConf = HBaseConfiguration.create()
  <... configure HBase ...>
  val myTable = new HTable(hbaseConf, tableName)
  iter.foreach { a =>
   var p = new Put(Bytes.toBytes(a(0)))
   p.add(Bytes.toBytes(hbaseColFamily), Bytes.toBytes("col"), Bytes.toBytes(a(1)))
    myTable.put(p)
  }
}

请注意,每个工作节点都必须能够访问HBase的服务器,并且必须有需要的jar preinstalled或通过 ADD_JARS 提供。

Note that each of worker nodes must have access to HBase servers and must have required jars preinstalled or provided via ADD_JARS.

另外请注意,因为如果打开每个分区的连接池,这将是一个好主意,分区数量减少大约到工作节点的数量(以合并功能)。它也可以在每个工作节点共享一个单一的 HTable 实例,但它不是那么简单。

Also note that since the connection pool if opened for each of partitions, it would be a good idea to reduce the number of partitions roughly to the number of worker nodes (with coalesce function). It's also possible to share a single HTable instance on each of worker nodes, but it's not so trivial.

这是可能的用单个计算机写从RDD的所有数据,即使它的数据不适合于存储器。细节在这个答案说明:<一href=\"http://stackoverflow.com/questions/21698443/spark-best-practice-for-retrieving-big-data-from-rdd-to-local-machine/21801828#21801828\">Spark:从RDD获取大数据到本地机器最佳实践

It's possible to write all data from an RDD with a single computer, even if it the data doesn't fit to memory. The details are explained in this answer: Spark: Best practice for retrieving big data from RDD to local machine

当然,这会比分散的写作速度慢,但它的简单,不会带来痛苦的序列化问题,如果数据的大小是合理的可能是最好的办法。

Of course, it would be slower than distributed writing, but it's simple, doesn't bring painful serialization issues and might be the best approach if the data size is reasonable.

这是可能的创建HBase的自定义HadoopOutputFormat或使用现有之一。我不知道是否存在的东西,适合您的需求,但谷歌应该有所帮助。

It's possible to create a custom HadoopOutputFormat for HBase or use an existing one. I'm not sure if there exists something that fits your needs, but Google should help here.

PS 顺便说一句,在地图通话不会崩溃,因为它没有得到评估:RDDS不计算,直到你调用与副作用的功能。例如,如果你叫 theData.map(....)。一直存在,它就会崩溃了。

P.S. By the way, the map call doesn't crash since it doesn't get evaluated: RDDs aren't evaluated until you invoke a function with side-effects. For example, if you called theData.map(....).persist, it would crash too.

这篇关于通过星火写入HBase的:任务不序列化的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆