使用 Apache Nifi 修改 csv [英] Modify csv with Apache Nifi

查看:65
本文介绍了使用 Apache Nifi 修改 csv的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我从 FetchFTP 处理器收到一个有点奇怪的 .csv 文件.它看起来像:

I'm receiving a kinda weird .csv file from FetchFTP processor. It looks like:

Agency Name:IKEA,,,,,,,,,,,,,,
Advertiser Name: Ingka,,,,,,,,,,,,,,
Campaign Name:All,,,,,,,,,,,,,,
Date Resolution:Days,,,,,,,,,,,,,,
Campaign Dates:N/A,,,,,,,,,,,,,,
Report Date Range:Last X Days (25.06.2020 - 01.07.2020),,,,,,,,,,,,,,
Report Creation Date: 02.07.2020 5:26:18 (GMT -5 Eastern Standard Time),,,,,,,,,,,,,,

You must save the report locally to create a pivot table based on the report data.,,,,,,,,,,,,,,

,,,,,,,,,,,,,,,

Advertiser Name,Advertiser ID,Campaign Name,Campaign ID,Date,Site Name,Site ID,Device Type,Placement Name,Placement ID,Clickthrough URL,* Clicks,* Served Impressions,* Total Recordable Impressions (IAB),* Total Viewable Impressions (IAB)
Ingka,190530,1_flight_0119,947535,25.06.2020,Auditorius SE,101304,Smart Phone,Flight_EK_Auditorius_Video_mobile,27353235,https://www.ikea.com/promo/wifi?utm_source=Auditorius&utm_medium=Video_mobile,0,0,0,0
Ingka,190530,1_flight_0119,947535,28.06.2020,Between Exchange SE,124598,PC,Flight_IQP_Between_Exchange_Banner_728x90_DCO,27359134,,0,0,0,0
Data was updated last on 7/2/2020 12:00:00 AM (GMT -5 Eastern Standard Time),,,,,,,,,,,,,,
Viewability mode is set per individual campaign,,,,,,,,,,,,,,

我无法以这种格式将其保存到数据库中.我想要的:

I can't save it in this format into database. What I want:

  1. 删除这个无用的块:

Agency Name:IKEA,,,,,,,,,,,,,,
Advertiser Name: Ingka,,,,,,,,,,,,,,
Campaign Name:All,,,,,,,,,,,,,,
Date Resolution:Days,,,,,,,,,,,,,,
Campaign Dates:N/A,,,,,,,,,,,,,,
Report Date Range:Last X Days (25.06.2020 - 01.07.2020),,,,,,,,,,,,,,
Report Creation Date: 02.07.2020 5:26:18 (GMT -5 Eastern Standard Time),,,,,,,,,,,,,,

You must save the report locally to create a pivot table based on the report data.,,,,,,,,,,,,,,

,,,,,,,,,,,,,,,

  1. 删除这个无用的页脚:

Data was updated last on 7/2/2020 12:00:00 AM (GMT -5 Eastern Standard Time),,,,,,,,,,,,,,
Viewability mode is set per individual campaign,,,,,,,,,,,,,,

  1. 更改标题名称.

来自:广告商名称、广告商 ID、广告活动名称、广告活动 ID、日期、网站名称、网站 ID、设备类型、展示位置名称、展示位置 ID、点击后到达网址、* 点击次数、* 提供的展示次数、* 可记录总数展示次数 (IAB),* 总可见展示次数 (IAB)

收件人:advertiser_name,advertiser_id,campaign_name,campaign_id,report_date,site,site_id,device,placement_name,placement_id,url,clicks,imps,total_record_imps,total_view_imps

有什么工具可以通过 Apache Nifi 访问它?

Any tools to reach it with Apache Nifi?

推荐答案

需要对数据进行清理以生成有效的 CSV 格式.你可以使用 ExecuteScriptExecuteStreamCommand 处理器来执行数据清理脚本,比如 python,它将把传入的数据清理成你想要的格式.

Cleanup needs to be done to your data to make valid CSV format. You can use ExecuteScript or ExecuteStreamCommand processor to execute data cleaning script, say python, which will clean the incoming data to your desired format.

下面的代码片段(标头标准化和数据清理)将让您了解如何使用为 Python 配置的 ExecuteScript 处理器作为脚本引擎访问流文件内容 -

Below code snippet (header standardization and data cleanup) will give you an idea about how to access flowfile content using ExecuteScript processor configured for Python as script engine -

from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
from org.apache.nifi.processors.script import ExecuteScript
from org.python.core.util.FileUtil import wrap
from io import StringIO
import re


# Define a subclass of StreamCallback for use in session.write()
class PyStreamCallback(StreamCallback):
    def __init__(self):
        pass

    def process(self, inputStream, outputStream):
        with wrap(inputStream) as f:
            lines = f.readlines()

        outer_new_value_list = []
        is_csv_data = False
        for csv_row in lines:
            if not is_csv_data:
                if csv_row.startswith("Advertiser Name,Advertiser ID,"):
                    is_csv_data = True
                else:
                    continue
            if is_csv_data:
                if csv_row.startswith("Data was updated last on"):
                    break
                else:
                    outer_new_value_list.append(csv_row)

        outer_new_value_list[0] = outer_new_value_list[0].replace(' ', '_').replace('*', '').replace('-', '_').lower()
        with wrap(outputStream, 'w') as filehandle:
            filehandle.writelines("%s" % line for line in outer_new_value_list)


# end class
flowFile = session.get()
if (flowFile != None):
    flowFile = session.write(flowFile, PyStreamCallback())
    session.transfer(flowFile, ExecuteScript.REL_SUCCESS)
# implicit return at the end

这篇关于使用 Apache Nifi 修改 csv的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆