Nifi:如何在一个线程(处理器)中对文件进行写入和检查操作? [英] Nifi: How to make write and check operations in file in one thread (processor)?
问题描述
如何只写一个点(处理器或服务)来写文件并使其在单线程中工作,在我的情况下,我有这样的工作流程executioncript1(带写操作的单线程处理器) - > updateAttribute-> InvokeHttpProcessor - >执行脚本1(单线程处理器与检查操作(它是第一个处理器))我已经尝试了下面的代码,但它也没有成功实现既不会例外,
<强>我应该改变什么?
这是我的代码:
File file = new File(C:/Users/Desktop/test/conf.xml);
String content =;
BufferedReader s;
BufferedWriter w;
RandomAccessFile ini =新的RandomAccessFile(文件,rwd);
FileLock lock = ini.getChannel()。lock();
尝试{
def flowFile = session.get();
if(flowFile == null){
String sCurrentLine;
s = new BufferedReader(Channels.newReader(ini.getChannel(),UTF-8)); $($ s
$ b)
}
ini.seek(0);
$ b $ def flowFile1 = session.create()
flowFile1 = session.putAttribute(flowFile1,filename,conf.xml);
session.write(flowFile1,new StreamCallback(){
@Override
public void process(InputStream inputStream1,OutputStream outputStream)throws IOException {
outputStream.write content.getBytes(StandardCharsets.UTF_8))
}
});
session.transfer(flowFile1,REL_SUCCESS);
def xml = new XmlParser()。parseText(content);
xml。'**'。findAll {it.name()=='run'}。each {it.replaceBody'false'}
def newxml = XmlUtil.serialize(xml);
String data = newxml;
if(!data.isEmpty()){
ini.setLength(0);
w = new BufferedWriter(Channels.newWriter(ini.getChannel(),UTF-8));
w.write(data);
lock.release();
w.close();
}
}
else {
def serviceName = flowFile.getAttribute ('服务名称');
def date = flowFile.getAttribute('filename')。substring(0,10);
if(serviceName =='Decl'){
def xml = new XmlParser()。parseText(content)
for(int i = 0; i< names.size(); i ++ ){
date = names.get(i).substring(0,10);
xml.RS.Decl.details.findAll({p - >
p.runAs [0] .text()==false&&& p.start [0] .text ()== date.toString()
})。each({p - >
p.start [0] .value = addDays(p.start [0] .text())
p.runAs [0] .value =true
})
}
def newXml = groovy.xml.XmlUtil.serialize(xml)
data = newXml.toString()
if(!data.isEmpty()){
ini.setLength(0);
w = new BufferedWriter(Channels.newWriter(ini.getChannel(),UTF-8));
w.write(data);
lock.release();
w.close();
$ b else if(serviceName =='TaxyFee'){
def xml = new XmlParser()。parseText(content)
for(int i = 0; i< names.size(); i ++){
date = names.get(i).substring(0,10);
xml.RS.TaxyFee.details.findAll({p - >
p.runAs [0] .text()==false&&& p.start [0] .text ()== date.toString()
})。each({p - >
p.start [0] .value = addDays(p.start [0] .text())
p.runAs [0] .value =true
})
}
def newXml = groovy.xml.XmlUtil.serialize(xml)
data = newXml.toString()
if(!data.isEmpty()){
ini.setLength(0);
w = new BufferedWriter(Channels.newWriter(ini.getChannel(),UTF-8));
w.write(data);
lock.release();
w.close();
$ b}
}
}
} catch(FileNotFoundException e){
//e.printStackTrace();
TimeUnit.SECONDS.sleep(50000);
} catch(IOException e){
e.printStackTrace();
} catch(OverlappingFileLockException e){TimeUnit.SECONDS.sleep(50000);
lock.release();
} catch(Exception e){
e.printStackTrace();
} finally {
//lock.release();
ini.close();
处理器属性并发任务= 1
的选项卡。
所以只有一个处理器实例会同时运行。
https://nifi.apache.org/docs/nifi-docs /html/user-guide.html#scheduling-tab
How to make only one point (Processor or Service) that will write the file and make it working in single thread, in my case i have workflow like this executescript1(single thread processor with write operations)->updateAttribute->InvokeHttpProcessor->executescript1(single thread processor with check operations (it is the first processor)) i have tried the code below but it nor fulfills sucessfully neither trows exception,
WHAT SHOULD I CHANGE?
here is my code:
File file = new File("C:/Users/Desktop/test/conf.xml");
String content = "";
BufferedReader s;
BufferedWriter w;
RandomAccessFile ini= new RandomAccessFile(file, "rwd");
FileLock lock= ini.getChannel().lock();
try {
def flowFile=session.get();
if(flowFile==null){
String sCurrentLine;
s = new BufferedReader(Channels.newReader(ini.getChannel(), "UTF-8"));
while ((sCurrentLine = s.readLine()) != null) {
content += sCurrentLine;
}
ini.seek(0);
def flowFile1=session.create()
flowFile1 = session.putAttribute(flowFile1, "filename", "conf.xml");
session.write(flowFile1, new StreamCallback() {
@Override
public void process(InputStream inputStream1, OutputStream outputStream) throws IOException {
outputStream.write(content.getBytes(StandardCharsets.UTF_8))
}
});
session.transfer(flowFile1,REL_SUCCESS);
def xml = new XmlParser().parseText(content);
xml.'**'.findAll{it.name() == 'run'}.each{ it.replaceBody 'false'}
def newxml=XmlUtil.serialize(xml);
String data =newxml;
if (!data.isEmpty()) {
ini.setLength(0);
w = new BufferedWriter(Channels.newWriter(ini.getChannel(), "UTF-8"));
w.write(data);
lock.release();
w.close();
}
}
else{
def serviceName=flowFile.getAttribute('serviceName');
def date=flowFile.getAttribute('filename').substring(0,10);
if(serviceName=='Decl'){
def xml = new XmlParser().parseText(content)
for(int i=0;i<names.size();i++) {
date = names.get(i).substring(0, 10);
xml.RS.Decl.details.findAll({ p ->
p.runAs[0].text() == "false" && p.start[0].text() == date.toString()
}).each({ p ->
p.start[0].value = addDays( p.start[0].text())
p.runAs[0].value = "true"
})
}
def newXml= groovy.xml.XmlUtil.serialize( xml )
data = newXml.toString()
if (!data.isEmpty()) {
ini.setLength(0);
w = new BufferedWriter(Channels.newWriter(ini.getChannel(), "UTF-8"));
w.write(data);
lock.release();
w.close();
}
}
else if(serviceName=='TaxyFee'){
def xml = new XmlParser().parseText(content)
for(int i=0;i<names.size();i++) {
date = names.get(i).substring(0, 10);
xml.RS.TaxyFee.details.findAll({ p ->
p.runAs[0].text() == "false" && p.start[0].text() == date.toString()
}).each({ p ->
p.start[0].value = addDays( p.start[0].text())
p.runAs[0].value = "true"
})
}
def newXml= groovy.xml.XmlUtil.serialize( xml )
data = newXml.toString()
if (!data.isEmpty()) {
ini.setLength(0);
w = new BufferedWriter(Channels.newWriter(ini.getChannel(), "UTF-8"));
w.write(data);
lock.release();
w.close();
}
}
}
} catch (FileNotFoundException e) {
//e.printStackTrace();
TimeUnit.SECONDS.sleep(50000);
} catch (IOException e) {
e.printStackTrace();
} catch(OverlappingFileLockException e){ TimeUnit.SECONDS.sleep(50000);
lock.release();
} catch (Exception e) {
e.printStackTrace();
} finally {
//lock.release();
ini.close();
}
set on the "scheduling" tab of the processor properties Concurrent Tasks = 1
.
so only one instance of the processor will be running at the same time.
https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#scheduling-tab
这篇关于Nifi:如何在一个线程(处理器)中对文件进行写入和检查操作?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!