使用自定义Hadoop输入格式处理Spark中的二进制文件 [英] Using Custom Hadoop input format for processing binary file in Spark

查看:296
本文介绍了使用自定义Hadoop输入格式处理Spark中的二进制文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我开发了一个处理二进制文件的基于hadoop的解决方案。这使用了经典的hadoop MR技术。二进制文件大约为10GB,并分为73个HDFS块,业务逻辑写为映射进程在这73个块中的每一块上运行。我们在Hadoop中开发了一个customInputFormat和CustomRecordReader,它将map(intWritable)和value(BytesWritable)返回给map函数。该值不过是HDFS块的内容(bianry数据)。业务逻辑知道如何读取这些数据。



现在,我想在spark中移植此代码。我是火花的开端,可以在火花中运行简单的例子(wordcount,pi例子)。但是,不能直接在spark中处理binaryFiles。我看到这个用例有两个解决方案。首先,避免使用自定义输入格式和记录阅读器。在spark中找到一个方法(方法)为这些HDFS块创建一个RDD,使用一个像将HDFS块内容提供给业务逻辑的方法。如果这是不可能的,我想重新使用自定义输入格式和自定义阅读器使用一些方法,如HadoopAPI,HadoopRDD等。我的问题: - 我不知道第一种方法是否可能。如果可能的话,任何人都可以提供一些包含示例的指针吗?我正在尝试第二种方法,但很不成功。这里是我使用的代码片段

  package org {
object Driver {
def myFunc(key:IntWritable,content:BytesWritable):Int = {
println(key.get())
println(content.getSize())
return 1
$ b $ def b main(args:Array [String]){
//创建一个spark上下文
val conf = new SparkConf()。setAppName(Dummy)。setMaster( spark://< host>:7077)
val sc = new SparkContext(conf)
println(sc)
val rd = sc.newAPIHadoopFile(hdfs:/// user /hadoop/myBin.dat,classOf [RandomAccessInputFormat],classOf [IntWritable],classOf [BytesWritable])
val count = rd.map(x => myFunc(x._1,x._2))。减少(_ + _)
println(计数为*****************************+ count)





$ b

}

请注意,main方法中的print语句打印73,这是块的数量因为map函数中的打印语句打印0。



有人可以告诉我在这里做错了吗?我认为我没有正确使用API​​,但未能找到一些文档/使用示例。

解决方案

我在这个问题上取得了一些进展。我现在正在使用下面的函数来完成这项工作:

$ $ $ $ $ $ $ $ $ $ $ $ var hRDD = new NewHadoopRDD(sc,classOf [RandomAccessInputFormat],
classOf [IntWritable],
classOf [BytesWritable],
job.getConfiguration()


val count = hRDD.mapPartitionsWithInputSplit {(split,iter) =>但是,另一个错误的详细信息是:myfuncPart(split,iter)}。collect()

我在这里发布了
访问HDFS文件时的问题内部火花映射功能

  15/10/30 11:11:39 WARN scheduler.TaskSetManager:Lost task 0.0在阶段0.0(TID 0,40.221.94.235):java.io.IOException:No FileSystem for scheme:spark 
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2584)
在org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
在org.apache.hadoop.fs.FileSystem.access $ 200(FileSystem.java:91)
在org .apache.hadoop.fs.FileSystem $ Cache.getInternal(FileSystem.java:2630)


I have developed a hadoop based solution that process a binary file. This uses classic hadoop MR technique. The binary file is about 10GB and divided into 73 HDFS blocks, and the business logic written as map process operates on each of these 73 blocks. We have developed a customInputFormat and CustomRecordReader in Hadoop that returns key (intWritable) and value (BytesWritable) to the map function. The value is nothing but the contents of a HDFS block(bianry data). The business logic knows how to read this data.

Now, I would like to port this code in spark. I am a starter in spark and could run simple examples (wordcount, pi example) in spark. However, could not straightforward example to process binaryFiles in spark. I see there are two solutions for this use case. In the first, avoid using custom input format and record reader. Find a method (approach) in spark the creates a RDD for those HDFS blocks, use a map like method that feeds HDFS block content to the business logic. If this is not possible, I would like to re-use the custom input format and custom reader using some methods such as HadoopAPI, HadoopRDD etc. My problem:- I do not know whether the first approach is possible or not. If possible, can anyone please provide some pointers that contains examples? I was trying second approach but highly unsuccessful. Here is the code snippet I used

package org {  
object Driver {      
  def myFunc(key : IntWritable, content : BytesWritable):Int = {      
    println(key.get())
    println(content.getSize())
    return 1       
  }    
  def main(args: Array[String]) {       
    // create a spark context
    val conf = new SparkConf().setAppName("Dummy").setMaster("spark://<host>:7077")
    val sc = new SparkContext(conf)    
    println(sc)   
    val rd = sc.newAPIHadoopFile("hdfs:///user/hadoop/myBin.dat", classOf[RandomAccessInputFormat], classOf[IntWritable], classOf[BytesWritable])  
    val count = rd.map (x => myFunc(x._1, x._2)).reduce(_+_)
    println("The count is *****************************"+count)
  }
} 

}

Please note that the print statement in the main method prints 73 which is the number of blocks whereas the print statements inside the map function prints 0.

Can someone tell where I am doing wrong here? I think I am not using API the right way but failed to find some documentation/usage examples.

解决方案

I have made some progress in this issue. I am now using the below function which does the job

var hRDD = new NewHadoopRDD(sc, classOf[RandomAccessInputFormat], 
        classOf[IntWritable], 
        classOf[BytesWritable],
        job.getConfiguration() 
        )    

val count = hRDD.mapPartitionsWithInputSplit{ (split, iter) => myfuncPart(split, iter)}.collect()

However, landed up with another error the details of which i have posted here Issue in accessing HDFS file inside spark map function

15/10/30 11:11:39 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 40.221.94.235): java.io.IOException: No FileSystem for scheme: spark
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2584)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)

这篇关于使用自定义Hadoop输入格式处理Spark中的二进制文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆