I am working on a custom Spark data source and want the schema to include a string like a primitive byte array.
My problem is that bytes in the resulting byte array are obtained in a box: the output is of type WrappedArray$ofRef. This means that each byte is represented as java.lang.Object. Although I can get around this, I am concerned about the computational and memory overhead that are critical to my application. I just need primitive arrays!
The following is a minimal example demonstrating this behavior.
class DefaultSource extends SchemaRelationProvider with DataSourceRegister {
override def shortName(): String = "..."
override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String],
schema: StructType = new StructType()
): BaseRelation = {
new DefaultRelation(sqlContext)
}
}
class DefaultRelation(val sqlContext: SQLContext) extends BaseRelation with PrunedFilteredScan {
override def schema = {
StructType(
Array(
StructField("key", ArrayType(ByteType))
)
)
}
override def buildScan(
requiredColumnNames: Array[String],
filterArr: Array[Filter]
): RDD[Row] = {
testRDD
}
def testRDD = sqlContext.sparkContext.parallelize(
List(
Row(
Array[Byte](1)
)
)
)
}
Using this example data source as follows:
def schema = StructType(Array(StructField("key", ArrayType(ByteType))))
val rows = sqlContext
.read
.schema(schema)
.format("testdatasource")
.load
.collect()
println(rows(0)(0).getClass)
Then it generates the following output:
class scala.collection.mutable.WrappedArray$ofRef
, WrappedArray - - java.lang.Object ( java.lang.Byte).
, RDD , API- , .
, , .