使用executescript处理器更新csv值在apache-nifi中失败 [英] Update csv value using executescript processor fails in apache-nifi

查看:256
本文介绍了使用executescript处理器更新csv值在apache-nifi中失败的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我尝试从流文件读取并使用csv中的默认值更新记录值.为此,我已经在ExecuteScript处理器中使用了以下python代码.

I try to read from a flowfile and update a record value using default value in csv. To that I have used ExecuteScript processor with following python code in it.

import sys
import re
import traceback
from org.apache.commons.io import IOUtils
from org.apache.nifi.processor.io import StreamCallback
from org.python.core.util import StringUtil
from java.lang import Class
from java.io import BufferedReader
from java.io import InputStreamReader
from java.io import OutputStreamWriter

flowfile = session.get()
record = flowfile.getAttribute('record_type')

if record == '0':
    flowfile = session.putAttribute(flowfile,'record_type', 'NEW_USER')
    session.transfer(flowFile, REL_SUCCESS)
    session.commit()
elif record == '1':
    flowfile = session.putAttribute(flowfile,'record_type', 'OLD_USER')
    session.transfer(flowFile, REL_SUCCESS)
    session.commit()
else:
    flowfile = session.putAttribute(flowfile,'record_type', 'IGNORE')
    session.transfer(flowFile, REL_SUCCESS)
    session.commit()

writer.flush()
writer.close()
reader.close()

我的csv看起来像

id,record_type
1,0
2,1
3,2
4,0

结果应为:

id,record_type
1,NEW_USER
2,OLD_USER
3,IGNORE
4,NEW_USER

我收到以下错误消息:

AttributeError:"NoneType"对象在其中没有属性"getAttribute" 第13行的脚本

AttributeError : 'NoneType' object has no attribute 'getAttribute' in script at line number 13

它说record = flowfile.getAttribute('record_type')这是错误的..

我不知道如何解决此问题,因为我对python不好.

I have no idea how to solve this as I am not good with python.

推荐答案

那不是python,但根据作者的评论,可能很时髦.

that,s not python, but according to comment from author could be groovy.

通过以下代码使用ExecuteGroovyScript处理器:

use ExecuteGroovyScript processor with following code:

def ff=session.get()
if(!ff)return

def map = [
    '0': 'NEW_USER',
    '1': 'OLD_USER',
]

ff.write{rawIn, rawOut->
    rawOut.withWriter("UTF-8"){w->
        rawIn.withReader("UTF-8"){r->
            int rowNum = 0
            //iterate lines from input stream and split each with coma
            r.splitEachLine( ',' ){row->
                if(rowNum>0){
                    //if not a header line then substitute value using map
                    row[1] = map[ row[1] ] ?: 'IGNORE'
                }
                //join and write row to output writer
                w << row.join(',') << '\n'
                rowNum++
            }
        }
    }
}

REL_SUCCESS << ff

这篇关于使用executescript处理器更新csv值在apache-nifi中失败的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆