Spark Streaming with Hbase

I am trying to get data from hbase. For all the tutes, I believe that in order for Hbase data to go through Kafka, is it possible to integrate between the spark stream and hbase directly, without including Kafka in the chain Thank you.

+4
source share
1 answer

is it possible to integrate between the spark flow and hbase directly without turning on Kafka

Yes .. it is possible since we did the same without using kafa. see below JavaHBaseStreamingBulkPutExample example

package org.apache.hadoop.hbase.spark.example.hbasecontext;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.spark.JavaHBaseContext;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

/**
 * This is a simple example of BulkPut with Spark Streaming
 */
final public class JavaHBaseStreamingBulkPutExample {

  private JavaHBaseStreamingBulkPutExample() {}

  public static void main(String[] args) {
    if (args.length < 4) {
      System.out.println("JavaHBaseBulkPutExample  " +
              "{host} {port} {tableName}");
      return;
    }

    String host = args[0];
    String port = args[1];
    String tableName = args[2];

    SparkConf sparkConf =
            new SparkConf().setAppName("JavaHBaseStreamingBulkPutExample " +
                    tableName + ":" + port + ":" + tableName);

    JavaSparkContext jsc = new JavaSparkContext(sparkConf);

    try {
      JavaStreamingContext jssc =
              new JavaStreamingContext(jsc, new Duration(1000));

      JavaReceiverInputDStream<String> javaDstream =
              jssc.socketTextStream(host, Integer.parseInt(port));

      Configuration conf = HBaseConfiguration.create();

      JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);

      hbaseContext.streamBulkPut(javaDstream,
              TableName.valueOf(tableName),
              new PutFunction());
    } finally {
      jsc.stop();
    }
  }

  public static class PutFunction implements Function<String, Put> {

    private static final long serialVersionUID = 1L;

    public Put call(String v) throws Exception {
      String[] part = v.split(",");
      Put put = new Put(Bytes.toBytes(part[0]));

      put.addColumn(Bytes.toBytes(part[1]),
              Bytes.toBytes(part[2]),
              Bytes.toBytes(part[3]));
      return put;
    }

  }
}
+3
source

Source: https://habr.com/ru/post/1666508/


All Articles