Nifi:如何在一个线程(处理器)中对文件进行写入和检查操作? [英] Nifi: How to make write and check operations in file in one thread (processor)?

查看:717
本文介绍了Nifi:如何在一个线程(处理器)中对文件进行写入和检查操作?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如何只写一个点(处理器或服务)来写文件并使其在单线程中工作,在我的情况下,我有这样的工作流程executioncript1(带写操作的单线程处理器) - > updateAttribute-> InvokeHttpProcessor - >执行脚本1(单线程处理器与检查操作(它是第一个处理器))我已经尝试了下面的代码,但它也没有成功实现既不会例外,



<强>我应该改变什么?



这是我的代码:

  File file = new File(C:/Users/Desktop/test/conf.xml); 
String content =;
BufferedReader s;
BufferedWriter w;
RandomAccessFile ini =新的RandomAccessFile(文件,rwd);
FileLock lock = ini.getChannel()。lock();

尝试{
def flowFile = session.get();
if(flowFile == null){

String sCurrentLine;
s = new BufferedReader(Channels.newReader(ini.getChannel(),UTF-8)); $($ s

$ b)
}
ini.seek(0);
$ b $ def flowFile1 = session.create()
flowFile1 = session.putAttribute(flowFile1,filename,conf.xml);
session.write(flowFile1,new StreamCallback(){
@Override
public void process(InputStream inputStream1,OutputStream outputStream)throws IOException {

outputStream.write content.getBytes(StandardCharsets.UTF_8))
}

});


session.transfer(flowFile1,REL_SUCCESS);

def xml = new XmlParser()。parseText(content);
xml。'**'。findAll {it.name()=='run'}。each {it.replaceBody'false'}
def newxml = XmlUtil.serialize(xml);

String data = newxml;

if(!data.isEmpty()){
ini.setLength(0);
w = new BufferedWriter(Channels.newWriter(ini.getChannel(),UTF-8));
w.write(data);
lock.release();
w.close();




}
}


else {
def serviceName = flowFile.getAttribute ('服务名称');
def date = flowFile.getAttribute('filename')。substring(0,10);
if(serviceName =='Decl'){
def xml = new XmlParser()。parseText(content)
for(int i = 0; i< names.size(); i ++ ){
date = names.get(i).substring(0,10);
xml.RS.Decl.details.findAll({p - >
p.runAs [0] .text()==false&&& p.start [0] .text ()== date.toString()
})。each({p - >
p.start [0] .value = addDays(p.start [0] .text())
p.runAs [0] .value =true
})
}
def newXml = groovy.xml.XmlUtil.serialize(xml)

data = newXml.toString()
if(!data.isEmpty()){
ini.setLength(0);
w = new BufferedWriter(Channels.newWriter(ini.getChannel(),UTF-8));
w.write(data);
lock.release();
w.close();



$ b else if(serviceName =='TaxyFee'){
def xml = new XmlParser()。parseText(content)
for(int i = 0; i< names.size(); i ++){
date = names.get(i).substring(0,10);
xml.RS.TaxyFee.details.findAll({p - >
p.runAs [0] .text()==false&&& p.start [0] .text ()== date.toString()
})。each({p - >
p.start [0] .value = addDays(p.start [0] .text())
p.runAs [0] .value =true
})
}
def newXml = groovy.xml.XmlUtil.serialize(xml)

data = newXml.toString()
if(!data.isEmpty()){
ini.setLength(0);
w = new BufferedWriter(Channels.newWriter(ini.getChannel(),UTF-8));
w.write(data);
lock.release();
w.close();

$ b}
}
}



} catch(FileNotFoundException e){
//e.printStackTrace();
TimeUnit.SECONDS.sleep(50000);
} catch(IOException e){
e.printStackTrace();

} catch(OverlappingFileLockException e){TimeUnit.SECONDS.sleep(50000);
lock.release();
} catch(Exception e){
e.printStackTrace();
} finally {

//lock.release();
ini.close();


解决方案

处理器属性并发任务= 1 的选项卡。

所以只有一个处理器实例会同时运行。



https://nifi.apache.org/docs/nifi-docs /html/user-guide.html#scheduling-tab


How to make only one point (Processor or Service) that will write the file and make it working in single thread, in my case i have workflow like this executescript1(single thread processor with write operations)->updateAttribute->InvokeHttpProcessor->executescript1(single thread processor with check operations (it is the first processor)) i have tried the code below but it nor fulfills sucessfully neither trows exception,

WHAT SHOULD I CHANGE?

here is my code:

   File file = new File("C:/Users/Desktop/test/conf.xml");
        String content = "";
        BufferedReader s;
        BufferedWriter w;
        RandomAccessFile ini= new RandomAccessFile(file, "rwd");
        FileLock lock= ini.getChannel().lock();

        try {
         def flowFile=session.get();
         if(flowFile==null){

                String sCurrentLine;
            s = new BufferedReader(Channels.newReader(ini.getChannel(), "UTF-8"));


            while ((sCurrentLine = s.readLine()) != null) {
                content += sCurrentLine;
            }
            ini.seek(0);

              def flowFile1=session.create()
                flowFile1 = session.putAttribute(flowFile1, "filename", "conf.xml");
                session.write(flowFile1, new StreamCallback() {
                    @Override
                    public void process(InputStream inputStream1, OutputStream outputStream) throws IOException {

                        outputStream.write(content.getBytes(StandardCharsets.UTF_8))
                    }

                });


                session.transfer(flowFile1,REL_SUCCESS);

             def xml = new XmlParser().parseText(content);
             xml.'**'.findAll{it.name() == 'run'}.each{ it.replaceBody 'false'}
            def newxml=XmlUtil.serialize(xml);

            String data =newxml;

            if (!data.isEmpty()) {
                ini.setLength(0);
                w = new BufferedWriter(Channels.newWriter(ini.getChannel(), "UTF-8"));
                w.write(data);
                lock.release();
                w.close();




            }
            }


            else{
            def   serviceName=flowFile.getAttribute('serviceName');
            def date=flowFile.getAttribute('filename').substring(0,10);
            if(serviceName=='Decl'){
          def xml = new XmlParser().parseText(content)
            for(int i=0;i<names.size();i++) {
                date = names.get(i).substring(0, 10);
                xml.RS.Decl.details.findAll({ p ->
                    p.runAs[0].text() == "false" && p.start[0].text() == date.toString()
                }).each({ p ->
                    p.start[0].value = addDays( p.start[0].text())
                    p.runAs[0].value = "true"
                })
            }
            def newXml=  groovy.xml.XmlUtil.serialize( xml )

            data = newXml.toString()
            if (!data.isEmpty()) {
                 ini.setLength(0);
                w = new BufferedWriter(Channels.newWriter(ini.getChannel(), "UTF-8"));
                w.write(data);
                    lock.release();
                w.close();


            }
            }
            else  if(serviceName=='TaxyFee'){
              def xml = new XmlParser().parseText(content)
            for(int i=0;i<names.size();i++) {
                date = names.get(i).substring(0, 10);
                xml.RS.TaxyFee.details.findAll({ p ->
                    p.runAs[0].text() == "false" && p.start[0].text() == date.toString()
                }).each({ p ->
                    p.start[0].value = addDays( p.start[0].text())
                    p.runAs[0].value = "true"
                })
            }
            def newXml=  groovy.xml.XmlUtil.serialize( xml )

            data = newXml.toString()
            if (!data.isEmpty()) {
                 ini.setLength(0);
                w = new BufferedWriter(Channels.newWriter(ini.getChannel(), "UTF-8"));
                w.write(data);
                    lock.release();
                w.close();


            }
            }
            }



        } catch (FileNotFoundException e) {
            //e.printStackTrace();
            TimeUnit.SECONDS.sleep(50000);
        } catch (IOException e) {
            e.printStackTrace();

        } catch(OverlappingFileLockException e){ TimeUnit.SECONDS.sleep(50000);
            lock.release();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {

            //lock.release();
            ini.close();
        }

解决方案

set on the "scheduling" tab of the processor properties Concurrent Tasks = 1.

so only one instance of the processor will be running at the same time.

https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#scheduling-tab

这篇关于Nifi:如何在一个线程(处理器)中对文件进行写入和检查操作?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆