I have RDD with elements of different types, and I want to calculate them by their types, for example, the code below will work correctly.
scala> val rdd = sc.parallelize(List(1, 2.0, "abc")) rdd: org.apache.spark.rdd.RDD[Any] = ParallelCollectionRDD[0] at parallelize at <console>:24 scala> rdd.filter{case z:Int => true; case _ => false}.count res0: Long = 1 scala> rdd.filter{case z:String => true; case _ => false}.count res1: Long = 1
Now, if the elements are user-defined types, the code below will not work as expected.
scala> class TypeA extends Serializable // this is the base class defined class TypeA scala> case class TypeB(id:Long) extends TypeA // derived class 1 defined class TypeB scala> case class TypeC(name:String) extends TypeA // derived class 2 defined class TypeC scala> val rdd1 = sc.parallelize(List(TypeB(123), TypeC("jack"), TypeB(456))) // create an rdd with different types of elements rdd1: org.apache.spark.rdd.RDD[TypeA with Product] = ParallelCollectionRDD[3] at parallelize at <console>:29 scala> rdd1.count // total size is correct res2: Long = 3 scala> rdd1.filter{case z:TypeB => true; case _ => false}.count // what the hell? res3: Long = 0 scala> rdd1.filter{case z:TypeC => true; case _ => false}.count // again ? res4: Long = 0 scala> rdd1.filter{case z:TypeA => true; case _ => false}.count // only works for the base class? res5: Long = 3
Did I miss something? Help me please!
source share