I am trying to make my code more efficient, since I have to process billions of lines of data in cassandra. I am currently using the JAVA loop in the Datastax Cassandra Spark Connector to pull data and put it in a format that I am familiar with (multimap) to get a spark to do the manipulation. I would like to be able to replace this Multimap loop with direct spark manipulation of the cassandra table in order to save time and make everything more efficient. I would really appreciate any code suggestions for this. Here is my existing code:
Statement stmt = new SimpleStatement("SELECT \"Power\",\"Bandwidth\",\"Start_Frequency\" FROM \"SB1000_49552019\".\"Measured_Value\";");
stmt.setFetchSize(2000000);
ResultSet results = session.execute(stmt);
Multimap<Double, Float> data = LinkedListMultimap.create();
for (Row row : results){
start_frequency = row.getDouble("Start_Frequency");
power = row.getFloat("Power");
bandwidth = row.getDouble("Bandwidth");
for(channel = 1.6000E8; channel <= channel_end; ){
if( (channel >= start_frequency) && (channel <= (start_frequency + bandwidth)) ) {
data.put(channel, power);
}
channel+=increment;
}
}
List<Value> values = data.asMap().entrySet()
.stream()
.flatMap(x -> x.getValue()
.stream()
.map(y -> new Value(x.getKey(), y)))
.collect(Collectors.toList());
sqlContext.createDataFrame(sc.parallelize(values), Value.class).groupBy(col("channel"))
.agg(min("power"), max("power"), avg("power"))
.write().mode(SaveMode.Append)
.option("table", "results")
.option("keyspace", "model")
.format("org.apache.spark.sql.cassandra").save();
}
}