Spark's HBase entry: an existential type puzzle

I am trying to write a spark task that should output its output to HBase. As far as I can tell, the right way to do this is to use the method saveAsHadoopDataseton org.apache.spark.rdd.PairRDDFunctions- this requires that my RDDconsist of pairs.

A method saveAsHadoopDatasetis required JobConf, and this is what I am trying to build. According to this link , one thing I have to install on mine JobConfis the output format (in fact, it doesn't work without work) like

jobConfig.setOutputFormat(classOf[TableOutputFormat])

The problem is that this does not seem to compile because it TableOutputFormatis generic, although it ignores its type parameter. So I tried various combinations, for example

jobConfig.setOutputFormat(classOf[TableOutputFormat[Unit]])
jobConfig.setOutputFormat(classOf[TableOutputFormat[_]])

but anyway I get an error

required: Class[_ <: org.apache.hadoop.mapred.OutputFormat[_, _]]

Now, as far as I can tell, it Class[_ <: org.apache.hadoop.mapred.OutputFormat[_, _]]translates to Class[T] forSome { type T <: org.apache.hadoop.mapred.OutputFormat[_, _] }. Here, I think I have a problem because:

  • Class is invariant
  • TableOutputFormat[T] <: OutputFormat[T, Mutation]but
  • I'm not sure how existential types interact with subtyping in a requirement T <: OutputFormat[_, _]

Is there a way to get a subtype OutputFormat[_, _]from TableOutputFormat? The problem seems to be related to the differences between generics in Java and Scala - what can I do for this?

change

It turns out that this is even more subtle. I tried to define myself in the REPL method

def foo(x: Class[_ <: OutputFormat[_, _]]) = x

and I can really call it using

foo(classOf[TableOutputFormat[Unit]])

or even

foo(classOf[TableOutputFormat[_]])

important for this. But I can’t name

jobConf.setOutputFormat(classOf[TableOutputFormat[_]])

The original signature setOutputFormatin Java is void setOutputFormat(Class<? extends OutputFormat> theClass). How can I call it from Scala?

+4
2

, 100% , (EDIT: , , . ), ? , , :

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.hbase.mapred.TableOutputFormat

val conf = HBaseConfiguration.create()

val jobConfig: JobConf = new JobConf(conf, this.getClass)
jobConfig.setOutputFormat(classOf[TableOutputFormat])
jobConfig.set(TableOutputFormat.OUTPUT_TABLE, outputTable)

:

"org.apache.hadoop" % "hadoop-client" % "2.3.0-mr1-cdh5.0.0",
"org.apache.hbase" % "hbase-client" % "0.96.1.1-cdh5.0.0", 
"org.apache.hbase" % "hbase-common" % "0.96.1.1-cdh5.0.0", 

"org.apache.hbase" % "hbase-hadoop-compat" % "0.96.1.1-cdh5.0.0",
"org.apache.hbase" % "hbase-it" % "0.96.1.1-cdh5.0.0", /
"org.apache.hbase" % "hbase-hadoop2-compat" % "0.96.1.1-cdh5.0.0",

"org.apache.hbase" % "hbase-prefix-tree" % "0.96.1.1-cdh5.0.0", 
"org.apache.hbase" % "hbase-protocol" % "0.96.1.1-cdh5.0.0", 
"org.apache.hbase" % "hbase-server" % "0.96.1.1-cdh5.0.0",
"org.apache.hbase" % "hbase-shell" % "0.96.1.1-cdh5.0.0", 

"org.apache.hbase" % "hbase-testing-util" % "0.96.1.1-cdh5.0.0", 
"org.apache.hbase" % "hbase-thrift" % "0.96.1.1-cdh5.0.0",
+7

import org.apache.hadoop.hbase.mapred.TableOutputFormat , :

import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
...
val hConf = HBaseConfiguration.create()

val job = Job.getInstance(hConf)
val jobConf = job.getConfiguration
jobConf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
...
rdd.saveAsNewAPIHadoopDataset(jobConf)
+3

Source: https://habr.com/ru/post/1540335/


All Articles