Updated on 2022-06-01 GMT+08:00

Scala Sample Code

Function Description

In Spark applications, use Streaming to call Kafka APIs to obtain data and write data analysis results to an HBase table.

Sample Code

The following code snippets are used as an example. For complete codes, see com.huawei.bigdata.spark.examples.SparkOnStreamingToHbase.

/**
  * Run a Streaming job. Read data from HBase table1 based on the value, sum two data records, and update the new data in the HBase table1.
  */
object SparkOnStreamingToHbase {
  def main(args: Array[String]) {
    if (args.length < 4) {
      printUsage
    }

    val Array(checkPointDir, topics, brokers, zkQuorum) = args
    val sparkConf = new SparkConf().setAppName("DirectStreamToHbase")
    val ssc = new StreamingContext(sparkConf, Seconds(5))

    // Set the CheckPoint directory of Streaming.
    if (!"nocp".equals(checkPointDir)) {
      ssc.checkpoint(checkPointDir)
    }

    val columnFamily = "cf"
    val zkClientPort = "24002"
    val kafkaParams = Map[String, String](
      "metadata.broker.list" -> brokers
    )

    val topicArr = topics.split(",")
    val topicSet = topicArr.toSet
    // map(_._1) is the key of the message, and map(_._2) is the value of the message.
    val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet).map(_._2)
    lines.foreachRDD(rdd => {
      // Partitions run on the executor.
      rdd.foreachPartition(iterator => hBaseWriter(iterator, zkClientPort, zkQuorum, columnFamily))
    })

    ssc.start()
    ssc.awaitTermination()
  }

  
  /**
   * Write data to the executor.
   * @param iterator Message
   * @param zkClientPort
   * @param zkQuorum
   * @param columnFamily
   */
  def hBaseWriter(iterator: Iterator[String], zkClientPort: String, zkQuorum: String, columnFamily: String): Unit = {
    val conf = HBaseConfiguration.create()
    conf.set("hbase.zookeeper.property.clientPort", zkClientPort)
    conf.set("hbase.zookeeper.quorum", zkQuorum)
    var table: Table = null
    var connection: Connection = null
    try {
      connection = ConnectionFactory.createConnection(conf)
      table = connection.getTable(TableName.valueOf("table1"))
      val iteratorArray = iterator.toArray
      val rowList = new util.ArrayList[Get]()
      for (row <- iteratorArray) {
        val get = new Get(row.getBytes)
        rowList.add(get)
      }
      // Obtain data in table1.
      val resultDataBuffer = table.get(rowList)
      // Set data in table1.
      val putList = new util.ArrayList[Put]()
      for (i <- 0 until iteratorArray.size) {
        val row = iteratorArray(i)
        val resultData = resultDataBuffer(i)
        if (!resultData.isEmpty) {
          // Obtain the old value based on the column family and column.
          val aCid = Bytes.toString(resultData.getValue(columnFamily.getBytes, "cid".getBytes))
          val put = new Put(Bytes.toBytes(row))
          // Calculate the result.
          val resultValue = row.toInt + aCid.toInt
          put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("cid"), Bytes.toBytes(resultValue.toString))
          putList.add(put)
        }
      }
      if (putList.size() > 0) {
        table.put(putList)
      }
    } catch {
      case e: IOException =>
        e.printStackTrace();
    } finally {
      if (table != null) {
        try {
          table.close()
        } catch {
          case e: IOException =>
            e.printStackTrace();
        }
      }
      if (connection != null) {
        try {
          // Close the HBase connection.
          connection.close()
        } catch {
          case e: IOException =>
            e.printStackTrace()
        }
      }
    }
  }
  

  private def printUsage {
    System.out.println("Usage: {checkPointDir} {topic} {brokerList} {zkQuorum}")
    System.exit(1)
  }
}