消息的消费确认实现原理:

当消费者的消息消费异常时,消息进入延迟重试队列,待超时后重新发送到重试队列指定的死信队列,死信队列重新消费信息,如果又出现死信情况,继续进入延时重试队列,依次循环,当重试超过3次后,消息进入失败队列等待相应的消费者特殊处理或人工处理。

死信队列

死信队列中(dead letter)死信的消息来源:

  1. 消息被拒绝(basic.reject或basic.nack)并且requeue=false

  2. 消息TTL过期(x-message-ttl参数值即为超时时间)

  3. .队列达到最大长度(队列满了,无法再添加数据到mq中)

普通队列指定死信队列的相关参数:

  • x-dead-letter-exchange: 用来设置死信后发送的交换机

  • x-dead-letter-routing-key:用来设置死信的routingKey

  • x-message-ttl: 单位毫秒,消息、队列的生命周期。超时后消息会变成dead message

延迟队列

延迟队列存储的对象为对应的延时消息,等待指定时间后,消费者才拿到这个消息进行消费。RabbitMQ本身没有直接支持的延迟队列功能,我们可以设置 x-message-ttl,来控制消息的生存时间,如果超时,则消息成为dead letter(死信),进入死信队列,将死信队列指定为普通的消费队列即可实现延迟消费。


声明交换器

  • 邮件交换器 emailExchange

    1
    2
    3
    4
    @Bean
    public TopicExchange emailExchange() {
    return (TopicExchange)ExchangeBuilder.topicExchange("emailExchange").durable(true).build();
    }
  • 重试交换器 retryExchange

    1
    2
    3
    4
    @Bean
    public TopicExchange retryExchange() {
    return (TopicExchange)ExchangeBuilder.topicExchange("retryExchange").durable(true).build();
    }
  • 失败交换器 faildExchange

    1
    2
    3
    4
    @Bean
    public TopicExchange faildExchange() {
    return (TopicExchange)ExchangeBuilder.topicExchange("faildExchange").durable(true).build();
    }

声明队列

1
2
3
4
5
6
7
8
9
10
11
12
/**
* 死信队列 交换机标识符
*/
private static final String X_DEAD_LETTER_EXCHANGE = "x-dead-letter-exchange";
/**
* 死信队列交换机routing-key标识符
*/
private static final String X_DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";
/**
* 死信队列消息的超时时间枚举
*/
private static final String X_MESSAGE_TTL = "x-message-ttl";
  • 邮件队列 emailQueue

    1
    2
    3
    4
    @Bean
    public Queue emailQueue() {
    return new Queue("emailQueue");
    }
  • 重试队列 retryQueue(此处我指定了其死信队列为emailQueue队列)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    @Bean
    public Queue retryQueue() {
    Map args = new ConcurrentHashMap<>(3);
    // 将消息重新投递到emailExchange中
    args.put(DEAD_LETTER_QUEUE_KEY, "emailExchange");
    args.put(DEAD_LETTER_ROUTING_KEY, "email.topic.retry");
    //消息在队列中延迟30s后超时,消息会重新投递到x-dead-letter-exchage对应的队列中,routingkey为自己指定
    args.put(X_MESSAGE_TTL, 30 * 1000);
    return QueueBuilder.durable("retryQueue").withArguments(args).build();
    }
  • 失败队列 faildQueue

    1
    2
    3
    4
    @Bean
    public Queue faildQueue() {
    return QueueBuilder.durable("faildQueue").build();
    }

绑定响应的交换器与队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Bean
public Binding topicQueueBinding(@Qualifier("emailQueue") Queue queue,
@Qualifier("emailExchange") TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("email.topic.*");
}
@Bean
public Binding retryDirectBinding(@Qualifier("retryQueue") Queue queue,
@Qualifier("retryExchange") TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("email.retry.*");
}
@Bean
public Binding failDirectBinding(@Qualifier("faildQueue") Queue queue,
@Qualifier("faildExchange") TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("email.faild.*");
}

配置生产者消息确认回调(具体处理逻辑需按业务情况,此处只输出了信息)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Bean
public AmqpTemplate amqpTemplate(){
// 在消息没有被路由到合适的队列情况下,Broker会将消息返回给生产者,
// 为true时如果Exchange根据类型和消息Routing Key无法路由到一个合适的Queue存储消息,
// Broker会调用Basic.Return回调给handleReturn(),再回调给ReturnCallback,将消息返回给生产者。
// 为false时,丢弃该消息
rabbitTemplate.setMandatory(true);

// 消息确认,需要配置 spring.rabbitmq.publisher-confirms = true
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {

//根据返回的状态,生产者可以处理失败与成功的相应信息,比如发送失败,可重发,转发或者存入日志等
//if(ack){
// correlationData.getId()为message唯一标识,需要生产者发送message时传入自定义的correlationData才能获取到,否则为null
// //do something
//}else{
// correlationData.getId()
// //do something
//}

//此处只做打印,不对生产者发送失败的信息处理
System.out.println("------------------------------------------------");
System.out.println("ConfirmCallBackListener:correlationData=" + correlationData + ",ack=" + ack + ",cause=" + cause);
System.out.println("------------------------------------------------");

});

// 消息发送失败返回到队列中,需要配置spring.rabbitmq.publisher-returns = true
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {

System.out.println("------------------------------------------------");
System.out.println("ReturnCallBackListener:message=" + new String(message.getBody()) + ",replyCode=" + replyCode + ",replyText=" + replyText + ",exchange=" + exchange + ",routingKey=" + routingKey);
System.out.println("------------------------------------------------");

});

return rabbitTemplate;
}

生产者发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.bhy702.website;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

/**
* @author: Hongyuan Bai
* @create: 2019-06-27 13:45:43
* @description:
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitMQTest {

@Autowired
private AmqpTemplate rabbitTemplate;

public static final String EXCHANGE = "emailExchange";
public static final String ROUTING_KEY = "email.topic.queue";

@Test
public void test() {

for (int i = 100; i < 105; i++){
rabbitTemplate.convertAndSend(EXCHANGE,ROUTING_KEY,"message:"+(i+1));
}
}

}

消费者消费消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package com.bhy702.website.common.rabbitmq;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.stereotype.Component;
import java.io.IOException;


/**
* @author: Hongyuan Bai
* @create: 2019-06-27 13:38:46
* @description:
*/
@Slf4j
@Component
public class RabbitConsumer2 {

@Autowired
private JavaMailSender javaMailSender;

@Autowired
private RabbitTemplate rabbitTemplate;

@RabbitListener(queues = "emailQueue")
public void sendMail(Message message, Channel channel) throws IOException {

try {
/** 手动抛出异常,测试消息重试 */
int i = 1 / 0;

} catch (Exception e) {

long retryCount = getRetryCount(message.getMessageProperties());

if (retryCount >= 3) {

/** 重试次数超过3次,则将消息发送到失败队列等待特定消费者处理或者人工处理 */
try {
rabbitTemplate.convertAndSend("faildExchange","email.faild.myfaild", message);
log.info("消费者消费消息在重试3次后依然失败,将消息发送到faild队列,发送消息:" + new String(message.getBody()));
} catch (Exception e1) {
log.error("消息在发送到faild队列的时候报错:" + e1.getMessage() + ",原始消息:"+ new String(message.getBody()));
}

} else {

try {
/** 重试次数不超过3次,则将消息发送到重试队列等待重新被消费(重试队列延迟超时后信息被发送到相应死信队列重新消费,即延迟消费)*/
rabbitTemplate.convertAndSend("retryExchange", "email.retry.myRetry", message);
log.info("消费者消费失败,消息发送到重试队列;" + "原始消息:" + new String(message.getBody()) + ";第" + (retryCount+1) + "次重试");
} catch (Exception e1) {
log.error("消息发送到重试队列的时候,异常了:" + e1.getMessage() + ",重新发送消息");
}
}
} finally {
/**
* 无论消费成功还是消费失败,都要手动进行ack,因为即使消费失败了,也已经将消息重新投递到重试队列或者失败队列
* 如果不进行ack,生产者在超时后会进行消息重发,如果消费者依然不能处理,则会存在死循环
*/
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}

/**
* 获取消息被重试的次数
*/
public long getRetryCount(MessageProperties messageProperties) {
Long retryCount = 0L;
if (null != messageProperties) {
List> deaths = messageProperties.getXDeathHeader();
if(deaths != null && deaths.size()>0){
Map death = (Map)deaths.get(0);
retryCount = (Long) death.get("count");
}

}
return retryCount;
}

}

properties配置文件中RabbitMQ相关配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
spring.application.name=rabbitMQ-test
spring.rabbitmq.host=RabbitMQ服务器的IP地址
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/

# 开启发布者发送确认
spring.rabbitmq.publisher-confirms=true
# 开启发布者发送失败退回
spring.rabbitmq.publisher-returns=true

# 开启ACK手动确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.direct.acknowledge-mode=manual

# 参数设为 true 时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,
# 那么 RabbitMQ 会调用 Basic.Return 命令将消息返回给生产者
spring.rabbitmq.template.mandatory=true

# 开启rabbitmq的生产端重试机制,默认是false,默认重试3次
spring.rabbitmq.template.retry.enabled=true

# 开启rabbitmq的消费端重试机制,默认是false,默认重试3次
# spring.rabbitmq.listener.simple.retry.enabled=true
# 设置重试的次数
# spring.rabbitmq.listener.simple.retry.max-attempts=5

测试效果展示:

消息进入emailQueue邮件队列后,出现消费异常会进入retryQueue重试队列等待延迟重试

在这里插入图片描述

消费异常的消息全部进入重试队列等待延迟重试,30秒后消息超时会转发到指定的死信队列,也就是回到emailQueue邮件队列

在这里插入图片描述

经过3次重试后,消息仍然未被消费,消息进入faildQueue失败队列等待相应的消费者特殊处理或人工处理

在这里插入图片描述


当前只解决了消息的可靠性消费,要保证消息全部投递成功,就涉及到RabbitMQ的可靠性投递。
RabbitMQ消息的可靠性投递与消费的完整解决方案- -可参考另一篇博客 点这里