如何将流文件传递给Execute Python脚本并使用属性& Nifi变量来存储该文件? [英] How to pass flow files to the Execute Python script and use attributes & Nifi variables to store that file?
问题描述
我是NiFi和Python的新手,我需要您的帮助才能将Flow File属性值传递给脚本.该脚本正在将嵌套的json转换为csv.当我在本地运行脚本时,它可以正常工作.
I am a rookie at both NiFi and Python and I need your help to pass the Flow File attribute value to the script. The script is converting a nested json into csv. When I run the script locally it works.
如何将FlowFile名称传递给src_json和tgt_csv?
How can I pass the FlowFile name to src_json and tgt_csv?
谢谢
罗莎
import pandas as pd
import json
from pandas.io.json import json_normalize
src_json = "C:/Users/name/Documents/Filename.json"
tgt_csv = "C:/Users/name/Documents/Filename.csv"
jfile = open(src_json)
jdata = json.load(jfile)
...rest of the code...
```python
推荐答案
您可以选择几种方法来完成此任务.
You have a few options to accomplish this task.
- As Arun211 pointed out, there is an existing
ConvertRecord
processor which largely accomplishes this task. If your nested JSON is a problem or you have other reasons for wanting to do this in a Python script, continue below. - If you have an existing Python script which performs this task as shown above, you'll need to invoke it from NiFi while providing the data to the script. You can use:
-
InvokeScriptedProcessor
(性能更高(用于生产任务)允许您在NiFi实例中运行Python(实际上是 Jython )脚本.这使您可以直接使用一些便捷的方法和功能.功能.但是,由于Jython无法处理本地编译的Python库,因此您将无法在此代码中使用pandas
. 有关配置此处理器的说明,请参见此处和ExecuteStreamCommand
(如果您需要向此处理器提供输入)或
ExecuteScript
(better for prototyping) andInvokeScriptedProcessor
(more performant for production tasks) allow you to run Python (actually Jython) scripts inside the NiFi instance. This gives you direct access to some convenience methods & functionality. However, because Jython cannot handle natively-compiled Python libraries, you will not be able to usepandas
in this code. See here for instructions on configuring this processor and here for whypandas
will not work.- If you need
pandas
for some functionality, you'll need to save the script as a Python file on the local file system and invoke it as a shell command usingExecuteStreamCommand
(if you need to provide input to this processor) orExecuteProcess
(if it's the first processor in your flow). These processors essentially run a shell command likepython my_python_script_with_pandas.py -somearg
(inExecuteProcess
) orpython my_python_script_with_pandas.py
with the flowfile content asSTDIN
(inExecuteStreamCommand
) and the output ofSTDOUT
captured as the resulting flowfile content.
当前,您的脚本正在静态文件位置查找传入的JSON文件,并将生成的CSV放置在另一个静态文件位置.您将需要更改脚本以执行以下操作之一:
Currently your script is looking for the incoming JSON file in a static file location, and putting the resulting CSV in another static file location. You will need to change the script to do one of the following:
- 从命令行参数读取这些路径,并将这些路径传递到您选择的处理器的相关处理器属性中.这些属性可以从 flowfile属性填充,因此您可以执行 Command Arguments :
-inputfile /path/to/some_existing_file.json -outputfile ${flowfile_attribute_named_output_file}
或其任意组合之类的操作.然后,您的脚本将读取-inputfile
和-outputfile
参数以确定路径. - 直接从
STDIN
此处的示例中读取传入的数据.然后处理JSON数据,将其转换为CSV,然后通过STDOUT
返回. NiFi会使用此数据,将其作为结果流文件的内容放置,然后将其发送到流中的下一个处理器. - 前两个选项可让您的Python脚本独立于NiFi;它不知道任何"flowfile"结构.此选项将使其特定于NiFi,但允许进一步的功能(请参阅上面的选项2.1).要编写直接从流文件内容读写数据的Python代码,请参见以下在Python中处理流文件内容的
ExecuteScript
处理器的示例.
- Read those paths from command-line arguments and pass those in the relevant processor property in the processor you select. These properties can be populated from flowfile attributes, so you could do something like Command Arguments:
-inputfile /path/to/some_existing_file.json -outputfile ${flowfile_attribute_named_output_file}
or any combination thereof. Your script would then read the-inputfile
and-outputfile
arguments to determine the paths. - Read the incoming data directly from
STDIN
example here. Then process the JSON data, convert it to CSV, and return it viaSTDOUT
. NiFi will consume this data, put it as the content of the resulting flowfile, and send it to the next processor(s) in your flow. - The prior two options keep your Python script independent from NiFi; it is unaware of any of the "flowfile" constructs. This option will make it NiFi-specific, but allow further functionality (see option 2.1 above). To write Python code that reads and writes directly from/to the flowfile content, see this example of
ExecuteScript
processor handling flowfile content in Python.
这篇关于如何将流文件传递给Execute Python脚本并使用属性& Nifi变量来存储该文件?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!