How to make independent participation in Spark 2.3.0? What is the correct syntax?

I have the following code

import org.apache.spark.sql.streaming.Trigger val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load(); jdf.createOrReplaceTempView("table") val resultdf = spark.sql("select * from table as x inner join table as y on x.offset=y.offset") resultdf.writeStream.outputMode("append").format("console").option("truncate", false).trigger(Trigger.ProcessingTime(1000)).start() 

and I get the following exception

 org.apache.spark.sql.AnalysisException: cannot resolve '`x.offset`' given input columns: [x.value, x.offset, x.key, x.timestampType, x.topic, x.timestamp, x.partition]; line 1 pos 50; 'Project [*] +- 'Join Inner, ('x.offset = 'y.offset) :- SubqueryAlias x : +- SubqueryAlias table : +- StreamingRelation DataSource( org.apache.spark.sql.SparkSession@15f3f9cf ,kafka,List(),None,List(),None,Map(startingOffsets -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, offset#32L, timestamp#33, timestampType#34] +- SubqueryAlias y +- SubqueryAlias table +- StreamingRelation DataSource( org.apache.spark.sql.SparkSession@15f3f9cf ,kafka,List(),None,List(),None,Map(startingOffsets -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, offset#32L, timestamp#33, timestampType#34] 

I changed the code to this

 import org.apache.spark.sql.streaming.Trigger val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load(); val jdf1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "join_test").option("startingOffsets", "earliest").load(); jdf.createOrReplaceTempView("table") jdf1.createOrReplaceTempView("table1") val resultdf = spark.sql("select * from table inner join table1 on table.offset=table1.offset") resultdf.writeStream.outputMode("append").format("console").option("truncate", false).trigger(Trigger.ProcessingTime(1000)).start() 

And it works. However, I do not think this is the solution I am looking for. I want to be able to make an independent connection using raw SQL, but not by creating additional copies of the data, for example, the code above. so is there any other way?

+4
source share
2 answers

This is a known issue and will be fixed in 2.4.0. See https://issues.apache.org/jira/browse/SPARK-23406 . Right now, you may simply not join the same DataFrame objects.

+4
source

You can use the DataFrame API join function instead of SQL syntax:

 jdf.as("df1").join(jdf.as("df2"), $"df1.offset" === $"df2.offset", "inner") 
+1
source

Source: https://habr.com/ru/post/1266963/


All Articles