Updated on 2022-06-01 GMT+08:00

Java Sample Code

Function Description

In Spark applications, use Streaming to call Kafka APIs to obtain word records. Classify word records to obtain the number of records of each word and write the result data to Kafka0-10.

Sample Code for Streaming to Read Kafka0-10

The following code snippets are used as an example. For complete codes, see com.huawei.bigdata.spark.examples.SecurityKafkaWordCount. For sample codes, see Obtaining a Sample Project.

For a normal cluster, you need to comment out the 78th line (as shown in the following) in the com.huawei.bigdata.spark.examples.SecurityKafkaWordCount class:

kafkaParams.put("security.protocol", "SASL_PLAINTEXT");.

/**
  * One or more topic messages from Kafka
  * <checkPointDir> is the Spark Streaming checkpoint directory.
  * <brokers> is used for bootstrapping. The producer only uses it to obtain metadata.
  * <topics> is a list of one or more Kafka topics to be consumed.
  * <batchTime> is the duration (in seconds) of one Spark Streaming batch.
 */
public class SecurityKafkaWordCount
{
  public static void main(String[] args) throws Exception {
    JavaStreamingContext ssc = createContext(args);

    // Start the Streaming system.
    ssc.start();
    try {
      ssc.awaitTermination();
    } catch (InterruptedException e) {
    }
  }

  private static JavaStreamingContext createContext(String[] args) throws Exception {
    String checkPointDir = args[0];
    String brokers = args[1];
    String topics = args[2];
    String batchSize = args[3];

    // Create a Streaming startup environment.
    SparkConf sparkConf = new SparkConf().setAppName("KafkaWordCount");
    JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(Long.parseLong(batchSize) * 1000));

    // Set the CheckPoint directory of Streaming.
    // This parameter is mandatory because a window concept exists.
 ssc.checkpoint(checkPointDir);

    // Obtain a list of topics used by Kafka.
    String[] topicArr = topics.split(",");
    Set<String> topicSet = new HashSet<String>(Arrays.asList(topicArr));
    Map<String, Object> kafkaParams = new HashMap();
    kafkaParams.put("bootstrap.servers", brokers);
    kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    kafkaParams.put("group.id", "DemoConsumer");
    kafkaParams.put("security.protocol", "SASL_PLAINTEXT");
    kafkaParams.put("sasl.kerberos.service.name", "kafka");
    kafkaParams.put("kerberos.domain.name", "hadoop.hadoop.com");

    LocationStrategy locationStrategy = LocationStrategies.PreferConsistent();
    ConsumerStrategy consumerStrategy = ConsumerStrategies.Subscribe(topicSet, kafkaParams);

    // Create a direct kafka stream using brokers and topics.
    // Receive data from Kafka and generate the corresponding DStream.
    JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(ssc, locationStrategy, consumerStrategy);

    // Obtain the field attribute of each row.
    JavaDStream<String> lines = messages.map(new Function<ConsumerRecord<String, String>, String>() {
      @Override
      public String call(ConsumerRecord<String, String> tuple2) throws Exception {
        return tuple2.value();
      }
    });

    // Sum the total time for calculating the number of words.
    JavaPairDStream<String, Integer> wordCounts = lines.mapToPair(
        new PairFunction<String, String, Integer>() {
          @Override
          public Tuple2<String, Integer> call(String s) {
            return new Tuple2<String, Integer>(s, 1);
          }
        }).reduceByKey(new Function2<Integer, Integer, Integer>() {
      @Override
      public Integer call(Integer i1, Integer i2) {
        return i1 + i2;
      }
    }).updateStateByKey(
        new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
          @Override
          public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
            int out = 0;
            if (state.isPresent()) {
              out += state.get();
            }
            for (Integer v : values) {
              out += v;
            }
            return Optional.of(out);
          }
        });

    // Print the result.
    wordCounts.print();
    return ssc;
  }
}

Streaming Write To Kafka 0–10 Sample Code

The following code snippets are used as an example. For complete codes, see com.huawei.bigdata.spark.examples.DstreamKafkaWriter.

  • You are advised to use the new API createDirectStream to develop applications instead of the old API createStream. The old API can still be used, but the new API provides better performance and stability.
  • The sample code exists only in mrs-sample-project-1.6.0.zip.
/**
 * Parameter description: 
 * <checkPointDir> is the checkPoint directory.
 * <topics>: Topics subscribed in the Kafka. Multiple topics are separated by commas (,).
 * <brokers> is the Kafka address for obtaining metadata.
 */
public class JavaDstreamKafkaWriter {

  public static void main(String[] args) throws InterruptedException {

    if (args.length != 4) {
      System.err.println("Usage: DstreamKafkaWriter <checkPointDir> <brokers> <topic>");
      System.exit(1);
    }

    String checkPointDir = args[0];
    String brokers = args[1];
    String topic = args[2];

    SparkConf sparkConf = new SparkConf().setAppName("KafkaWriter");

    // Enter the properties of Kafka.
    Map kafkaParams = new HashMap<String, Object>();
    kafkaParams.put("zookeeper.connect", brokers);
    kafkaParams.put("metadata.broker.list", brokers);
    kafkaParams.put("group.id", "dstreamKafkaWriterFt08");
    kafkaParams.put("auto.offset.reset", "smallest");

    // Create a Context of the Java Spark Streaming.
    JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(500));

    // Enter data to be written to Kafka.
    List<String> sentData = new ArrayList<String>();
    sentData.add("kafka_writer_test_msg_01");
    sentData.add("kafka_writer_test_msg_02");
    sentData.add("kafka_writer_test_msg_03");

    // Create a Java RDD queue.
    Queue<JavaRDD<String>> sent = new LinkedList();
    sent.add(ssc.sparkContext().parallelize(sentData));

    // Create a Java DStream for writing data.
    JavaDStream wStream = ssc.queueStream(sent);
    // Write data to Kafka.
    JavaDStreamKafkaWriterFactory.fromJavaDStream(wStream).writeToKafka(JavaConverters.mapAsScalaMapConverter(kafkaParams),
        new Function<String, ProducerRecord<String, byte[]>>() {
          public ProducerRecord<String, byte[]> call(String s) {
            return new ProducerRecord(topic, s.getBytes());
          }
        });

    ssc.start();
    ssc.awaitTermination();
  }
}