如何将flowfile属性传递给控制器​​服务? [英] How can I pass a flowfile attribute to a controller service?

查看:66
本文介绍了如何将flowfile属性传递给控制器​​服务?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

Hello Internet Hive头脑!

Hello Internet Hive Mind!

我需要使用nifi查询AWS Athena,但是我需要为发送的每个查询更改登台目录(将保存结果的S3存储桶和文件夹).

I need to query AWS Athena with nifi, however I need to change the staging directory (the S3 bucket & folder where the results will be saved) for each query sent.

但是必须在DBCPConnectionPool控制器服务上设置s3_staging_dir属性. 如何为每个不同的流文件更改该属性的值? 显然,不能仅通过表达语言来获取它.

But the s3_staging_dir property has to be set on the DBCPConnectionPool Controller Service. How can I change the value of that property for each different flow file? Apparently it can't be fetched by expression language alone.

谢谢!

推荐答案

我不确定每个查询都依赖于不同的登台目录的流的性质,但是有几点需要牢记.

I'm not sure the nature of your flow where each query depends on a different staging directory, but there are a couple things to keep in mind.

  1. DBCPConnectionPool控制器服务确实允许评估表达式语言的动态属性,但是表达式语言评估是在启用控制器服务 时执行的,因此每次启动/停止一次".
  2. 控制器服务上的动态属性评估流文件属性.
  1. The DBCPConnectionPool controller service does allow dynamic properties which evaluate expression language, but that expression language evaluation is performed when the controller service is enabled, so "once" per start/stop.
  2. The dynamic properties on the controller service do not evaluate flowfile attributes.

来自

动态属性:

Dynamic Properties:

动态属性允许用户同时指定名称和值 财产.

Dynamic Properties allow the user to specify both the name and value of a property.

...

指定要在JDBC上设置的属性名称和值 连接.如果使用表达语言,将进行评估 在启用控制器服务后执行.注意没有流量 可以在Expression中使用文件输入(例如属性) 这些属性的语言构造. 支持表情 语言:真

Specifies a property name and value to be set on the JDBC connection(s). If Expression Language is used, evaluation will be performed upon the controller service being enabled. Note that no flow file input (attributes, e.g.) is available for use in Expression Language constructs for these properties. Supports Expression Language: true

由于您的要求是每个请求的S3过渡目录都不同,因此我认为在这种情况下,您需要采用以下选项之一:

Because of your requirement that the S3 staging directory is different on every request, I think in this case, you would need to pursue one of the following options:

  1. 提交一个Jira ,要求在NiFi中提供本机Athena支持(彻底)解释为什么现有的DBCPConnectionPool不支持您的用例)
  2. 用您自己的AthenaConnectionPool控制器服务扩展DBCPConnectionPool控制器服务.有很多教程可用于构建您自己的NiFi组件,但是开发控制器服务" 是最好的起点.您可以创建一个控制器服务,该控制器服务在执行表达式语言执行时会评估传入的流文件属性,但是您将需要手动触发此操作,因为控制器服务的生命周期没有@OnTrigger阶段.如果您还编写自定义处理器,则可以从处理器的onTrigger()方法调用控制器服务中的某些重新评估"方法,但是现有处理器不会调用此方法.相反,您可以理论上将高频刷新器放入控制器服务本身使用执行程序,但这肯定会影响性能
  3. 为每个登台目录创建多个DBCPConnectionPool实例和SQL处理器(按1-3的顺序执行,否则为糟糕)
  4. 使用
  1. File a Jira requesting native Athena support in NiFi (thoroughly explain why the existing DBCPConnectionPool doesn't support your use case)
  2. Extend the DBCPConnectionPool controller service with your own AthenaConnectionPool controller service. There are many tutorials for building your own NiFi components, but the NiFi Developer Guide > Developing Controller Services is the best place to start. You can make a controller service which does evaluate incoming flowfile attributes when performing expression language execution, but you will need to manually trigger this, as controller services do not have an @OnTrigger phase of their lifecycle. If you also write a custom processor, you can invoke some "re-evaluate" method in the controller service from the onTrigger() method of the processor, but existing processors will not call this. Instead, you could theoretically put a high frequency refresher in the controller service itself using executors, but this will definitely affect performance
  3. Create multiple DBCPConnectionPool instances and SQL processors for each staging directory (feasible on the order of 1 - 3, otherwise abysmal)
  4. Use the ExecuteStreamCommand processor with awscli to execute the queries using the command-line tool. This deprives you of the NiFi native SQL tools but allows custom queries on every invocation because ExecuteStreamCommand can interpret the flowfile-specific attributes and use them in the query
  5. Re-evaluate your flow design and see if there is a way to perform the queries without allowing for arbitrary S3 staging directories on individual query execution

这篇关于如何将flowfile属性传递给控制器​​服务?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆