1. 消息中间件简介
  1. 消息即是信息的载体,为了方便消息的生产者和消费者都能明白,它在传递时需要一定的数据格式(即消息协议),根据消息送达的实时性,它分为即时消息和延迟消息两类。
  2. 即时消息,关注消息送达的实时性,如HTTP、RPC请求等
  3. 延迟消息,即消息从某一端发出后,首先进入一个容器进行临时存储,当达到某种条件后,再由这个容器发送给另一端。 这个容器的一种具体实现就是消息队列
  4. 实现消息队列的组件称之为消息中间件。对于消息中间件,关注于数据的发送和接收,利用高效可靠的异步消息传递机制集成分布式系统,也通常用于服务与服务解耦。常见的角色大致也就有Producer(生产者发送消息)、Consumer(消费者订阅消息)在这里插入图片描述
    2. RabbitMQ简介
    RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而群集和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。
    详见RabbitMQ官网:点这里
    3. RabbitMQ应用场景
  5. 异步处理
  6. 应用解耦
  7. 流量削锋
  8. 日志处理
  9. 消息通讯
    详细应用场景介绍:点这里
    4. RabbitMQ消息类型
    • simple,简单模式
      一条消息对应一个消费者
      在这里插入图片描述
    • work,工作模式
      一条消息可以被多个消费者尝试接,但是最终只能有一个消费者能获取
      在这里插入图片描述
    • Pub/Sub,订阅模式
      一条消息可以被多个消费者同时获取,
      生产者将消息发送到交换机,消费者将自己对应的队列注册到交换机,
      当发送消息后,所有注册的队列的消费者都可以收到消息
      在这里插入图片描述
    • routing,路由模式
      生产者将消息发送到了 type 为 direct 模式的交换机
      消费者的队列再将自己绑定到路由的时候会给自己绑定一个 key
      只有消费者发送对应 key 格式的消息时候 队列才会收到消息
      在这里插入图片描述
    • topic,通配符模式
      将路由键和某模式进行匹配。
      此时队列需要绑定到一个模式上。
      符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。
      因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”
      在这里插入图片描述
    • RPC模式
      客户端启动后,创建一个独有的回调队列,某个应用将请求信息交给客户端,然后客户端发送RPC请求,在发送RPC请求到RPC请求队列时,客户端至少发送带有reply_to以及correlation_id两个属性的信息。服务器端等待接受客户端发来RPC请求,当请求出现的时候,服务器从RPC请求队列中取出请求,然后处理后,将响应发送到reply_to指定的回调队列中,客户端等待回调队列中出现响应,当响应出现时,它会根据响应中correlation_id字段的值,将其返回给对应的应用
      在这里插入图片描述
      5. RabbitMQ的两种应答模式
      当我们发送消息后,服务端如何知道消息已经被消费,在RabbitMQ里有两种模式:
  • 自动模式。不管消费者获取到消息后是否是成功处理消息,服务端都认为是成功的

  • 手动模式。消费者获取到消息后,服务器会将消息标记为不可用,等待消费者反馈,如果不反馈,则一直标记为不可用

开启手动应答,默认为自动应答

1
2
3
# 开启ACK手动确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.direct.acknowledge-mode=manual
6. rabbitmq channel参数详解
  • channel.exchangeDeclare()
    1
    Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete,Map<String, Object> arguments) throws IOException;

type:有direct、fanout、topic三种
durable:true、false true:服务器重启会保留下来Exchange,不保证重启后消息还在
autoDelete:true、false.true:当已经没有消费者时,服务器是否可以删除该Exchange

  • channel.basicPublish()
    1
    void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;

在topic exchange做消息转发用

  • channel.basicAck()
    1
    void basicAck(long deliveryTag, boolean multiple) throws IOException;

deliveryTag:该消息的index
multiple:是否批量,true:将一次性ack所有小于deliveryTag的消息。

  • channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true)
    1
    2
    void basicNack(long deliveryTag, boolean multiple, boolean requeue)
    throws IOException;

deliveryTag:该消息的index
multiple:是否批量.true:将一次性拒绝所有小于deliveryTag的消息。
requeue:被拒绝的是否重新入队列

  • channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false)
    1
    void basicReject(long deliveryTag, boolean requeue) throws IOException;

deliveryTag:该消息的index
requeue:被拒绝的是否重新入队列

channel.basicNack 与 channel.basicReject 的区别在于basicNack可以拒绝多条消息,而basicReject一次只能拒绝一条消息


7. 简单应用(topic模式)
  • 配置交换器Exchange(topic模式)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    /**
    * 声明交换机,支持持久化.
    * rabbitmq常用几种exchange,比如direct, fanout, topic,可根据具体业务需求配置
    * 命名规范参考 scm3.services,scm3.services.retry,scm3.services.failed
    * @return the exchange
    */
    @Bean
    public TopicExchange emailExchange() {
    return (TopicExchange)ExchangeBuilder.topicExchange("emailExchange").durable(true).build();
    }
  • 配置队列Queue

    1
    2
    3
    4
    5
    6
    7
    /**
    * 声明队列,默认持久化
    */
    @Bean
    public Queue emailQueue() {
    return new Queue("emailQueue");
    }
  • 绑定交换器与队列

    1
    2
    3
    4
    5
    6
    7
    8
    /**
    * 绑定交换器与队列
    */
    @Bean
    public Binding topicQueueBinding(@Qualifier("emailQueue") Queue queue,
    @Qualifier("emailExchange") TopicExchange exchange) {
    return BindingBuilder.bind(queue).to(exchange).with("email.topic.*");
    }
  • 生产者发送消息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    package com.bhy702.website;

    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;

    /**
    * @author: Hongyuan Bai
    * @create: 2019-06-27 13:45:43
    * @description:
    */
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class RabbitMQTest {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public static final String EXCHANGE = "emailExchange";
    public static final String ROUTING_KEY = "email.topic.queue";

    @Test
    public void test() {

    for (int i = 0; i < 5; i++){
    rabbitTemplate.convertAndSend(EXCHANGE,ROUTING_KEY,"message:"+(i+1));
    }
    }

    }
  • 消费者处理消息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    package com.bhy702.website.common.rabbitmq;

    import com.rabbitmq.client.Channel;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.mail.javamail.JavaMailSender;
    import org.springframework.stereotype.Component;
    import java.io.IOException;


    /**
    * @author: Hongyuan Bai
    * @create: 2019-06-27 13:38:46
    * @description:
    */
    @Slf4j
    @Component
    public class RabbitConsumer2 {

    @Autowired
    private JavaMailSender javaMailSender;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = "emailQueue")
    public void sendMail(Message message, Channel channel) throws IOException {
    try {
    //消费者处理信息
    System.out.println(message.getBody());
    //处理完成手动应答
    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    } catch (Exception e) {
    //拒绝信息,重新加入队列
    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
    //拒绝信息,丢弃,视场景而定
    //channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
    e.printStackTrace();
    }
    }

    }

8. RabbitMQ生产者消息确认

rabbitMQ有重试机制,配置properties文件可开启

1
2
# 开启rabbitmq的生产端重试机制,默认是false,默认重试3次
spring.rabbitmq.template.retry.enabled=true

当生产者发送消息给rabbitmq服务器时,消息是否真正的到达了服务器?为了保证生产者发送的消息能够可靠的发送到服务器(即消息落地),rabbitmq提供了两种方式:

  • 通过事务实现
  • 通过发送方确认机制(publisher confirm)实现

由于事务方式非常影响rabbitmq的性能,所以这里介绍第二种实现

ConfirmCallback接口

ConfirmCallback是一个回调接口,消息发送到 Broker 后触发回调,确认消息是否到达 Broker 服务器,也就是只确认是否正确到达 Exchange 中。

ReturnCallback接口

通过实现 ReturnCallback 接口,启动消息失败返回,此接口是在交换器路由不到队列时触发回调。

先配置properties配置文件,开启回调

1
2
3
4
# 开启生产者发送确认(ConfirmCallback接口)
spring.rabbitmq.publisher-confirms=true
# 开启生产者发送失败退回(ReturnCallback接口)
spring.rabbitmq.publisher-returns=true

在RabbitMQConfig中配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Bean
public AmqpTemplate amqpTemplate(){

// 必须开启回调才会生效
rabbitTemplate.setMandatory(true);

// 消息确认,需要配置 spring.rabbitmq.publisher-confirms = true
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {

//根据返回的状态,生产者可以处理失败与成功的相应信息,比如发送失败,可重发,转发或者存入日志等
//if(ack){
// correlationData.getId()为message唯一标识,需要生产者发送message时传入自定义的correlationData才能获取到,否则为null
// //do something
//}else{
// correlationData.getId()
// //do something
//}

//此处只做打印,不对生产者发送失败的信息处理
System.out.println("------------------------------------------------");
System.out.println("ConfirmCallBackListener:correlationData=" + correlationData + ",ack=" + ack + ",cause=" + cause);
System.out.println("------------------------------------------------");

});

// 消息发送失败返回到队列中,需要配置spring.rabbitmq.publisher-returns = true
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {

System.out.println("------------------------------------------------");
System.out.println("ReturnCallBackListener:message=" + new String(message.getBody()) + ",replyCode=" + replyCode + ",replyText=" + replyText + ",exchange=" + exchange + ",routingKey=" + routingKey);
System.out.println("------------------------------------------------");

});

return rabbitTemplate;
}

9. Spring Boot连接RabbitMQ集群
1
spring.rabbitmq.host = ip1:port1,ip2:port2,ip3:port3

要保证消息全部被消费者消费,就涉及到RabbitMQ的消息重试,死信队列,延迟重试队列等,可参考另一篇博客点这里