Apache Nifi / Cassandra-如何将CSV加载到Cassandra表中 [英] Apache Nifi/Cassandra - how to load CSV into Cassandra table

查看:155
本文介绍了Apache Nifi / Cassandra-如何将CSV加载到Cassandra表中的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我每天要收到几次不同的CSV文件,它们存储来自传感器的时间序列数据,这些传感器是传感器站的一部分。每个CSV均以其来自的传感器站和传感器ID命名,例如 station1_sensor2.csv。目前,数据的存储方式如下:

 > cat station1_sensor2.csv 
2016-05-04 03:02:01.001000 + 0000; 0;
2016-05-04 03:02:01.002000 + 0000; 0.1234;
2016-05-04 03:02:01.003000 + 0000; 0.2345;

我创建了一个Cassandra表来存储它们并能够查询它们以完成各种已确定的任务。 Cassandra表看起来像这样:

  cqlsh>使用复制= {'class':'SimpleStrategy','replication_factor':3}来创建KEYSPACE数据; 

创建表sensor_data(
station_id文本,//工作站的ID
sensor_id文本,//传感器的ID
tps时间戳,//时间戳测量
val浮动,//测量值
主键((station_id,sensor_id),tps)
);

我想使用Apache Nifi将CSV中的数据自动存储到此Cassandra表中,但是我找不到正确的例子或方案。我试图使用 PutCassandraQL处理器,但是我在没有任何明确示例的情况下仍在努力。因此,对于如何使用Apache Nifi执行Cassandra put查询以将数据插入表中的任何帮助,将不胜感激!

解决方案

TL; DR我有一个NiFi 1.0模板可以在


  1. 读取CSV文件。可以使用GetFile,最好是ListFile-> FetchFile。在我的示例中,我使用脚本处理器内联创建流文件,其中包含来自上方的示例数据。


  2. 解析文件名以获取测站和传感器字段。它使用 NiFi表达式语言来获取零件下划线之前(用于工作站)和下划线之后(减去CSV扩展名)的传感器文件名。


  3. 将单个CSV流文件拆分为一个流每行文件。这样做是为了以后我们可以创建单个CQL INSERT语句。


  4. 从每行提取列值。我为此使用了ExtractText和一个正则表达式,如果您有非常复杂的逻辑,则可能需要检出脚本处理器,例如 ExecuteScript


  5. 更改时间戳记。 IIRC,CQL不接受时间戳文字的微秒。您可以尝试解析微秒(最好在ExecuteScript处理器中完成),也可以只是重新格式化时间戳。请注意,由于无法解析微秒,因此重新格式化会导致所有小数秒都被截断。


  6. 构建CQL INSERT声明。此时,数据(无论如何在我的模板中)全部在流文件属性中,原始内容可以用CQL INSERT语句替换(这是 PutCassandraQL 期望)。您可以将数据保留在属性中(使用UpdateAttribute正确命名它们,请参阅PutCassandraQL文档)并使用准备好的语句,但是恕我直言,编写显式CQL语句更简单。在撰写本文时,PutCassandraQL尚未缓存PreparedStatements,因此现在这样做的性能实际上较低。


  7. 使用以下命令执行CQL语句PutCassandraQL。


我没有详细介绍属性名称等信息,但是当流程到达ReplaceText时,我具有以下属性:




  • station.name:包含从文件名解析的站点的名称

  • sensor.name:包含从文件名解析的传感器的名称

  • tps:包含更新的时间戳值

  • columns.2 :(大概)包含传感器读数的值



ReplaceText将内容设置为以下内容(使用表达式语言以填充值):

 插入sensor_data(station_id,sensor_id,tps,val)值('$ { station.name}, $ {sensor.name}, $ {tps},$ {column.2 })

希望能有所帮助,如果您有任何疑问或问题,请告诉我。干杯!


I have various CSV files incoming several times per day, storing timeseries data from sensors, which are parts of sensors stations. Each CSV is named after the sensor station and sensor id from which it is coming from, for instance "station1_sensor2.csv". At the moment, data is stored like this :

> cat station1_sensor2.csv
2016-05-04 03:02:01.001000+0000;0;
2016-05-04 03:02:01.002000+0000;0.1234;
2016-05-04 03:02:01.003000+0000;0.2345;

I have created a Cassandra table to store them and to be able to query them for various identified tasks. The Cassandra table looks like this :

cqlsh > CREATE KEYSPACE data with replication = {'class' : 'SimpleStrategy', 'replication_factor' : 3};

        CREATE TABLE sensor_data (
        station_id text, // id of the station
        sensor_id text,  // id of the sensor
        tps timestamp,   // timestamp of the measure
        val float,       // measured value
        PRIMARY KEY ((station_id, sensor_id), tps)
        );

I would like to use Apache Nifi to automatically store the data from the CSV into this Cassandra Table, but I can't find example or scheme to do it right. I have tried to use the "PutCassandraQL" processor, but I am struggling without any clear example. So, any help on how to execute a Cassandra put query with Apache Nifi to insert the data into the table would be appreciated !

解决方案

TL;DR I have a NiFi 1.0 template to accomplish this on Gist and in the NiFi Wiki.

NiFi encourages very modular design, so let's break this down into smaller tasks, I'll describe a possible flow and explain what each processor is used for in terms of your use case:

  1. Read in the CSV file. This can be done with GetFile, or preferably ListFile -> FetchFile. In my example I am using a scripting processor to create a flow file in-line, containing your example data from above. This makes my template portable for others to use.

  2. Parse the filename to get the station and sensor fields. This uses NiFi Expression Language to get the parts of the filename before the underscore (for station) and after the underscore (minus the CSV extension) for sensor.

  3. Split the single CSV flow file into one flow file per line. This is done so we can create individual CQL INSERT statements later.

  4. Extract the column values from each line. I used ExtractText and a regular expression for this, if you have very complicated logic you may want to check out a scripting processor such as ExecuteScript.

  5. Alter the timestamp. IIRC, CQL doesn't accept microseconds on timestamp literals. You can either try to parse out the microseconds (might best be done in an ExecuteScript processor) or just re-format the timestamp. Note that "re-formatting", since the microseconds couldn't be parsed, causes all fractional seconds to be truncated in my example.

  6. Build a CQL INSERT statement. At this point the data (in my template anyway) is all in flow file attributes, the original content can be replaced with a CQL INSERT statement (which is the way PutCassandraQL expects it). You can keep the data in attributes (using UpdateAttribute to name them correctly, see the PutCassandraQL doc) and use a prepared statement, but IMHO it's simpler to write an explicit CQL statement. At the time of this writing, PutCassandraQL is not caching PreparedStatements, so it is actually less performant right now to do things that way.

  7. Execute the CQL statements with PutCassandraQL.

I didn't go into detail as far as the names of my attributes and such, but by the time the flow gets to ReplaceText, I have the following attributes:

  • station.name: Contains the name of the station parsed from the filename
  • sensor.name: Contains the name of the sensor parsed from the filename
  • tps: Contains the updated timestamp value
  • columns.2: Contains (presumably) the value of the sensor reading

The ReplaceText sets the content to the following (using Expression Language to fill in the values):

insert into sensor_data (station_id, sensor_id, tps, val) values ('${station.name}', '${sensor.name}', '${tps}', ${column.2})

Hopefully that helps, please let me know if you have any questions or issues. Cheers!

这篇关于Apache Nifi / Cassandra-如何将CSV加载到Cassandra表中的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆