Updated on 2022-06-01 GMT+08:00

Java Sample Code

Function Description

In a Spark application, users can use Spark to call a Hive API to operate a Hive table, and write the data analysis result of the Hive table to an HBase table.

Sample Code

The following code snippets are used as an example. For complete codes, see com.huawei.bigdata.spark.examples.SparkHivetoHbase.

/**
  * Read data from the Hive table, and obtain the corresponding record from the HBase table based on the key value. Sum the obtained two data records and update the sum result to the HBase table.
 */
public class SparkHivetoHbase {

  public static void main(String[] args) throws Exception {
    if (args.length < 1) {
      printUsage();
    }

    // Use the Spark API to obtain table data.
    SparkConf conf = new SparkConf().setAppName("SparkHivetoHbase");
    JavaSparkContext jsc = new JavaSparkContext(conf);
    HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(jsc);
    DataFrame dataFrame = sqlContext.sql("select name, account from person");

    // Traverse every partition in the Hive table and update data to the HBase table.
    // If the number of data records is small, you can use the foreach() method.
    final String zkQuorum = args[0];
    dataFrame.toJavaRDD().foreachPartition(
      new VoidFunction<Iterator<Row>>() {
        public void call(Iterator<Row> iterator) throws Exception {
          hBaseWriter(iterator,zkQuorum);
        }
      }
    );

    jsc.stop();
  }

 /**
   * Update records in the HBase table on the executor.
   *
   * @param iterator Partition data in the Hive table.
   */
  private static void hBaseWriter(Iterator<Row> iterator, String zkQuorum) throws IOException {
    // Read the HBase table.
    String tableName = "table2";
    String columnFamily = "cf";
    Configuration conf = HBaseConfiguration.create();
    conf.set("hbase.zookeeper.property.clientPort", "24002");
    conf.set("hbase.zookeeper.quorum", zkQuorum);
    Connection connection = null;
    Table table = null;
    try {
      connection = ConnectionFactory.createConnection(conf);
      table = connection.getTable(TableName.valueOf(tableName));
      List<Row> table1List = new ArrayList<Row>();
      List<Get> rowList = new ArrayList<Get>();
      while (iterator.hasNext()) {
        Row item = iterator.next();
        Get get = new Get(item.getString(0).getBytes());
        table1List.add(item);
        rowList.add(get);
      }
      // Obtain the records in the HBase table.
      Result[] resultDataBuffer = table.get(rowList);
      // Modify records in the HBase table.
      List<Put> putList = new ArrayList<Put>();
      for (int i = 0; i < resultDataBuffer.length; i++) {
        // Hive table value
        Result resultData = resultDataBuffer[i];
        if (!resultData.isEmpty()) {
          // get hiveValue
          int hiveValue = table1List.get(i).getInt(1);
          // Obtain the HBase table value based on the column family and column.
          String hbaseValue = Bytes.toString(resultData.getValue(columnFamily.getBytes(), "cid".getBytes()));
          Put put = new Put(table1List.get(i).getString(0).getBytes());
          // Calculate the result.
          int resultValue = hiveValue + Integer.valueOf(hbaseValue);
          // Set the result to the Put object.
          put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("cid"), Bytes.toBytes(String.valueOf(resultValue)));
          putList.add(put);
        }
      }
      if (putList.size() > 0) {
        table.put(putList);
      }
    } catch (IOException e) {
      e.printStackTrace();
    } finally {
      if (table != null) {
        try {
          table.close();
        } catch (IOException e) {
          e.printStackTrace();
        }
      }
      if (connection != null) {
        try {
          // Close the HBase connection.
          connection.close();
        } catch (IOException e) {
          e.printStackTrace();
        }
      }
    }
  }
  

  private static void printUsage() {
    System.out.println("Usage: {zkQuorum}");
    System.exit(1);
  }
}