springBoot2.x集成kafka

Diana ·
更新时间:2024-11-14
· 876 次阅读

最近在做一个物联网paas项目,实时设备预警结合kafka流式实时计算的特定,采用springBoot结合kafka来完成
kafka环境搭建——kafka基本命令及环境搭建

依赖 org.springframework.kafka spring-kafka 配置参数 #kafka配置 #broker地址,多个使用英文逗号分隔 spring.kafka.bootstrap-servers=127.0.0.1:9092 #指定序列号与反序列化 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.consumer.value-serializer=org.apache.kafka.common.serialization.StringSerializer #要使用消费者必须指定group-id spring.kafka.consumer.group-id=server spring.kafka.consumer.enable-auto-commit=true spring.kafka.consumer.auto-commit-interval=1000 #如果消费的topic不存在,是否在启动时抛出异常,false为不抛出 spring.kafka.listener.missing-topics-fatal=false #kafka定时检查的通配符,发现新分区需要5分钟的间隔 #默认的offset-reset是latest,导致分配分区前的间隔时间内,新分区的消息会丢失 #设置为earliest从头消费该分区数据,即使新分区,也会消费间隔时间内产生的旧消息 spring.kafka.consumer.auto-offset-reset=earliest 生产者

自动注入springboot自动配置的KafkaTemplate,其中包含几个常用的生产方法,key可有可无,主要是为了优化分区,相同key(不为null)的消息会分配到相同分区

//指定topic和value public ListenableFuture<SendResult> send(String topic, @Nullable V data) { ProducerRecord producerRecord = new ProducerRecord(topic, data); return this.doSend(producerRecord); } //指定topic和key和value public ListenableFuture<SendResult> send(String topic, K key, @Nullable V data) { ProducerRecord producerRecord = new ProducerRecord(topic, key, data); return this.doSend(producerRecord); } //指定topic和value和key以及分区 public ListenableFuture<SendResult> send(String topic, Integer partition, K key, @Nullable V data) { ProducerRecord producerRecord = new ProducerRecord(topic, partition, key, data); return this.doSend(producerRecord); } //指定topic和value和key以及分区,还有消息创建时间戳 public ListenableFuture<SendResult> send(String topic, Integer partition, Long timestamp, K key, @Nullable V data) { ProducerRecord producerRecord = new ProducerRecord(topic, partition, timestamp, key, data); return this.doSend(producerRecord); }

如果想要发送指定时间戳而不指定分区,只需要在使用send(String topic, Integer partition, Long timestamp, K key, @Nullable V data)时,传递partition为null

@Component public class Producer{ @Autowired private KafkaTemplate kafkaTemplate; public void send(String topic, String key, String message, Long timestamp, Integer partition){ kafkaTemplate.send(topic,partition,timestamp,key, message); } } 消费者

在方法上打KafkaListener以开启一个消费者订阅,topics属性用于订阅单个主题,使用topicPatter可以使用通配符订阅多个主题,方法参数类型为ConsumerRecord

@Component public class Consumer { @KafkaListener(topicPattern ="test.*.*") public void recordListener(ConsumerRecord record){ record.topic();//return topic name record.key();//return key record.value();//return value } } 外部生产问题

以下是外部生产代码,外部生产者创建的主题,springBoot集成的kafka并不能立即分区并消费其消息。springBoot默认是5分钟一次去检测新主题并分区,所以应该配置spring.kafka.consumer.auto-offset-reset=earliest才能保证在检测间隔中产生的新分区的消息不丢失

public static void main(String[] args) { Properties properties = new Properties(); properties.put("bootstrap.servers","127.0.0.1:9092"); properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); properties.put("group.id","ss"); KafkaProducer kafkaProducer = new KafkaProducer(properties); kafkaProducer.send(new ProducerRecord("test.a.b","key","val"),(metaData, exception)->{ System.out.println(metaData.topic()); if(exception!=null){ exception.printStackTrace(); } }); kafkaProducer.flush();//没有flush不会发送 }
作者:歪歪梯



springboot kafka

需要 登录 后方可回复, 如果你还没有账号请 注册新账号