如何将流文件传递给Execute Python脚本并使用属性& Nifi变量来存储该文件? [英] How to pass flow files to the Execute Python script and use attributes & Nifi variables to store that file?

查看:108
本文介绍了如何将流文件传递给Execute Python脚本并使用属性& Nifi变量来存储该文件?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是NiFi和Python的新手,我需要您的帮助才能将Flow File属性值传递给脚本.该脚本正在将嵌套的json转换为csv.当我在本地运行脚本时,它可以正常工作.

I am a rookie at both NiFi and Python and I need your help to pass the Flow File attribute value to the script. The script is converting a nested json into csv. When I run the script locally it works.

如何将FlowFile名称传递给src_json和tgt_csv?

How can I pass the FlowFile name to src_json and tgt_csv?

谢谢

罗莎

import pandas as pd
import json
from pandas.io.json import json_normalize

src_json = "C:/Users/name/Documents/Filename.json"
tgt_csv = "C:/Users/name/Documents/Filename.csv"

jfile = open(src_json)
jdata = json.load(jfile)

...rest of the code...
```python

推荐答案

您可以选择几种方法来完成此任务.

You have a few options to accomplish this task.

  1. 正如 Arun211 所指出的那样,现有的
  1. As Arun211 pointed out, there is an existing ConvertRecord processor which largely accomplishes this task. If your nested JSON is a problem or you have other reasons for wanting to do this in a Python script, continue below.
  2. If you have an existing Python script which performs this task as shown above, you'll need to invoke it from NiFi while providing the data to the script. You can use:
  1. 有关配置此处理器的说明,请参见此处 ExecuteStreamCommand (如果您需要向此处理器提供输入)或
  1. ExecuteScript (better for prototyping) and InvokeScriptedProcessor (more performant for production tasks) allow you to run Python (actually Jython) scripts inside the NiFi instance. This gives you direct access to some convenience methods & functionality. However, because Jython cannot handle natively-compiled Python libraries, you will not be able to use pandas in this code. See here for instructions on configuring this processor and here for why pandas will not work.
  2. If you need pandas for some functionality, you'll need to save the script as a Python file on the local file system and invoke it as a shell command using ExecuteStreamCommand (if you need to provide input to this processor) or ExecuteProcess (if it's the first processor in your flow). These processors essentially run a shell command like python my_python_script_with_pandas.py -somearg (in ExecuteProcess) or python my_python_script_with_pandas.py with the flowfile content as STDIN (in ExecuteStreamCommand) and the output of STDOUT captured as the resulting flowfile content.

当前,您的脚本正在静态文件位置查找传入的JSON文件,并将生成的CSV放置在另一个静态文件位置.您将需要更改脚本以执行以下操作之一:

Currently your script is looking for the incoming JSON file in a static file location, and putting the resulting CSV in another static file location. You will need to change the script to do one of the following:

  1. 从命令行参数读取这些路径,并将这些路径传递到您选择的处理器的相关处理器属性中.这些属性可以从 flowfile属性填充,因此您可以执行 Command Arguments :-inputfile /path/to/some_existing_file.json -outputfile ${flowfile_attribute_named_output_file}或其任意组合之类的操作.然后,您的脚本将读取-inputfile-outputfile参数以确定路径.
  2. 直接从STDIN 此处的示例中读取传入的数据.然后处理JSON数据,将其转换为CSV,然后通过STDOUT返回. NiFi会使用此数据,将其作为结果流文件的内容放置,然后将其发送到流中的下一个处理器.
  3. 前两个选项可让您的Python脚本独立于NiFi;它不知道任何"flowfile"结构.此选项将使其特定于NiFi,但允许进一步的功能(请参阅上面的选项2.1).要编写直接从流文件内容读写数据的Python代码,请参见以下在Python中处理流文件内容的ExecuteScript处理器的示例.
  1. Read those paths from command-line arguments and pass those in the relevant processor property in the processor you select. These properties can be populated from flowfile attributes, so you could do something like Command Arguments: -inputfile /path/to/some_existing_file.json -outputfile ${flowfile_attribute_named_output_file} or any combination thereof. Your script would then read the -inputfile and -outputfile arguments to determine the paths.
  2. Read the incoming data directly from STDIN example here. Then process the JSON data, convert it to CSV, and return it via STDOUT. NiFi will consume this data, put it as the content of the resulting flowfile, and send it to the next processor(s) in your flow.
  3. The prior two options keep your Python script independent from NiFi; it is unaware of any of the "flowfile" constructs. This option will make it NiFi-specific, but allow further functionality (see option 2.1 above). To write Python code that reads and writes directly from/to the flowfile content, see this example of ExecuteScript processor handling flowfile content in Python.

这篇关于如何将流文件传递给Execute Python脚本并使用属性& Nifi变量来存储该文件?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆