更新时间:2022-07-19 GMT+08:00

Old Consumer API使用样例

功能介绍

每一个Consumer实例都属于一个Consumer group,每一条消息只会被同一个Consumer group里的一个Consumer实例消费(不同的Consumer group可以同时消费同一条消息)。

下面代码片段在com.huawei.bigdata.kafka.example.Old_Consumer类中,作用在于订阅指定Topic的消息。(注意:旧Consumer API仅支持访问未设置ACL的Topic,安全接口说明见安全接口说明

样例代码

Old Consumer API线程run方法中的消费逻辑

/**  *启动执行Consumer,订阅Kafka上指定topic消息。  */  
 public void run() 
 { 
   LOG.info("Consumer: start."); 

   Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 
   topicCountMap.put(topic, new Integer(1)); 
   Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); 
   List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); 

   LOG.info("Consumerstreams size is : " + streams.size()); 

   for (KafkaStream<byte[], byte[]> stream : streams) 
   { 
     ConsumerIterator<byte[], byte[]> it = stream.iterator(); 

     while (it.hasNext()) 
     { 
       LOG.info("Consumer: receive " + new String(it.next().message()) + " from " + topic); 
     } 
   } 

   LOG.info("Consumer End."); 
 }