如何从pysark中的文件匹配/提取多行模式 [英] How to match/extract multi-line pattern from file in pysark

查看:19
本文介绍了如何从pysark中的文件匹配/提取多行模式的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个巨大的 rdf 三元组(主语谓词对象)文件,如下图所示.它提取粗体项目的目标并具有以下输出

I have a huge file of rdf triplets (subject predicate objects) as shown in the image below. The goals it extract the bold items and have the following output

  Item_Id | quantityAmount | quantityUnit | rank
    -----------------------------------------------
      Q31      24954         Meter       BestRank
      Q25       582         Kilometer    NormalRank  

我想提取遵循以下模式的行

I want to extract lines that follow the following pattern

  • 主体被赋予一个指针(.)

Pointer 有一个排名()
和 valuePointer (<Pointer_Q31-87RF><prop/Pointer_value/P1082><value/cebcf9>)

Pointer has a ranking (<Pointer_Q31-87RF> <rank> <BestRank> )
and valuePointer (<Pointer_Q31-87RF> <prop/Pointer_value/P1082> <value/cebcf9> )

valuePointer 依次指向其 Amount ( "24954") 和 Unit ( )数量单位> <米>)

The valuePointer in turn points to its Amount (<value/cebcf9> <quantityAmount> "24954") and Unit (<value/cebcf9> <quantityUnit> <Meter>)

通常的方法是逐行读取文件并提取上述模式中的每一种(使用 sc.textFile('inFile').flatMap(lambda x: extractFunc(x)) 然后通过不同的连接将它们组合起来这样它将提供上表.有没有更好的方法来解决这个问题?我包括下面的文件示例.

The normal way is to read the file line by line and extract each one of these above patterns (using sc.textFile('inFile').flatMap(lambda x: extractFunc(x)) and then through different joins combine them such that it would provide the above table. Is there a better way to go after this? I am including the file sample below.

<Q31> <prop/P1082> <Pointer_Q31-87RF> .
<Pointer_Q31-87RF> <rank> <BestRank> .
<Pointer_Q31-87RF> <prop/Pointer_P1082> "+24954"^^<2001/XMLSchema#decimal> .
<Pointer_Q31-87RF> <prop/Pointer_value/P1082> <value/cebcf9> .
<value/cebcf9> <syntax-ns#type> <QuantityValue> .
<value/cebcf9> <quantityAmount> 24954
<value/cebcf9> <quantityUnit> <Meter> .
<Q25> <prop/P1082> <Pointer_Q25-8E6C> .
<Pointer_Q25-8E6C> <rank> <NormalRank> .
<Pointer_Q25-8E6C> <prop/Pointer_P1082> "+24954"
<Pointer_Q25-8E6C> <prop/Pointer_value/P1082> <value/cebcf9> .
<value/cebcf9> <syntax-ns#type> <QuantityValue> .
<value/cebcf9> <quantityAmount> "582" .
<value/cebcf9> <quantityUnit> <Kilometer> .

推荐答案

如果可以使用 \n 作为分隔符来创建 RDD 元素,那么解析数据块.下面我创建了一个函数(基于您的示例)来使用正则表达式解析块文本并将 cols 信息检索到 Row 对象中(您可能需要调整正则表达式以反映实际数据模式,即区分大小写、额外的空格等):

If you can use \n<Q as the delimiter to create RDD elements, then it becomes a pure python task to parse the data blocks. Below I create a function (based on your sample) to parse the block texts using regexes and retrieve cols information into Row object (you might have to adjust the regexes to reflect the actual data patterns, i.e. case sensitivity, extra white spaces etc.):

  • 对于每个 RDD 元素,用 '\n' 分割(行模式)
  • 然后对于每一行,用 > 分割;y
  • 我们可以通过检查y[1]y[2]rankquantityUnitcode>quantityAmount 通过检查 y[1]Item_id 通过检查 y[0].
  • 通过迭代所有必填字段来创建 Row 对象,将缺失字段的值设置为 None

  • For each RDD element, split by '\n' (line-mode)
  • and then for each line, split by > < into a list y
  • we can find rank, quantityUnit by checking y[1] and y[2], quantityAmount by checking y[1] and Item_id by checking y[0].
  • Create Row object by iterating all required fields, set value to None for missing fields

from pyspark.sql import Row
import re

# skipped the code to initialize SparkSession

# field names to retrieve
cols = ['Item_Id', 'quantityAmount', 'quantityUnit', 'rank']

def parse_rdd_element(x, cols):
    try:
        row = {}
        for e in x.split('\n'):
            y = e.split('> <')
            if len(y) < 2:
                continue
            if y[1] in ['rank', 'quantityUnit']:
                row[y[1]] = y[2].split(">")[0]
            else:
                m = re.match(r'^quantityAmount>\D*(\d+)', y[1])
                if m:
                    row['quantityAmount'] = m.group(1)
                    continue
                m = re.match('^(?:<Q)?(\d+)', y[0])
                if m:
                    row['Item_Id'] = 'Q' + m.group(1)
        # if row is not EMPTY, set None to missing field
        return Row(**dict([ (k, row[k]) if k in row else (k, None) for k in cols])) if row else None
    except:
        return None

使用 newAPIHadoopFile() 设置 RDD,以 \n 作为分隔符:

setup RDD using newAPIHadoopFile() with \n<Q as delimiter:

rdd = spark.sparkContext.newAPIHadoopFile(
    '/path/to/file',
    'org.apache.hadoop.mapreduce.lib.input.TextInputFormat',
    'org.apache.hadoop.io.LongWritable',
    'org.apache.hadoop.io.Text',
    conf={'textinputformat.record.delimiter': '\n<Q'}
)

使用map函数将RDD元素解析为Row对象

Use the map function to parse the RDD element into Row object

rdd.map(lambda x: parse_rdd_element(x[1], cols)).collect()
#[Row(Item_Id=u'Q31', quantityAmount=u'24954', quantityUnit=u'Meter', rank=u'BestRank'),
# Row(Item_Id=u'Q25', quantityAmount=u'582', quantityUnit=u'Kilometer', rank=u'NormalRank')]

将上面的RDD转换成dataframe

Convert the above RDD to dataframe

df = rdd.map(lambda x: parse_rdd_element(x[1], cols)).filter(bool).toDF()
df.show()
+-------+--------------+------------+----------+
|Item_Id|quantityAmount|quantityUnit|      rank|
+-------+--------------+------------+----------+
|    Q31|         24954|       Meter|  BestRank|
|    Q25|           582|   Kilometer|NormalRank|
+-------+--------------+------------+----------+

一些注意事项:

  • 为了获得更好的性能,在将它们传递给 parse_rdd_element() 函数之前,使用 re.compile() 预编译所有正则表达式模式.

  • For better performance, pre-compile all regex patterns using re.compile() before passing them to the parse_rdd_element() function.

如果 \n 之间可能有空格/制表符,多个块将被添加到同一个 RDD 元素中,只需拆分RDD 元素由 \n\s+map() 替换为 flatMap().

In case there could be spaces/tabs between \n and <Q, multiple blocks will be added into the same RDD element, just split the RDD element by \n\s+<Q and replace map() with flatMap().

参考:从多行创建火花数据结构记录

这篇关于如何从pysark中的文件匹配/提取多行模式的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆