Apache Nifi / Cassandra - how to load CSV into Cassandra table

I have various CSV files that arrive several times a day, storing timers data from sensors that are part of sensor stations. Each CSV is named after the sensor station and the identifier of the sensor from which it comes, for example, "station1_sensor2.csv". Currently, data is saved as follows:

> cat station1_sensor2.csv
2016-05-04 03:02:01.001000+0000;0;
2016-05-04 03:02:01.002000+0000;0.1234;
2016-05-04 03:02:01.003000+0000;0.2345;

I created a Cassandra table to store them and the ability to query them for various identified tasks. The Cassandra table looks like this:

cqlsh > CREATE KEYSPACE data with replication = {'class' : 'SimpleStrategy', 'replication_factor' : 3};

        CREATE TABLE sensor_data (
        station_id text, // id of the station
        sensor_id text,  // id of the sensor
        tps timestamp,   // timestamp of the measure
        val float,       // measured value
        PRIMARY KEY ((station_id, sensor_id), tps)
        );

Apache Nifi CSV Cassandra, , . "PutCassandraQL", - . , Cassandra Apache Nifi !

+4
1

TL; DR NiFi 1.0, Gist NiFi Wiki.

NiFi , , , :

enter image description here

  • CSV . GetFile ListFile → FetchFile. , . .

  • , . NiFi, ( ) ( CSV) .

  • CSV . , CQL INSERT .

  • . ExtractText , , , ExecuteScript.

  • . IIRC, CQL timestamp. ( ExecuteScript), . , " ", , .

  • CQL INSERT. ( ) , CQL INSERT ( PutCassandraQL ). ( UpdateAttribute, , . PutCassandraQL) , IMHO CQL. PutCassandraQL PreparedStatements, , - .

  • CQL PutCassandraQL.

.., ReplaceText :

  • _:
  • sensor.name: ,
  • tps:
  • columns.2: ()

ReplaceText ( ):

insert into sensor_data (station_id, sensor_id, tps, val) values ('${station.name}', '${sensor.name}', '${tps}', ${column.2})

, , , , - . !

+5

Source: https://habr.com/ru/post/1652816/


All Articles