Nifi-使用ExecuteStreamCommand并行和并发执行 [英] Nifi- Parallel and concurrent execution with ExecuteStreamCommand

查看:278
本文介绍了Nifi-使用ExecuteStreamCommand并行和并发执行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当前,我在具有4个核心的边缘节点上运行Nifi.假设我有20个传入流文件,并为ExecuteStreamCommand处理器提供了10个并发任务,这是否意味着我仅获得并发执行或并发和并行执行?

Currently, I have Nifi running on an edge node that has 4 cores. Say I have 20 incoming flow files and I give concurrent tasks as 10 for ExecuteStreamCommand processor, does it mean I get only concurrent execution or both concurrent and parallel execution?

推荐答案

在这种情况下,您将获得并发并行性,如

In this case you get concurrency and parallelism, as noted in the Apache NiFi User Guide (emphasis added):

接下来,调度"选项卡提供了一个名为 并发任务. 这可控制处理器将处理多少个线程 使用.换句话说,这控制了应该有多少个FlowFiles 由该处理器同时处理.增加此值 通常将允许处理器在同一时间处理更多数据 多少时间.但是,它通过使用以下系统资源来做到这一点: 则其他处理器无法使用.这实质上提供了 处理器的相对权重-它控制着多少 系统的资源应分配给该处理器,而不是 其他处理器.该字段适用于大多数处理器.那里 但是,某些类型的处理器只能通过 一个并发任务.

Next, the Scheduling Tab provides a configuration option named Concurrent tasks. This controls how many threads the Processor will use. Said a different way, this controls how many FlowFiles should be processed by this Processor at the same time. Increasing this value will typically allow the Processor to handle more data in the same amount of time. However, it does this by using system resources that then are not usable by other Processors. This essentially provides a relative weighting of Processors — it controls how much of the system’s resources should be allocated to this Processor instead of other Processors. This field is available for most Processors. There are, however, some types of Processors that can only be scheduled with a single Concurrent task.

如果您要调用的命令存在锁定问题或竞争条件,则可能会出现问题,但如果它们是独立的,则仅受JVM调度和硬件性能的限制.

If there are locking issues or race conditions with the command you are invoking, this could be problematic, but if they are independent, you are only limited by JVM scheduling and hardware performance.

对评论中的问题的回复时间太长,无法发表评论:

问题:

感谢安迪.当有4个核心时,我可以假设应该有 4个并行执行,它们将在其中运行多个 线程来处理10个并发任务?以最好的方式 我提到的场景中执行的这20个流文件是什么? –约翰30 分钟前

Thanks Andy. When there are 4 cores, can i assume that there shall be 4 parallel executions within which they would be running multiple threads to handle 10 concurrent tasks? In the best possible way, how are these 20 flowfiles executed in the scenario I mentioned. – John 30 mins ago

响应:

John,JVM线程处理是一个相当复杂的主题,但是,是的,通常会有 n + C 个JVM线程,其中 C 是一个常量( main 线程,VM线程,GC线程)和 n . JVM线程将1:1映射到本机OS线程,因此在运行10个 processor 线程的4核系统上,您将拥有"4个并行执行".我的信念是,在较高级别上,您的操作系统将使用时间切片来一次循环10个线程4,每个线程将处理约2个流文件.

John, JVM thread handling is a fairly complex topic, but yes, in general there would be n+C JVM threads, where C is some constant (main thread, VM thread, GC threads) and n is a number of "individual" threads created by the flow controller to execute the processor tasks. JVM threads map 1:1 to native OS threads, so on a 4 core system with 10 processor threads running, you would have "4 parallel executions". My belief is that at a high level, your OS would use time slicing to cycle through the 10 threads 4 at a time, and each thread would process ~2 flowfiles.

同样,很粗略的想法(假设1个流文件= 1个工作单位= 1秒):

Again, very rough idea (assume 1 flowfile = 1 unit of work = 1 second):

Cores | Threads | Flowfiles/thread | Relative time
  1   |    1    |         20       |      20 s      (normal)
  4   |    1    |         20       |      20 s      (wasting 3 cores)
  1   |    4    |          5       |      20 s      (time slicing 1 core for 4 threads)
  4   |    4    |          5       |       5 s      (1:1 thread to core ratio)
  4   |   10    |          2       |       5+x s    (see execution table below)

如果我们假设每个内核可以处理一个线程,并且每个线程每秒可以处理1个流文件,并且每个线程获得1秒的不间断操作(显然不是真实的),那么执行顺序可能如下所示:

If we are assuming each core can handle one thread, and each thread can handle 1 flowfile per second, and each thread gets 1 second of uninterrupted operation (obviously not real), the execution sequence might look like this:

流文件A-T

核心&alpha ;、&beta ;、&gamma ;、δ

Cores α, β, γ, δ

线程1-10

时间/线程1 s

Time | Core α | Core β | Core γ | Core δ
  0  |   1/A  |   2/B  |   3/C  |   4/D
  1  |   5/E  |   6/F  |   7/G  |   8/H
  2  |   9/I  |  10/J  |   1/K  |   2/L
  3  |   3/M  |   4/N  |   5/O  |   6/P
  4  |   7/Q  |   8/R  |   9/S  |  10/T

在5秒内,所有10个线程都执行了两次,每个线程完成2个流文件.

In 5 seconds, all 10 threads have executed twice, each completing 2 flowfiles.

但是,假设线程调度程序每次迭代只为每个线程分配0.5秒的周期(同样,为了演示,这不是一个现实的数字).这样的执行模式将是:

However, assume the thread scheduler only assigns each thread a cycle of .5 seconds each iteration (again, not a realistic number, just to demonstrate). The execution pattern then would be:

流文件A-T

核心&alpha ;、&beta ;、&gamma ;、δ

Cores α, β, γ, δ

线程1-10

时间/线程.5 s

Time | Core α | Core β | Core γ | Core δ
  0  |   1/A  |   2/B  |   3/C  |   4/D
 .5  |   5/E  |   6/F  |   7/G  |   8/H
  1  |   9/I  |  10/J  |   1/A  |   2/B
1.5  |   3/C  |   4/D  |   5/E  |   6/F
  2  |   7/G  |   8/H  |   9/I  |  10/J
2.5  |   1/K  |   2/L  |   3/M  |   4/N
  3  |   5/O  |   6/P  |   7/Q  |   8/R
3.5  |   9/S  |  10/T  |   1/K  |   2/L
  4  |   3/M  |   4/N  |   5/O  |   6/P
4.5  |   7/Q  |   8/R  |   9/S  |  10/T

在这种情况下,总的执行时间是相同的(*线程切换会产生一些开销),但是特定的流文件需要更长的时间"(总时间从0开始,而不是活动的执行时间)来完成.例如,流文件C和D在第二种情况下直到 time = 2 才完成,但在第一种情况下在 time = 1 时完成.

In this case, the total execution time is the same (* there is some overhead from the thread switching) but specific flowfiles take "longer" (total time from 0, not active execution time) to complete. For example, flowfiles C and D are not complete until time=2 in the second scenario, but are complete at time=1 in the first.

说实话,OS和JVM的工作人员比我聪明得多,就像我们的项目一样(幸运的是),因此这里存在过分的简化,总的来说,我建议您让系统担心超级-优化线程.在这种情况下,我不认为将并发任务设置为10会比将其设置为4产生巨大的改进.您可以阅读有关JVM线程的更多信息此处.

To be honest, the OS and JVM have people much smarter than me working on this, as does our project (luckily), so there are gross over-simplifications here and in general I would recommend you let the system worry about hyper-optimizing the threading. I would not think setting the concurrent tasks to 10 would yield vast improvements over setting it to 4 in this case. You can read more about JVM threading here and here.

我只是在本地1.5.0开发分支中进行了快速测试-我将运行0 sec计划的简单GenerateFlowFile连接到了LogAttribute处理器. GenerateFlowFile立即生成如此多的流文件,以至于队列启用了反压功能(暂停输入处理器,直到队列可以耗尽10,000个等待流文件中的一部分).我将两者都停止,然后重新运行,为LogAttribute处理器提供了更多的并发任务.通过将LogAttribute并发任务设置为GenerateFlowFile的2:1,队列永远不会建立超过约50个排队的流文件.

I just did a quick test in my local 1.5.0 development branch -- I connected a simple GenerateFlowFile running with 0 sec schedule to a LogAttribute processor. The GenerateFlowFile immediately generates so many flowfiles that the queue enables the back pressure feature (pausing the input processor until the queue can drain some of the 10,000 waiting flowfiles). I stopped both and re-ran this, giving the LogAttribute processor more concurrent tasks. By setting the LogAttribute concurrent tasks to 2:1 of the GenerateFlowFile, the queue never built up past about 50 queued flowfiles.

tl; dr 将并发任务设置为拥有的内核数应该足够了.

tl;dr Setting your concurrent tasks to the number of cores you have should be sufficient.

更新2:

与我们的一名本地JVM专家进行了核对,他提到了两点需要注意的事情:

Checked with one of our resident JVM experts and he mentioned two things to note:

  1. 该命令不仅受CPU限制;如果I/O繁重,则更多的并发任务可能是有益的.
  2. 默认情况下,整个流控制器的最大并发任务数设置为10.

这篇关于Nifi-使用ExecuteStreamCommand并行和并发执行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆