is it possible to integrate between the spark flow and hbase directly without turning on Kafka
Yes .. it is possible since we did the same without using kafa. see below JavaHBaseStreamingBulkPutExample example
package org.apache.hadoop.hbase.spark.example.hbasecontext;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.spark.JavaHBaseContext;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
final public class JavaHBaseStreamingBulkPutExample {
private JavaHBaseStreamingBulkPutExample() {}
public static void main(String[] args) {
if (args.length < 4) {
System.out.println("JavaHBaseBulkPutExample " +
"{host} {port} {tableName}");
return;
}
String host = args[0];
String port = args[1];
String tableName = args[2];
SparkConf sparkConf =
new SparkConf().setAppName("JavaHBaseStreamingBulkPutExample " +
tableName + ":" + port + ":" + tableName);
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
try {
JavaStreamingContext jssc =
new JavaStreamingContext(jsc, new Duration(1000));
JavaReceiverInputDStream<String> javaDstream =
jssc.socketTextStream(host, Integer.parseInt(port));
Configuration conf = HBaseConfiguration.create();
JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
hbaseContext.streamBulkPut(javaDstream,
TableName.valueOf(tableName),
new PutFunction());
} finally {
jsc.stop();
}
}
public static class PutFunction implements Function<String, Put> {
private static final long serialVersionUID = 1L;
public Put call(String v) throws Exception {
String[] part = v.split(",");
Put put = new Put(Bytes.toBytes(part[0]));
put.addColumn(Bytes.toBytes(part[1]),
Bytes.toBytes(part[2]),
Bytes.toBytes(part[3]));
return put;
}
}
}
source
share