如何将流文件属性传递给控制器​​服务? [英] How can I pass a flowfile attribute to a controller service?

查看:33
本文介绍了如何将流文件属性传递给控制器​​服务?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

你好 Internet Hive Mind!

Hello Internet Hive Mind!

我需要使用 nifi 查询 AWS Athena,但是我需要为每个发送的查询更改暂存目录(S3 存储桶和保存结果的文件夹).

I need to query AWS Athena with nifi, however I need to change the staging directory (the S3 bucket & folder where the results will be saved) for each query sent.

但是必须在 DBCPConnectionPool 控制器服务上设置 s3_staging_dir 属性.如何为每个不同的流文件更改该属性的值?显然不能单独通过表达式语言来获取.

But the s3_staging_dir property has to be set on the DBCPConnectionPool Controller Service. How can I change the value of that property for each different flow file? Apparently it can't be fetched by expression language alone.

谢谢!

推荐答案

我不确定每个查询依赖于不同暂存目录的流程的性质,但有几件事情需要记住.

I'm not sure the nature of your flow where each query depends on a different staging directory, but there are a couple things to keep in mind.

  1. DBCPConnectionPool 控制器服务确实允许评估表达式语言的动态属性,但是当控制器服务启用时执行表达式语言评估,因此每次启动一次"/停止.
  2. 控制器服务的动态属性评估流文件属性.
  1. The DBCPConnectionPool controller service does allow dynamic properties which evaluate expression language, but that expression language evaluation is performed when the controller service is enabled, so "once" per start/stop.
  2. The dynamic properties on the controller service do not evaluate flowfile attributes.

来自 Apache NiFi DBCPConnectionPool 文档:

From Apache NiFi DBCPConnectionPool documentation:

动态属性:

动态属性允许用户指定名称和值属性.

Dynamic Properties allow the user to specify both the name and value of a property.

...

指定要在 JDBC 上设置的属性名称和值连接.如果使用表达式语言,评估将是在启用控制器服务时执行.注意没有流量文件输入(属性,例如)可用于 Expression这些属性的语言结构.支持表达语言:真实

Specifies a property name and value to be set on the JDBC connection(s). If Expression Language is used, evaluation will be performed upon the controller service being enabled. Note that no flow file input (attributes, e.g.) is available for use in Expression Language constructs for these properties. Supports Expression Language: true

由于您要求每个请求的 S3 暂存目录都不同,我认为在这种情况下,您需要采用以下选项之一:

Because of your requirement that the S3 staging directory is different on every request, I think in this case, you would need to pursue one of the following options:

  1. 提交 Jira 请求 NiFi 中的原生 Athena 支持(彻底解释为什么现有的 DBCPConnectionPool 不支持您的用例)
  2. 使用您自己的 AthenaConnectionPool 控制器服务扩展 DBCPConnectionPool 控制器服务.有很多教程可以构建您自己的 NiFi 组件,但是 NiFi 开发人员指南 > 开发控制器服务 是最好的起点.您可以创建一个控制器服务,在执行表达式语言执行时评估传入的流文件属性,但您需要手动触发它,因为控制器服务在其生命周期中没有 @OnTrigger 阶段.如果您还编写自定义处理器,则可以从处理器的 onTrigger() 方法调用控制器服务中的某些重新评估"方法,但现有处理器不会调用此方法.相反,您可以理论上在控制器中放置一个高频刷新器服务本身使用执行器,但这肯定会影响性能
  3. 为每个暂存目录创建多个 DBCPConnectionPool 实例和 SQL 处理器(按 1 - 3 的顺序可行,否则很糟糕)
  4. 使用 ExecuteStreamCommand 处理器和 awscli 使用命令行工具执行查询.这剥夺了您的 NiFi 本机 SQL 工具,但允许在每次调用时进行自定义查询,因为 ExecuteStreamCommand 可以解释特定于流文件的属性并在查询中使用它们
  5. 重新评估您的流程设计,看看是否有一种方法可以执行查询,而不允许在单个查询执行中使用任意 S3 暂存目录
  1. File a Jira requesting native Athena support in NiFi (thoroughly explain why the existing DBCPConnectionPool doesn't support your use case)
  2. Extend the DBCPConnectionPool controller service with your own AthenaConnectionPool controller service. There are many tutorials for building your own NiFi components, but the NiFi Developer Guide > Developing Controller Services is the best place to start. You can make a controller service which does evaluate incoming flowfile attributes when performing expression language execution, but you will need to manually trigger this, as controller services do not have an @OnTrigger phase of their lifecycle. If you also write a custom processor, you can invoke some "re-evaluate" method in the controller service from the onTrigger() method of the processor, but existing processors will not call this. Instead, you could theoretically put a high frequency refresher in the controller service itself using executors, but this will definitely affect performance
  3. Create multiple DBCPConnectionPool instances and SQL processors for each staging directory (feasible on the order of 1 - 3, otherwise abysmal)
  4. Use the ExecuteStreamCommand processor with awscli to execute the queries using the command-line tool. This deprives you of the NiFi native SQL tools but allows custom queries on every invocation because ExecuteStreamCommand can interpret the flowfile-specific attributes and use them in the query
  5. Re-evaluate your flow design and see if there is a way to perform the queries without allowing for arbitrary S3 staging directories on individual query execution

这篇关于如何将流文件属性传递给控制器​​服务?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆