I am trying to convert a MultiMap from billions of data values ββto a Spark DataFrame to run calculations and then write the results to a cassandra table.
I generate multimap from the following query and cassandra loop. I would be glad to accept the suggestions if there was a better way to get and process this data in a DataFrame, as I am with the loop.
Updated code with the answer:
Statement stmt = new SimpleStatement("SELECT \"Power\",\"Bandwidth\",\"Start_Frequency\" FROM \"SB1000_49552019\".\"Measured_Value\";");
stmt.setFetchSize(1000);
ResultSet results = session.execute(stmt);
Multimap<Double, Float> data = LinkedListMultimap.create();
for (Row row : results){
start_frequency = row.getDouble("Start_Frequency");
power = row.getFloat("Power");
bandwidth = row.getDouble("Bandwidth");
for(channel = 1.6000E8; channel <= channel_end; ){
if( (channel >= start_frequency) && (channel <= (start_frequency + bandwidth)) ) {
data.put(channel, power);
}
channel+=increment;
}
}
List<Value> values = data.asMap().entrySet()
.stream()
.flatMap(x -> x.getValue()
.stream()
.map(y -> new Value(x.getKey(), y)))
.collect(Collectors.toList());
sqlContext.createDataFrame(sc.parallelize(values), Value.class).groupBy(col("channel"))
.agg(min("power"), max("power"), avg("power"))
.write().mode(SaveMode.Append)
.option("table", "results")
.option("keyspace", "model")
.format("org.apache.spark.sql.cassandra").save();
}
}
public class Value implements Serializable {
public Value(Double channel, Float power) {
this.channel = channel;
this.power = power;
}
Double channel;
Float power;
public void setChannel(Double channel) {
this.channel = channel;
}
public void setPower(Float power) {
this.power = power;
}
public Double getChannel() {
return channel;
}
public Float getPower() {
return power;
}
@Override
public String toString() {
return "[" +channel +","+power+"]";
}
}
There are {Double} = [Float] types in the multimap sample, where there can be several Float elements for each Double
Example
{1.50E8=[10, 20], 1.51E8=[-10, -13, -14, -15], 1.52E8=[-10, -11]
I need to use a spark to get min, max, the average value for each of them. For example, for the first 1.50ED there will be min 10, max 20, avg 15.
, , :
queryMV.groupBy(col("channel"))
.agg(min("power"), max("power"), avg("power"))
.write().mode(SaveMode.Append)
.option("table", "results")
.option("keyspace", "model")
.format("org.apache.spark.sql.cassandra").save();
, DataFrame JAVA. , .
, , for , , , / , , . multimap - .