使用executescript处理器更新csv值在apache-nifi中失败 [英] Update csv value using executescript processor fails in apache-nifi
问题描述
我尝试从流文件读取并使用csv中的默认值更新记录值.为此,我已经在ExecuteScript
处理器中使用了以下python代码.
I try to read from a flowfile and update a record value using default value in csv. To that I have used ExecuteScript
processor with following python code in it.
import sys
import re
import traceback
from org.apache.commons.io import IOUtils
from org.apache.nifi.processor.io import StreamCallback
from org.python.core.util import StringUtil
from java.lang import Class
from java.io import BufferedReader
from java.io import InputStreamReader
from java.io import OutputStreamWriter
flowfile = session.get()
record = flowfile.getAttribute('record_type')
if record == '0':
flowfile = session.putAttribute(flowfile,'record_type', 'NEW_USER')
session.transfer(flowFile, REL_SUCCESS)
session.commit()
elif record == '1':
flowfile = session.putAttribute(flowfile,'record_type', 'OLD_USER')
session.transfer(flowFile, REL_SUCCESS)
session.commit()
else:
flowfile = session.putAttribute(flowfile,'record_type', 'IGNORE')
session.transfer(flowFile, REL_SUCCESS)
session.commit()
writer.flush()
writer.close()
reader.close()
我的csv看起来像
id,record_type
1,0
2,1
3,2
4,0
结果应为:
id,record_type
1,NEW_USER
2,OLD_USER
3,IGNORE
4,NEW_USER
我收到以下错误消息:
AttributeError:"NoneType"对象在其中没有属性"getAttribute" 第13行的脚本
AttributeError : 'NoneType' object has no attribute 'getAttribute' in script at line number 13
它说record = flowfile.getAttribute('record_type')
这是错误的..
我不知道如何解决此问题,因为我对python
不好.
I have no idea how to solve this as I am not good with python
.
推荐答案
那不是python,但根据作者的评论,可能很时髦.
that,s not python, but according to comment from author could be groovy.
通过以下代码使用ExecuteGroovyScript处理器:
use ExecuteGroovyScript processor with following code:
def ff=session.get()
if(!ff)return
def map = [
'0': 'NEW_USER',
'1': 'OLD_USER',
]
ff.write{rawIn, rawOut->
rawOut.withWriter("UTF-8"){w->
rawIn.withReader("UTF-8"){r->
int rowNum = 0
//iterate lines from input stream and split each with coma
r.splitEachLine( ',' ){row->
if(rowNum>0){
//if not a header line then substitute value using map
row[1] = map[ row[1] ] ?: 'IGNORE'
}
//join and write row to output writer
w << row.join(',') << '\n'
rowNum++
}
}
}
}
REL_SUCCESS << ff
这篇关于使用executescript处理器更新csv值在apache-nifi中失败的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!