消息的消费确认实现原理:
当消费者的消息消费异常时,消息进入延迟重试队列,待超时后重新发送到重试队列指定的死信队列,死信队列重新消费信息,如果又出现死信情况,继续进入延时重试队列,依次循环,当重试超过3次后,消息进入失败队列等待相应的消费者特殊处理或人工处理。
死信队列
死信队列中(dead letter)死信的消息来源:
消息被拒绝(basic.reject或basic.nack)并且requeue=false
消息TTL过期(x-message-ttl参数值即为超时时间)
.队列达到最大长度(队列满了,无法再添加数据到mq中)
普通队列指定死信队列的相关参数:
x-dead-letter-exchange: 用来设置死信后发送的交换机
x-dead-letter-routing-key:用来设置死信的routingKey
x-message-ttl: 单位毫秒,消息、队列的生命周期。超时后消息会变成dead message
延迟队列
延迟队列存储的对象为对应的延时消息,等待指定时间后,消费者才拿到这个消息进行消费。RabbitMQ本身没有直接支持的延迟队列功能,我们可以设置 x-message-ttl,来控制消息的生存时间,如果超时,则消息成为dead letter(死信),进入死信队列,将死信队列指定为普通的消费队列即可实现延迟消费。
声明交换器
邮件交换器 emailExchange
1 2 3 4
| @Bean public TopicExchange emailExchange() { return (TopicExchange)ExchangeBuilder.topicExchange("emailExchange").durable(true).build(); }
|
重试交换器 retryExchange
1 2 3 4
| @Bean public TopicExchange retryExchange() { return (TopicExchange)ExchangeBuilder.topicExchange("retryExchange").durable(true).build(); }
|
失败交换器 faildExchange
1 2 3 4
| @Bean public TopicExchange faildExchange() { return (TopicExchange)ExchangeBuilder.topicExchange("faildExchange").durable(true).build(); }
|
声明队列
1 2 3 4 5 6 7 8 9 10 11 12
|
private static final String X_DEAD_LETTER_EXCHANGE = "x-dead-letter-exchange";
private static final String X_DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";
private static final String X_MESSAGE_TTL = "x-message-ttl";
|
邮件队列 emailQueue
1 2 3 4
| @Bean public Queue emailQueue() { return new Queue("emailQueue"); }
|
失败队列 faildQueue
1 2 3 4
| @Bean public Queue faildQueue() { return QueueBuilder.durable("faildQueue").build(); }
|
绑定响应的交换器与队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @Bean public Binding topicQueueBinding(@Qualifier("emailQueue") Queue queue, @Qualifier("emailExchange") TopicExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("email.topic.*"); } @Bean public Binding retryDirectBinding(@Qualifier("retryQueue") Queue queue, @Qualifier("retryExchange") TopicExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("email.retry.*"); } @Bean public Binding failDirectBinding(@Qualifier("faildQueue") Queue queue, @Qualifier("faildExchange") TopicExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("email.faild.*"); }
|
配置生产者消息确认回调(具体处理逻辑需按业务情况,此处只输出了信息)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| @Bean public AmqpTemplate amqpTemplate(){ rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
System.out.println("------------------------------------------------"); System.out.println("ConfirmCallBackListener:correlationData=" + correlationData + ",ack=" + ack + ",cause=" + cause); System.out.println("------------------------------------------------");
});
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
System.out.println("------------------------------------------------"); System.out.println("ReturnCallBackListener:message=" + new String(message.getBody()) + ",replyCode=" + replyCode + ",replyText=" + replyText + ",exchange=" + exchange + ",routingKey=" + routingKey); System.out.println("------------------------------------------------");
});
return rabbitTemplate; }
|
生产者发送消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| package com.bhy702.website;
import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class) @SpringBootTest public class RabbitMQTest {
@Autowired private AmqpTemplate rabbitTemplate;
public static final String EXCHANGE = "emailExchange"; public static final String ROUTING_KEY = "email.topic.queue";
@Test public void test() {
for (int i = 100; i < 105; i++){ rabbitTemplate.convertAndSend(EXCHANGE,ROUTING_KEY,"message:"+(i+1)); } }
}
|
消费者消费消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
| package com.bhy702.website.common.rabbitmq;
import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.mail.javamail.JavaMailSender; import org.springframework.stereotype.Component; import java.io.IOException;
@Slf4j @Component public class RabbitConsumer2 {
@Autowired private JavaMailSender javaMailSender;
@Autowired private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = "emailQueue") public void sendMail(Message message, Channel channel) throws IOException {
try { int i = 1 / 0;
} catch (Exception e) {
long retryCount = getRetryCount(message.getMessageProperties());
if (retryCount >= 3) {
try { rabbitTemplate.convertAndSend("faildExchange","email.faild.myfaild", message); log.info("消费者消费消息在重试3次后依然失败,将消息发送到faild队列,发送消息:" + new String(message.getBody())); } catch (Exception e1) { log.error("消息在发送到faild队列的时候报错:" + e1.getMessage() + ",原始消息:"+ new String(message.getBody())); }
} else {
try { rabbitTemplate.convertAndSend("retryExchange", "email.retry.myRetry", message); log.info("消费者消费失败,消息发送到重试队列;" + "原始消息:" + new String(message.getBody()) + ";第" + (retryCount+1) + "次重试"); } catch (Exception e1) { log.error("消息发送到重试队列的时候,异常了:" + e1.getMessage() + ",重新发送消息"); } } } finally {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }
public long getRetryCount(MessageProperties messageProperties) { Long retryCount = 0L; if (null != messageProperties) { List> deaths = messageProperties.getXDeathHeader(); if(deaths != null && deaths.size()>0){ Map death = (Map)deaths.get(0); retryCount = (Long) death.get("count"); }
} return retryCount; }
}
|
properties配置文件中RabbitMQ相关配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| spring.application.name=rabbitMQ-test spring.rabbitmq.host=RabbitMQ服务器的IP地址 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/
# 开启发布者发送确认 spring.rabbitmq.publisher-confirms=true # 开启发布者发送失败退回 spring.rabbitmq.publisher-returns=true
# 开启ACK手动确认 spring.rabbitmq.listener.simple.acknowledge-mode=manual spring.rabbitmq.listener.direct.acknowledge-mode=manual
# 参数设为 true 时,交换器无法根据自身的类型和路由键找到一个符合条件的队列, # 那么 RabbitMQ 会调用 Basic.Return 命令将消息返回给生产者 spring.rabbitmq.template.mandatory=true
# 开启rabbitmq的生产端重试机制,默认是false,默认重试3次 spring.rabbitmq.template.retry.enabled=true
# 开启rabbitmq的消费端重试机制,默认是false,默认重试3次 # spring.rabbitmq.listener.simple.retry.enabled=true # 设置重试的次数 # spring.rabbitmq.listener.simple.retry.max-attempts=5
|
测试效果展示:
消息进入emailQueue邮件队列后,出现消费异常会进入retryQueue重试队列等待延迟重试
消费异常的消息全部进入重试队列等待延迟重试,30秒后消息超时会转发到指定的死信队列,也就是回到emailQueue邮件队列
经过3次重试后,消息仍然未被消费,消息进入faildQueue失败队列等待相应的消费者特殊处理或人工处理
当前只解决了消息的可靠性消费,要保证消息全部投递成功,就涉及到RabbitMQ的可靠性投递。
RabbitMQ消息的可靠性投递与消费的完整解决方案- -可参考另一篇博客 点这里