如何使用 ExecuteScript(以 python 作为脚本引擎)进行添加数字的练习?【新手用户尝试学习NiFi】 [英] How to use ExecuteScript (with python as a script engine) for an exercise to add numbers? [Novice user trying to learn NiFi]

查看:26
本文介绍了如何使用 ExecuteScript(以 python 作为脚本引擎)进行添加数字的练习?【新手用户尝试学习NiFi】的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我对 NiFi 比较陌生,不确定如何正确执行以下操作.我想使用 ExecuteScript 处理器(脚本引擎:python)来执行以下操作(请仅在 python 中):

I am relatively new to NiFi and am not sure how to do the following correctly. I would like to use ExecuteScript processor (script engine: python) to do the following (only in python please):

1) 有一个包含以下信息的 CSV 文件(第一行是标题):

1) There is a CSV file containing the following information (the first row is the header):

first,second,third
1,4,9
7,5,2
3,8,7

2) 我想找到单个行的总和并生成一个带有修改标题的最终文件.最终文件应如下所示:

2) I would like to find the sum of individual rows and generate a final file with a modified header. The final file should look like this:

first,second,third,total
1,4,9,14
7,5,2,14
3,8,7,18

对于python脚本,我写道:

For the python script, I wrote:

def summation(first,second,third):
    numbers = first + second + third
    return numbers
flowFile = session.get()
if (flowFile != None):
    flowFile = session.write(flowFile, summation())

但它不起作用,我不知道如何解决这个问题.谁能告诉我如何解决这个问题?

But it does not work and I am not sure how to fix this. Can anyone provide me an understanding on how to approach this problem?

NiFi 流程:

The NiFi flow:

谢谢

推荐答案

您的脚本没有按照您的意愿运行.有几种方法可以解决这个问题:

Your script is not doing what you would like it to do. There are a couple approaches to this problem:

  1. 使用迭代 CSV 内容中的行的脚本一次对整个流文件进行操作
  2. 将 CSV 内容中的行视为记录",并使用处理单行的脚本对每条记录进行操作

我将对您的脚本进行更改以立即处理整个流文件内容;您可以阅读有关 Record* 处理器的更多信息 这里这里,和此处.

I will provide changes to your script to handle the entire flowfile content at once; you can read more about the Record* processors here, here, and here.

这是一个执行您期望的操作的脚本.请注意差异以查看我更改内容的位置(此脚本当然可以变得更高效和简洁;演示正在发生的事情很冗长,而且我不是 Python 专家).

Here is a script which performs the action you expect. Note the differences to see where I changed things (this script could certainly be made more efficient and concise; it is verbose to demonstrate what is happening, and I am not a Python expert).

import json
from java.io import BufferedReader, InputStreamReader
from org.apache.nifi.processor.io import StreamCallback

# This PyStreamCallback class is what the processor will use to ingest and output the flowfile content
class PyStreamCallback(StreamCallback):
  def __init__(self):
        pass
  def process(self, inputStream, outputStream):
      try:
        # Get the provided inputStream into a format where you can read lines
        reader = BufferedReader(InputStreamReader(inputStream))
        # Set a marker for the first line to be the header
        isHeader = True        
        try:
          # A holding variable for the lines
          lines = []
          # Loop indefinitely
          while True:
            # Get the next line
            line = reader.readLine()
            # If there is no more content, break out of the loop
            if line is None:
              break
            # If this is the first line, add the new column
            if isHeader:
              header = line + ",total"
              # Write the header line and the new column
              lines.append(header)
              # Set the header flag to false now that it has been processed
              isHeader = False
            else:
              # Split the line (a string) into individual elements by the ',' delimiter
              elements = self.extract_elements(line)
              # Get the sum (this method is unnecessary but shows where your "summation" method would go)
              sum = self.summation(elements)
              # Write the output of this line
              newLine = ",".join([line, str(sum)])
              lines.append(newLine)

          # Now out of the loop, write the output to the outputStream
          output = "\n".join([str(l) for l in lines])
          outputStream.write(bytearray(output.encode('utf-8')))

        finally:
            if reader is not None:
                reader.close()

      except Exception as e:
        log.warn("Exception in Reader")
        log.warn('-' * 60)
        log.warn(str(e))
        log.warn('-' * 60)
        raise e
        session.transfer(flowFile, REL_FAILURE)

  def extract_elements(self, line):
    # This splits the line on the ',' delimiter and converts each element to an integer, and puts them in a list
    return [int(x) for x in line.split(',')]

  # This method replaces your "summation" method and can accept any number of inputs, not just 3
  def summation(self, list):
    # This returns the sum of all items in the list
    return sum(list)


flowFile = session.get()
if (flowFile != None):
  flowFile = session.write(flowFile,PyStreamCallback())
  session.transfer(flowFile, REL_SUCCESS)

我的流程的结果(在 GenerateFlowFile 处理器中使用您的输入):

Result from my flow (using your input in a GenerateFlowFile processor):

2018-07-20 13:54:06,772 INFO [Timer-Driven Process Thread-5] o.a.n.processors.standard.LogAttribute LogAttribute[id=b87f0c01-0164-1000-920e-799647cb9b48] logging for flow file StandardFlowFileRecord[uuid=de888571-2947-4ae1-b646-09e61c85538b,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1532106928567-1, container=default, section=1], offset=2499, length=51],offset=0,name=470063203212609,size=51]
--------------------------------------------------
Standard FlowFile Attributes
Key: 'entryDate'
    Value: 'Fri Jul 20 13:54:06 EDT 2018'
Key: 'lineageStartDate'
    Value: 'Fri Jul 20 13:54:06 EDT 2018'
Key: 'fileSize'
    Value: '51'
FlowFile Attribute Map Content
Key: 'filename'
    Value: '470063203212609'
Key: 'path'
    Value: './'
Key: 'uuid'
    Value: 'de888571-2947-4ae1-b646-09e61c85538b'
--------------------------------------------------
first,second,third,total
1,4,9,14
7,5,2,14
3,8,7,18

这篇关于如何使用 ExecuteScript(以 python 作为脚本引擎)进行添加数字的练习?【新手用户尝试学习NiFi】的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆