Spark中如何处理多行输入记录 [英] How to process multi line input records in Spark
本文介绍了Spark中如何处理多行输入记录的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我将每条记录分布在输入文件(非常大的文件)中的多行中.
I have each record spread across multiple lines in the input file(Very huge file).
例如:
Id: 2
ASIN: 0738700123
title: Test tile for this product
group: Book
salesrank: 168501
similar: 5 0738700811 1567184912 1567182813 0738700514 0738700915
categories: 2
|Books[283155]|Subjects[1000]|Religion & Spirituality[22]|Earth-Based Religions[12472]|Wicca[12484]
|Books[283155]|Subjects[1000]|Religion & Spirituality[22]|Earth-Based Religions[12472]|Witchcraft[12486]
reviews: total: 12 downloaded: 12 avg rating: 4.5
2001-12-16 cutomer: A11NCO6YTE4BTJ rating: 5 votes: 5 helpful: 4
2002-1-7 cutomer: A9CQ3PLRNIR83 rating: 4 votes: 5 helpful: 5
如何识别和处理spark中的每条多行记录?
How to identify and process each multi line record in spark?
推荐答案
我通过实现自定义输入格式和记录阅读器来做到这一点.
I have done this by implementing custom input format and record reader.
public class ParagraphInputFormat extends TextInputFormat {
@Override
public RecordReader<LongWritable, Text> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) {
return new ParagraphRecordReader();
}
}
public class ParagraphRecordReader extends RecordReader<LongWritable, Text> {
private long end;
private boolean stillInChunk = true;
private LongWritable key = new LongWritable();
private Text value = new Text();
private FSDataInputStream fsin;
private DataOutputBuffer buffer = new DataOutputBuffer();
private byte[] endTag = "\n\r\n".getBytes();
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
FileSplit split = (FileSplit) inputSplit;
Configuration conf = taskAttemptContext.getConfiguration();
Path path = split.getPath();
FileSystem fs = path.getFileSystem(conf);
fsin = fs.open(path);
long start = split.getStart();
end = split.getStart() + split.getLength();
fsin.seek(start);
if (start != 0) {
readUntilMatch(endTag, false);
}
}
public boolean nextKeyValue() throws IOException {
if (!stillInChunk) return false;
boolean status = readUntilMatch(endTag, true);
value = new Text();
value.set(buffer.getData(), 0, buffer.getLength());
key = new LongWritable(fsin.getPos());
buffer.reset();
if (!status) {
stillInChunk = false;
}
return true;
}
public LongWritable getCurrentKey() throws IOException, InterruptedException {
return key;
}
public Text getCurrentValue() throws IOException, InterruptedException {
return value;
}
public float getProgress() throws IOException, InterruptedException {
return 0;
}
public void close() throws IOException {
fsin.close();
}
private boolean readUntilMatch(byte[] match, boolean withinBlock) throws IOException {
int i = 0;
while (true) {
int b = fsin.read();
if (b == -1) return false;
if (withinBlock) buffer.write(b);
if (b == match[i]) {
i++;
if (i >= match.length) {
return fsin.getPos() < end;
}
} else i = 0;
}
}
}
endTag 标识每条记录的结尾.
endTag identifies the end of each record.
这篇关于Spark中如何处理多行输入记录的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文