Nifi- 使用 ExecuteStreamCommand 并行和并发执行 [英] Nifi- Parallel and concurrent execution with ExecuteStreamCommand

查看:56
本文介绍了Nifi- 使用 ExecuteStreamCommand 并行和并发执行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

目前,我在具有 4 个内核的边缘节点上运行 Nifi.假设我有 20 个传入的流文件,并且我为 ExecuteStreamCommand 处理器提供了 10 个并发任务,这是否意味着我只能获得并发执行或同时执行和并行执行?

解决方案

在这种情况下,您将获得并发并行,如

tl;dr 将您的并发任务设置为您拥有的内核数量就足够了.

更新 2:

咨询了我们的一位常驻 JVM 专家,他提到了两件需要注意的事情:

  1. 该命令不仅限于 CPU;如果 I/O 很重,更多的并发任务可能是有益的.
  2. 整个流控制器的最大并发任务数默认设置为10.

Currently, I have Nifi running on an edge node that has 4 cores. Say I have 20 incoming flow files and I give concurrent tasks as 10 for ExecuteStreamCommand processor, does it mean I get only concurrent execution or both concurrent and parallel execution?

解决方案

In this case you get concurrency and parallelism, as noted in the Apache NiFi User Guide (emphasis added):

Next, the Scheduling Tab provides a configuration option named Concurrent tasks. This controls how many threads the Processor will use. Said a different way, this controls how many FlowFiles should be processed by this Processor at the same time. Increasing this value will typically allow the Processor to handle more data in the same amount of time. However, it does this by using system resources that then are not usable by other Processors. This essentially provides a relative weighting of Processors — it controls how much of the system’s resources should be allocated to this Processor instead of other Processors. This field is available for most Processors. There are, however, some types of Processors that can only be scheduled with a single Concurrent task.

If there are locking issues or race conditions with the command you are invoking, this could be problematic, but if they are independent, you are only limited by JVM scheduling and hardware performance.

Response to question in comments too long for a comment:

Question:

Thanks Andy. When there are 4 cores, can i assume that there shall be 4 parallel executions within which they would be running multiple threads to handle 10 concurrent tasks? In the best possible way, how are these 20 flowfiles executed in the scenario I mentioned. – John 30 mins ago

Response:

John, JVM thread handling is a fairly complex topic, but yes, in general there would be n+C JVM threads, where C is some constant (main thread, VM thread, GC threads) and n is a number of "individual" threads created by the flow controller to execute the processor tasks. JVM threads map 1:1 to native OS threads, so on a 4 core system with 10 processor threads running, you would have "4 parallel executions". My belief is that at a high level, your OS would use time slicing to cycle through the 10 threads 4 at a time, and each thread would process ~2 flowfiles.

Again, very rough idea (assume 1 flowfile = 1 unit of work = 1 second):

Cores | Threads | Flowfiles/thread | Relative time
  1   |    1    |         20       |      20 s      (normal)
  4   |    1    |         20       |      20 s      (wasting 3 cores)
  1   |    4    |          5       |      20 s      (time slicing 1 core for 4 threads)
  4   |    4    |          5       |       5 s      (1:1 thread to core ratio)
  4   |   10    |          2       |       5+x s    (see execution table below)

If we are assuming each core can handle one thread, and each thread can handle 1 flowfile per second, and each thread gets 1 second of uninterrupted operation (obviously not real), the execution sequence might look like this:

Flowfiles A - T

Cores α, β, γ, δ

Threads 1 - 10

Time/thread 1 s

Time | Core α | Core β | Core γ | Core δ
  0  |   1/A  |   2/B  |   3/C  |   4/D
  1  |   5/E  |   6/F  |   7/G  |   8/H
  2  |   9/I  |  10/J  |   1/K  |   2/L
  3  |   3/M  |   4/N  |   5/O  |   6/P
  4  |   7/Q  |   8/R  |   9/S  |  10/T

In 5 seconds, all 10 threads have executed twice, each completing 2 flowfiles.

However, assume the thread scheduler only assigns each thread a cycle of .5 seconds each iteration (again, not a realistic number, just to demonstrate). The execution pattern then would be:

Flowfiles A - T

Cores α, β, γ, δ

Threads 1 - 10

Time/thread .5 s

Time | Core α | Core β | Core γ | Core δ
  0  |   1/A  |   2/B  |   3/C  |   4/D
 .5  |   5/E  |   6/F  |   7/G  |   8/H
  1  |   9/I  |  10/J  |   1/A  |   2/B
1.5  |   3/C  |   4/D  |   5/E  |   6/F
  2  |   7/G  |   8/H  |   9/I  |  10/J
2.5  |   1/K  |   2/L  |   3/M  |   4/N
  3  |   5/O  |   6/P  |   7/Q  |   8/R
3.5  |   9/S  |  10/T  |   1/K  |   2/L
  4  |   3/M  |   4/N  |   5/O  |   6/P
4.5  |   7/Q  |   8/R  |   9/S  |  10/T

In this case, the total execution time is the same (* there is some overhead from the thread switching) but specific flowfiles take "longer" (total time from 0, not active execution time) to complete. For example, flowfiles C and D are not complete until time=2 in the second scenario, but are complete at time=1 in the first.

To be honest, the OS and JVM have people much smarter than me working on this, as does our project (luckily), so there are gross over-simplifications here and in general I would recommend you let the system worry about hyper-optimizing the threading. I would not think setting the concurrent tasks to 10 would yield vast improvements over setting it to 4 in this case. You can read more about JVM threading here and here.

I just did a quick test in my local 1.5.0 development branch -- I connected a simple GenerateFlowFile running with 0 sec schedule to a LogAttribute processor. The GenerateFlowFile immediately generates so many flowfiles that the queue enables the back pressure feature (pausing the input processor until the queue can drain some of the 10,000 waiting flowfiles). I stopped both and re-ran this, giving the LogAttribute processor more concurrent tasks. By setting the LogAttribute concurrent tasks to 2:1 of the GenerateFlowFile, the queue never built up past about 50 queued flowfiles.

tl;dr Setting your concurrent tasks to the number of cores you have should be sufficient.

Update 2:

Checked with one of our resident JVM experts and he mentioned two things to note:

  1. The command is not solely CPU limited; if I/O is heavy, more concurrent tasks may be beneficial.
  2. The max number of concurrent tasks for the entire flow controller is set to 10 by default.

这篇关于Nifi- 使用 ExecuteStreamCommand 并行和并发执行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆