Spark Streaming Special Initial Warranty Window

I use Spark Streaming to read data from Kinesis using Structured Streaming structure, my connection is as follows

val kinesis = spark
  .readStream
  .format("kinesis")
  .option("streams", streamName)
  .option("endpointUrl", endpointUrl)
  .option("initialPositionInStream", "earliest")
  .option("format", "json")
  .schema(<my-schema>)
  .load

The data comes from several IoT devices that have a unique identifier, I need to aggregate the data by this identifier and an oval window above the timestamp field, as shown below:

val aggregateData = kinesis
    .groupBy($"uid", window($"timestamp", "15 minute", "15 minute"))
    .agg(...)

The problem I am facing is that I have to ensure that every window starts in round-the-clock (e.g. 00:00:00, 00:15:00, etc.), I also need a guarantee that only lines containing full 15-minute windows will be output to my receiver, what I am doing now is

val query = aggregateData
    .writeStream
      .foreach(postgreSQLWriter)
      .outputMode("update")
      .start()
      .awaitTermination()

ths postgreSQLWriter - StreamWriter, PostgreSQL SGBD. 15 , - 15- ?

+4
1

Question1: , , , "". , :

dataframe.groupBy($"Column1",window($"TimeStamp","22 minute","1 minute","15 minute"))

1 22- 1 15

, :

window1: 8:15(8:00 add 15 minute offset) to 8:37 (8:15 add 22 minutes)
window2: 8:16(previous window start + 1 minute) to 8:38 ( 22 minute size again)

question2: , 15- , count, , . 15, , ,

:

dataframe.groupBy($"Column1",window($"TimeStamp","22 minute","1 minute","15 minute")).agg(count*$"Column2").as("count"))

writestream, 15:

aggregateddata.filter($"count"===15).writeStream.format(....).outputMode("complete").start()
+1

Source: https://habr.com/ru/post/1686063/


All Articles