I have the following code
import org.apache.spark.sql.streaming.Trigger val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load(); jdf.createOrReplaceTempView("table") val resultdf = spark.sql("select * from table as x inner join table as y on x.offset=y.offset") resultdf.writeStream.outputMode("append").format("console").option("truncate", false).trigger(Trigger.ProcessingTime(1000)).start()
and I get the following exception
org.apache.spark.sql.AnalysisException: cannot resolve '`x.offset`' given input columns: [x.value, x.offset, x.key, x.timestampType, x.topic, x.timestamp, x.partition]; line 1 pos 50; 'Project [*] +- 'Join Inner, ('x.offset = 'y.offset) :- SubqueryAlias x : +- SubqueryAlias table : +- StreamingRelation DataSource( org.apache.spark.sql.SparkSession@15f3f9cf ,kafka,List(),None,List(),None,Map(startingOffsets -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> localhost:9092),None), kafka, [key
I changed the code to this
import org.apache.spark.sql.streaming.Trigger val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load(); val jdf1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load(); jdf.createOrReplaceTempView("table") jdf1.createOrReplaceTempView("table1") val resultdf = spark.sql("select * from table inner join table1 on table.offset=table1.offset") resultdf.writeStream.outputMode("append").format("console").option("truncate", false).trigger(Trigger.ProcessingTime(1000)).start()
And it works. However, I do not think this is the solution I am looking for. I want to be able to make an independent connection using raw SQL, but not by creating additional copies of the data, for example, the code above. so is there any other way?