使用Nifi ExecuteScript处理器生成多个流文件 [英] Generating multiple flowfiles using the Nifi ExecuteScript processor

查看:99
本文介绍了使用Nifi ExecuteScript处理器生成多个流文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在研究一个Nifi流,在那里我得到了具有多个键值对的JSON文档.我正在将 ExecuteScript 处理器与 python 一起使用.

I am working on a Nifi flow where I am getting a JSON document with multiple key-value pairs. I am using the ExecuteScript processor with python.

我的目标是在JSON keys 上创建各种URLS.键是数字,它们看起来像这样:

My goal here is to create various URLS bases on JSON keys. The keys are numerical and they look like this:

keys = [10200, 10201, 10202, ...]

我想要的网址有3种类型,它们应该如下所示:

The URLs I want are of 3 types and they should look like these:

http://google.com/10200
http://bing.com/10200
http://yahoo.com/10200

我试图遍历我的 keys [] ,并为其中包含的每个数字键创建3个特定的url.我在尝试的位置有以下代码:

I am trying to loop through my keys[] and create 3 specific urls for each numerical keys that it contains. I have the following code where I am trying to:

从列表-> 中读取数字键,创建3个URL -> 吐出流文件.

read a numerical key from list --> create 3 URLs --> spit out a flow file.

......并读取列表中的下一个数字键并继续循环.....

...... and read the next numerical key in the list and keep looping.....

我有以下代码,但是当我给它一个JSON流文件时,它现在什么也不做.有人可以告诉我我在做什么错吗?

I have the following code but when I give it a JSON flowfile it does not do anything right now. Can someone please tell me what I am doing wrong?

import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

class ModJSON(StreamCallback):

  def __init__(self):
        self.parentFlowFile = None
        pass
  def process(self, inputStream, outputStream):
    text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
    obj = json.loads(text)
    flowfiles_list = [] 

    outputStream.write(bytearray(json.dumps(obj.keys(), indent=4).encode('utf-8')))


    for numerical_key in obj.keys():
      # create 1 flowfile for each numerical_key. Each flow file should have 3 url attributes 
      flowFile = session.create(self.parentFlowFile)
      if (flowFile != None):
        flowFile = session.write(flowFile, "Does not matter")
        flowFile = session.putAttribute(flowFile, "google", "http://google.com/"+ numerical_key)

        flowFile = session.putAttribute(flowFile, "google", "http://bing.com/"+ numerical_key)

        flowFile = session.putAttribute(flowFile, "google", "http://yahoo.com/"+ numerical_key)
        flowfiles_list.append(flowFile)

    for flow in flowfiles_list:
      session.transfer(flow, REL_SUCCESS)

推荐答案

很好的问题,这是对流文件API的回调方法的细微差别.您已经创建了StreamCallback的子类,但尚未检索输入流文件或使用它来通过类的实例覆盖内容.

Good question, this is a nuance of the Callback approach to the flow file API. You've created a subclass of StreamCallback but you haven't retrieved an input flow file or used it to overwrite the content via an instance of your class.

在定义ModJSON类之后尝试以下操作:

Try this after the definition of your ModJSON class:

originalFlowFile = session.get()
if(originalFlowFile != None):
    originalFlowFile = session.write(flowFile, ModJSON())
    session.remove(originalFlowFile)

这将获得一个输入流文件(或等待显示一个输入流文件),然后调用StreamCallback覆盖流文件的内容.在我的示例中,您将丢弃输入流文件,因此,如果这是针对您的用例的正确行为,那么您可以扩展InputStreamCallback而不是StreamCallback并删除outputStream.write()(如果您不使用outputStream进行任何操作).为此,用InputStreamCallback替换StreamCallback并从process()方法中删除"outputStream"参数.

This will get an input flow file (or wait for one to show up), then call your StreamCallback to overwrite the contents of your flow file. In my example you'd discard your input flow file, so if that's the right behavior for your use case, then instead you can just extend InputStreamCallback instead of StreamCallback and remove the outputStream.write(), if you are not using outputStream for anything. To do that, replace StreamCallback with InputStreamCallback and remove the "outputStream" parameter from the process() method.

在您的示例中,在上面添加了我的代码段后,您将使用json.dumps()命令覆盖输入内容,以及创建和传输新文件,所有操作都具有相同的关系(成功),这样就可以如果它们的格式不同,则会导致问题(这就是为什么我添加session.remove()的原因).如果您需要原始流文件与其他文件建立不同的关系,请考虑

In your example, once you add my snippet above, you are overwriting the input content with your json.dumps() command, as well as creating and transferring new files, all to the same relationship (success), so that could cause problems if they are not of the same format (which is why I added the session.remove()). If you need the original flow file to go out a different relationship than the rest, please consider InvokeScriptedProcessor rather than ExecuteScript. If you don't care about the input flow file after the processing (adding URL attributes is done), then follow my suggestion above. If they can all go out the same relationship (success), then replace my session.remove() with

session.transfer(originalFlowFile, REL_SUCCESS)

查看我的ExecuteScript食谱文章(第2部分的3)中有关Jython(和其他语言)的这些用例的更多示例:)

Check my ExecuteScript cookbook article (part 2 of 3) for more examples of these use cases in Jython (and other languages) :)

这篇关于使用Nifi ExecuteScript处理器生成多个流文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆