在Spark 2.4.4中修复Assembly_id和Assembly_name列数据的方法 [英] Approach to fix assembly_id and assembly_name column data in spark 2.4.4

查看:55
本文介绍了在Spark 2.4.4中修复Assembly_id和Assembly_name列数据的方法的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我一直在Spark 2.4.4中从事数据清理任务,但被困在以下两个任务中(问题部分提到)。以下是数据框和问题详细信息:



1。在数据框中装载数据并读取镶木地板文件

  partFitmentRawDF = sqlContext.read.parquet( / mnt / blob / devdatasciencesto / pga-parts-forecast / raw / parts-fits /)

2。样本数据

  display(partFitmentRawDF)



  Itemno Assembly_id Assembly_name 
0450056 44011油泵组件-A01EA09CA(4999202399920239A06)
0450056 135502油泵组件-A02EA09CA / CB / CC(4999202399920239A06)
0450056 37884机油泵总成-A01EA05CA(4999202399920239A06)

我已经做过其他处理,以使数据看起来像上面一样,但是我被困在以下任务中



3。问题



a。。如果我们查看第2行和列 Assembly_name ,则有三个ID A02EA09CA / CB / CC ,但它们已合并。请您提出如何使其成为 A02EA09 A02EA09CB A02EA09CC 的建议。基本上,所有部分都应具有独立的ID,并在其之间有一个空格。相同问题的另一个示例是将 DRIVE TRAIN,TRANSMISSION(6 SPEED)-V08AB26 / GB26 / LB26 ALL OPTIONS(49VICTRANS08)更改为 DRIVE TRAIN,变速箱(6速)-V08AB26 V08GB26 V08LB26所有选项(49VICTRANS08)或更改 SUSPENSION(7043244)-S09PR6HSL / PS6HSL / HEL(49SNOWSHOCKFRONT7043244SB) SUSPENSION(7043244)-S09PR6HSL S09PS6HSL S09PS6HEL(49SNOWSHOCKFRONT7043244SB)



b。属于同一项目的 assemble_id和assembly_name列中的多行没有到单行并删除重复的单词。



数据集

  Itemno Assembly_id Assembly_name 
0450056 44011油泵组件-A01EA09CA(4999202399920239A06)
0450056 135502油泵组装-A02EA09CA / CB / CC(4999202399920239A06)
0450056 37884油泵组装-A01EA05CA(4999202399920239A06)

将导致下面提到的最终数据集(没有标点符号,单词与重复单词之间不止一个空格)

  Itemno Assembly_id Assembly_name 
0450056 44011 135502 37884油泵组件A01EA09CA 4999202399920239A06 A02EA09CA A02EA09CB A02EA09CC 4999202399920239A06

您能帮我吗?预先感谢您的帮助!



测试JXC解决方案后出现的问题



<强> 1。串联问题



如果初始数据集如下

  itemno fits_assembly_id fits_assembly_name 
1322660 35459传动系统,离合器,主要-S09PR6HSL / PS6HSL / HEL(49SNOWDRIVECLUTCH09600TRG)

步骤3 之后,它正在执行以下操作

  +- -------------------------------------------------- -------------------------------------- + 
| temp1 |
+ --------------------------------------------- ---------------------------------------------- +
| [驱动器,火车,离合器,主要,S09PR6HSL,S09PS6HSL,S09PS6HEL,49SNOWDRIVECLUTCH09600TRG] |

最初,它是 S09PR6HSL / PS6HSL / HEL ,因此应更改为 S09PR6HSL S09PS6HSL S09PR6HEL ,但它是 S09PR6HSL S09PS6HSL S09PS6HEL 。第三项应该是 S09PR6HEL ,但它是 S09PS6HEL 。我认为应该从第一个字符串开始将其连接起来,然后将其添加到 / 之后的所有其他字符串中。



2。下划线替换:
这是新的,因为我刚刚发现了这一行。有时 / 之后的字符串带有下划线。在这种情况下,与下划线位于同一位置的字母(第一个字符串)应该从第二个或以后的字符串中替换 _ 串。 例如,如果数据为 DRIVE TRAIN,CLUTCH,PRIMARY-S09PR6HSL / PS_HSL / H_L(49SNOWDRIVECLUTCH09600TRG),则应更改为 DRIVE TRAIN ,离合器,主要-S09PR6HSL S09PS6HSL S09PR6HSL(49SNOWDRIVECLUTCH09600TRG)。在这里,在 / PS_HSL / _ 中替换为值 6 因为 PS_HSL PR6HSL 匹配,所以将 _ 替换为 6 并添加 S09 使其完整ID为 S09PS6HSL 基本上,如果长度不相同,则从第一个字符串中获取数据,并将其附加到后面的字符串(在 / 之后)。使其具有完整的ID。如果存在 _ ,请从第一个字符串中获取与 _ 相同位置的数据,并将其替换为以后的ID字符串。



3。。单独的子字符串由 / 连接,并将其余字符串存储在new列添加到同一数据框中



例如:





在步骤0之后发出的问题:



< a href = https://i.stack.imgur.com/wIX62.png rel = nofollow noreferrer>

解决方案

好问题,理想情况下,我会选择 udf 使事情变得简单,但是由于此任务是使用Spark SQL高阶函数的一个很好的示例...可能有点冗长,因此我将其分为4个步骤s。让我知道它是否有效,欢迎您提出任何疑问:



步骤1:将字符串转换为字符串数组



用模式 (?:(?!/)\p {Punct} | \s)+'))分割字符串 是连续的
标点符号( / 除外)或空格,然后过滤掉空的项目(前导/尾随)。临时列 temp1 用于保存中间列。

  from pyspark.sql.functions导入拆分,expr 

df1 = df.withColumn('temp1',split('Assembly_name',r'(?:(?!/)\p {Punct} | \s)+'))\
.withColumn('temp1',expr( filter(temp1,x-> x<>')))

df1.select('temp1')。show(truncate = False)
+ ----------------------------- -------------------------------------------------- ------ +
| temp1 |
+ --------------------------------------------- ---------------------------------------- +
| [油,泵,组装,A01EA09CA,4999202399920239A06] | |
| [机油,泵,总成,A02EA09CA / CB / CC,4999202399920239A06] |
| [机油,泵,组件,A01EA05CA,4999202399920239A06] |
| [驱动器,火车,变速器,6,速度,V08AB26 / GB26 / LB26,所有,选项,49VICTRANS08] |
| [SUSPENSION,7043244,S09PR6HSL / PS6HSL / HEL,49SNOWSHOCKFRONT7043244SB] |
+ --------------------------------------------- ---------------------------------------- +



步骤2:将temp1转换为数组数组



再次拆分数组项使用 / ,以便所有部件ID都在其自己的数组项上

  df2 = df1.withColumn('temp1',expr( transform(temp1,x-> split(x,'/')))))
df2.select('temp1')。show(truncate = False)
+ ------------------------------------------ -------------------------------------------------- -------------- +
| temp1 |
+ --------------------------------------------- -------------------------------------------------- ----------- +
| [[油],[泵],[装配],[A01EA09CA],[4999202399920239A06]] | |
| [[OIL],[PUMP],[ASSEMBLY],[A02EA09CA,CB,CC],[4999202399920239A06]] | |
| [[油],[泵],[装配],[A01EA05CA],[4999202399920239A06]] |
| [[驱动器],[火车],[变速器],[6],[速度],[V08AB26,GB26,LB26],[全部],[选项],[49VICTRANS08]] |
| [[SUSPENSION],[7043244],[S09PR6HSL,PS6HSL,HEL],[49SNOWSHOCKFRONT7043244SB]] | |
+ --------------------------------------------- -------------------------------------------------- ----------- +



步骤3:使用汇总重置零件-ids



聚合函数将在内部数组上运行:

  df3 = df2.withColumn('temp1',expr( 

flatten(
transform(temp1,x->
transform(sequence(1,size( x)),i->
合计(
sequence(1,i)
,x [0]
,(acc,j)-> concat(substr( acc,1,length(x [0])-length(x [j-1])),x [j-1])$ ​​b $ b)




))

df3.select('temp1')。show(truncate = False)
+ --------- -------------------------------------------------- ----------------------------------- +
| temp1 |
+ --------------------------------------------- ------------------------------------------------- +
| [油,泵,组件,A01EA09CA,4999202399920239A06] |
| [油,泵,组件,A02EA09CA,A02EA09CB,A02EA09CC,4999202399920239A06] |
| [机油,泵,组件,A01EA05CA,4999202399920239A06] |
| [驱动器,火车,变速器,6,速度,V08AB26,V08GB26,V08LB26,所有,选项,49VICTRANS08] |
| [SUSPENSION,7043244,S09PR6HSL,S09PS6HSL,S09PS6HEL,49SNOWSHOCKFRONT7043244SB] |
+ --------------------------------------------- ------------------------------------------------- +

其中:




  • transform(temp1,x-> func1(x)):遍历数组 temp1 中的每个项目em>运行 func1(x),x是内部数组(字符串数组)

  • func1(x)上面提到的是另一个变换函数,它迭代 sequence(1,size(x))并在每个上运行 func2(i) > i

      transform(sequence(1,size(x)),i-> func2(i ))


  • func2(i)是一个汇总函数,它以 x [0] 的初始值遍历 sequence(1,i),并使用以下函数进行累加/归约: / p>

     (acc,j)-> concat(substr(acc,1,length(acc)-length(x [j-1])),x [j-1])$ ​​b $ b  

    注意: substr()位置基于1,并且 array-indexing 是基于0的,因此我们需要x [j-1]来引用上述reduce / aggregate函数中的当前数组项


  • 最后,运行 flatten()合并内部数组



    此步骤将执行以下操作伪循环:

     对于temp1中的x:
    for i在range(1,size(x)+1 ):
    acc = x [0]
    对于范围(1,i + 1)中的j:
    acc = concat(substr(acc,1,length(acc)-length(x [j-1])),x [j-1])$ ​​b $ b




步骤4:合并和删除重复项



  df4 = df3.groupby('Itemno')。agg (
expr( concat_ws('',array_distinct(flatten(collect_list(temp1))))AS Assembly_names)
,expr( concat_ws('',collect_set(Assembly_id))as Assembly_ids)

其中:




  • 使用 collect_list()获取数组数组(temp1是字符串数组)

  • 使用 flatten()将上面的内容转换为字符串数组

  • 使用 array_distinct()删除重复项

  • 使用 concat_ws()将上面的数组转换为字符串

      df4.select('Assembly_names') .show(truncate = False)
    + -------------------------------------- ------------------------------------------------- +
    | Assembly_names |
    + --------------------------------------------- ------------------------------------------ +
    |油泵组装A01EA09CA 4999202399920239A06 A02EA09CA A02EA09CB A02EA09CC A01EA05CA |
    | SUSPENSION 7043244 S09PR6HSL S09PS6HSL S09PS6HEL 49SNOWSHOCKFRONT7043244SB |
    |驱动器变速器6速V08AB26 V08GB26 V08LB26所有选项49VICTRANS08 |
    + --------------------------------------------- ------------------------------------------ +




更新:



第一个易于修复,比现有的要容易得多(不需要聚合)。对于第二个,以下解决方案要求逐个字符遍历字符串char,这可能很慢。如果是这样,我们可能必须使用udf进行检查。



以下是更改:




  • 步骤1::只需添加下划线即可将其从标点符号中排除:(请注意,如果在文本的其他位置显示了任何下划线,则可能必须先运行regexp_replace才能清除它们)

      df1 = df.withColumn('temp1',split('Assembly_name',r'(?:(?![ / _])\p {Punct} | \s)+'))\ 
    .withColumn('temp1',expr( filter(temp1,x-> x<>'''' )))


  • 第二步:在将数组进一步分成数组的数组时,最里面的数组已将字符串拆分为char。反转最里面的数组,以便比较。

      df2 = df1.withColumn('temp1',expr( transform(temp1 ,x-> split(x,'/')))))
    .withColumn('temp1',expr( transform(temp1,x-> transform(x,y-> (split(y,``))))))


  • 步骤-3:使用transform()而不是aggregate()重置零件ID。我们检查 y [i] (最里面的数组的项)是否为NULL或下划线,然后将其替换为 x [0] [i] 。然后我们反转数组并使用concat_ws(''..)将其转换回字符串。

      df3 = df2.withColumn ('temp1',expr( 

    flatten(
    transform(temp1,x->
    transform(x,y->
    concat_ws( '',
    反向(
    transform(sequence(0,size(x [0])-1),i-> IF(y [i]为NULL或y [i] ==' _',x [0] [i],y [i]))






    ))




以下是结果从上面

  df3.select('temp1')。show(truncate = False)
+- -------------------------------------------------- ---------------------------------------- +
| temp1 |
+ --------------------------------------------- ------------------------------------------------ +
| [油,泵,组件,A01EA09CA,4999202399920239A06] |
| [油,泵,组件,A02EA09CA,A02EA09CB,A02EA09CC,4999202399920239A06] |
| [机油,泵,组件,A01EA05CA,4999202399920239A06] |
| [驱动器,火车,变速器,6,速度,V08AB26,V08GB26,V08LB26,所有,选项,49VICTRANS08] |
| [SUSPENSION,7043244,S09PR6HSL,S09PS6HSL,S09PR6HEL,49SNOWSHOCKFRONT7043244SB] |
| [驱动器,火车,离合器,初级,S09PR6HSL,S09PS6HSL,S09PR6HSL,49SNOWDRIVECLUTCH09600TRG] |
| [驱动器,火车,离合器,初级,S09PR6HSL,S09PS6HSL,S09PR6HSL,49SNOWDRIVECLUTCH09600TRG] |
+ --------------------------------------------- ------------------------------------------------ +

处理前的字段:

  df.select('Assembly_name')。show(truncate = False)
+ ----------------------- -------------------------------------------------- --------- +
| Assembly_name |
+ --------------------------------------------- ------------------------------------- +
|机油泵总成-A01EA09CA(4999202399920239A06 )|
|油泵总成-A02EA09CA / CB / CC(4999202399920239A06)|
|机油泵总成-A01EA05CA(4999202399920239A06)|
|传动,变速箱(6速)-V08AB26 / GB26 / LB26所有选项(49VICTRANS08)|
| SUSPENSION(7043244)-S09PR6HSL / PS6HSL / HEL(49SNOWSHOCKFRONT7043244SB)|
|主传动,离合器-S09PR6HSL / PS_HSL / H_L(49SNOWDRIVECLUTCH09600TRG)|
|主传动,离合器-S09PR6HSL / _S__SL / H_L(49SNOWDRIVECLUTCH09600TRG)|
+ --------------------------------------------- ------------------------------------- +




  • 步骤4 :无变化。



UPDATE-2添加了步骤0:



步骤0:进行了预处理列 Assembly_name ,使用 regexp_replace + split 将模型#分成新列,并将其从原始列中删除程序集名称

 从pyspark.sql.functions导入regexp_replace,拆分

df0 = df.withColumn('new_col',split(regexp_replace('Assembly_name',r'^(。*)-\s *(\S +)(。*)$','$ 1 $ 3\0 $ 2'),'\0')))
.selectExpr(
'Itemno'
,'Assembly_id'
, coalesce(new_col [0 ],Assembly_name)as Assembly_name
, coalesce(new_col [1],'')as models


df0.show(truncate = False)
+ ------- + ----------- + ---------------------------- --------------------------- -------- + -------------------- +
|商品编号| Assembly_id | Assembly_name |模型|
+ ------- + ----------- + ------------------------- -------------------------------------- + ----------- --------- +
| 0450056 | 44011 |油泵组件(4999202399920239A06)| A01EA09CA |
| 0450056 | 135502 |油泵组件(4999202399920239A06)| A02EA09CA / CB / CC |
| 0450056 | 37884 |油泵组件(4999202399920239A06)| A01EA05CA |
| 0450067 | 12345 |传动,变速箱(6速)所有选项(49VICTRANS08)| V08AB26 / GB26 / LB26 |
| 0450068 | 1000 | SUSPENSION(7043244)(49SNOWSHOCKFRONT7043244SB)| S09PR6HSL / PS6HSL / HEL |
| 0450066 | 12345 |主传动,离合器(49SNOWDRIVECLUTCH09600TRG)| S09PR6HSL / PS_HSL / H_L |
| 0450069 | 12346 |主传动,离合器(49SNOWDRIVECLUTCH09600TRG)| |
+ ------- + ----------- + ------------------------- -------------------------------------- + ----------- --------- +

然后可以处理 Assembly_name 使用RegexTokenier和StopwordsRemover,模型是当前帖子的简化版本,您可以跳过步骤1,但要注意深度



(注意:从最后一条记录中删除 S09PR6HSL / _S__SL / H_L 进行测试) p>

I've been working on data cleaning task in spark 2.4.4 but got stuck in following two tasks (mentioned in question section). Following is the dataframe and questions details:

1. Mount data and read parquet file in dataframe

partFitmentRawDF = sqlContext.read.parquet("/mnt/blob/devdatasciencesto/pga-parts-forecast/raw/parts-fits/")

2. Sample data

display(partFitmentRawDF)

Itemno   Assembly_id    Assembly_name
0450056   44011         OIL PUMP ASSEMBLY - A01EA09CA (4999202399920239A06)
0450056   135502        OIL PUMP ASSEMBLY - A02EA09CA/CB/CC (4999202399920239A06)
0450056   37884         OIL PUMP ASSEMBLY - A01EA05CA (4999202399920239A06)

I've done other processing to make data look like above but I am stuck into following tasks

3.Question

a. If we look at row2 and column Assembly_name then there are three id's A02EA09CA/CB/CC but they've been merged in. Could you please suggest how to make it A02EA09 A02EA09CB A02EA09CC. Basically, all of the parts should have stand-alone id concatenated with one space in between. Another example of same problem is to change DRIVE TRAIN, TRANSMISSION (6 SPEED) - V08AB26/GB26/LB26 ALL OPTIONS (49VICTRANS08) to DRIVE TRAIN, TRANSMISSION (6 SPEED) - V08AB26 V08GB26 V08LB26 ALL OPTIONS (49VICTRANS08)or change SUSPENSION (7043244) - S09PR6HSL/PS6HSL/HEL (49SNOWSHOCKFRONT7043244SB) to SUSPENSION (7043244) - S09PR6HSL S09PS6HSL S09PS6HEL (49SNOWSHOCKFRONT7043244SB).

b. Roll over multiple rows in assemble_id and assembly_name column belonging to same itemno to single row and remove duplicate words.

So following dataset

Itemno   Assembly_id    Assembly_name
0450056   44011         OIL PUMP ASSEMBLY - A01EA09CA (4999202399920239A06)
0450056   135502        OIL PUMP ASSEMBLY - A02EA09CA/CB/CC (4999202399920239A06)
0450056   37884         OIL PUMP ASSEMBLY - A01EA05CA (4999202399920239A06)

will result into below mentioned final dataset (which doesn't have punctuations, more than one space between the words and duplicate words)

Itemno   Assembly_id            Assembly_name
0450056  44011 135502 37884     OIL PUMP ASSEMBLY A01EA09CA 4999202399920239A06 A02EA09CA A02EA09CB A02EA09CC 4999202399920239A06

Could you please help me in this? Thanks in advance for kind help!

Issues after testing JXC's solution

1. Concatenation issue

If the initial dataset is as following

itemno  fits_assembly_id    fits_assembly_name
1322660 35459               DRIVE TRAIN, CLUTCH, PRIMARY - S09PR6HSL/PS6HSL/HEL (49SNOWDRIVECLUTCH09600TRG)

After step-3, it is doing the following

+-------------------------------------------------------------------------------------------+
|temp1                                                                                      |
+-------------------------------------------------------------------------------------------+
|[DRIVE, TRAIN, CLUTCH, PRIMARY, S09PR6HSL, S09PS6HSL, S09PS6HEL, 49SNOWDRIVECLUTCH09600TRG]|

Initially, it was S09PR6HSL/PS6HSL/HEL so it should change to S09PR6HSL S09PS6HSL S09PR6HEL but it is S09PR6HSL S09PS6HSL S09PS6HEL. 3rd item should be S09PR6HEL but it is S09PS6HEL. I believe it should take the part to be concatenated from the very first string and add it to all others after /.

2. Underscore replacement: This is new because I just spotted a few rows with this. Sometimes string after / has underscores. In this case, letter (of 1st string) from the same position as of the underscore should replace _ in second or later string from the very first string. For instance, if data is DRIVE TRAIN, CLUTCH, PRIMARY - S09PR6HSL/PS_HSL/H_L (49SNOWDRIVECLUTCH09600TRG) then it should change to DRIVE TRAIN, CLUTCH, PRIMARY - S09PR6HSL S09PS6HSL S09PR6HSL (49SNOWDRIVECLUTCH09600TRG). Here, In /PS_HSL/ _ got replaced by value 6 because PS_HSL matches with PR6HSL so replace _ with 6 and add S09 to make it complete id which is S09PS6HSL. Basically, take data from the first string and append it to later strings (after /) if it's not the same length to make it complete id. If there's _, take data from the same position as of _ from 1st string and replace it in later ID strings.

3. Separate substrings joined by / and store rest of the string in new column added to the same dataframe

For example:

Issue after step0:

解决方案

Good question, ideally I would go with a udf to make things simple, but since this task is a good example of using Spark SQL higher-order functions... Might be a little verbose, so I split it into 4 steps. Let me know if it works and any questions are welcome:

Step-1: convert string into array of strings

split the string by the pattern (?:(?!/)\p{Punct}|\s)+')) which is consecutive punctuation(except /) or spaces, then filter out the items which are EMPTY (leading/trailing). A temporary column temp1 is used to save the intermediate columns.

from pyspark.sql.functions import split, expr

df1 = df.withColumn('temp1', split('Assembly_name', r'(?:(?!/)\p{Punct}|\s)+')) \
        .withColumn('temp1', expr("filter(temp1, x -> x <> '')"))

df1.select('temp1').show(truncate=False)
+-------------------------------------------------------------------------------------+
|temp1                                                                                |
+-------------------------------------------------------------------------------------+
|[OIL, PUMP, ASSEMBLY, A01EA09CA, 4999202399920239A06]                                |
|[OIL, PUMP, ASSEMBLY, A02EA09CA/CB/CC, 4999202399920239A06]                          |
|[OIL, PUMP, ASSEMBLY, A01EA05CA, 4999202399920239A06]                                |
|[DRIVE, TRAIN, TRANSMISSION, 6, SPEED, V08AB26/GB26/LB26, ALL, OPTIONS, 49VICTRANS08]|
|[SUSPENSION, 7043244, S09PR6HSL/PS6HSL/HEL, 49SNOWSHOCKFRONT7043244SB]               |
+-------------------------------------------------------------------------------------+

Step-2: convert temp1 to array of arrays

split the array items again using /, so that all part-id on their own array item

df2 = df1.withColumn('temp1', expr("transform(temp1, x -> split(x, '/'))"))
df2.select('temp1').show(truncate=False)
+----------------------------------------------------------------------------------------------------------+
|temp1                                                                                                     |
+----------------------------------------------------------------------------------------------------------+
|[[OIL], [PUMP], [ASSEMBLY], [A01EA09CA], [4999202399920239A06]]                                           |
|[[OIL], [PUMP], [ASSEMBLY], [A02EA09CA, CB, CC], [4999202399920239A06]]                                   |
|[[OIL], [PUMP], [ASSEMBLY], [A01EA05CA], [4999202399920239A06]]                                           |
|[[DRIVE], [TRAIN], [TRANSMISSION], [6], [SPEED], [V08AB26, GB26, LB26], [ALL], [OPTIONS], [49VICTRANS08]] |
|[[SUSPENSION], [7043244], [S09PR6HSL, PS6HSL, HEL], [49SNOWSHOCKFRONT7043244SB]]                          |
+----------------------------------------------------------------------------------------------------------+

Step-3: use aggregate to reset part-ids

The aggregate function will operate on the inner arrays:

df3 = df2.withColumn('temp1', expr("""

       flatten(
         transform(temp1, x ->
           transform(sequence(1, size(x)), i ->
             aggregate(
                 sequence(1, i)
               , x[0]
               , (acc,j) -> concat(substr(acc, 1, length(x[0])-length(x[j-1])), x[j-1])
             )
           )
         )
       )

    """))

df3.select('temp1').show(truncate=False)
+----------------------------------------------------------------------------------------------+
|temp1                                                                                         |
+----------------------------------------------------------------------------------------------+
|[OIL, PUMP, ASSEMBLY, A01EA09CA, 4999202399920239A06]                                         |
|[OIL, PUMP, ASSEMBLY, A02EA09CA, A02EA09CB, A02EA09CC, 4999202399920239A06]                   |
|[OIL, PUMP, ASSEMBLY, A01EA05CA, 4999202399920239A06]                                         |
|[DRIVE, TRAIN, TRANSMISSION, 6, SPEED, V08AB26, V08GB26, V08LB26, ALL, OPTIONS, 49VICTRANS08] |
|[SUSPENSION, 7043244, S09PR6HSL, S09PS6HSL, S09PS6HEL, 49SNOWSHOCKFRONT7043244SB]             |
+----------------------------------------------------------------------------------------------+

Where:

  • transform(temp1, x -> func1(x)) : iterate through each item in the array temp1 to run func1(x), x is the inner array (array of strings)
  • func1(x) mentioned above is another transform function which iterates through the sequence(1, size(x)) and run func2(i) on each i:

    transform(sequence(1, size(x)), i -> func2(i))
    

  • func2(i) mentioned above is an aggregate function, which iterates through the sequence(1,i), with initial value of x[0] and accumulate/reduce using the function:

    (acc,j) -> concat(substr(acc, 1, length(acc)-length(x[j-1])), x[j-1])
    

    Note: substr() position is 1-based and array-indexing is 0-based, thus we need x[j-1] to refer to the current array item in the above reduce/aggregate function

  • finally, run flatten() to merge the inner arrays

    This step is doing something like the following pysudo-loop:

    for x in temp1:
      for i in range(1, size(x)+1):
        acc = x[0]
        for j in range(1,i+1):
          acc = concat(substr(acc, 1, length(acc)-length(x[j-1])), x[j-1])
    

Step-4: merge and drop duplicates

df4 = df3.groupby('Itemno').agg(
      expr("concat_ws(' ', array_distinct(flatten(collect_list(temp1)))) AS Assembly_names")
    , expr("concat_ws(' ', collect_set(Assembly_id)) as Assembly_ids")
  )

Where:

  • use collect_list() to get an array of arrays(temp1 which is array of strings)
  • use flatten() to convert the above into array of strings
  • use array_distinct() to drop duplicates
  • use concat_ws() to convert above array into a string

    df4.select('Assembly_names').show(truncate=False)
    +---------------------------------------------------------------------------------------+
    |Assembly_names                                                                         |
    +---------------------------------------------------------------------------------------+
    |OIL PUMP ASSEMBLY A01EA09CA 4999202399920239A06 A02EA09CA A02EA09CB A02EA09CC A01EA05CA|
    |SUSPENSION 7043244 S09PR6HSL S09PS6HSL S09PS6HEL 49SNOWSHOCKFRONT7043244SB             |
    |DRIVE TRAIN TRANSMISSION 6 SPEED V08AB26 V08GB26 V08LB26 ALL OPTIONS 49VICTRANS08      |
    +---------------------------------------------------------------------------------------+
    

UPDATE:

The first one is simple to fix, it's much easier than the existing one (no need aggregate). For the 2nd one, the following solution requires to iterate through the string char by char which could be slow. If so, we might have to check using udf.

Below are the changes:

  • Step-1: Just add underscore to be excluded from the punctuation: (notice if any underscore shows in other places of the text, might have to run regexp_replace first to clean them up)

    df1 = df.withColumn('temp1', split('Assembly_name', r'(?:(?![/_])\p{Punct}|\s)+')) \ 
            .withColumn('temp1', expr("filter(temp1, x -> x <> '')"))
    

  • Step-2: split the array of array further into array of arrays of arrays, the inner-most array has split string into chars. reverse the innermost array so it's easy for comparison.

    df2 = df1.withColumn('temp1', expr("transform(temp1, x -> split(x, '/'))")) \
             .withColumn('temp1', expr("transform(temp1, x -> transform(x, y -> reverse(split(y, ''))) )"))
    

  • Step-3: Use transform() instead of aggregate() to reset part-ids. we check y[i] (the item of the innermost array) if it's NULL or is an underscore, then replace it with the corresponding item from x[0][i]. then we reverse the array and using concat_ws(''..) to convert it back into string.

    df3 = df2.withColumn('temp1', expr("""
    
       flatten(
         transform(temp1, x ->
           transform(x, y ->
             concat_ws('', 
               reverse(
                 transform(sequence(0, size(x[0])-1), i -> IF(y[i] is NULL or y[i] == '_', x[0][i], y[i]))
               )
             )
           )
         ) 
       ) 
    
    """))
    

Below is the result from the above

df3.select('temp1').show(truncate=False)                                                                           
+---------------------------------------------------------------------------------------------+
|temp1                                                                                        |
+---------------------------------------------------------------------------------------------+
|[OIL, PUMP, ASSEMBLY, A01EA09CA, 4999202399920239A06]                                        |
|[OIL, PUMP, ASSEMBLY, A02EA09CA, A02EA09CB, A02EA09CC, 4999202399920239A06]                  |
|[OIL, PUMP, ASSEMBLY, A01EA05CA, 4999202399920239A06]                                        |
|[DRIVE, TRAIN, TRANSMISSION, 6, SPEED, V08AB26, V08GB26, V08LB26, ALL, OPTIONS, 49VICTRANS08]|
|[SUSPENSION, 7043244, S09PR6HSL, S09PS6HSL, S09PR6HEL, 49SNOWSHOCKFRONT7043244SB]            |
|[DRIVE, TRAIN, CLUTCH, PRIMARY, S09PR6HSL, S09PS6HSL, S09PR6HSL, 49SNOWDRIVECLUTCH09600TRG]  |
|[DRIVE, TRAIN, CLUTCH, PRIMARY, S09PR6HSL, S09PS6HSL, S09PR6HSL, 49SNOWDRIVECLUTCH09600TRG]  |
+---------------------------------------------------------------------------------------------+

The field before processing:

df.select('Assembly_name').show(truncate=False)                                                                    
+----------------------------------------------------------------------------------+
|Assembly_name                                                                     |
+----------------------------------------------------------------------------------+
|OIL PUMP ASSEMBLY - A01EA09CA (4999202399920239A06)                               |
|OIL PUMP ASSEMBLY - A02EA09CA/CB/CC (4999202399920239A06)                         |
|OIL PUMP ASSEMBLY - A01EA05CA (4999202399920239A06)                               |
|DRIVE TRAIN, TRANSMISSION (6 SPEED) - V08AB26/GB26/LB26 ALL OPTIONS (49VICTRANS08)|
|SUSPENSION (7043244) - S09PR6HSL/PS6HSL/HEL (49SNOWSHOCKFRONT7043244SB)           |
|DRIVE TRAIN, CLUTCH, PRIMARY - S09PR6HSL/PS_HSL/H_L (49SNOWDRIVECLUTCH09600TRG)   |
|DRIVE TRAIN, CLUTCH, PRIMARY - S09PR6HSL/_S__SL/H_L (49SNOWDRIVECLUTCH09600TRG)   |
+----------------------------------------------------------------------------------+

  • Step-4: no change.

UPDATE-2 added Step-0:

Step-0: to pre-process the column Assembly_name, use regexp_replace + split to separate models# into a new column and remove it from the original column Assembly_name:

from pyspark.sql.functions import regexp_replace, split

df0 = df.withColumn('new_col', split(regexp_replace('Assembly_name', r'^(.*)-\s*(\S+)(.*)$', '$1$3\0$2'),'\0')) \
    .selectExpr(
        'Itemno'
      , 'Assembly_id'
      , "coalesce(new_col[0], Assembly_name) as Assembly_name"
      , "coalesce(new_col[1], '') as models"
)

df0.show(truncate=False)
+-------+-----------+---------------------------------------------------------------+--------------------+
|Itemno |Assembly_id|Assembly_name                                                  |models              |
+-------+-----------+---------------------------------------------------------------+--------------------+
|0450056|44011      |OIL PUMP ASSEMBLY  (4999202399920239A06)                       |A01EA09CA           |
|0450056|135502     |OIL PUMP ASSEMBLY  (4999202399920239A06)                       |A02EA09CA/CB/CC     |
|0450056|37884      |OIL PUMP ASSEMBLY  (4999202399920239A06)                       |A01EA05CA           |
|0450067|12345      |DRIVE TRAIN, TRANSMISSION (6 SPEED)  ALL OPTIONS (49VICTRANS08)|V08AB26/GB26/LB26   |
|0450068|1000       |SUSPENSION (7043244)  (49SNOWSHOCKFRONT7043244SB)              |S09PR6HSL/PS6HSL/HEL|
|0450066|12345      |DRIVE TRAIN, CLUTCH, PRIMARY  (49SNOWDRIVECLUTCH09600TRG)      |S09PR6HSL/PS_HSL/H_L|
|0450069|12346      |DRIVE TRAIN, CLUTCH, PRIMARY (49SNOWDRIVECLUTCH09600TRG)       |                    |
+-------+-----------+---------------------------------------------------------------+--------------------+

You can then process Assembly_name using RegexTokenier and StopwordsRemover, the models is a simplified version of the current post, which you can skip the Step-1, but do notice the depth of the arrays.

(Note: removed S09PR6HSL/_S__SL/H_L from the last record for testing)

这篇关于在Spark 2.4.4中修复Assembly_id和Assembly_name列数据的方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆