Apache Nifi/Cassandra - 如何将 CSV 加载到 Cassandra 表中 [英] Apache Nifi/Cassandra - how to load CSV into Cassandra table

查看:43
本文介绍了Apache Nifi/Cassandra - 如何将 CSV 加载到 Cassandra 表中的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我每天多次收到各种 CSV 文件,存储来自传感器的时间序列数据,传感器是传感器站的一部分.每个 CSV 都以其来自的传感器站和传感器 ID 命名,例如station1_sensor2.csv".目前,数据是这样存储的:

<代码>>cat station1_sensor2.csv2016-05-04 03:02:01.001000+0000;0;2016-05-04 03:02:01.002000+0000;0.1234;2016-05-04 03:02:01.003000+0000;0.2345;

我创建了一个 Cassandra 表来存储它们并能够查询它们以查找各种已识别的任务.Cassandra 表如下所示:

cqlsh >CREATE KEYSPACE data with replication = {'class' : 'SimpleStrategy', 'replication_factor' : 3};创建表 sensor_data (station_id text,//车站idsensor_id text,//传感器的idtps timestamp,//度量的时间戳val float,//测量值PRIMARY KEY ((station_id, sensor_id), tps));

我想使用 Apache Nifi 自动将数据从 CSV 存储到这个 Cassandra 表中,但我找不到正确的示例或方案.我曾尝试使用PutCassandraQL"处理器,但我在没有任何明确示例的情况下苦苦挣扎.因此,对于如何使用 Apache Nifi 执行 Cassandra put 查询以将数据插入表中的任何帮助将不胜感激!

解决方案

TL;DR 我有一个 NiFi 1.0 模板来完成这个在

  1. 读入 CSV 文件.这可以通过 GetFile 来完成,或者最好是 ListFile -> FetchFile.在我的示例中,我使用脚本处理器来创建一个内嵌的流文件,其中包含来自上面的示例数据.这使我的模板可移植给其他人使用.

  2. 解析文件名以获取站点和传感器字段.这使用 NiFi 表达语言 来获取零件传感器的下划线前(站)和下划线后(减去 CSV 扩展名)的文件名.

  3. 将单个 CSV 流文件拆分为每行一个流文件.这样做是为了我们稍后可以创建单独的 CQL INSERT 语句.

  4. 从每一行中提取列值.我为此使用了 ExtractText 和正则表达式,如果您的逻辑非常复杂,您可能需要查看脚本处理器,例如 ExecuteScript.

  5. 修改时间戳.IIRC,CQL 不接受时间戳文字上的微秒.您可以尝试解析出微秒(最好在 ExecuteScript 处理器中完成)或只是重新格式化时间戳.请注意,由于无法解析微秒,因此重新格式化"会导致在我的示例中截断所有小数秒.

  6. 构建 CQL INSERT 语句.此时数据(无论如何在我的模板中)都在流文件属性中,原始内容可以用CQL INSERT语句替换(这是PutCassandraQL 期待它).您可以将数据保存在属性中(使用 UpdateAttribute 正确命名它们,请参阅 PutCassandraQL 文档)并使用准备好的语句,但恕我直言,编写显式 CQL 语句更简单.在撰写本文时,PutCassandraQL 未缓存 PreparedStatements,因此现在以这种方式执行操作实际上性能较低.

  7. 使用 PutCassandraQL 执行 CQL 语句.

我没有详细介绍我的属性名称等,但是当流程到达 ReplaceText 时,我有以下属性:

  • station.name:包含从文件名解析出来的站名
  • sensor.name:包含从文件名解析的传感器名称
  • tps:包含更新的时间戳值
  • columns.2:包含(大概)传感器读数的值

ReplaceText 将内容设置为以下内容(使用表达式语言填充值):

insert into sensor_data (station_id, sensor_id, tps, val) 值('${station.name}', '${sensor.name}', '${tps}', ${column.2})

希望对您有所帮助,如果您有任何疑问或问题,请告诉我.干杯!

I have various CSV files incoming several times per day, storing timeseries data from sensors, which are parts of sensors stations. Each CSV is named after the sensor station and sensor id from which it is coming from, for instance "station1_sensor2.csv". At the moment, data is stored like this :

> cat station1_sensor2.csv
2016-05-04 03:02:01.001000+0000;0;
2016-05-04 03:02:01.002000+0000;0.1234;
2016-05-04 03:02:01.003000+0000;0.2345;

I have created a Cassandra table to store them and to be able to query them for various identified tasks. The Cassandra table looks like this :

cqlsh > CREATE KEYSPACE data with replication = {'class' : 'SimpleStrategy', 'replication_factor' : 3};

        CREATE TABLE sensor_data (
        station_id text, // id of the station
        sensor_id text,  // id of the sensor
        tps timestamp,   // timestamp of the measure
        val float,       // measured value
        PRIMARY KEY ((station_id, sensor_id), tps)
        );

I would like to use Apache Nifi to automatically store the data from the CSV into this Cassandra Table, but I can't find example or scheme to do it right. I have tried to use the "PutCassandraQL" processor, but I am struggling without any clear example. So, any help on how to execute a Cassandra put query with Apache Nifi to insert the data into the table would be appreciated !

解决方案

TL;DR I have a NiFi 1.0 template to accomplish this on Gist and in the NiFi Wiki.

NiFi encourages very modular design, so let's break this down into smaller tasks, I'll describe a possible flow and explain what each processor is used for in terms of your use case:

  1. Read in the CSV file. This can be done with GetFile, or preferably ListFile -> FetchFile. In my example I am using a scripting processor to create a flow file in-line, containing your example data from above. This makes my template portable for others to use.

  2. Parse the filename to get the station and sensor fields. This uses NiFi Expression Language to get the parts of the filename before the underscore (for station) and after the underscore (minus the CSV extension) for sensor.

  3. Split the single CSV flow file into one flow file per line. This is done so we can create individual CQL INSERT statements later.

  4. Extract the column values from each line. I used ExtractText and a regular expression for this, if you have very complicated logic you may want to check out a scripting processor such as ExecuteScript.

  5. Alter the timestamp. IIRC, CQL doesn't accept microseconds on timestamp literals. You can either try to parse out the microseconds (might best be done in an ExecuteScript processor) or just re-format the timestamp. Note that "re-formatting", since the microseconds couldn't be parsed, causes all fractional seconds to be truncated in my example.

  6. Build a CQL INSERT statement. At this point the data (in my template anyway) is all in flow file attributes, the original content can be replaced with a CQL INSERT statement (which is the way PutCassandraQL expects it). You can keep the data in attributes (using UpdateAttribute to name them correctly, see the PutCassandraQL doc) and use a prepared statement, but IMHO it's simpler to write an explicit CQL statement. At the time of this writing, PutCassandraQL is not caching PreparedStatements, so it is actually less performant right now to do things that way.

  7. Execute the CQL statements with PutCassandraQL.

I didn't go into detail as far as the names of my attributes and such, but by the time the flow gets to ReplaceText, I have the following attributes:

  • station.name: Contains the name of the station parsed from the filename
  • sensor.name: Contains the name of the sensor parsed from the filename
  • tps: Contains the updated timestamp value
  • columns.2: Contains (presumably) the value of the sensor reading

The ReplaceText sets the content to the following (using Expression Language to fill in the values):

insert into sensor_data (station_id, sensor_id, tps, val) values ('${station.name}', '${sensor.name}', '${tps}', ${column.2})

Hopefully that helps, please let me know if you have any questions or issues. Cheers!

这篇关于Apache Nifi/Cassandra - 如何将 CSV 加载到 Cassandra 表中的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆