Apache Nifi的AbstractProcessor的onTrigger()方法中的零流文件 [英] Zero flow files in onTrigger() method of AbstractProcessor of Apache Nifi

查看:184
本文介绍了Apache Nifi的AbstractProcessor的onTrigger()方法中的零流文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在为Apache NiFi开发自定义处理器.我已经创建了处理器的nar并将其放在nifi的lib文件夹中并启动了nifi.我已经在eclipse中设置了远程调试器,并在onTrigger()的第一行启用了断点.调试时,我在nifi管道中一次运行一个处理器.我可以在自定义处理器的输入队列中找到单个流文件,但是我的自定义处理器未收到任何流文件.当我启动自定义处理器时,它会在onTrigger()方法内达到断点.在Thie方法中,当我这样做时:

public class MyCustomProc extends AbstractProcessor {

    @Override
    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {

        List<FlowFile> flowFiles = session.get(5000);
        if (flowFiles == null || flowFiles.size() == 0) {
            return;
        }
        //...

flowFiles的大小为零!!! 我无法猜测应该朝哪个方向查找原因.有什么提示可以诊断出来吗?

修改

Stacktrace

2019-05-02 18:08:09,456 ERROR [Timer-Driven Process Thread-10] c.c.product.module.submodule.MyCustomProcessor MyCustomProcessor[id=016a1008-8956-1dbf-bd66-993e0ce98668] MyCustomProcessor[id=016a1008-8956-1dbf-bd66-993e0ce98668] failed to process due to org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=408fbb3d-7cc2-48bc-be8f-6d0afdbddaf2,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1556800468726-1, container=default, section=1], offset=261, length=591447],offset=0,name=188149730353200,size=591447] transfer relationship not specified; rolling back session: {}
org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=408fbb3d-7cc2-48bc-be8f-6d0afdbddaf2,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1556800468726-1, container=default, section=1], offset=261, length=591447],offset=0,name=188149730353200,size=591447] transfer relationship not specified
    at org.apache.nifi.controller.repository.StandardProcessSession.checkpoint(StandardProcessSession.java:251)
    at org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:321)
    at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:28)
    at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1122)
    at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)
    at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
    at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128)
    at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
    at java.util.concurrent.FutureTask.runAndReset(Unknown Source)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(Unknown Source)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)

PS1::此方法立即从if的内部返回,这给了我以下异常:

org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord transfer relationship not specified

由于流文件位于我的自定义处理器的输入队列中,因此该异常会一直持续发生.

PS2:,尽管我不确定这是否是问题的根源,但我在apps.log中遇到以下错误:

2019-05-02 18:17:32,394 ERROR [Timer-Driven Process Thread-4] o.a.nifi.groups.StandardProcessGroup Unable to synchronize StandardProcessGroup[identifier=d25747e6-719e-3ed9-c6c5-56794af6555c] with Flow Registry because Process Group was placed under Version Control using Flow Registry with identifier 80016ab0-bfab-152b-ffff-ffffc441867c but cannot find any Flow Registry with this identifier

解决方案

有时会得到零流量文件是正常现象,这就是为什么处理器在开始时会进行检查的原因.

FlowFileHandlingException意味着从会话(从get或create获得)流文件,并且该流文件没有被转移到任何地方并且没有被删除,因此从根本上讲它是无法解释的.仅从if语句的开头返回是不可能的,因此其余的处理器代码正在执行并产生此错误.您尚未提供其余代码,因此我们看不到问题.

第二个问题是不言而喻的.您有一个受版本控制的进程组,但是用于启动版本控制的注册表客户端已不复存在.我不知道您是如何创建这种情况的,因为我相信UI/API不会允许您删除具有受版本控制的活动流的注册表客户端,但是您应该能够在进程组上停止版本控制.

I am developing a custom processor for Apache NiFi. I have created nar of my processor and put it in the lib folder of nifi and started the nifi. I have setup the remote debugger in eclipse and enabled breakpoint on first line of onTrigger(). While debugging I am running one processor at a time in my nifi pipeline. I can find single flow file in the input queue of my custom processor, however my custom processor is not receiving any flow file. When I start my custom processor, it hits breakpoint inside onTrigger() method. Inside thie method, when I do:

public class MyCustomProc extends AbstractProcessor {

    @Override
    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {

        List<FlowFile> flowFiles = session.get(5000);
        if (flowFiles == null || flowFiles.size() == 0) {
            return;
        }
        //...

flowFiles turns out to be of size zero!!! I am not able to guess in which direction should I check to find why this is happening. Any hint how I can diagnose this?

Edit

Stacktrace

2019-05-02 18:08:09,456 ERROR [Timer-Driven Process Thread-10] c.c.product.module.submodule.MyCustomProcessor MyCustomProcessor[id=016a1008-8956-1dbf-bd66-993e0ce98668] MyCustomProcessor[id=016a1008-8956-1dbf-bd66-993e0ce98668] failed to process due to org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=408fbb3d-7cc2-48bc-be8f-6d0afdbddaf2,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1556800468726-1, container=default, section=1], offset=261, length=591447],offset=0,name=188149730353200,size=591447] transfer relationship not specified; rolling back session: {}
org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=408fbb3d-7cc2-48bc-be8f-6d0afdbddaf2,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1556800468726-1, container=default, section=1], offset=261, length=591447],offset=0,name=188149730353200,size=591447] transfer relationship not specified
    at org.apache.nifi.controller.repository.StandardProcessSession.checkpoint(StandardProcessSession.java:251)
    at org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:321)
    at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:28)
    at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1122)
    at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)
    at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
    at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128)
    at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
    at java.util.concurrent.FutureTask.runAndReset(Unknown Source)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(Unknown Source)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)

PS1: This method returns immediately from inside if's body, which gives me following exception:

org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord transfer relationship not specified

This exceptions keeps recurring forever, since flow file in the input queue of my custom processor.

PS2: I am getting following error in the apps.log, though I am unsure if this is the source of the problem:

2019-05-02 18:17:32,394 ERROR [Timer-Driven Process Thread-4] o.a.nifi.groups.StandardProcessGroup Unable to synchronize StandardProcessGroup[identifier=d25747e6-719e-3ed9-c6c5-56794af6555c] with Flow Registry because Process Group was placed under Version Control using Flow Registry with identifier 80016ab0-bfab-152b-ffff-ffffc441867c but cannot find any Flow Registry with this identifier

解决方案

It is normal behavior to sometimes get zero flow files, which is why processors have the check that you have at the beginning.

The FlowFileHandlingException means that a flow file was obtained from the session, either from get or create, and that flow file was not transferred anywhere and was not removed, so basically it is unaccounted for. This could not happen from just returning at the beginning in that if statement, so the rest of the processor code is executing and producing this error. You haven't provided the rest of the code so we can't see the problem.

The second issue is fairly self-explanatory. You have a process group under version control, but the registry client that was used to start version control somehow no longer exists. I don't know how you created this scenario because I believe the UI/API won't let you delete a registry client that has active flows under version control, but you should be able to stop version control on the process group.

这篇关于Apache Nifi的AbstractProcessor的onTrigger()方法中的零流文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆