在Hadoop mapreduce中进行XML解析 [英] XML parsing in Hadoop mapreduce

查看:138
本文介绍了在Hadoop mapreduce中进行XML解析的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经编写了用于将XML解析为CSV的mapreduce代码。
但运行作业后,我的输出目录中找不到任何输出。
我不确定该文件是否未读取或未写入。我是Hadoop mapreduce的新手。



你可以请这个帮忙吗?

这是我的全部代码。

  public class XmlParser11 
{
public static String outvalue;
public static class XmlInputFormat1 extends TextInputFormat {
public static final String START_TAG_KEY =xmlinput.start;
public static final String END_TAG_KEY =xmlinput.end;

public RecordReader< LongWritable,Text> createRecordReader(
InputSplit split,TaskAttemptContext context){
return new XmlRecordReader();
}
public static class XmlRecordReader extends
RecordReader< LongWritable,Text> {
private byte [] startTag;
私人字节[] endTag;
私人长途入门;
私人长期结束;
private FSDataInputStream fsin;
private DataOutputBuffer buffer = new DataOutputBuffer();

private LongWritable key = new LongWritable();
私人文本值= new Text();
@Override
public void initialize(InputSplit split,TaskAttemptContext context)
throws IOException,InterruptedException {
System.out.println(B);
配置conf = context.getConfiguration();
startTag = conf.get(START_TAG_KEY).getBytes(utf-8);
endTag = conf.get(END_TAG_KEY).getBytes(utf-8);
FileSplit fileSplit =(FileSplit)split;

//打开文件并寻找分割的开始部分
start = fileSplit.getStart();
end = start + fileSplit.getLength();
路径文件= fileSplit.getPath();
FileSystem fs = file.getFileSystem(conf);
fsin = fs.open(fileSplit.getPath());
fsin.seek(start);

$ b @Override
public boolean nextKeyValue()throws IOException,
InterruptedException {
System.out.println(C);
if(fsin.getPos()< end){
if(readUntilMatch(startTag,false)){
try {
buffer.write(startTag);
if(readUntilMatch(endTag,true)){
key.set(fsin.getPos());
value.set(buffer.getData(),0,
buffer.getLength());
返回true;
}
} finally {
buffer.reset();
}
}
}
返回false;
}
@Override
public LongWritable getCurrentKey()抛出IOException,
InterruptedException {
return key;

$ b @Override
public Text getCurrentValue()throws IOException,
InterruptedException {

返回值;
}
@Override
public void close()throws IOException {
fsin.close();

@Override
public float getProgress()throws IOException {

return(fsin.getPos() - start)/(float)(end - start) ;


private boolean readUntilMatch(byte [] match,boolean withinBlock)
throws IOException {
int i = 0;

while(true){
int b = fsin.read();
//文件结尾:
if(b == -1)
return false;
//保存到缓冲区:
if(withinBlock)
buffer.write(b);
//检查我们是否匹配:
if(b == match [i]){
i ++;
if(i> = match.length)
return true;
} else
i = 0;
//查看我们是否已经通过了停止点:$ b​​ $ b if(!withinBlock&& i == 0&& fsin.getPos()> = end)
返回false;





$ b public static class Map extends Mapper< Text,Text,
Text,Text> ; {
@SuppressWarnings(unchecked)
@Override
protected void map(Text key,Text value,
@SuppressWarnings(rawtypes)Mapper.Context context)
抛出
IOException,InterruptedException {

String document = value.toString();
System.out.println('+ document +');

XMLInputFactory xmlif = XMLInputFactory.newInstance();
XMLStreamReader xmlr;

尝试{
xmlr = xmlif.createXMLStreamReader(new FileReader(document));
while(xmlr.hasNext())
{
printEvent(xmlr);
xmlr.next();
}
xmlr.close();
context.write(null,new Text(outvalue));
} catch(XMLStreamException e){

e.printStackTrace();


private void printEvent(XMLStreamReader xmlr){

switch(xmlr.getEventType()){

case XMLStreamConstants。 START_ELEMENT:
print(xmlr);
休息;

case XMLStreamConstants.CHARACTERS:
int start = xmlr.getTextStart();
int length = xmlr.getTextLength();
System.out.print(new String(xmlr.getTextCharacters(),
start,
length));
休息;
}

private String print(XMLStreamReader xmlr){
if(xmlr.hasName()){
for(int i = 0; i< xmlr .getAttributeCount(); i ++){
String localName = xmlr.getLocalName();
if(localName!= null);
String attName = xmlr.getAttributeLocalName(i);
字符串值= xmlr.getAttributeValue(i);
System.out.print(,);
字符串outvalue = localName +:+ attName + - + value;
System.out.print(outvalue);
}
}返回outvalue;


$ b public static void main(String [] args)throws Exception
{
Configuration conf = new Configuration();

conf.set(xmlinput.start,< FICHER>);
conf.set(xmlinput.end,< / FICHER>);
工作职位=新职位(conf);
job.setJarByClass(XmlParser11.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

job.setMapperClass(XmlParser11.Map.class);
job.setNumReduceTasks(0);

job.setInputFormatClass(XmlInputFormat1.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job,new Path(args [0]));
FileOutputFormat.setOutputPath(job,new Path(args [1]));

job.waitForCompletion(true);
}

这里是putty的输出

 
文件系统计数器
FILE:读取的字节数= 0 strong text>
FILE:写入的字节数= 120678
FILE:Number读取操作的数量= 0
FILE:大量读取操作的数量= 0
FILE:写入操作的数量= 0
HDFS:读取的字节数= 1762671
HDFS:写入的字节数= 0
HDFS:读取操作数量= 5
HDFS:大量读取操作数量= 0
HDFS:写入操作数量= 2
作业计数器
启动地图任务= 1
机架局部地图任务= 1
所有地图在占用插槽中花费的总时间(ms)= 15960
占用插槽中所有减少花费的总时间(ms )= 0
所有地图任务花费的总时间(ms)= 3990
所有地图任务花费的总计vcore-seconds = 3990
至所有地图任务占用的兆兆字节秒= 16343040
Map-Reduce Framework
地图输入记录= 0
地图输出记录= 0
输入分割字节= 124
溢出记录= 0
失败Shuffles = 0
合并映射输出= 0
GC时间流逝(ms)= 0
CPU时间花费(ms)= 1390
物理内存(字节)快照= 513449984
虚拟内存(字节)快照= 4122763264
总承诺堆使用率(字节)= 2058354688
文件输入格式计数器
字节读= 1762547
文件输出格式计数器
Bytes Written = 0


解决方案

在开始标签。

  conf.set(xmlinput.start,< FICHER);`
conf.set(xmlinput.end,< / FICHER>);

希望这可以帮到你。


I have written a mapreduce code for parsing XML as CSV. But I don't find any output in my output directory after running the job. I am not sure if the file is not read or not written. I am new to Hadoop mapreduce.

Can you please help with this?

This my entire code.

public class XmlParser11
{
        public static String outvalue;
        public static class XmlInputFormat1 extends TextInputFormat {
        public static final String START_TAG_KEY = "xmlinput.start";
        public static final String END_TAG_KEY = "xmlinput.end";

        public RecordReader<LongWritable, Text> createRecordReader(
                InputSplit split, TaskAttemptContext context) {
            return new XmlRecordReader();
        }
        public static class XmlRecordReader extends
                RecordReader<LongWritable, Text> {
            private byte[] startTag;
            private byte[] endTag;
            private long start;
            private long end;
            private FSDataInputStream fsin;
            private DataOutputBuffer buffer = new DataOutputBuffer();

            private LongWritable key = new LongWritable();
            private Text value = new Text();
                @Override
            public void initialize(InputSplit split, TaskAttemptContext context)
                    throws IOException, InterruptedException {
                    System.out.println("B");
                Configuration conf = context.getConfiguration();
                startTag = conf.get(START_TAG_KEY).getBytes("utf-8");
                endTag = conf.get(END_TAG_KEY).getBytes("utf-8");
                FileSplit fileSplit = (FileSplit) split;

                // open the file and seek to the start of the split
                start = fileSplit.getStart();
                end = start + fileSplit.getLength();
                Path file = fileSplit.getPath();
                FileSystem fs = file.getFileSystem(conf);
                fsin = fs.open(fileSplit.getPath());
                fsin.seek(start);

            }
        @Override
            public boolean nextKeyValue() throws IOException,
                    InterruptedException {
            System.out.println("C");
                if (fsin.getPos() < end) {
                    if (readUntilMatch(startTag, false)) {
                        try {
                            buffer.write(startTag);
                            if (readUntilMatch(endTag, true)) {
                                key.set(fsin.getPos());
                                value.set(buffer.getData(), 0,
                                        buffer.getLength());
                                return true;
                            }
                        } finally {
                            buffer.reset();
                        }
                    }
                }
                return false;
            }
        @Override
           public LongWritable getCurrentKey() throws IOException,
                    InterruptedException {
                return key;
            }

        @Override
            public Text getCurrentValue() throws IOException,
                    InterruptedException {

                return value;
            }
        @Override
            public void close() throws IOException {
                fsin.close();
            }
        @Override
            public float getProgress() throws IOException {

                return (fsin.getPos() - start) / (float) (end - start);
            }

            private boolean readUntilMatch(byte[] match, boolean withinBlock)
                    throws IOException {
                int i = 0;

                while (true) {
                    int b = fsin.read();
                    // end of file:
                    if (b == -1)
                        return false;
                    // save to buffer:
                    if (withinBlock)
                        buffer.write(b);
                    // check if we're matching:
                    if (b == match[i]) {
                        i++;
                        if (i >= match.length)
                            return true;
                    } else
                        i = 0;
                    // see if we've passed the stop point:
                    if (!withinBlock && i == 0 && fsin.getPos() >= end)
                        return false;
                }
            }
        }
    }


        public static class Map extends Mapper<Text, Text,
        Text, Text> {
            @SuppressWarnings("unchecked")
            @Override
            protected void map(Text key, Text value,
                     @SuppressWarnings("rawtypes") Mapper.Context context)
                             throws
                             IOException, InterruptedException {

                String document = value.toString();
                System.out.println("‘" + document + "‘");

                XMLInputFactory xmlif = XMLInputFactory.newInstance();
                XMLStreamReader xmlr;

            try {
                xmlr = xmlif.createXMLStreamReader(new FileReader(document));
                while(xmlr.hasNext())
                {
                   printEvent(xmlr);
                   xmlr.next();
                 }
                   xmlr.close();
                   context.write(null,new Text (outvalue));
            } catch (XMLStreamException e) {

                e.printStackTrace();
            }
            }
                   private void printEvent(XMLStreamReader xmlr) {

                       switch (xmlr.getEventType()) {

                       case XMLStreamConstants.START_ELEMENT:
                          print(xmlr);
                           break;

                       case XMLStreamConstants.CHARACTERS:
                           int start = xmlr.getTextStart();
                           int length = xmlr.getTextLength();
                           System.out.print(new String(xmlr.getTextCharacters(),
                                      start,
                                      length));
                           break;
                       }
                   }
                   private  String print(XMLStreamReader xmlr){
                        if(xmlr.hasName()){
                          for (int i=0; i < xmlr.getAttributeCount(); i++) {
                              String localName = xmlr.getLocalName();
                              if (localName != null);
                              String attName = xmlr.getAttributeLocalName(i);
                                String value = xmlr.getAttributeValue(i);
                                System.out.print(",");
                                String outvalue = localName +":"+ attName +"-"+value;
                                System.out.print(outvalue);
                          }
                        } return outvalue;
                      }

  }
        public static void main(String[] args) throws Exception
        {
                Configuration conf = new Configuration();

                conf.set("xmlinput.start", "<FICHER>");
                conf.set("xmlinput.end", "</FICHER>");
                Job job = new Job(conf);
                job.setJarByClass(XmlParser11.class);
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(Text.class);

                job.setMapperClass(XmlParser11.Map.class);
                job.setNumReduceTasks(0);

                job.setInputFormatClass(XmlInputFormat1.class);
                job.setOutputFormatClass(TextOutputFormat.class);
                FileInputFormat.addInputPath(job, new Path(args[0]));
                FileOutputFormat.setOutputPath(job, new Path(args[1]));

                job.waitForCompletion(true);
        }

Here is the out put of putty

File System Counters 
        FILE: Number of bytes read=0 strong text>                 
        FILE: Number of bytes written=120678
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=1762671
        HDFS: Number of bytes written=0
        HDFS: Number of read operations=5
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=2
Job Counters
        Launched map tasks=1
        Rack-local map tasks=1
        Total time spent by all maps in occupied slots (ms)=15960
        Total time spent by all reduces in occupied slots (ms)=0
        Total time spent by all map tasks (ms)=3990
        Total vcore-seconds taken by all map tasks=3990
        Total megabyte-seconds taken by all map tasks=16343040
Map-Reduce Framework
        Map input records=0
        Map output records=0
        Input split bytes=124
        Spilled Records=0
        Failed Shuffles=0
        Merged Map outputs=0
        GC time elapsed (ms)=0
        CPU time spent (ms)=1390
        Physical memory (bytes) snapshot=513449984
        Virtual memory (bytes) snapshot=4122763264
        Total committed heap usage (bytes)=2058354688
File Input Format Counters
        Bytes Read=1762547
File Output Format Counters
        Bytes Written=0

解决方案

i think problem is in begining tag.

 conf.set("xmlinput.start", "<FICHER");`
 conf.set("xmlinput.end", "</FICHER>");

hope this helps you.

这篇关于在Hadoop mapreduce中进行XML解析的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆