Apache Nifi 的 AbstractProcessor 的 onTrigger() 方法中的零流文件 [英] Zero flow files in onTrigger() method of AbstractProcessor of Apache Nifi

查看:33
本文介绍了Apache Nifi 的 AbstractProcessor 的 onTrigger() 方法中的零流文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在为 Apache NiFi 开发自定义处理器.我已经创建了我的处理器的 nar 并将其放在 nifi 的 lib 文件夹中并启动了 nifi.我已经在 Eclipse 中设置了远程调试器并在 onTrigger() 的第一行启用了断点.在调试时,我在 nifi 管道中一次运行一个处理器.我可以在自定义处理器的输入队列中找到单个流文件,但是我的自定义处理器没有收到任何流文件.当我启动我的自定义处理器时,它在 onTrigger() 方法中遇到断点.在 thie 方法中,当我这样做时:

public class MyCustomProc extends AbstractProcessor {@覆盖public void onTrigger(final ProcessContext context, final ProcessSession session) 抛出 ProcessException {列表flowFiles = session.get(5000);if (flowFiles == null || flowFiles.size() == 0) {返回;}//...

flowFiles 结果是大小为零!!!我无法猜测应该检查哪个方向以找出发生这种情况的原因.任何提示我如何诊断?

编辑

堆栈跟踪

2019-05-02 18:08:09,456 错误 [Timer-Driven Process Thread-10] ccproduct.module.submodule.MyCustomProcessor MyCustomProcessor[id=016a1008-8956-1dbf-bd66-993e0ce9866[id=016a1008-8956-1dbf-bd66-993e0ce98668] 由于 org.apache.nifi.processor.exception.FlowFileHandlingException 无法处理:StandardFlowFileRecord[uuid=408fbb3d-7cc2-48bc-be8f=StaflaiCresourceClaimCresourceCresourceC[id=1556800468726-1, container=default, section=1], offset=261, length=591447],offset=0,name=188149730353200,size=591447] 传输关系未指定;回滚会话:{}org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=408fbb3d-7cc2-48bc-be8f-6d0afdbddaf2,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=155680041681]offset=155680041681=261,length=591447],offset=0,name=188149730353200,size=591447] 传输关系未指定在 org.apache.nifi.controller.repository.StandardProcessSession.checkpoint(StandardProcessSession.java:251)在 org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:321)在 org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:28)在 org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1122)在 org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)在 org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)在 org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128)在 java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)在 java.util.concurrent.FutureTask.runAndReset(Unknown Source)在 java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(来源不明)在 java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(未知来源)在 java.util.concurrent.ThreadPoolExecutor.runWorker(来源不明)在 java.util.concurrent.ThreadPoolExecutor$Worker.run(未知来源)在 java.lang.Thread.run(未知来源)

PS1:此方法立即从 if 的主体内部返回,这给了我以下异常:

org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord 传输关系未指定

这个异常永远重复出现,因为我的自定义处理器的输入队列中的流文件.

PS2:我在 apps.log 中收到以下错误,但我不确定这是否是问题的根源:

2019-05-02 18:17:32,394 ERROR [Timer-Driven Process Thread-4] oanifi.groups.StandardProcessGroup 无法与 StandardProcessGroup[identifier=d25747e6-719e-3ed9-c6c5-565594af] 同步流程注册表,因为流程组使用具有标识符 80016ab0-bfab-152b-ffff-ffffc441867c 的流程注册表置于版本控制之下,但找不到任何具有此标识符的流程注册表

解决方案

有时获得零流文件是正常行为,这就是为什么处理器会进行您在开始时进行的检查.

FlowFileHandlingException 表示从会话中获取了一个流文件,无论是通过 get 还是 create,并且该流文件没有传输到任何地方也没有被删除,所以基本上它是下落不明的.这不会发生在 if 语句的开头,因此处理器代码的其余部分正在执行并产生此错误.您尚未提供其余代码,因此我们无法发现问题.

第二个问题是不言自明的.您有一个受版本控制的进程组,但用于启动版本控制的注册表客户端已不复存在.我不知道您是如何创建此场景的,因为我相信 UI/API 不会让您删除在版本控制下具有活动流的注册表客户端,但您应该能够停止对进程组的版本控制.

I am developing a custom processor for Apache NiFi. I have created nar of my processor and put it in the lib folder of nifi and started the nifi. I have setup the remote debugger in eclipse and enabled breakpoint on first line of onTrigger(). While debugging I am running one processor at a time in my nifi pipeline. I can find single flow file in the input queue of my custom processor, however my custom processor is not receiving any flow file. When I start my custom processor, it hits breakpoint inside onTrigger() method. Inside thie method, when I do:

public class MyCustomProc extends AbstractProcessor {

    @Override
    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {

        List<FlowFile> flowFiles = session.get(5000);
        if (flowFiles == null || flowFiles.size() == 0) {
            return;
        }
        //...

flowFiles turns out to be of size zero!!! I am not able to guess in which direction should I check to find why this is happening. Any hint how I can diagnose this?

Edit

Stacktrace

2019-05-02 18:08:09,456 ERROR [Timer-Driven Process Thread-10] c.c.product.module.submodule.MyCustomProcessor MyCustomProcessor[id=016a1008-8956-1dbf-bd66-993e0ce98668] MyCustomProcessor[id=016a1008-8956-1dbf-bd66-993e0ce98668] failed to process due to org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=408fbb3d-7cc2-48bc-be8f-6d0afdbddaf2,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1556800468726-1, container=default, section=1], offset=261, length=591447],offset=0,name=188149730353200,size=591447] transfer relationship not specified; rolling back session: {}
org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=408fbb3d-7cc2-48bc-be8f-6d0afdbddaf2,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1556800468726-1, container=default, section=1], offset=261, length=591447],offset=0,name=188149730353200,size=591447] transfer relationship not specified
    at org.apache.nifi.controller.repository.StandardProcessSession.checkpoint(StandardProcessSession.java:251)
    at org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:321)
    at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:28)
    at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1122)
    at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)
    at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
    at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128)
    at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
    at java.util.concurrent.FutureTask.runAndReset(Unknown Source)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(Unknown Source)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)

PS1: This method returns immediately from inside if's body, which gives me following exception:

org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord transfer relationship not specified

This exceptions keeps recurring forever, since flow file in the input queue of my custom processor.

PS2: I am getting following error in the apps.log, though I am unsure if this is the source of the problem:

2019-05-02 18:17:32,394 ERROR [Timer-Driven Process Thread-4] o.a.nifi.groups.StandardProcessGroup Unable to synchronize StandardProcessGroup[identifier=d25747e6-719e-3ed9-c6c5-56794af6555c] with Flow Registry because Process Group was placed under Version Control using Flow Registry with identifier 80016ab0-bfab-152b-ffff-ffffc441867c but cannot find any Flow Registry with this identifier

解决方案

It is normal behavior to sometimes get zero flow files, which is why processors have the check that you have at the beginning.

The FlowFileHandlingException means that a flow file was obtained from the session, either from get or create, and that flow file was not transferred anywhere and was not removed, so basically it is unaccounted for. This could not happen from just returning at the beginning in that if statement, so the rest of the processor code is executing and producing this error. You haven't provided the rest of the code so we can't see the problem.

The second issue is fairly self-explanatory. You have a process group under version control, but the registry client that was used to start version control somehow no longer exists. I don't know how you created this scenario because I believe the UI/API won't let you delete a registry client that has active flows under version control, but you should be able to stop version control on the process group.

这篇关于Apache Nifi 的 AbstractProcessor 的 onTrigger() 方法中的零流文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆