使用Apache Nifi修改csv [英] Modify csv with Apache Nifi

查看:80
本文介绍了使用Apache Nifi修改csv的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我从 FetchFTP 处理器收到了一个奇怪的 .csv 文件.看起来像:

I'm receiving a kinda weird .csv file from FetchFTP processor. It looks like:

Agency Name:IKEA,,,,,,,,,,,,,,
Advertiser Name: Ingka,,,,,,,,,,,,,,
Campaign Name:All,,,,,,,,,,,,,,
Date Resolution:Days,,,,,,,,,,,,,,
Campaign Dates:N/A,,,,,,,,,,,,,,
Report Date Range:Last X Days (25.06.2020 - 01.07.2020),,,,,,,,,,,,,,
Report Creation Date: 02.07.2020 5:26:18 (GMT -5 Eastern Standard Time),,,,,,,,,,,,,,

You must save the report locally to create a pivot table based on the report data.,,,,,,,,,,,,,,

,,,,,,,,,,,,,,,

Advertiser Name,Advertiser ID,Campaign Name,Campaign ID,Date,Site Name,Site ID,Device Type,Placement Name,Placement ID,Clickthrough URL,* Clicks,* Served Impressions,* Total Recordable Impressions (IAB),* Total Viewable Impressions (IAB)
Ingka,190530,1_flight_0119,947535,25.06.2020,Auditorius SE,101304,Smart Phone,Flight_EK_Auditorius_Video_mobile,27353235,https://www.ikea.com/promo/wifi?utm_source=Auditorius&utm_medium=Video_mobile,0,0,0,0
Ingka,190530,1_flight_0119,947535,28.06.2020,Between Exchange SE,124598,PC,Flight_IQP_Between_Exchange_Banner_728x90_DCO,27359134,,0,0,0,0
Data was updated last on 7/2/2020 12:00:00 AM (GMT -5 Eastern Standard Time),,,,,,,,,,,,,,
Viewability mode is set per individual campaign,,,,,,,,,,,,,,

我无法以这种格式将其保存到数据库中.我想要什么:

I can't save it in this format into database. What I want:

  1. 删除此无用的块:

Agency Name:IKEA,,,,,,,,,,,,,,
Advertiser Name: Ingka,,,,,,,,,,,,,,
Campaign Name:All,,,,,,,,,,,,,,
Date Resolution:Days,,,,,,,,,,,,,,
Campaign Dates:N/A,,,,,,,,,,,,,,
Report Date Range:Last X Days (25.06.2020 - 01.07.2020),,,,,,,,,,,,,,
Report Creation Date: 02.07.2020 5:26:18 (GMT -5 Eastern Standard Time),,,,,,,,,,,,,,

You must save the report locally to create a pivot table based on the report data.,,,,,,,,,,,,,,

,,,,,,,,,,,,,,,

  1. 删除此无用的页脚:

Data was updated last on 7/2/2020 12:00:00 AM (GMT -5 Eastern Standard Time),,,,,,,,,,,,,,
Viewability mode is set per individual campaign,,,,,,,,,,,,,,

  1. 更改标题名称.

来自:广告商名称,广告商ID,广告系列名称,广告系列ID,日期,网站名称,网站ID,设备类型,展示位置名称,展示位置ID,点击后到达网址,*点击次数,*提供的展示次数,*可记录总数展示次数(IAB)*,总可见展示次数(IAB)

收件人: advertiser_name,advertiser_id,campaign_name,campaign_id,report_date,site,site_id,device,placement_name,placement_id,url,clicks,imps,total_record_imps,total_view_imps

使用Apache Nifi可以使用它的任何工具吗?

Any tools to reach it with Apache Nifi?

推荐答案

需要对您的数据进行清理以制作有效的CSV格式.您可以使用 ExecuteScript ExecuteStreamCommand 处理器执行数据清除脚本,例如python,它将清除传入的数据为所需的格式.

Cleanup needs to be done to your data to make valid CSV format. You can use ExecuteScript or ExecuteStreamCommand processor to execute data cleaning script, say python, which will clean the incoming data to your desired format.

下面的代码段(标头标准化和数据清除)将为您提供有关如何使用配置为Python作为脚本引擎的 ExecuteScript 处理器访问流文件内容的想法-

Below code snippet (header standardization and data cleanup) will give you an idea about how to access flowfile content using ExecuteScript processor configured for Python as script engine -

from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
from org.apache.nifi.processors.script import ExecuteScript
from org.python.core.util.FileUtil import wrap
from io import StringIO
import re


# Define a subclass of StreamCallback for use in session.write()
class PyStreamCallback(StreamCallback):
    def __init__(self):
        pass

    def process(self, inputStream, outputStream):
        with wrap(inputStream) as f:
            lines = f.readlines()

        outer_new_value_list = []
        is_csv_data = False
        for csv_row in lines:
            if not is_csv_data:
                if csv_row.startswith("Advertiser Name,Advertiser ID,"):
                    is_csv_data = True
                else:
                    continue
            if is_csv_data:
                if csv_row.startswith("Data was updated last on"):
                    break
                else:
                    outer_new_value_list.append(csv_row)

        outer_new_value_list[0] = outer_new_value_list[0].replace(' ', '_').replace('*', '').replace('-', '_').lower()
        with wrap(outputStream, 'w') as filehandle:
            filehandle.writelines("%s" % line for line in outer_new_value_list)


# end class
flowFile = session.get()
if (flowFile != None):
    flowFile = session.write(flowFile, PyStreamCallback())
    session.transfer(flowFile, ExecuteScript.REL_SUCCESS)
# implicit return at the end

这篇关于使用Apache Nifi修改csv的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆