如何将流文件传递给执行 Python 脚本并使用属性 &Nifi变量来存储该文件? [英] How to pass flow files to the Execute Python script and use attributes & Nifi variables to store that file?

查看:23
本文介绍了如何将流文件传递给执行 Python 脚本并使用属性 &Nifi变量来存储该文件?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是 NiFi 和 Python 的新手,我需要您的帮助才能将 Flow File 属性值传递给脚本.该脚本正在将嵌套的 json 转换为 csv.当我在本地运行脚本时,它可以工作.

I am a rookie at both NiFi and Python and I need your help to pass the Flow File attribute value to the script. The script is converting a nested json into csv. When I run the script locally it works.

如何将 FlowFile 名称传递给 src_json 和 tgt_csv?

How can I pass the FlowFile name to src_json and tgt_csv?

谢谢,

罗莎

import pandas as pd
import json
from pandas.io.json import json_normalize

src_json = "C:/Users/name/Documents/Filename.json"
tgt_csv = "C:/Users/name/Documents/Filename.csv"

jfile = open(src_json)
jdata = json.load(jfile)

...rest of the code...
```python

推荐答案

您有几个选项可以完成此任务.

You have a few options to accomplish this task.

  1. 正如 Arun211 指出的那样,有一个现有的 ConvertRecord 处理器,它在很大程度上完成了这项任务.如果您的嵌套 JSON 有问题,或者您有其他原因想要在 Python 脚本中执行此操作,请继续下面的操作.
  2. 如果您有一个执行此任务的现有 Python 脚本,如上所示,您需要在向脚本提供数据的同时从 NiFi 调用它.您可以使用:
  1. As Arun211 pointed out, there is an existing ConvertRecord processor which largely accomplishes this task. If your nested JSON is a problem or you have other reasons for wanting to do this in a Python script, continue below.
  2. If you have an existing Python script which performs this task as shown above, you'll need to invoke it from NiFi while providing the data to the script. You can use:
  1. ExecuteScript(更适合原型设计)和 InvokeScriptedProcessor(生产任务的性能更高)允许您在 NiFi 实例中运行 Python(实际上是 Jython)脚本.这使您可以直接访问一些方便的方法 &功能.但是,由于 Jython 无法处理本地编译的 Python 库,因此您将无法在此代码中使用 pandas.有关配置此处理器的说明,请参阅此处这里是为什么 pandas 不起作用.
  2. 如果您需要 pandas 来实现某些功能,您需要将脚本保存为本地文件系统上的 Python 文件,并使用以下命令将其作为 shell 命令调用ExecuteStreamCommand(如果您需要为此处理器提供输入)或 ExecuteProcess(如果它是流程中的第一个处理器).这些处理器本质上运行像 python my_python_script_with_pandas.py -somearg(在 ExecuteProcess 中)或 python my_python_script_with_pandas.py 之类的 shell 命令,其流文件内容为 STDIN(在 ExecuteStreamCommand 中)和作为结果流文件内容捕获的 STDOUT 的输出.
  1. ExecuteScript (better for prototyping) and InvokeScriptedProcessor (more performant for production tasks) allow you to run Python (actually Jython) scripts inside the NiFi instance. This gives you direct access to some convenience methods & functionality. However, because Jython cannot handle natively-compiled Python libraries, you will not be able to use pandas in this code. See here for instructions on configuring this processor and here for why pandas will not work.
  2. If you need pandas for some functionality, you'll need to save the script as a Python file on the local file system and invoke it as a shell command using ExecuteStreamCommand (if you need to provide input to this processor) or ExecuteProcess (if it's the first processor in your flow). These processors essentially run a shell command like python my_python_script_with_pandas.py -somearg (in ExecuteProcess) or python my_python_script_with_pandas.py with the flowfile content as STDIN (in ExecuteStreamCommand) and the output of STDOUT captured as the resulting flowfile content.

目前,您的脚本正在静态文件位置查找传入的 JSON 文件,并将生成的 CSV 放在另一个静态文件位置.您需要更改脚本以执行以下操作之一:

Currently your script is looking for the incoming JSON file in a static file location, and putting the resulting CSV in another static file location. You will need to change the script to do one of the following:

  1. 从命令行参数中读取这些路径,并在您选择的处理器的相关处理器属性中传递这些路径.这些属性可以从 flowfile 属性 填充,因此您可以执行类似 命令参数 的操作: -inputfile/path/to/some_existing_file.json -outputfile ${flowfile_attribute_named_output_file} 或其任意组合.然后,您的脚本将读取 -inputfile-outputfile 参数以确定路径.
  2. 直接从 STDIN 此处的示例读取传入数据.然后处理 JSON 数据,将其转换为 CSV,并通过 STDOUT 返回.NiFi 将使用这些数据,将其作为结果流文件的内容,并将其发送到流中的下一个处理器.
  3. 前两个选项使您的 Python 脚本独立于 NiFi;它不知道任何流文件"构造.此选项将使其特定于 NiFi,但允许更多功能(请参阅上面的选项 2.1).要编写直接从/向流文件内容读取和写入的 Python 代码,请参阅此 ExecuteScript 处理器处理示例Python 中的流文件内容.
  1. Read those paths from command-line arguments and pass those in the relevant processor property in the processor you select. These properties can be populated from flowfile attributes, so you could do something like Command Arguments: -inputfile /path/to/some_existing_file.json -outputfile ${flowfile_attribute_named_output_file} or any combination thereof. Your script would then read the -inputfile and -outputfile arguments to determine the paths.
  2. Read the incoming data directly from STDIN example here. Then process the JSON data, convert it to CSV, and return it via STDOUT. NiFi will consume this data, put it as the content of the resulting flowfile, and send it to the next processor(s) in your flow.
  3. The prior two options keep your Python script independent from NiFi; it is unaware of any of the "flowfile" constructs. This option will make it NiFi-specific, but allow further functionality (see option 2.1 above). To write Python code that reads and writes directly from/to the flowfile content, see this example of ExecuteScript processor handling flowfile content in Python.

这篇关于如何将流文件传递给执行 Python 脚本并使用属性 &Nifi变量来存储该文件?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆