Updated on 2023-04-12 GMT+08:00

Java Sample Code

Function Description

In a Flink application, call the API of the flink-connector-kafka module to produce and consume data.

If you need to interconnect with Kafka in security mode before application development, kafka-client-xx.x.x.jar of MRS is required. You can obtain the JAR file in the MRS client directory.

Sample Code

The following example shows the main logic code of Kafka Consumer and Kafka Producer.

For the complete codes, see com.huawei.bigdata.flink.examples.WriteIntoKafka and com.huawei.flink.example.kafka.ReadFromKafka.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
// Kafka Producer code
public class WriteIntoKafka {
     public static void main(String[] args) throws Exception {
     // Print the command reference for flink run. 
       System.out.println("use command as: ");
       System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" +
           " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21005");
       System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" +
           " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka");
       System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" +
           " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21008 --security.protocol SSL --ssl.truststore.location /home/truststore.jks --ssl.truststore.password huawei");
       System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" +
           " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21009 --security.protocol SASL_SSL --sasl.kerberos.service.name kafka --ssl.truststore.location /home/truststore.jks --ssl.truststore.password huawei");
       System.out.println("******************************************************************************************");
       System.out.println("<topic> is the kafka topic name");
       System.out.println("<bootstrap.servers> is the ip:port list of brokers");
       System.out.println("******************************************************************************************");
      // Construct the execution environment.
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      // Set parallelism.
       env.setParallelism(1);
       // Parse the running parameters.
       ParameterTool paraTool = ParameterTool.fromArgs(args);
      // Construct a StreamGraph and write the data generated from self-defined sources to Kafka.
       DataStream<String> messageStream = env.addSource(new SimpleStringGenerator());
       messageStream.addSink(new FlinkKafkaProducer010<>(paraTool.get("topic"),
           new SimpleStringSchema(),
           paraTool.getProperties()));
       // Invoke execute to trigger the execution.
       env.execute();
     }
    // Customize the sources and generate a message every other second.
     public static class SimpleStringGenerator implements SourceFunction<String> {
       private static final long serialVersionUID = 2174904787118597072L;
       boolean running = true;
       long i = 0;
       @Override
       public void run(SourceContext<String> ctx) throws Exception {
         while (running) {
           ctx.collect("element-" + (i++));
           Thread.sleep(1000);
         }
       }
       @Override
       public void cancel() {
         running = false;
       }
     }
   } 
// Kafka Consumer code
public class ReadFromKafka {
     public static void main(String[] args) throws Exception {
     // Print the command reference for flink run. 
       System.out.println("use command as: ");
       System.out.println("./bin/flink run --class com.huawei.flink.example.kafka.ReadFromKafka" +
           " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21005");
       System.out.println("./bin/flink run --class com.huawei.flink.example.kafka.ReadFromKafka" +
           " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka");
       System.out.println("./bin/flink run --class com.huawei.flink.example.kafka.ReadFromKafka" +
           " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21008 --security.protocol SSL --ssl.truststore.location /home/truststore.jks --ssl.truststore.password huawei");
       System.out.println("./bin/flink run --class com.huawei.flink.example.kafka.ReadFromKafka" +
           " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21009 --security.protocol SASL_SSL --sasl.kerberos.service.name kafka --ssl.truststore.location /home/truststore.jks --ssl.truststore.password huawei");
       System.out.println
("******************************************************************************************");
       System.out.println("<topic> is the kafka topic name");
       System.out.println("<bootstrap.servers> is the ip:port list of brokers");
       System.out.println
("******************************************************************************************");
      // Construct the execution environment.
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      // Set parallelism.
       env.setParallelism(1);
       // Parse the running parameters.
       ParameterTool paraTool = ParameterTool.fromArgs(args);
     //Construct a StreamGraph, read data from Kafka, and print the data in a new line.
       DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer010<>(paraTool.get("topic"),
           new SimpleStringSchema(),
           paraTool.getProperties()));
       messageStream.rebalance().map(new MapFunction<String, String>() {
         @Override
         public String map(String s) throws Exception {
           return "Flink says " + s + System.getProperty("line.separator");
         }
       }).print();
       // Invoke execute to trigger the execution. 
       env.execute();
     }
   }