如何将流文件传递给执行 Python 脚本并使用属性 &Nifi变量来存储该文件? [英] How to pass flow files to the Execute Python script and use attributes & Nifi variables to store that file?
问题描述
我是 NiFi 和 Python 的新手,我需要您的帮助才能将 Flow File 属性值传递给脚本.该脚本正在将嵌套的 json 转换为 csv.当我在本地运行脚本时,它可以工作.
I am a rookie at both NiFi and Python and I need your help to pass the Flow File attribute value to the script. The script is converting a nested json into csv. When I run the script locally it works.
如何将 FlowFile 名称传递给 src_json 和 tgt_csv?
How can I pass the FlowFile name to src_json and tgt_csv?
谢谢,
罗莎
import pandas as pd
import json
from pandas.io.json import json_normalize
src_json = "C:/Users/name/Documents/Filename.json"
tgt_csv = "C:/Users/name/Documents/Filename.csv"
jfile = open(src_json)
jdata = json.load(jfile)
...rest of the code...
```python
推荐答案
您有几个选项可以完成此任务.
You have a few options to accomplish this task.
- 正如 Arun211 指出的那样,有一个现有的
ConvertRecord
处理器,它在很大程度上完成了这项任务.如果您的嵌套 JSON 有问题,或者您有其他原因想要在 Python 脚本中执行此操作,请继续下面的操作. - 如果您有一个执行此任务的现有 Python 脚本,如上所示,您需要在向脚本提供数据的同时从 NiFi 调用它.您可以使用:
- As Arun211 pointed out, there is an existing
ConvertRecord
processor which largely accomplishes this task. If your nested JSON is a problem or you have other reasons for wanting to do this in a Python script, continue below. - If you have an existing Python script which performs this task as shown above, you'll need to invoke it from NiFi while providing the data to the script. You can use:
ExecuteScript
(更适合原型设计)和InvokeScriptedProcessor
(生产任务的性能更高)允许您在 NiFi 实例中运行 Python(实际上是 Jython)脚本.这使您可以直接访问一些方便的方法 &功能.但是,由于 Jython 无法处理本地编译的 Python 库,因此您将无法在此代码中使用pandas
.有关配置此处理器的说明,请参阅此处 和 这里是为什么pandas
不起作用.- 如果您需要
pandas
来实现某些功能,您需要将脚本保存为本地文件系统上的 Python 文件,并使用以下命令将其作为 shell 命令调用ExecuteStreamCommand
(如果您需要为此处理器提供输入)或ExecuteProcess
(如果它是流程中的第一个处理器).这些处理器本质上运行像python my_python_script_with_pandas.py -somearg
(在ExecuteProcess
中)或python my_python_script_with_pandas.py
之类的 shell 命令,其流文件内容为STDIN
(在ExecuteStreamCommand
中)和作为结果流文件内容捕获的STDOUT
的输出.
ExecuteScript
(better for prototyping) andInvokeScriptedProcessor
(more performant for production tasks) allow you to run Python (actually Jython) scripts inside the NiFi instance. This gives you direct access to some convenience methods & functionality. However, because Jython cannot handle natively-compiled Python libraries, you will not be able to usepandas
in this code. See here for instructions on configuring this processor and here for whypandas
will not work.- If you need
pandas
for some functionality, you'll need to save the script as a Python file on the local file system and invoke it as a shell command usingExecuteStreamCommand
(if you need to provide input to this processor) orExecuteProcess
(if it's the first processor in your flow). These processors essentially run a shell command likepython my_python_script_with_pandas.py -somearg
(inExecuteProcess
) orpython my_python_script_with_pandas.py
with the flowfile content asSTDIN
(inExecuteStreamCommand
) and the output ofSTDOUT
captured as the resulting flowfile content.
目前,您的脚本正在静态文件位置查找传入的 JSON 文件,并将生成的 CSV 放在另一个静态文件位置.您需要更改脚本以执行以下操作之一:
Currently your script is looking for the incoming JSON file in a static file location, and putting the resulting CSV in another static file location. You will need to change the script to do one of the following:
- 从命令行参数中读取这些路径,并在您选择的处理器的相关处理器属性中传递这些路径.这些属性可以从 flowfile 属性 填充,因此您可以执行类似 命令参数 的操作:
-inputfile/path/to/some_existing_file.json -outputfile ${flowfile_attribute_named_output_file}
或其任意组合.然后,您的脚本将读取-inputfile
和-outputfile
参数以确定路径. - 直接从
STDIN
此处的示例读取传入数据.然后处理 JSON 数据,将其转换为 CSV,并通过STDOUT
返回.NiFi 将使用这些数据,将其作为结果流文件的内容,并将其发送到流中的下一个处理器. - 前两个选项使您的 Python 脚本独立于 NiFi;它不知道任何流文件"构造.此选项将使其特定于 NiFi,但允许更多功能(请参阅上面的选项 2.1).要编写直接从/向流文件内容读取和写入的 Python 代码,请参阅此
ExecuteScript
处理器处理示例Python 中的流文件内容.
- Read those paths from command-line arguments and pass those in the relevant processor property in the processor you select. These properties can be populated from flowfile attributes, so you could do something like Command Arguments:
-inputfile /path/to/some_existing_file.json -outputfile ${flowfile_attribute_named_output_file}
or any combination thereof. Your script would then read the-inputfile
and-outputfile
arguments to determine the paths. - Read the incoming data directly from
STDIN
example here. Then process the JSON data, convert it to CSV, and return it viaSTDOUT
. NiFi will consume this data, put it as the content of the resulting flowfile, and send it to the next processor(s) in your flow. - The prior two options keep your Python script independent from NiFi; it is unaware of any of the "flowfile" constructs. This option will make it NiFi-specific, but allow further functionality (see option 2.1 above). To write Python code that reads and writes directly from/to the flowfile content, see this example of
ExecuteScript
processor handling flowfile content in Python.
这篇关于如何将流文件传递给执行 Python 脚本并使用属性 &Nifi变量来存储该文件?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!