解析Scala中多行记录 [英] Parsing multiline records in Scala

查看:215
本文介绍了解析Scala中多行记录的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

下面是我的RDD [字符串]

Here is my RDD[String]

M1 module1
PIP a Z A
PIP b Z B
PIP c Y n4

M2 module2
PIP a I n4
PIP b O D
PIP c O n5

和等。
基本上,我需要密钥的RDD(包含在一号线的第二个字),并且可以根据迭代后续PIP线的值。

and so on. Basically, I need a RDD of key (containing the second word on line1) and values of the subsequent PIP lines that can be iterated upon.

我试过以下

val usgPairRDD = usgRDD.map(x => (x.split("\\n")(0), x))

但是这给了我下面的输出

but this gives me the following output

(,)
(M1 module1,M1 module1)
(PIP a Z A,PIP a Z A)
(PIP b Z B,PIP b Z B)
(PIP c Y n4,PIP c Y n4)
(,)
(M2 module2,M2 module2)
(PIP a I n4,PIP a I n4)
(PIP b O D,PIP b O D)
(PIP c O n5,PIP c O n5)

相反,希望我可以将输出为

Instead, I'd like the output to be

module1, (PIP a Z A, PIP b Z B, PIP b Z B)
module2, (PIP a I n4,PIP b O D, PIP c O n5)

我是什么做错了吗?我是很新的火花的API。
谢谢

What am I doing wrong? I am quite new to Spark APIs. Thanks

喜@ zero323

Hi @zero323

usgRDD.take(10).foreach(x => println(x + "%%%%%%%%%"))

收益率...

%%%%%%%%%
M1 module1%%%%%%%%%
PIP a Z A%%%%%%%%%
PIP b Z B%%%%%%%%%
PIP c Y n4%%%%%%%%%
%%%%%%%%%
M2 module2%%%%%%%%%
PIP a I n4%%%%%%%%%
PIP b O D%%%%%%%%%
PIP c O n5%%%%%%%%%

喜@ zero323和@Daniel Darabos
我的投入是非常非常大套很多很多的文件(在跨越TBS)。下面是示例..

Hi @zero323 and @Daniel Darabos My input is very very large set of many many files (spanning in TBs). Here is sample..

BIN n4
BIN n5
BIN D
BIN E
PIT A I A
PIT B I B 
PIT C I C
PIT D O D
PIT E O E
DEF M1 module1
   PIP a Z A
   PIP b Z B
   PIP c Y n4
DEF M2 module2
   PIP a I n4
   PIP b O D
   PIP c O n5

我需要所有的垃圾桶,PIT和DEF(包括下面的PIP行)在3个不同RDDS。这里是我当前如何做这个(从讨论中,我感觉到usgRDD如下计算错误)

I need all the BINS, PIT and DEF (including PIP lines below) in 3 different RDDS. Here is how I am doing this currently (from the discussion, I sense usgRDD below is wrongly computed)

val binRDD = levelfileRDD.filter(line => line.contains("BIN"))
val pitRDD = levelfileRDD.filter(line => line.contains("PIT"))
val usgRDD = levelfileRDD.filter(line => !line.contains("BIN") && !line.contains("PIT")).flatMap(s=>s.split("DEF").map(_.trim))

我需要RDDS的3种(目前),因为我需要稍后进行验证。例如,N4,DEF M2模块2下仅当n 4为一个二进制元素存在。从RDDS,希望得出使用GraphX​​ API的关系(我显然不是来这高达一点)。这将是理想的,如果每个usgPairRDD(从usgRDD计算或以其他方式)打印以下

I need 3 types (at the moment) of RDDs because I need to perform validation later on. For example, "n4" under "DEF M2 module2" can only exist if n4 is a BIN element. From the RDDs, I hope to derive relationships using GraphX APIs (I have obviously not come upto this point). It would be ideal if each usgPairRDD (computed from usgRDD or otherwise) prints the following

module1, (a Z A, b Z B, c Y n4) %%%%%%%
module2, (a I n4, b O D, c O n5) %%%%%%%

我希望我做的感觉。道歉星火神,如果我不是。

I hope I am making sense. Apologies to the Spark Gods, if I am not.

推荐答案

默认情况下创建的Spark每行一个元素。这意味着,在您的案件中的每个记录是在多个单元S $ P $垫,由丹尼尔Darabos 说评价,可以由不同的工人来处理。

By default Spark creates a single element per line. It means that in your case every record is spread over multiple elements which, as stated by Daniel Darabos in the comments, can be processed by different workers.

因为它看起来像你的数据是比较正规和你应该能够用一个空行分隔 newAPIHadoopFile 自定义分隔符:

Since it looks like your data is relatively regular and separated by an empty line you should be able to use newAPIHadoopFile with custom delimiter:

import org.apache.spark.rdd.RDD
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.io.{LongWritable, Text}

val path: String = ???

val conf = new org.apache.hadoop.mapreduce.Job().getConfiguration
conf.set("textinputformat.record.delimiter", "\n\n")

val usgRDD = sc.newAPIHadoopFile(
    path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf)
  .map{ case (_, v) => v.toString }

val usgPairRDD: RDD[(String, Seq[String])] = usgRDD.map(_.split("\n") match {
  case Array(x, xs @ _*) => (x, xs)
})

这篇关于解析Scala中多行记录的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆