使用 Nifi ExecuteScript 处理器生成多个流文件 [英] Generating multiple flowfiles using the Nifi ExecuteScript processor

查看:35
本文介绍了使用 Nifi ExecuteScript 处理器生成多个流文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在处理一个 Nifi 流,在那里我得到了一个包含多个键值对的 JSON 文档.我正在将 ExecuteScript 处理器与 python 一起使用.

I am working on a Nifi flow where I am getting a JSON document with multiple key-value pairs. I am using the ExecuteScript processor with python.

我的目标是基于 JSON keys 创建各种 URLS.键是数字,它们看起来像这样:

My goal here is to create various URLS bases on JSON keys. The keys are numerical and they look like this:

keys = [10200, 10201, 10202, ...]

我想要的 URL 有 3 种类型,它们应该如下所示:

The URLs I want are of 3 types and they should look like these:

http://google.com/10200
http://bing.com/10200
http://yahoo.com/10200

我正在尝试遍历我的 keys[] 并为其包含的每个数字键创建 3 个特定的 url.我有以下代码:

I am trying to loop through my keys[] and create 3 specific urls for each numerical keys that it contains. I have the following code where I am trying to:

从列表中读取一个数字键 --> 创建 3 个 URL --> 吐出一个流文件.

read a numerical key from list --> create 3 URLs --> spit out a flow file.

......并阅读列表中的下一个数字键并继续循环......

...... and read the next numerical key in the list and keep looping.....

我有以下代码,但是当我给它一个 JSON 流文件时,它现在什么也不做.有人可以告诉我我做错了什么吗?

I have the following code but when I give it a JSON flowfile it does not do anything right now. Can someone please tell me what I am doing wrong?

import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

class ModJSON(StreamCallback):

  def __init__(self):
        self.parentFlowFile = None
        pass
  def process(self, inputStream, outputStream):
    text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
    obj = json.loads(text)
    flowfiles_list = [] 

    outputStream.write(bytearray(json.dumps(obj.keys(), indent=4).encode('utf-8')))


    for numerical_key in obj.keys():
      # create 1 flowfile for each numerical_key. Each flow file should have 3 url attributes 
      flowFile = session.create(self.parentFlowFile)
      if (flowFile != None):
        flowFile = session.write(flowFile, "Does not matter")
        flowFile = session.putAttribute(flowFile, "google", "http://google.com/"+ numerical_key)

        flowFile = session.putAttribute(flowFile, "google", "http://bing.com/"+ numerical_key)

        flowFile = session.putAttribute(flowFile, "google", "http://yahoo.com/"+ numerical_key)
        flowfiles_list.append(flowFile)

    for flow in flowfiles_list:
      session.transfer(flow, REL_SUCCESS)

推荐答案

好问题,这是流文件 API 回调方法的细微差别.您已经创建了 StreamCallback 的子类,但尚未检索输入流文件或使用它通过类的实例覆盖内容.

Good question, this is a nuance of the Callback approach to the flow file API. You've created a subclass of StreamCallback but you haven't retrieved an input flow file or used it to overwrite the content via an instance of your class.

在定义你的 ModJSON 类之后试试这个:

Try this after the definition of your ModJSON class:

originalFlowFile = session.get()
if(originalFlowFile != None):
    originalFlowFile = session.write(flowFile, ModJSON())
    session.remove(originalFlowFile)

这将获得一个输入流文件(或等待一个出现),然后调用您的 StreamCallback 以覆盖您的流文件的内容.在我的示例中,您将丢弃输入流文件,因此如果这是您用例的正确行为,那么您可以只扩展 InputStreamCallback 而不是 StreamCallback 并删除 outputStream.write(),如果您不使用 outputStream 进行任何操作.为此,请将 StreamCallback 替换为 InputStreamCallback 并从 process() 方法中删除outputStream"参数.

This will get an input flow file (or wait for one to show up), then call your StreamCallback to overwrite the contents of your flow file. In my example you'd discard your input flow file, so if that's the right behavior for your use case, then instead you can just extend InputStreamCallback instead of StreamCallback and remove the outputStream.write(), if you are not using outputStream for anything. To do that, replace StreamCallback with InputStreamCallback and remove the "outputStream" parameter from the process() method.

在您的示例中,一旦您在上面添加了我的代码段,您就会使用 json.dumps() 命令覆盖输入内容,以及创建和传输新文件,所有这些都具有相同的关系(成功),这样就可以如果它们的格式不同,则会导致问题(这就是我添加 session.remove() 的原因).如果您需要原始流文件与其他流文件建立不同的关系,请考虑 InvokeScriptedProcessor 而不是 ExecuteScript.如果你不关心处理后的输入流文件(添加URL属性完成),那么按照我上面的建议.如果他们都可以走出同样的关系(成功),那么用

In your example, once you add my snippet above, you are overwriting the input content with your json.dumps() command, as well as creating and transferring new files, all to the same relationship (success), so that could cause problems if they are not of the same format (which is why I added the session.remove()). If you need the original flow file to go out a different relationship than the rest, please consider InvokeScriptedProcessor rather than ExecuteScript. If you don't care about the input flow file after the processing (adding URL attributes is done), then follow my suggestion above. If they can all go out the same relationship (success), then replace my session.remove() with

session.transfer(originalFlowFile, REL_SUCCESS)

查看我的 ExecuteScript 食谱文章(第 2 部分3) 有关 Jython(和其他语言)中这些用例的更多示例:)

Check my ExecuteScript cookbook article (part 2 of 3) for more examples of these use cases in Jython (and other languages) :)

这篇关于使用 Nifi ExecuteScript 处理器生成多个流文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆