SPARK在文本文件中合并相邻记录 [英] SPARK Combining Neighbouring Records in a text file

查看:46
本文介绍了SPARK在文本文件中合并相邻记录的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

SPARK的新手.

我需要读取非常大的输入数据集,但我担心输入文件的格式不适合在SPARK上读取.格式如下:

  RECORD,record1identifierSUBRECORD,值1SUBRECORD2,值2记录,记录2标识符记录,记录3标识符SUBRECORD,值3SUBRECORD,值4SUBRECORD,值5... 

理想情况下,我想做的是将文件的行拖入SPARK RDD,然后将其转换为每个记录仅包含一项的RDD(子记录成为其关联记录项的一部分)./p>

因此,如果读懂了上面的示例,我想结束一个包含3个对象的RDD:[record1,record2,record3].每个对象都将包含其RECORD和任何关联的SUBRECORD条目中的数据.

不幸的是,该数据中唯一将子记录链接到记录的是它们在文件中位于记录下方的位置.这意味着该问题是顺序性的,可能不适合使用SPARK.

是否有使用SPARK做到这一点的明智方法(如果可以,那将是什么,可以使用什么转换将子记录折叠为相关记录)?还是这是一个需要解决的问题?

解决方案

识别记录和子记录的顺序有些不可靠.此方法假定每个新的记录"都可以通过某种方式识别.

 导入org.apache.spark.sql.types.LongType导入org.apache.spark.sql.expressions.Windowval df = Seq(("RECORD","record1identifier"),("SUBRECORD","value1"),("SUBRECORD2","value2"),("RECORD","record2identifier"),("RECORD","record3identifier"),("SUBRECORD","value3"),("SUBRECORD","value4"),("SUBRECORD","value5")).toDS().rdd.zipWithIndex.map(r =>(r._1._1,r._1._2,r._2)).toDF("record","value","id")val win = Window.orderBy("id")val recids = df.withColumn("newrec",($"record" ==="RECORD").cast(LongType)).withColumn("recid",sum($"newrec").over(win)).select($"recid",$"record",$"value")val recs = recids.where($"record" ==="RECORD").select($"recid",$"value" .as("recname"))val subrecs = recids.where($"record" =!="RECORD").select($"recid",$"value" .as("attr"))recs.join(subrecs,Seq("recid"),"left").groupBy("recname").agg(collect_list("attr").as("attrs")).show() 

此代码段将首先按 zipWithIndex 顺序标识每一行,然后添加一个布尔列,该列在每次标识记录"时为true,否则为false.然后,我们将该布尔值强制转换为长整数,然后可以进行求和,这具有基本的优点,即基本上标记了每条记录及其带有共同标识符的子记录.

在这种情况下,我们拆分以获取记录标识符,仅重新连接子记录,按记录ID分组,然后将子记录值收集到列表中.

以上代码片段的结果如下:

  + ----------------- + -------------------- +|改名|attrs |+ ----------------- + -------------------- +| record1identifier |[值1,值2] || record2identifier |[] || record3identifier | [value3,value4,... |+ ----------------- + -------------------- + 

very new to SPARK.

I need to read a very large input dataset, but I fear the format of the input files would not be amenable to read on SPARK. Format is as follows:

RECORD,record1identifier
SUBRECORD,value1
SUBRECORD2,value2
RECORD,record2identifier
RECORD,record3identifier
SUBRECORD,value3
SUBRECORD,value4
SUBRECORD,value5
...

Ideally what I would like to do is pull the lines of the file into a SPARK RDD, and then transform it into an RDD that only has one item per record (with the subrecords becoming part of their associated record item).

So if the example above was read in, I'd want to wind up with an RDD containing 3 objects: [record1,record2,record3]. Each object would contain the data from their RECORD and any associated SUBRECORD entries.

The unfortunate bit is that the only thing in this data that links subrecords to records is their position in the file, underneath their record. That means the problem is sequentially dependent and might not lend itself to SPARK.

Is there a sensible way to do this using SPARK (and if so, what could that be, what transform could be used to collapse the subrecords into their associated record)? Or is this the sort of problem one needs to do off spark?

解决方案

There is a somewhat hackish way to identify the sequence of records and sub-records. This method assumes that each new "record" is identifiable in some way.

import org.apache.spark.sql.types.LongType
import org.apache.spark.sql.expressions.Window

val df = Seq(
("RECORD","record1identifier"),
("SUBRECORD","value1"),
("SUBRECORD2","value2"),
("RECORD","record2identifier"),
("RECORD","record3identifier"),
("SUBRECORD","value3"),
("SUBRECORD","value4"),
("SUBRECORD","value5")
).toDS().rdd.zipWithIndex.map(r => (r._1._1, r._1._2, r._2)).toDF("record", "value", "id")

val win = Window.orderBy("id")

val recids = df.withColumn("newrec", ($"record" === "RECORD").cast(LongType))
  .withColumn("recid", sum($"newrec").over(win))
  .select($"recid", $"record", $"value")

val recs = recids.where($"record"==="RECORD").select($"recid", $"value".as("recname"))
val subrecs = recids.where($"record" =!= "RECORD").select($"recid", $"value".as("attr"))

recs.join(subrecs, Seq("recid"), "left").groupBy("recname").agg(collect_list("attr").as("attrs")).show()

This snippet will first zipWithIndex to identify each row, in order, then add a boolean column that is true every time a "record" is identified, and false otherwise. We then cast that boolean to a long, and then can do a running sum, which has the neat side-effect of essentially labeling every record and it's sub-records with a common identifier.

In this particular case, we then split to get the record identifiers, re-join only the sub-records, group by the record ids, and collect the sub-record values to a list.

The above snippet results in this:

+-----------------+--------------------+
|          recname|               attrs|
+-----------------+--------------------+
|record1identifier|    [value1, value2]|
|record2identifier|                  []|
|record3identifier|[value3, value4, ...|
+-----------------+--------------------+

这篇关于SPARK在文本文件中合并相邻记录的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆