Apache NiFi:如何比较 csv 中的多行并创建新列 [英] Apache NiFi: How to compare multiple rows in a csv and create new column

查看:30
本文介绍了Apache NiFi:如何比较 csv 中的多行并创建新列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个看起来像这样的 csv.

Jc,TXF,timer,alpha,beta15、44、55、12、3318,87,33,1119、87、61、29、77

Alpha 和 Beta 组合构成一个城市代码.我想将城市名称作为新列添加到 csv 中.

Jc,TXF,timer,alpha,beta,city15,44,55,12,33,约克18,87,33,111,伦敦9,87,61,29,77,悉尼

我有另一个只有 alpha,beta,city 列的 csv.看起来像这样:

alpha,beta,city12,33,约克33,111,伦敦29,77,悉尼

我如何使用 Apache NiFi 实现这一点.请建议实现此目的所需的处理器和工作流程.

解决方案

我看到了两种解决方案.

首先使用 CsvLookupService.然而 CsvLookupService 只支持一个键,但你有两个,alpha 和 beta.因此,要使用此解决方案,您必须将两个键连接成一个键,例如 12_33.

其次使用 ExecuteScript 处理器.这个更好,因为您不必修改源数据.策略:

  1. 将 CSV 文本拆分为多行
  2. 通过在映射文件中查找 alpha 和 beta 键,用 city 列丰富每一行
  3. 将各个行合并到一个 CSV 文件中.

总体流程:

生成流文件:

拆分文本:

header line count 设置为 1 以在拆分内容中包含标题行.对于 ExecuteScript 处理器,将 python 设置为 scripting engine 并提供以下 script body:

from org.apache.commons.io import IOUtils从 java.nio.charset 导入 StandardCharsets从 org.apache.nifi.processor.io 导入 StreamCallback导入 csv# 定义一个StreamCallback的子类,用于session.write()类 PyStreamCallback(StreamCallback):def __init__(self):经过定义过程(自我,输入流,输出流):# 获取映射 CSV 文件使用 open('/home/nifi/mapping.csv', 'r') 作为映射:# 读取映射文件mappingContent = csv.reader(mapping, delimiter=',')# 流文件内容是包含两行的 CSV 文本,标题和实际内容# 按换行符拆分以访问每个单独的行行 = IOUtils.toString(inputStream, StandardCharsets.UTF_8).split('\n')# 结果将包含标题行# 结果将有额外的城市列结果 = 行 [0] + ',城市\n'# 取第二行并拆分它# 访问 alpha、beta 和 city 值lineSplit = lines[1].split(',')# 遍历映射文件# item[0] ->α# 项目[1] ->测试版# 项目[2] ->城市# 看看你是否在行内容上找到了 alpha 和 beta对于 mappingContent 中的项目:如果 item[0] == lineSplit[3] 和 item[1] == lineSplit[4]:结果 += 行 [1] + ',' + item[2]休息如果结果为无:raise Exception('找不到匹配项.')别的:outputStream.write(bytearray(result.encode('utf-8')))# 结束类flowFile = session.get()如果(流文件!= 无):尝试:flowFile = session.write(flowFile, PyStreamCallback())session.transfer(流文件,REL_SUCCESS)除了作为 e 的例外:session.transfer(流文件,REL_FAILURE)

有关脚本的详细说明,请参阅注释./home/nifi/mapping.csv 必须在您的 NiFi 实例上可用.如果您想了解有关 ExecuteScript 处理器的更多信息,请参阅

设置 CSV 读取器和写入器.保留其默认属性.调整 MergeContent 属性以控制每个生成的 CSV 文件中的行数.结果:

I have a csv which looks like this.

Jc,TXF,timer,alpha,beta
15,44,55,12,33
18,87,33,111
9,87,61,29,77

Alpha and Beta combined makes up a city code. I want to add the name of the city to the csv as a new column.

Jc,TXF,timer,alpha,beta,city
15,44,55,12,33,York
18,87,33,111,London
9,87,61,29,77,Sydney

I have another csv with only the columns alpha,beta,city. Which looks like this:

alpha,beta,city
12,33,York
33,111,London
29,77,Sydney

How can I achieve this using Apache NiFi. Please suggest the processors and workflow needed to be used to achieve this.

解决方案

I see two ways of solving this.

First by using CsvLookupService. However the CsvLookupService only supports a single key, but you have two, alpha and beta. So to use this solution you have to concatenate both keys into a single key, like 12_33.

Second by using ExecuteScript processor. This one is better, because you don't have to modify your source data. Strategy:

  1. Split the CSV text into lines
  2. Enrich each line with the city column by looking up the alpha and beta keys in the mapping file
  3. Merge the individual lines into a single CSV file.

Overall flow:

GenerateFlowFile:

SplitText:

Set header line count to 1 to include the header line in the split content. For the ExecuteScript processor set python as scripting engine and provide following script body:

from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
import csv

# Define a subclass of StreamCallback for use in session.write()
class PyStreamCallback(StreamCallback):
    def __init__(self):
        pass
    def process(self, inputStream, outputStream):
        # fetch the mapping CSV file
        with open('/home/nifi/mapping.csv', 'r') as mapping:
            # read the mapping file
            mappingContent = csv.reader(mapping, delimiter=',')
            # flowfile content is CSV text with two lines, header and actual content
            # split by newline to get access to each inidvidual line
            lines = IOUtils.toString(inputStream, StandardCharsets.UTF_8).split('\n')
            # the result will contain the header line 
            # the result will have the additional city column
            result = lines[0] + ',city\n'
            # take the second line and split it
            # to get access to alpha, beta and city values
            lineSplit = lines[1].split(',')

            # Go through the mapping file
            # item[0] -> alpha
            # item[1] -> beta
            # item[2] -> city
            # See if you find alpha and beta on the line content
            for item in mappingContent:
                if item[0] == lineSplit[3] and item[1] == lineSplit[4]:
                    result += lines[1] + ',' + item[2]
                    break

            if result is None:
                raise Exception('No matching found.')
            else:
                outputStream.write(bytearray(result.encode('utf-8')))
# end class

flowFile = session.get()
if(flowFile != None):
    try:
        flowFile = session.write(flowFile, PyStreamCallback())
        session.transfer(flowFile, REL_SUCCESS)
    except Exception as e:
        session.transfer(flowFile, REL_FAILURE)

See comments for a detailed description of the script. /home/nifi/mapping.csv has to be available on your NiFi instance. If you want to learn more about the ExecuteScript processor, refer to the ExecuteScript Cookbook. Finally you merge all the lines into a single CSV file:

Set CSV reader and writer. Leave their default properties. Adjust MergeContent properties to control how many lines should be in each resulting CSV file. Result:

这篇关于Apache NiFi:如何比较 csv 中的多行并创建新列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆