Spring Boot 集成RabbitMQ

此处假设已经安装好了RabbitMQ,主要讲述使用Spring Boot如何集成RabbitMQ。

添加依赖

在Maven的pom.xml添加rabbitmq的starter依赖,内容如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.sivalabs</groupId>
    <artifactId>springboot-rabbitmq-demo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.0.RC1</version>
        <relativePath/>
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    </dependencies>

</project>

RabbitMQ配置

RabbitMQ需要配置Queue,Exchange和Binding响应的Bean。

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig  
{
    public static final String QUEUE_ORDERS = "orders-queue";
    public static final String EXCHANGE_ORDERS = "orders-exchange";

    @Bean
    Queue ordersQueue() {
        return QueueBuilder.durable(QUEUE_ORDERS).build();
    }

    @Bean
    Queue deadLetterQueue() {
        return QueueBuilder.durable(QUEUE_DEAD_ORDERS).build();
    }

    @Bean
    Exchange ordersExchange() {
        return ExchangeBuilder.topicExchange(EXCHANGE_ORDERS).build();
    }

    @Bean
    Binding binding(Queue ordersQueue, TopicExchange ordersExchange) {
        return BindingBuilder.bind(ordersQueue).to(ordersExchange).with(QUEUE_ORDERS);
    }
}

例子里定义了名为orders-queue的队列和名为orders-exchange的Exchange,以及定义了orders-queue与orders-exchange的绑定,其中设定router key为orders-queue。任何以orders-queue为router key都会发送到队列orders-queue。

在application.properties配置RabbitMQ服务器信息:

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

环境已经配置好,接着是应用。应用分为两部分:发送方OrderMessageSender 和接收方OrderMessageListener 。

消息发送OrderMessageSender

这里新建一个Bean OrderMessageSender作为发送方,它将用来发送消息到orders-exchange。

模型Order

public class Order implements Serializable {
    private String orderNumber;
    private String productId;
    private double amount;

    //setters & getters
}

用于传递数据的数据模型需要序列化。

RabbitTemplate 

OrderMessageSender主要是使用RabbitTemplate来发送消息。

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class OrderMessageSender {
    private final RabbitTemplate rabbitTemplate;

    @Autowired
    public OrderMessageSender(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    public void sendOrder(Order order) {
        this.rabbitTemplate.convertAndSend(RabbitConfig.QUEUE_ORDERS, order);
    }
}

调用rabbitTemplate.convertAndSend()发送消息,它接收两个参数:

  • routerKey:第一个参数路由的键,此router key为配置里定义绑定时设置的值。
  • data:第二个参数为发送消息载体

默认情况下,Spring Boot使用org.springframework.amqp.support.converter.SimpleMessageConverter来序列化数据为字节码。

监听消息OrderMessageListener

接着是在消息消费端实现消息的监听,以便接收消息。这里订单消息监听类为OrderMessageListener 。

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class OrderMessageListener {

    static final Logger logger = LoggerFactory.getLogger(OrderMessageListener.class);

    @RabbitListener(queues = RabbitConfig.QUEUE_ORDERS)
    public void processOrder(Order order) {
        logger.info("Order Received: "+order);
    }
}

在相应的处理方法上添加注解@RabbitListener,并指定监听的队列。

至此就完成了一个简单消息的发送和接收。

版权声明:著作权归作者所有。

相关推荐

Spring Cloud Stream集成Kafka

这里演示使用Spring Boot ,Spring Cloud集成Kafka来实现一个简单的实时流系统。添加依赖可以在https://start.spring.io创建一个基于spring boot的maven项目。需要添加的主要依赖:spring-cloud-stream以及spring-cloud-starter-stream-kafka,如下:<

Spring Boot集成RabbitMQ发送接收JSON

在Spring Boot 集成RabbitMQ一文中介绍了如何集成RabbitMQ。默认情况下发送的消息是转换为字节码,这里介绍一下如何发送JSON数据。ObjectMapper最简单发送JSON数据的方式是把对象使用ObjectMapper等JSON工具类把对象转换为JSON格式,然后发送。如下:@Autowired private ObjectMapper&nbs

Spring Boot设置服务器的端口

Spring Boot默认内嵌的web服务器为tomcat,端口为8080。如果想修改内嵌tomcat插件的端口有三种方式:在application.properties里修改在java的启动脚本里设置在java代码里设置在application.properties设置server.port=9999 如果是application.yml,设置如下:server:  &nbs

Spring Boot集成kafka

Spring Boot对kafka提供了自动配置(auto configuration)。使用用Spring Boot只需要做很少的配置即可集成对kafka的访问。pom配置1、继承spring-boot-starter-parent<parent>   <groupId>org.springframework.boot&l

Spring Boot:日志集成

Java日志框架Java有好几个日志框架,我们有时在选择Java日志框架时会有点迷惑。下面几个是常用的日志框架Common Logging:Apache最早提供的日志的门面接口。避免和具体的日志方案直接耦合。SLF4J(Simple Logging Facade for JAVA):和Common Logging一样,是一个门面框架,是对