Updated on 2022-06-01 GMT+08:00

Scala Sample Code

Function Description

In Spark applications, use Streaming to call Kafka APIs to obtain word records. Classify word records to obtain the number of records of each word and write the result data to Kafka0-10.

Sample Code for Streaming to Read Kafka0-10

The following code snippets are used as an example. For complete codes, see com.huawei.bigdata.spark.examples.SecurityKafkaWordCount.

For a normal cluster, you need to comment out the 60th line (as shown in the following) in the com.huawei.bigdata.spark.examples.SecurityKafkaWordCount.scala class:

"security.protocol" -> "SASL_PLAINTEXT",

/**
  * One or more topic messages from Kafka
  * <checkPointDir> is the Spark Streaming checkpoint directory.
  * <brokers> is used for bootstrapping. The producer only uses it to obtain metadata.
  * <topics> is a list of one or more Kafka topics to be consumed.
  * <batchTime> is the duration (in seconds) of one Spark Streaming batch.
  */
object SecurityKafkaWordCount {

  def main(args: Array[String]) {
    val ssc = createContext(args)

    // Start the Streaming system.
    ssc.start()
    ssc.awaitTermination()
  }

  def createContext(args : Array[String]) : StreamingContext = {
    val Array(checkPointDir, brokers, topics, batchSize) = args

    // Create a Streaming startup environment.
    val sparkConf = new SparkConf().setAppName("KafkaWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(batchSize.toLong))

    // Set the CheckPoint directory of Streaming.
    // This parameter is mandatory because a window concept exists.
    ssc.checkpoint(checkPointDir)

    // Obtain a list of topics used by Kafka.
    val topicArr = topics.split(",")
    val topicSet = topicArr.toSet
    val kafkaParams = Map[String, String](
      "bootstrap.servers" -> brokers,
      "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "group.id" -> "DemoConsumer",
      "security.protocol" -> "SASL_PLAINTEXT",
      "sasl.kerberos.service.name" -> "kafka",
      "kerberos.domain.name" -> "hadoop.hadoop.com"
    );

    val locationStrategy = LocationStrategies.PreferConsistent
    val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams)

    // Create a direct kafka stream using brokers and topics.
    // Receive data from Kafka and generate the corresponding DStream.
    val stream = KafkaUtils.createDirectStream[String, String](ssc, locationStrategy, consumerStrategy)

    // Obtain the field attribute of each row.
    val tf = stream.transform ( rdd =>
      rdd.map(r => (r.value, 1L))
    )

    // Sum the total time for calculating the number of words.
    val wordCounts = tf.reduceByKey(_ + _)
    val totalCounts = wordCounts.updateStateByKey(updataFunc)
    totalCounts.print()
    ssc
  }

  def updataFunc(values : Seq[Long], state : Option[Long]) : Option[Long] =
    Some(values.sum + state.getOrElse(0L))
}

Streaming Write To Kafka 0–10 Sample Code

The following code snippets are used as an example. For complete codes, see com.huawei.bigdata.spark.examples.DstreamKafkaWriter.

  • You are advised to use the new API createDirectStream to develop applications instead of the old API createStream. The old API can still be used, but the new API provides better performance and stability.
  • The sample code exists only in mrs-sample-project-1.6.0.zip.
/**
 * Parameter description: 
 * <checkPointDir> is the checkPoint directory.
 * <topics>: Topics subscribed in the Kafka. Multiple topics are separated by commas (,).
 * <brokers> is the Kafka address for obtaining metadata.
 */
object DstreamKafkaWriterTest1 {

  def main(args: Array[String]) {
    if (args.length != 4) {
      System.err.println("Usage: DstreamKafkaWriterTest <checkPointDir> <brokers> <topic>")
      System.exit(1)
    }

    val Array(checkPointDir, brokers, topic) = args
    val sparkConf = new SparkConf().setAppName("KafkaWriter")

    // Enter the properties of Kafka.
    val kafkaParams = Map[String, String](
      "bootstrap.servers" -> brokers,
      "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "value.serializer" -> "org.apache.kafka.common.serialization.ByteArraySerializer",
      "key.serializer" -> "org.apache.kafka.common.serialization.StringSerializer",
      "group.id" -> "dstreamKafkaWriterFt",
      "auto.offset.reset" -> "latest"
    )

    // Creates the context of Streaming.
    val ssc = new StreamingContext(sparkConf, Milliseconds(500));
    val sentData = Seq("kafka_writer_test_msg_01", "kafka_writer_test_msg_02", "kafka_writer_test_msg_03")

    // Create an RDD queue.
    val sent = new mutable.Queue[RDD[String]]()
    sent.enqueue(ssc.sparkContext.makeRDD(sentData))

    // Create a DStream for writing data.
    val wStream = ssc.queueStream(sent)

    // Use the writetokafka API to write data to Kafka.
    wStream.writeToKafka(kafkaParams,
      (x: String) => new ProducerRecord[String, Array[Byte]](topic, x.getBytes))

    // Start the context of Streaming.
    ssc.start()
    ssc.awaitTermination()
  }
}