NiFi中的Python ExecuteScript:转换流文件属性&内容 [英] Python ExecuteScript in NiFi: Transform flowfile attributes & content

查看:125
本文介绍了NiFi中的Python ExecuteScript:转换流文件属性&内容的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试在NiFi中创建一个Python脚本:

I am trying to create a Python script in NiFi that:

  1. 从传入的流文件中读取一些属性
  2. 读取流文件的json内容&提取特定字段
  3. 将属性写入传出的流文件
  4. 用脚本中创建的新内容覆盖传入的流文件(例如,返回新json的API调用),然后将其发送到SUCCESS关系,或者删除旧的流文件并使用所需的内容创建新的流文件.

到目前为止我所做的事情:

What i ve done so far:

import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback,InputStreamCallback, OutputStreamCallback

class OutputWrite(OutputStreamCallback, obj):

def __init__(self):
    self.obj = obj

def process(self, outputStream):

    outputStream.write(bytearray(json.dumps(self.obj).encode('utf')))

###end class###

flowfile = session.get()

if flowfile != None:

**#1) Get flowfile attributes**

    headers = {
        'Accept-Encoding': 'gzip, deflate, br',
        'Accept': 'application/json, text/plain, */*',
        'Cache-Control': 'no-cache',
        'Ocp-Apim-Trace': 'true',
        'Authorization': flowfile.getAttribute('Authorization')
    }

    collection = flowfile.getAttribute('collection')
    dataset = flowfile.getAttribute('dataset')

    **#2)Get flowfile content**

    stream_content = session.read(flowfile)
    text_content = IOUtils.toString(stream_content, StandardCharsets.UTF_8)
    json_content = json.loads(text_content)

    records = json_content['result']['count']
    pages = records/10000

    **#3) Write flowfile attributes**

    flowfile = session.putAttribute(flowfile, 'collection', collection)
    flowfile = session.putAttribute(flowfile, 'dataset', dataset)

    **#API operations: output_json with desired data**

    output_json = {some data}

    **#4) Write final JSON data to output flowfile**

    flowfile = session.write(flowfile, OutputWrite(output_json))

    session.transfer(flowfile, REL_SUCCESS)
    session.commit()

我的问题是我找不到在OutputStreamCallback类中将对期望的output_json对象的引用作为参数传递的方法.关于如何解决这个问题或更好的方法的任何想法?

My problem is that i can't find a way to pass a reference to the desired output_json object as an argument in the OutputStreamCallback class. Any ideas on how to resolve this or maybe a better approach?

在这种情况下,在类的流程函数中执行所有API操作也许更容易,但是然后我如何才能访问流程函数中的传入流文件属性(需要会话或流文件对象)?

Is it maybe easier to perform all API operations in this case within the process function of the class, but then how do i get access to the incoming flowfile attributes within the process function (requires a session or a flowfile object) ?

任何帮助,不胜感激!

推荐答案

我在下面提供了示例Python代码,该代码允许自定义 PyStreamCallback 类,该类实现了将流文件内容中的JSON转换为逻辑的逻辑.马特·伯吉斯的博客文章,但我鼓励您考虑对 UpdateAttribute EvaluateJSONPath 使用本机处理器来执行相关活动,并且仅在执行NiFi任务特别需要的地方使用自定义代码开箱即用.

I've included example Python code below which allows for a custom PyStreamCallback class which implements logic to transform JSON in the flowfile content from Matt Burgess' blog article on the topic, but I would encourage you to consider using native processors for UpdateAttribute and EvaluateJSONPath to perform the relevant activities and only use custom code where it is specifically needed to perform a task that NiFi doesn't handle out of the box.

import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

class PyStreamCallback(StreamCallback):
  def __init__(self):
        pass
  def process(self, inputStream, outputStream):
    text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
    obj = json.loads(text)
    newObj = {
          "Range": 5,
          "Rating": obj['rating']['primary']['value'],
          "SecondaryRatings": {}
        }
    for key, value in obj['rating'].iteritems():
      if key != "primary":
        newObj['SecondaryRatings'][key] = {"Id": key, "Range": 5, "Value": value['value']}

    outputStream.write(bytearray(json.dumps(newObj, indent=4).encode('utf-8'))) 

flowFile = session.get()
if (flowFile != None):
  flowFile = session.write(flowFile,PyStreamCallback())
  flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').split('.')[0]+'_translated.json')
  session.transfer(flowFile, REL_SUCCESS)

更新:

要在回调中访问流文件的属性,只需将其作为参数传递给构造函数,将其存储为字段,然后在 process 方法内对其进行引用.这是一个非常简单的示例,将属性 my_attr 的值连接到传入的流文件内容并将其写回:

To access the attributes of the flowfile within the callback, simply pass it as an argument to the constructor, store it as a field, and reference it within the process method. Here is a very simple example that concatenates the value of attribute my_attr to the incoming flowfile content and writes it back:

import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

class PyStreamCallback(StreamCallback):
    def __init__(self, flowfile):
        self.ff = flowfile
        pass
    def process(self, inputStream, outputStream):
        text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        text += self.ff.getAttribute('my_attr')
        outputStream.write(bytearray(text.encode('utf-8')))

flowFile = session.get()
if (flowFile != None):
    flowFile = session.write(flowFile,PyStreamCallback(flowFile))
    session.transfer(flowFile, REL_SUCCESS)

传入流文件:

--------------------------------------------------
Standard FlowFile Attributes
Key: 'entryDate'
    Value: 'Tue Mar 13 13:10:48 PDT 2018'
Key: 'lineageStartDate'
    Value: 'Tue Mar 13 13:10:48 PDT 2018'
Key: 'fileSize'
    Value: '30'
FlowFile Attribute Map Content
Key: 'filename'
    Value: '1690494181462176'
Key: 'my_attr'
    Value: 'This is an attribute value.'
Key: 'path'
    Value: './'
Key: 'uuid'
    Value: 'dc93b715-50a0-43ce-a4db-716bd9ec3205'
--------------------------------------------------
This is some flowfile content.

去向流文件:

--------------------------------------------------
Standard FlowFile Attributes
Key: 'entryDate'
    Value: 'Tue Mar 13 13:10:48 PDT 2018'
Key: 'lineageStartDate'
    Value: 'Tue Mar 13 13:10:48 PDT 2018'
Key: 'fileSize'
    Value: '57'
FlowFile Attribute Map Content
Key: 'filename'
    Value: '1690494181462176'
Key: 'my_attr'
    Value: 'This is an attribute value.'
Key: 'path'
    Value: './'
Key: 'uuid'
    Value: 'dc93b715-50a0-43ce-a4db-716bd9ec3205'
--------------------------------------------------
This is some flowfile content.This is an attribute value.

这篇关于NiFi中的Python ExecuteScript:转换流文件属性&内容的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆