最近在做一个物联网paas项目,实时设备预警结合kafka流式实时计算的特定,采用springBoot结合kafka来完成
kafka环境搭建——kafka基本命令及环境搭建
org.springframework.kafka
spring-kafka
配置参数
#kafka配置
#broker地址,多个使用英文逗号分隔
spring.kafka.bootstrap-servers=127.0.0.1:9092
#指定序列号与反序列化
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#要使用消费者必须指定group-id
spring.kafka.consumer.group-id=server
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=1000
#如果消费的topic不存在,是否在启动时抛出异常,false为不抛出
spring.kafka.listener.missing-topics-fatal=false
#kafka定时检查的通配符,发现新分区需要5分钟的间隔
#默认的offset-reset是latest,导致分配分区前的间隔时间内,新分区的消息会丢失
#设置为earliest从头消费该分区数据,即使新分区,也会消费间隔时间内产生的旧消息
spring.kafka.consumer.auto-offset-reset=earliest
生产者
自动注入springboot自动配置的KafkaTemplate,其中包含几个常用的生产方法,key可有可无,主要是为了优化分区,相同key(不为null)的消息会分配到相同分区
//指定topic和value
public ListenableFuture<SendResult> send(String topic, @Nullable V data) {
ProducerRecord producerRecord = new ProducerRecord(topic, data);
return this.doSend(producerRecord);
}
//指定topic和key和value
public ListenableFuture<SendResult> send(String topic, K key, @Nullable V data) {
ProducerRecord producerRecord = new ProducerRecord(topic, key, data);
return this.doSend(producerRecord);
}
//指定topic和value和key以及分区
public ListenableFuture<SendResult> send(String topic, Integer partition, K key, @Nullable V data) {
ProducerRecord producerRecord = new ProducerRecord(topic, partition, key, data);
return this.doSend(producerRecord);
}
//指定topic和value和key以及分区,还有消息创建时间戳
public ListenableFuture<SendResult> send(String topic, Integer partition, Long timestamp, K key, @Nullable V data) {
ProducerRecord producerRecord = new ProducerRecord(topic, partition, timestamp, key, data);
return this.doSend(producerRecord);
}
如果想要发送指定时间戳而不指定分区,只需要在使用send(String topic, Integer partition, Long timestamp, K key, @Nullable V data)时,传递partition为null
@Component
public class Producer{
@Autowired
private KafkaTemplate kafkaTemplate;
public void send(String topic, String key, String message, Long timestamp, Integer partition){
kafkaTemplate.send(topic,partition,timestamp,key, message);
}
}
消费者
在方法上打KafkaListener以开启一个消费者订阅,topics属性用于订阅单个主题,使用topicPatter可以使用通配符订阅多个主题,方法参数类型为ConsumerRecord
@Component
public class Consumer {
@KafkaListener(topicPattern ="test.*.*")
public void recordListener(ConsumerRecord record){
record.topic();//return topic name
record.key();//return key
record.value();//return value
}
}
外部生产问题
以下是外部生产代码,外部生产者创建的主题,springBoot集成的kafka并不能立即分区并消费其消息。springBoot默认是5分钟一次去检测新主题并分区,所以应该配置spring.kafka.consumer.auto-offset-reset=earliest才能保证在检测间隔中产生的新分区的消息不丢失
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers","127.0.0.1:9092");
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.put("group.id","ss");
KafkaProducer kafkaProducer = new KafkaProducer(properties);
kafkaProducer.send(new ProducerRecord("test.a.b","key","val"),(metaData, exception)->{
System.out.println(metaData.topic());
if(exception!=null){
exception.printStackTrace();
}
});
kafkaProducer.flush();//没有flush不会发送
}