由于网络闪断、MQ Broker端异常等原因可能导致回送confirmCallback的ask消息失败或者异常,无法确认数据是否真的已经成功到达,导致消息丢失。

解决思路

  • 判断confirmCallback()消息发送成功与否,成功则修改redis中correlationDataId对应的message的message.getMessageProperties().getHeaders()中存入的自定义标识is_success_send_to_exchange为true,更新失败记录表;失败则判断投递重试次数的自定义记录变量failed_count_for_send_to_exchange是否大于等于3;如果小于3则将标识符设为false,重试次数+1,更新message的缓存,发送message到重试队列,否则清除缓存,直接发送到失败队列处理。
  • 使用定时任务,定时扫描缓存的数据,将is_success_send_to_exchange不为true的消息进行重发。
  • 消费端消费前读取缓存,查看消息是否存在,不存在则直接return,存在则消费,消费成功删除缓存数据,实现消费的幂等。

跟踪convertAndSend方法中的correlationData,发现在RabbitTemplate类中调用了setupConfirm方法,将correlationData的id赋值给了Message的MessageProperties中的headers集合,所以我们可以通过message获取到correlationData的id,而不仅限在confirmCallback()中。

setupConfirm方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private void setupConfirm(Channel channel, Message message, @Nullable CorrelationData correlationDataArg) {
if ((this.publisherConfirms || this.confirmCallback != null) && channel instanceof PublisherCallbackChannel) {
PublisherCallbackChannel publisherCallbackChannel = (PublisherCallbackChannel)channel;
CorrelationData correlationData = this.correlationDataPostProcessor != null ? this.correlationDataPostProcessor.postProcess(message, correlationDataArg) : correlationDataArg;
long nextPublishSeqNo = channel.getNextPublishSeqNo();
message.getMessageProperties().setPublishSequenceNumber(nextPublishSeqNo);
publisherCallbackChannel.addPendingConfirm(this, nextPublishSeqNo, new PendingConfirm(correlationData, System.currentTimeMillis()));
//此处将correlationDataId赋值给了Message的MessageProperties中的headers集合
if (correlationData != null && StringUtils.hasText(correlationData.getId())) {
message.getMessageProperties().setHeader("spring_returned_message_correlation", correlationData.getId());
}
} else if (channel instanceof ChannelProxy && ((ChannelProxy)channel).isConfirmSelected()) {
long nextPublishSeqNo = channel.getNextPublishSeqNo();
message.getMessageProperties().setPublishSequenceNumber(nextPublishSeqNo);
}

}
setHeader(String key, Object value)
1
2
3
public void setHeader(String key, Object value) {
this.headers.put(key, value);
}
获取Message中存入的correlationData的id
1
String correlationDataId = (String)message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");

注意:使用message.getMessageProperties().getCorrelationId()获取的并不是correlationData的id,而是MessageProperties类自带的普通变量

所以如果想要给message全局标识或记录,可以将标识以key-value的形式存入Message.getMessageProperties()的headers这个Map集合中
如投递成功后写入投递标识 “is_success_send_to_exchange”: message.getMessageProperties().getHeaders().put("is_success_send_to_exchange",true);

获取消息消费重试的次数,Message的MessageProperties可以获取到
1
2
3
4
5
6
7
8
9
10
11
12
13
public static long getRetryCount(MessageProperties messageProperties) {
Long retryCount = 0L;
if (null != messageProperties) {

List<Map<String, ?>> deaths = messageProperties.getXDeathHeader();
if(deaths != null && deaths.size()>0){
Map<String, Object> death = (Map<String, Object>)deaths.get(0);
retryCount = (Long) death.get("count");
}

}
return retryCount;
}

存入headers集合中的相关自定义变量说明
  • is_success_send_to_exchange 布尔类型, 投递成功标识,默认不含此变量,回调时写入
  • failed_count_for_send_to_exchange int类型, 投递失败次数,默认不含此变量,每次失败时写入记录次数

例如:

1
message.getMessageProperties().getHeaders().put("is_success_send_to_exchange",true);

交换器、队列声明绑定

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
/**
* 声明交换机,支持持久化.
* rabbitmq常用几种exchange,比如direct, fanout, topic,可根据具体业务需求配置
* 命名规范参考 scm3.services,scm3.services.retry,scm3.services.failed
* @return the exchange
*/
@Bean
public TopicExchange emailExchange() {
return (TopicExchange)ExchangeBuilder.topicExchange("emailExchange").durable(true).build();
}
@Bean
public TopicExchange retryExchange() {
return (TopicExchange)ExchangeBuilder.topicExchange("retryExchange").durable(true).build();
}
@Bean
public TopicExchange failedExchange() {
return (TopicExchange)ExchangeBuilder.topicExchange("failedExchange").durable(true).build();
}

@Bean
public Queue emailQueue() {
return new Queue("emailQueue");
}
@Bean
public Queue retryQueue() {
Map<String, Object> args = new ConcurrentHashMap<>(3);
// 将消息重新投递到exchange中
args.put(DEAD_LETTER_QUEUE_KEY, "emailExchange");
args.put(DEAD_LETTER_ROUTING_KEY, "email.topic.retry");
//在队列中延迟30s后,消息重新投递到x-dead-letter-exchage对应的队列中,routingkey是自己配置的
args.put(X_MESSAGE_TTL, 30 * 1000);
return QueueBuilder.durable("retryQueue").withArguments(args).build();
}
/**
* 失败队列
*
* @return
*/
@Bean
public Queue failedQueue() {
return QueueBuilder.durable("failedQueue").build();
}



@Bean
public Binding topicQueueBinding(@Qualifier("emailQueue") Queue queue,
@Qualifier("emailExchange") TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("email.topic.*");
}
@Bean
public Binding retryDirectBinding(@Qualifier("retryQueue") Queue queue,
@Qualifier("retryExchange") TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("email.retry.*");
}
@Bean
public Binding failDirectBinding(@Qualifier("failedQueue") Queue queue,
@Qualifier("failedExchange") TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("email.failed.*");
}

发送邮件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Map messageMap = new HashMap();
messageMap.put("email","1956969397@qq.com");
messageMap.put("subject","【个人网站通知】:老大,有新的留言信息通知~~");
messageMap.put("content","【个人网站通知】content内容。。。。。");

MessageConverter converter = rabbitTemplate.getMessageConverter();
MessageProperties messageProperties = new MessageProperties();
//存入数据库中message表中对应的消息id,方便失败队列处理消息时写入failed_message表时关联message表
messageProperties.setMessageId(message.getId().toString());
org.springframework.amqp.core.Message message = converter
.toMessage(messageMap, messageProperties);
//生成消息的唯一性标识
String correlationDataId = UUID.randomUUID().toString().replaceAll("-","");
log.info("++++++ 存储message到redis缓存中!{}",correlationDataId);
redisTemplate.opsForValue().set("CORRELATION_DATA_ID:"+correlationDataId,JSONObject.toJSONString(message));
log.info("++++++ 生产者发送message!{}",message);
rabbitTemplate.convertAndSend("emailExchange","email.topic.leave",message,new CorrelationData(correlationDataId));

可靠性投递

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
@Bean
public RabbitTemplate rabbitTemplate(){

RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);

// 必须开启回调才会生效
// rabbitTemplate.setMandatory(true);

// 消息确认,需要配置 spring.rabbitmq.publisher-returns = true
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {

log.info("++++++ 进入回调setConfirmCallback()");
if(correlationData != null){

//message缓存的key值
String messageKey = "CORRELATION_DATA_ID:"+correlationData.getId();

if(redisTemplate.hasKey(messageKey)){

String messageMap = redisTemplate.opsForValue().get(messageKey);
Message message = JSONObject.parseObject(messageMap,Message.class);
Map<String, Object> headers = message.getMessageProperties().getHeaders();

if( !headers.containsKey("is_success_send_to_exchange") || !(Boolean) headers.get("is_success_send_to_exchange")){
if(ack){
//标识投递成功
headers.put("is_success_send_to_exchange",true);
//更新message的redis缓存
redisTemplate.opsForValue().set(messageKey,JSONObject.toJSONString(message));

//更新failed_message表中的消息投递失败记录
FailedMessage failedMessage = new FailedMessage();
failedMessage.setMessageId(Integer.parseInt(message.getMessageProperties().getMessageId()));
failedMessage.setStatus("failed");
//查询数据库失败的信息
FailedMessage failedMessage1 = failedMessageMapper.selectOne(failedMessage);
if(failedMessage1 != null){
failedMessage1.setStatus("success");
failedMessage1.setStatusTime(new Date());
failedMessageMapper.updateByPrimaryKey(failedMessage1);
}

log.info("++++++ 消息投递成功,更新redis,更新失败记录数据库:{}",message);

}else{

log.info("++++++ 消息投递失败:{}",message);

if(!headers.containsKey("is_success_send_to_exchange")){
headers.put("is_success_send_to_exchange",false);
}

log.info("ConfirmCallback消息发送失败,id:{},原因:{}",correlationData.getId(),cause);
int count ;
if(headers.containsKey("failed_count_for_send_to_exchange")){
count = (int) headers.get("failed_count_for_send_to_exchange");
}else{
count = 0;
}

if(count >= 3){

//清除message缓存
redisTemplate.delete("CORRELATION_DATA_ID:"+correlationData.getId());

log.info("ConfirmCallback消息发送失败三次,id:{},停止重试",correlationData.getId(),cause);
rabbitTemplate.convertAndSend("failedExchange","email.failed.deliver", message);

return;
}

count++;

headers.put("failed_count_for_send_to_exchange",count);

redisTemplate.opsForValue().set(messageKey,JSONObject.toJSONString(message));
log.info("++++++ 生产者投递消息重试中:{}次,{}",count,message);
// 重发的时候到redis里面取,消费成功了,删除redis里面的msgId
rabbitTemplate.convertAndSend("emailExchange", "email.topic.retry",message, correlationData);
}
}
}

}

});

// 消息发送失败返回到队列中,需要配置spring.rabbitmq.publisher-confirms = true
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.info("ReturnCallback消息发送队列不可达,message:{},exchange:{},routingKey:{},原因:{}",message,exchange,routingKey,replyText);
});

return rabbitTemplate;
}

可靠性消费,消费幂等

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
@RabbitListener(queues = "emailQueue")
public void sendMail(Message message, Channel channel) throws IOException {

//获取correlationDataId
String correlationDataId = (String)message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");
//message缓存的key值
String messageKey = "CORRELATION_DATA_ID:"+correlationDataId;
//消息消费
try {
//消息幂等处理
if(!redisTemplate.hasKey(messageKey)){
return;
}
//获取消息内容
Map<String,String> map = (Map) SerializationUtils.deserialize(message.getBody());
//调用邮件发送接口消费消息
SendToMail.sendHtmlMail(javaMailSender, map.get("email"), map.get("subject"), map.get("content"));

//模拟消息消费完成
log.info("++++++ 消息消费成功:{}",correlationDataId);

log.info("++++++ 清除message缓存:{}",correlationDataId);
//清除message缓存
redisTemplate.delete(messageKey);

//更新failed_message中消息消费失败记录
FailedMessage failedMessage = new FailedMessage();
failedMessage.setMessageId(Integer.parseInt(message.getMessageProperties().getMessageId()));
failedMessage.setStatus("failed");
//查询数据库失败的信息
FailedMessage failedMessage1 = failedMessageMapper.selectOne(failedMessage);
if(failedMessage1 != null){
failedMessage1.setStatus("success");
failedMessage1.setStatusTime(new Date());
failedMessageMapper.updateByPrimaryKey(failedMessage1);
}
log.info("++++++ 更新failed_message中消息消费失败记录:{}",correlationDataId);
} catch (Exception e) {
//获取重试次数
long retryCount = RabbitMqUtil.getRetryCount(message.getMessageProperties());
Message newMessage = message;

if (retryCount >= 3) {

//重试超过3次的,直接存入失败队列
/** 如果重试次数大于3,则将消息发送到失败队列等待人工处理 */
try {
//清除message缓存
redisTemplate.delete(messageKey);

log.info("++++++ 重试超过3次,直接存入失败队列:{}次,重发消息{}",retryCount+1,correlationDataId);
rabbitTemplate.convertAndSend("failedExchange",
"email.failed.resume", newMessage);
log.info("用户体系服务消费者消费消息在重试3次后依然失败,将消息发送到fail队列,发送消息:" + new String(newMessage.getBody()));
} catch (Exception e1) {
log.error("用户体系服务消息在发送到fail队列的时候报错:" + e1.getMessage() + ",原始消息:"
+ new String(newMessage.getBody()));
}

} else {

try {
log.info("++++++ 消息消费重试:{}次,{}",retryCount+1,correlationDataId);
log.info("++++++ 消息消费重试:{}次,更新缓存{}",retryCount+1,correlationDataId);
//更新缓存
redisTemplate.opsForValue().set(messageKey,JSONObject.toJSONString(newMessage));
log.info("++++++ 消息消费重试:{}次,重发消息{}",retryCount+1,correlationDataId);
/** 如果当前消息被重试的次数小于3,则将消息发送到重试队列,等待重新被消费{延迟消费} */
rabbitTemplate.convertAndSend("retryExchange",
"email.retry.myRetry", newMessage,new CorrelationData(correlationDataId));
log.info("用户服务消费者消费失败,消息发送到重试队列;" + "原始消息:" + new String(newMessage.getBody()) + ";第"
+ (retryCount+1) + "次重试");
} catch (Exception e1) {
// 如果消息在重发的时候,出现了问题,可用nack,经过开发中的实际测试,当消息回滚到消息队列时,
// 这条消息不会回到队列尾部,而是仍是在队列头部,这时消费者会立马又接收到这条消息,进行处理,接着抛出异常,
// 进行回滚,如此反复进行。这种情况会导致消息队列处理出现阻塞,消息堆积,导致正常消息也无法运行
// channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
// 改为重新发送消息,经过多次重试后,如果重试次数大于3,就不会再走这,直接丢到了fail queue等待人工处理
log.error("消息发送到重试队列的时候,异常了:" + e1.getMessage() + ",重新发送消息");
}
}
} finally {
/**
* 关闭rabbitmq的自动ack,改为手动ack 1、因为自动ack的话,其实不管是否成功消费了,rmq都会在收到消息后立即返给生产者ack,但是很有可能 这条消息我并没有成功消费
* 2、无论消费成功还是消费失败,都要手动进行ack,因为即使消费失败了,也已经将消息重新投递到重试队列或者失败队列
* 如果不进行ack,生产者在超时后会进行消息重发,如果消费者依然不能处理,则会存在死循环
*/
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}

失败队列,消息处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@RabbitListener(queues = "failedQueue")
public void dealMail(Message message, Channel channel) {

String correlationDataId = (String)message.getMessageProperties().getHeaders()
.get("spring_returned_message_correlation");
//幂等处理
if(!redisTemplate.hasKey("CORRELATION_DATA_ID:"+correlationDataId)){
return;
}

FailedMessage failedMessage = new FailedMessage();
//获取message表对应的关联的messageId,存入failed_message表记录中
failedMessage.setMessageId(Integer.parseInt(message.getMessageProperties().getMessageId()));

failedMessage.setStatus("failed");
failedMessage.setStatusTime(new Date());

failedMessageMapper.insert(failedMessage);

}

定时扫描缓存,处理未消费的消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@Component
public class SaticScheduleTask {

@Autowired
private RedisTemplate<String,String> redisTemplate;

@Autowired
private RabbitTemplate rabbitTemplate;

//3.添加定时任务
//按业务需求指定时间间隔
@Scheduled(cron = "0 0 0/1 * * ?")
private void configureTasks() {

//获取
Set<String> keys = redisTemplate.keys("CORRELATION_DATA_ID:" + "*");
keys.stream().forEach(key -> {

String correlationDataId = key.replaceFirst("CORRELATION_DATA_ID:", "");

String messageMap = redisTemplate.opsForValue().get(key);
Message message = JSONObject.parseObject(messageMap,Message.class);
Map<String, Object> headers = message.getMessageProperties().getHeaders();

if( !headers.containsKey("is_success_send_to_exchange") || !(Boolean) headers.get("is_success_send_to_exchange")){

if(!headers.containsKey("is_success_send_to_exchange")){
headers.put("is_success_send_to_exchange",false);
}

int count ;
if(headers.containsKey("failed_count_for_send_to_exchange")){
count = (int) headers.get("failed_count_for_send_to_exchange");
}else{
count = 0;
}

if(count >= 3){
//清除message缓存
redisTemplate.delete("CORRELATION_DATA_ID:"+correlationDataId);
rabbitTemplate.convertAndSend("failedExchange","email.failed.deliver", message);
return;
}

count++;

headers.put("failed_count_for_send_to_exchange",count);

redisTemplate.opsForValue().set(key,JSONObject.toJSONString(message));
// 重发的时候到redis里面取,消费成功了,删除redis里面的msgId
rabbitTemplate.convertAndSend("emailExchange", "email.topic.retry",message, new CorrelationData(correlationDataId));

}
});
}
}