使用ExecuteStreamCommand的Python脚本 [英] Python Script using ExecuteStreamCommand

查看:271
本文介绍了使用ExecuteStreamCommand的Python脚本的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

尽我所能找到以前与该问题相关的问题和示例,但仍然找不到我要寻找的答案,我想自己会提交一个问题.

After doing my best to find previous questions and examples relevant to this question, and still not finding the answers that I'm looking for I figured that I would submit a question myself.

由于以下原因,ExecuteStreamCommand对我来说似乎是理想的处理器:

ExecuteStreamCommand seems like the perfect processor for me due to the following reasons:

  • 我能够执行任何Python脚本并避免使用Jython(与ExecuteScript相似的方式). Jython不是我的选择.
  • 我可以放入FlowFiles.这是必要的,因为使我的脚本消耗了前一个处理器的输出.此外,我喜欢将数据保存在"NiFi管理"下的想法.
  • 它会写入执行状态",这对于路由很有用.

简而言之,我要对ExecuteStreamCommand进行的操作是:

In a nutshell, what I'm trying to do with ExecuteStreamCommand is:

  • 摄取先前处理器的输出(Scrapy Spider会输出带有JSON行的准确文本文件)
  • 调用python脚本(例如python3 my_script.py)
  • 加载在我的python脚本中提取的FlowFile.
  • 选择FlowFile的内容.
  • 在python中操作FlowFile的内容.
  • 输出原始FlowFile的更新版本或创建一个新版本.
  • 使用更新的/新的FlowFile继续我的NiFi流.
  • Ingest the output of a previous processor (a Scrapy spider that outputs a text file with JSON lines to be exact)
  • Call a python script (e.g. python3 my_script.py)
  • Load the FlowFile that was ingested in my python script.
  • Select the content of the FlowFile.
  • Operate on the content of the FlowFile within python.
  • Output either an updated version of the original FlowFile or create a new one.
  • Continue with my NiFi flow with the updated/new FlowFile.

为了清楚起见,我目前不了解:

For clarity's sake I currently don't understand:

  • 如何从ExecuteStreamCommand处理器调用python脚本
  • 如何从Python内部加载FlowFile
  • 如何在Python中更新或创建新的FlowFile
  • 如何将更新的FlowFile从Python输出回NiFi.

我遇到了ExecuteScript的各种示例,但是不幸的是,这些示例并不能完全转换为ExecuteStreamCommand的使用.

I have come across various examples for ExecuteScript, but unfortunately these don't exactly translate to the use of the ExecuteStreamCommand.

先谢谢您.任何建议表示赞赏.

Thank you in advance. Any advice is appreciated.

推荐答案

从您的问题开始,您说您需要在不使用InvokeScriptedProcessorExecuteScript处理器的情况下调用Python脚本,因为您无法使用Jython.鉴于此要求,您仍然应该能够实现您的目标.尽管需要一定程度地熟悉框架,但是所有这些信息都来自

From your question you say you need to invoke the Python script without using the InvokeScriptedProcessor or ExecuteScript processors because you can't use Jython. Given that requirement, you should still be able to accomplish your goal. While it requires some familiarity with the framework, all of this information is from the ExecuteStreamCommand documentation.

您的我目前不了解"部分:

Your "I currently don't understand" section:

  • 如何从ExecuteStreamCommand处理器调用python脚本

  • How to call the python script (from the ExecuteStreamCommand Processor)

  • 在您的ExecuteStreamCommand处理器中,使用以下命令配置命令参数命令路径属性:

  • In your ExecuteStreamCommand processor, configure the Command Arguments and Command Path properties with the following:

  • 命令参数:any flags or args, delimited by ;(即/path/to/my_script.py)
  • 命令路径:/path/to/python3
  • Command Arguments: any flags or args, delimited by ; (i.e. /path/to/my_script.py)
  • Command Path: /path/to/python3

如何从Python内部加载FlowFile

How to load up the FlowFile from within Python

  • 流文件的内容将通过 STDIN 传递,因此在您的Python脚本中,以与通常处理 STDIN 相同的方式处理该数据.
  • The flowfile content will be passed via STDIN, so in your Python script, process that data the same way you would normally process STDIN.
  • NiFi处理框架中的流文件创建.您的Python脚本传递给 STDOUT 的所有数据都将填充到传递到ExecuteStreamCommand处理器的 output stream 关系的结果流文件的内容中.在这种情况下,您的脚本不需要对流文件"有任何了解.如果您使用的是ISPES处理器,则可以使用NiFi脚本API,该API将自动注入到脚本中以创建或更新flowfile对象.
  • NiFi handles the flowfile creation in the framework. Any data passed by your Python script to STDOUT will be populated into the content of the resulting flowfile passed to the output stream relationship of the ExecuteStreamCommand processor. Your script does not need to have any awareness of "flowfiles" in this instance. If you were instead using the ISP or ES processors, you could use the NiFi scripting API which is automatically injected into the scripts to create or update the flowfile object.
  • 同样,只需从脚本中将所需的流文件内容写入 STDOUT ,然后(鉴于返回状态代码为0),NiFi会使用该内容生成一个新的流文件.如果将ESC Output Destination Attribute 属性设置为非null值,NiFi将改为使用包含脚本输出的同名新属性来更新现有流文件.
  • Again, simply write the desired flowfile content to STDOUT from your script, and (given a return status code of 0) NiFi will generate a new flowfile with that content. If you set the Output Destination Attribute property of ESC to a non-null value, NiFi will instead update the existing flowfile with a new attribute of the same name containing the output of the script.

这篇关于使用ExecuteStreamCommand的Python脚本的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆