NiFi GenerateTableFetch不会按数据库存储状态.名称 [英] NiFi GenerateTableFetch does not store state per database.name

查看:275
本文介绍了NiFi GenerateTableFetch不会按数据库存储状态.名称的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在测试NiFi,以替换我们当前的提取设置,该设置从一个表的多个MySQL分片中导入数据并将其存储在HDFS中.

I am testing out NiFi to replace our current ingestion setup which imports data from multiple MySQL shards of a table and store it in HDFS.

我正在使用GenerateTableFetchExecuteSQL来实现这一目标.

I am using GenerateTableFetch and ExecuteSQL to achieve this.

每个传入流文件都将具有database.name属性,DBCPConnectionPoolLookup会使用该属性来选择相关的分片.

Each incoming flow file will have a database.name attribute which is being used by DBCPConnectionPoolLookup to select the relevant shard.

问题是,假设我有2个分片从表帐户的shard_1shard_2中提取数据,并且还有updated_at作为Maximum Value Columns,它没有存储table@updated_at每个分片.每个状态中的表只有1个条目.

Issue is that, let's say I have 2 shards to pull data from, shard_1 and shard_2 for table accounts and also I have updated_at as Maximum Value Columns, it is not storing state for the for the table@updated_at per shard. There is only 1 entry per table in state.

当我签入数据源时,我看到shard_2流文件文件被删除而没有传递给ExecuteSQL.我的猜测是,因为首先执行shard_1查询,然后在shard_2查询到来时,才根据shard_1的updated_at检查它的记录,并且由于它返回空值,因此删除了文件.

When I check in Data Provenance, I see the shard_2 flowfile file getting dropped without being passed to ExecuteSQL. And my guess is it's because shard_1 query gets executed first and then when shard_2 query comes, it's records are checked against shard_1's updated_at and since it returns empty, it drops the file.

有人遇到这个问题吗?还是我错过了什么?

Has anyone faced this issue? Or am I missing something?

推荐答案

在将状态存储在数据库提取处理器(例如QueryDatabaseTable,GenerateTableFetch等)中的方案之后,添加了通过DBCPConnectionPoolLookup选择不同数据库的功能.另外,获取数据库名称在RDBMS驱动程序之间有所不同,它可能在DatabaseMetaData或ResultSetMetaData中,可能在getCatalog()或getSchema()中,或都不在其中.

The ability to choose different databases via DBCPConnectionPoolLookup was added after the scheme to store state in the database fetch processors (QueryDatabaseTable, GenerateTableFetch, e.g.). Also, getting the database name differs between RDBMS drivers, it might be in the DatabaseMetaData or ResultSetMetaData, possibly in getCatalog() or getSchema() or neither.

我写了 NIFI-5590 来说明这一改进.

I have written NIFI-5590 to cover this improvement.

这篇关于NiFi GenerateTableFetch不会按数据库存储状态.名称的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆