NiFi GenerateTableFetch不会按数据库存储状态.名称 [英] NiFi GenerateTableFetch does not store state per database.name
问题描述
我正在测试NiFi,以替换我们当前的提取设置,该设置从一个表的多个MySQL分片中导入数据并将其存储在HDFS中.
I am testing out NiFi to replace our current ingestion setup which imports data from multiple MySQL shards of a table and store it in HDFS.
我正在使用GenerateTableFetch
和ExecuteSQL
来实现这一目标.
I am using GenerateTableFetch
and ExecuteSQL
to achieve this.
每个传入流文件都将具有database.name
属性,DBCPConnectionPoolLookup
会使用该属性来选择相关的分片.
Each incoming flow file will have a database.name
attribute which is being used by DBCPConnectionPoolLookup
to select the relevant shard.
问题是,假设我有2个分片从表帐户的shard_1
和shard_2
中提取数据,并且还有updated_at
作为Maximum Value Columns
,它没有存储table@updated_at
每个分片.每个状态中的表只有1个条目.
Issue is that, let's say I have 2 shards to pull data from, shard_1
and shard_2
for table accounts and also I have updated_at
as Maximum Value Columns
, it is not storing state for the for the table@updated_at
per shard. There is only 1 entry per table in state.
当我签入数据源时,我看到shard_2流文件文件被删除而没有传递给ExecuteSQL.我的猜测是,因为首先执行shard_1查询,然后在shard_2查询到来时,才根据shard_1的updated_at检查它的记录,并且由于它返回空值,因此删除了文件.
When I check in Data Provenance, I see the shard_2 flowfile file getting dropped without being passed to ExecuteSQL. And my guess is it's because shard_1 query gets executed first and then when shard_2 query comes, it's records are checked against shard_1's updated_at and since it returns empty, it drops the file.
有人遇到这个问题吗?还是我错过了什么?
Has anyone faced this issue? Or am I missing something?
推荐答案
在将状态存储在数据库提取处理器(例如QueryDatabaseTable,GenerateTableFetch等)中的方案之后,添加了通过DBCPConnectionPoolLookup选择不同数据库的功能.另外,获取数据库名称在RDBMS驱动程序之间有所不同,它可能在DatabaseMetaData或ResultSetMetaData中,可能在getCatalog()或getSchema()中,或都不在其中.
The ability to choose different databases via DBCPConnectionPoolLookup was added after the scheme to store state in the database fetch processors (QueryDatabaseTable, GenerateTableFetch, e.g.). Also, getting the database name differs between RDBMS drivers, it might be in the DatabaseMetaData or ResultSetMetaData, possibly in getCatalog() or getSchema() or neither.
我写了 NIFI-5590 来说明这一改进.
I have written NIFI-5590 to cover this improvement.
这篇关于NiFi GenerateTableFetch不会按数据库存储状态.名称的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!