ToDF does not compile although import sqlContext.implicits._ is used

I have a problem compiling Spark Scala code when I want to use toDF to pass an RDD to a DataFrame. I checked both Spark 2.0.0 and 1.6.2, and the problem is the same all the time. Below I provide my POM file and code snippet:

pom.xml

 <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.test</groupId> <artifactId>test_service</artifactId> <version>1.0-SNAPSHOT</version> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> <scala.version>2.10.6</scala.version> <spark.version>2.0.0</spark.version> <jackson.version>2.8.3</jackson.version> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka_2.10</artifactId> <version>1.6.2</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.10</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-mllib_2.10</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.module</groupId> <artifactId>jackson-module-scala_2.10</artifactId> <version>${jackson.version}</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>${jackson.version}</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-annotations</artifactId> <version>${jackson.version}</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> <version>${jackson.version}</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.module</groupId> <artifactId>jackson-module-scala_2.10</artifactId> <version>2.8.3</version> </dependency> <dependency> <groupId>org.sedis</groupId> <artifactId>sedis_2.10</artifactId> <version>1.2.2</version> </dependency> <dependency> <groupId>com.lambdaworks</groupId> <artifactId>jacks_2.10</artifactId> <version>2.3.3</version> </dependency> <dependency> <groupId>com.github.nscala-time</groupId> <artifactId>nscala-time_2.10</artifactId> <version>2.12.0</version> </dependency> <dependency> <groupId>com.typesafe</groupId> <artifactId>config</artifactId> <version>1.3.1</version> </dependency> <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk-s3</artifactId> <version>1.11.53</version> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <executions> <execution> <id>build-a</id> <configuration> <archive> <manifest> <mainClass>org.test.Runner1</mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <finalName>runner1</finalName> </configuration> <phase>package</phase> <goals> <goal>assembly</goal> </goals> </execution> <execution> <id>build-b</id> <configuration> <archive> <manifest> <mainClass>org.test.Runner2</mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <finalName>runner2</finalName> </configuration> <phase>package</phase> <goals> <goal>assembly</goal> </goals> </execution> </executions> </plugin> <!-- Configure maven-compiler-plugin to use the desired Java version --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>${java.version}</source> <target>${java.version}</target> </configuration> </plugin> <!-- Use build-helper-maven-plugin to add Scala source and test source directories --> <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>build-helper-maven-plugin</artifactId> <version>1.10</version> <executions> <execution> <id>add-source</id> <phase>generate-sources</phase> <goals> <goal>add-source</goal> </goals> <configuration> <sources> <source>src/main/scala</source> </sources> </configuration> </execution> <execution> <id>add-test-source</id> <phase>generate-test-sources</phase> <goals> <goal>add-test-source</goal> </goals> <configuration> <sources> <source>src/test/scala</source> </sources> </configuration> </execution> </executions> </plugin> <!-- Use scala-maven-plugin for Scala support --> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <executions> <execution> <goals> <!-- Need to specify this explicitly, otherwise plugin won't be called when doing eg mvn compile --> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project> 

Part of the code that provides the error (rdd → RDD[Map[String,Any]] ):

 import org.apache.spark.sql.SQLContext import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._ myDStream.foreachRDD { rdd => val features = rdd.map(line => line.keySet).first().toList val myDF = filtered.toDF(features: _*) // toDF is not recognized!!! } 
0
source share
2 answers

Expected. RDD[Map[String,Any]] cannot be converted to a DataFrame :

+2
source

Create an object or case class with elements containing both the key and the value. Now compare the results for the features variable so that you can pass the RDD["case_class_name"] to the .toDF function

0
source

Source: https://habr.com/ru/post/1012348/


All Articles