Spring Boot集成kafka

Spring Boot对kafka提供了自动配置(auto configuration)。使用用Spring Boot只需要做很少的配置即可集成对kafka的访问。

pom配置

1、继承spring-boot-starter-parent

<parent>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-parent</artifactId>
  <version>1.5.9.RELEASE</version>
  <relativePath/>
 </parent>

对于一个spring boot项目来说,最好继承于spring-boot-starter-parent。它会帮我们统一管理spring用到的相关的包的版本。

2、添加spring-boot-start和spring-kafka的依赖

<dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter</artifactId>
</dependency>

这里不需要设置依赖包的版本,spring-boot-starter-parent已经帮我们添加了版本的管理。

application.properties配置

在application.properties里提供了很多kafka的配置,配置项是以spring.kafka.为前缀,所有的配置项可以在org.springframework.boot.autoconfigure.kafka.KafkaProperties找到。

如配置:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup

发送信息

spring-kafka提供了KafkaTemplate的一个接口,可以用它来发送消息。spring boot会自动构建KafkaTemplate对象,使用kafkaTemplate只需要使用@Autowired注入。

示例

@Component
public class MessageSender{

    @Autowired
	private final KafkaTemplate template;

    public void send() {
      this.template.send("myTopic", "message1");
      this.template.send("myTopic", "message2");
      this.template.send("myTopic", "message3");
    }

}

接收消息

在接收消息端,只需要在指定Bean的方法上添加注解@KafkaListener,我们就可以监听Kafka的消息。

@Component
public class MessageReceiver {

	@KafkaListener(topics = "myTopic")
	public void processMessage(String content) {
		// ...
	}
}

Spring boot会自动创建KafkaListenerContainerFactory。如果要定义KafkaListen,可以在application.properties设置,相关配置项如下:

spring.kafka.listener.ack-count= # Number of records between offset commits when ackMode is "COUNT" or "COUNT_TIME".
spring.kafka.listener.ack-mode= # Listener AckMode. See the spring-kafka documentation.
spring.kafka.listener.ack-time= # Time between offset commits when ackMode is "TIME" or "COUNT_TIME".
spring.kafka.listener.concurrency= # Number of threads to run in the listener containers.
spring.kafka.listener.poll-timeout= # Timeout to use when polling the consumer.
spring.kafka.listener.type=single # Listener type.

可以看到,基于Spring boot可以很简单就完成了kafka的集成。

版权声明:著作权归作者所有。

相关推荐

Spring Cloud Stream集成Kafka

这里演示使用Spring Boot ,Spring Cloud集成Kafka来实现一个简单的实时流系统。添加依赖可以在https://start.spring.io创建一个基于spring boot的maven项目。需要添加的主要依赖:spring-cloud-stream以及spring-cloud-starter-stream-kafka,如下:<

Spring Boot集成RabbitMQ发送接收JSON

在Spring Boot 集成RabbitMQ一文中介绍了如何集成RabbitMQ。默认情况下发送的消息是转换为字节码,这里介绍一下如何发送JSON数据。ObjectMapper最简单发送JSON数据的方式是把对象使用ObjectMapper等JSON工具类把对象转换为JSON格式,然后发送。如下:@Autowired private ObjectMapper&nbs

Spring Boot 集成RabbitMQ

此处假设已经安装好了RabbitMQ,主要讲述使用Spring Boot如何集成RabbitMQ。添加依赖在Maven的pom.xml添加rabbitmq的starter依赖,内容如下:<?xml version="1.0" encoding="UTF-8"?> <project xmlns=&quo

Spring Boot设置服务器的端口

Spring Boot默认内嵌的web服务器为tomcat,端口为8080。如果想修改内嵌tomcat插件的端口有三种方式:在application.properties里修改在java的启动脚本里设置在java代码里设置在application.properties设置server.port=9999 如果是application.yml,设置如下:server:  &nbs

Spring Boot:日志集成

Java日志框架Java有好几个日志框架,我们有时在选择Java日志框架时会有点迷惑。下面几个是常用的日志框架Common Logging:Apache最早提供的日志的门面接口。避免和具体的日志方案直接耦合。SLF4J(Simple Logging Facade for JAVA):和Common Logging一样,是一个门面框架,是对