如何从pysark中的文件中匹配/提取多行模式 [英] How to match/extract multi-line pattern from file in pysark

查看:68
本文介绍了如何从pysark中的文件中匹配/提取多行模式的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个很大的rdf三元组文件(主题谓词对象),如下图所示.目标提取粗体项目并具有以下输出

I have a huge file of rdf triplets (subject predicate objects) as shown in the image below. The goals it extract the bold items and have the following output

  Item_Id | quantityAmount | quantityUnit | rank
    -----------------------------------------------
      Q31      24954         Meter       BestRank
      Q25       582         Kilometer    NormalRank  

我要提取遵循以下模式的行

I want to extract lines that follow the following pattern

  • 主题提供一个指针(< Q31>< prop/P1082>< Pointer_Q31-87RF>.)

指针的排名(< Pointer_Q31-87RF>< rank>< BestRank> )
和valuePointer(< Pointer_Q31-87RF>< prop/Pointer_value/P1082>< value/cebcf9> )

valuePointer依次指向其金额(< value/cebcf9>< quantityAmount>"24954" )和单位(< value/cebcf9><QuantityUnit>< Meter> )

The valuePointer in turn points to its Amount (<value/cebcf9> <quantityAmount> "24954") and Unit (<value/cebcf9> <quantityUnit> <Meter>)

通常的方法是逐行读取文件并提取上述每种模式(使用sc.textFile('inFile').flatMap(lambda x:extractFunc(x))),然后通过不同的联接将它们组合在一起这样就可以提供上表.有没有更好的方法去做呢?我将下面的文件示例包括在内.

The normal way is to read the file line by line and extract each one of these above patterns (using sc.textFile('inFile').flatMap(lambda x: extractFunc(x)) and then through different joins combine them such that it would provide the above table. Is there a better way to go after this? I am including the file sample below.

<Q31> <prop/P1082> <Pointer_Q31-87RF> .
<Pointer_Q31-87RF> <rank> <BestRank> .
<Pointer_Q31-87RF> <prop/Pointer_P1082> "+24954"^^<2001/XMLSchema#decimal> .
<Pointer_Q31-87RF> <prop/Pointer_value/P1082> <value/cebcf9> .
<value/cebcf9> <syntax-ns#type> <QuantityValue> .
<value/cebcf9> <quantityAmount> 24954
<value/cebcf9> <quantityUnit> <Meter> .
<Q25> <prop/P1082> <Pointer_Q25-8E6C> .
<Pointer_Q25-8E6C> <rank> <NormalRank> .
<Pointer_Q25-8E6C> <prop/Pointer_P1082> "+24954"
<Pointer_Q25-8E6C> <prop/Pointer_value/P1082> <value/cebcf9> .
<value/cebcf9> <syntax-ns#type> <QuantityValue> .
<value/cebcf9> <quantityAmount> "582" .
<value/cebcf9> <quantityUnit> <Kilometer> .

推荐答案

如果可以使用 \ n< Q 作为创建RDD元素的定界符,那么解析该元素将成为纯Python任务数据块.下面,我创建一个函数(基于您的示例),使用正则表达式解析块文本并将cols信息检索到Row对象中(您可能必须调整正则表达式以反映实际的数据模式,即区分大小写,多余的空格等).:

If you can use \n<Q as the delimiter to create RDD elements, then it becomes a pure python task to parse the data blocks. Below I create a function (based on your sample) to parse the block texts using regexes and retrieve cols information into Row object (you might have to adjust the regexes to reflect the actual data patterns, i.e. case sensitivity, extra white spaces etc.):

  • 对于每个RDD元素,均以'\ n'(行模式)分隔
  • ,然后对于每行,用>分割;< 进入列表 y
  • 我们可以通过检查 y [1] y [2] ,<通过检查 y [1] 对code> quantityAmount 和通过对 y [0] Item_id 进行检查.
  • 通过迭代所有必填字段来创建Row对象,将缺少的字段的值设置为无"

  • For each RDD element, split by '\n' (line-mode)
  • and then for each line, split by > < into a list y
  • we can find rank, quantityUnit by checking y[1] and y[2], quantityAmount by checking y[1] and Item_id by checking y[0].
  • Create Row object by iterating all required fields, set value to None for missing fields

from pyspark.sql import Row
import re

# skipped the code to initialize SparkSession

# field names to retrieve
cols = ['Item_Id', 'quantityAmount', 'quantityUnit', 'rank']

def parse_rdd_element(x, cols):
    try:
        row = {}
        for e in x.split('\n'):
            y = e.split('> <')
            if len(y) < 2:
                continue
            if y[1] in ['rank', 'quantityUnit']:
                row[y[1]] = y[2].split(">")[0]
            else:
                m = re.match(r'^quantityAmount>\D*(\d+)', y[1])
                if m:
                    row['quantityAmount'] = m.group(1)
                    continue
                m = re.match('^(?:<Q)?(\d+)', y[0])
                if m:
                    row['Item_Id'] = 'Q' + m.group(1)
        # if row is not EMPTY, set None to missing field
        return Row(**dict([ (k, row[k]) if k in row else (k, None) for k in cols])) if row else None
    except:
        return None

使用newAPIHadoopFile()以 \ n< Q 作为分隔符来设置RDD:

setup RDD using newAPIHadoopFile() with \n<Q as delimiter:

rdd = spark.sparkContext.newAPIHadoopFile(
    '/path/to/file',
    'org.apache.hadoop.mapreduce.lib.input.TextInputFormat',
    'org.apache.hadoop.io.LongWritable',
    'org.apache.hadoop.io.Text',
    conf={'textinputformat.record.delimiter': '\n<Q'}
)

使用map函数将RDD元素解析为Row对象

Use the map function to parse the RDD element into Row object

rdd.map(lambda x: parse_rdd_element(x[1], cols)).collect()
#[Row(Item_Id=u'Q31', quantityAmount=u'24954', quantityUnit=u'Meter', rank=u'BestRank'),
# Row(Item_Id=u'Q25', quantityAmount=u'582', quantityUnit=u'Kilometer', rank=u'NormalRank')]

将上面的RDD转换为数据框

Convert the above RDD to dataframe

df = rdd.map(lambda x: parse_rdd_element(x[1], cols)).filter(bool).toDF()
df.show()
+-------+--------------+------------+----------+
|Item_Id|quantityAmount|quantityUnit|      rank|
+-------+--------------+------------+----------+
|    Q31|         24954|       Meter|  BestRank|
|    Q25|           582|   Kilometer|NormalRank|
+-------+--------------+------------+----------+

一些注意事项:

  • 为获得更好的性能,请先将所有正则表达式模式使用 re.compile() 进行编译,然后再将其传递给parse_rdd_element()函数.

  • For better performance, pre-compile all regex patterns using re.compile() before passing them to the parse_rdd_element() function.

如果 \ n < Q 之间可能有空格/制表符,则会将多个块添加到同一RDD元素中,只需将通过 \ n \ s +< Q 来RDD元素,然后将 map()替换为 flatMap().

In case there could be spaces/tabs between \n and <Q, multiple blocks will be added into the same RDD element, just split the RDD element by \n\s+<Q and replace map() with flatMap().

参考:从多行创建spark数据结构记录

这篇关于如何从pysark中的文件中匹配/提取多行模式的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆