Apache Nifi中的批处理流文件 [英] Batch processing flowfiles in apache nifi

查看:109
本文介绍了Apache Nifi中的批处理流文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经编写了自定义nifi处理器,该处理器试图批处理输入流文件.

I have written custom nifi processor which tries to batch process input flow files.

但是,似乎它的行为不符合预期.这是正在发生的事情:

However, it seems it is not behaving as expected. Here is what happening:

我将一些文件复制粘贴到服务器上. FethFromServerProcessor从服务器获取这些文件,并将其放入queue1中. MyCustomProcessorqueue1批量读取文件.我在MyCustomProcessor上及其onTrigger()方法中定义了batchSize属性,通过执行以下操作,我正在从当前批处理中的queue1获取所有流文件:

I copy paste some files on server. FethFromServerProcessor fetches those files from server and puts it in queue1. MyCustomProcessor reads files in batch from queue1. I have batchSize property defined on MyCustomProcessor and inside its onTrigger() method, I am getting all flow files from queue1 in current batch by doing following:

session.get(context.getProperty(batchSize).asInteger())

onTrigger()的第一行创建时间戳,并将此时间戳添加到所有流文件中.因此,批处理中的所有文件都应具有相同的时间戳.但是,这没有发生.通常,第一个流文件获得一个时间戳,其余流文件获得另一个时间戳.

First line of onTrigger() creates timestamp and adds this timestamp on all flow files. So all files in the batch should have same timestamp. However, that is not happening. Usually first flow file get one timestamp and rest of the flow files get other timestamp.

似乎当FetchFromServerProcessor从服务器获取第一个文件并将其放入queue1时,MyCustomProcessor被触发,并且从队列中获取所有文件.顺便说一句,碰巧以前只有一个文件,在此批处理中仅作为单个文件被选择.在MyCustomProcessor处理该文件时,FetchFromServerProcessor已从服务器获取了所有文件并将它们放在queue1中.因此,在处理完第一个文件后,MyCustomProcessor接收了queue1中的所有文件并形成了第二批文件,而我希望所有文件都在一个批文件中提取.

It seems that when FetchFromServerProcessor fetches first file from server and puts it in the queue1, MyCustomProcessor gets triggered and it fetches all files from queue. Incidentally, it happens that there used to be single file, which is picked up as only file in this batch. By the time MyCustomProcessor has processed this file, FetchFromServerProcessor has fetched all the files from server and put them in the queue1. So after processing first file, MyCustomProcessor takes all the files in queue1 and forms second batch, whereas I want all files picked up in single batch.

如何避免形成两个批次?我看到人们在这种情况下讨论等待通知: 1 2 .但是我无法从这些帖子中快速理解.有人可以给我最少的步骤来使用等待通知处理器来实现此目的,还是可以给我指出一个最小的教程,该教程给出了逐步使用等待通知处理器的过程?也是我解释的等待通知模式标准方法来解决批处理相关问题吗?还是有其他标准方法可以做到这一点?

How can I avoid two batches getting formed? I see that people discuss wait-notify in this context:1, 2. But I am not able to make quick sense out of these posts. Can someone give me minimal steps to achieve this using wait notify processors or can someone point me to minimal tutorial which gives step by step procedure to use wait-notify processors? Also is wait-notify pattern standard approach to solve batch related problem I explained? Or is there any other standard approach to get this done?

推荐答案

听起来好像此批处理大小是向CustomProcessor传入的流文件所需的计数,所以为什么不这样编写CustomProcessor#onTrigger()呢?

It sounds as if this batch size is the required count of incoming flowfiles to CustomProcessor, so why not write your CustomProcessor#onTrigger() as follows:

@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    final ComponentLog logger = getLogger();
    // Try to get n flowfiles from incoming queue
    final Integer desiredFlowfileCount = context.getProperty(batchSize).asInteger();
    final int queuedFlowfileCount = session.getQueueSize().getObjectCount();
    if (queuedFlowfileCount < desiredFlowfileCount) {
        // There are not yet n flowfiles queued up, so don't try to run again immediately
        if (logger.isDebugEnabled()) {
            logger.debug("Only {} flowfiles queued; waiting for {}", new Object[]{queuedFlowfileCount, desiredFlowfileCount});
        }
        context.yield();
        return;
    }

    // If we're here, we do have at least n queued flowfiles
    List<FlowFile> flowfiles = session.get(desiredFlowfileCount);

    try {
        // TODO: Perform work on all flowfiles
        flowfiles = flowfiles.stream().map(f -> session.putAttribute(f, "timestamp", "my static timestamp value")).collect(Collectors.toList());
        session.transfer(flowfiles, REL_SUCCESS);

        // If extending AbstractProcessor, this is handled for you and you don't have to explicitly commit
        session.commit();
    } catch (Exception e) {
        logger.error("Helpful error message");
        if (logger.isDebugEnabled()) {
            logger.error("Further stacktrace: ", e);
        }
        // Penalize the flowfiles if appropriate (also done for you if extending AbstractProcessor and an exception is thrown from this method
        session.rollback(true);
        //  --- OR ---
        // Transfer to failure if they can't be retried
        session.transfer(flowfiles, REL_FAILURE);
    }
}

如果不熟悉Java 8 stream语法,可以用以下语法代替:

The Java 8 stream syntax can be replaced by this if it's unfamiliar:

        for (int i = 0; i < flowfiles.size(); i++) {
            // Write the same timestamp value onto all flowfiles
            FlowFile f = flowfiles.get(i);
            flowfiles.set(i, session.putAttribute(f, "timestamp", "my timestamp value"));
        }

处罚之间的语义(告诉处​​理器延迟对特定流文件执行工作)和让步(告诉处理器等待一段时间以尝试再次执行任何工作)很重要.

The semantics between penalization (telling the processor to delay performing work on a specific flowfile) and yielding (telling the processor to wait some period of time to try performing any work again) are important.

您可能还希望自定义处理器上的@TriggerSerially注释,以确保没有多个线程在运行,以免出现竞争状况.

You probably also want the @TriggerSerially annotation on your custom processor to ensure you do not have multiple threads running such that a race condition could arise.

这篇关于Apache Nifi中的批处理流文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆