Apache NiFi:如何比较csv中的多行并创建新列 [英] Apache NiFi: How to compare multiple rows in a csv and create new column

查看:255
本文介绍了Apache NiFi:如何比较csv中的多行并创建新列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个看起来像这样的csv.

I have a csv which looks like this.

Jc,TXF,timer,alpha,beta
15,44,55,12,33
18,87,33,111
9,87,61,29,77

Alpha和Beta共同组成了城市代码.我想将城市名称添加到csv中作为新列.

Alpha and Beta combined makes up a city code. I want to add the name of the city to the csv as a new column.

Jc,TXF,timer,alpha,beta,city
15,44,55,12,33,York
18,87,33,111,London
9,87,61,29,77,Sydney

我还有一个csv,只包含列alpha,beta,city.看起来像这样:

I have another csv with only the columns alpha,beta,city. Which looks like this:

alpha,beta,city
12,33,York
33,111,London
29,77,Sydney

如何使用Apache NiFi实现此目标.请提出实现这一目标所需的处理器和工作流程.

How can I achieve this using Apache NiFi. Please suggest the processors and workflow needed to be used to achieve this.

推荐答案

我看到了两种解决方法.

I see two ways of solving this.

首先使用CsvLookupService.但是CsvLookupService仅支持一个键,但是您有两个键,即alpha和beta.因此,要使用此解决方案,您必须将两个键都连接到一个键中,例如12_33.

First by using CsvLookupService. However the CsvLookupService only supports a single key, but you have two, alpha and beta. So to use this solution you have to concatenate both keys into a single key, like 12_33.

其次,使用ExecuteScript处理器.这是更好的选择,因为您不必修改源数据.策略:

Second by using ExecuteScript processor. This one is better, because you don't have to modify your source data. Strategy:

  1. 将CSV文本分成几行
  2. 通过在映射文件中查找alpha和beta键来丰富城市列中的每一行
  3. 将各行合并到一个CSV文件中.

总体流量:

GenerateFlowFile:

GenerateFlowFile:

SplitText:

SplitText:

header line count设置为1,以将标题行包含在拆分内容中.对于ExecuteScript处理器,将python设置为scripting engine并提供以下script body:

Set header line count to 1 to include the header line in the split content. For the ExecuteScript processor set python as scripting engine and provide following script body:

from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
import csv

# Define a subclass of StreamCallback for use in session.write()
class PyStreamCallback(StreamCallback):
    def __init__(self):
        pass
    def process(self, inputStream, outputStream):
        # fetch the mapping CSV file
        with open('/home/nifi/mapping.csv', 'r') as mapping:
            # read the mapping file
            mappingContent = csv.reader(mapping, delimiter=',')
            # flowfile content is CSV text with two lines, header and actual content
            # split by newline to get access to each inidvidual line
            lines = IOUtils.toString(inputStream, StandardCharsets.UTF_8).split('\n')
            # the result will contain the header line 
            # the result will have the additional city column
            result = lines[0] + ',city\n'
            # take the second line and split it
            # to get access to alpha, beta and city values
            lineSplit = lines[1].split(',')

            # Go through the mapping file
            # item[0] -> alpha
            # item[1] -> beta
            # item[2] -> city
            # See if you find alpha and beta on the line content
            for item in mappingContent:
                if item[0] == lineSplit[3] and item[1] == lineSplit[4]:
                    result += lines[1] + ',' + item[2]
                    break

            if result is None:
                raise Exception('No matching found.')
            else:
                outputStream.write(bytearray(result.encode('utf-8')))
# end class

flowFile = session.get()
if(flowFile != None):
    try:
        flowFile = session.write(flowFile, PyStreamCallback())
        session.transfer(flowFile, REL_SUCCESS)
    except Exception as e:
        session.transfer(flowFile, REL_FAILURE)

有关脚本的详细说明,请参见注释. /home/nifi/mapping.csv必须在您的NiFi实例上可用.如果要了解有关ExecuteScript处理器的更多信息,请参考

See comments for a detailed description of the script. /home/nifi/mapping.csv has to be available on your NiFi instance. If you want to learn more about the ExecuteScript processor, refer to the ExecuteScript Cookbook. Finally you merge all the lines into a single CSV file:

设置CSV读取器和写入器.保留其默认属性.调整MergeContent属性,以控制每个结果CSV文件中应有多少行.结果:

Set CSV reader and writer. Leave their default properties. Adjust MergeContent properties to control how many lines should be in each resulting CSV file. Result:

这篇关于Apache NiFi:如何比较csv中的多行并创建新列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆