1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
| @Bean public RabbitTemplate rabbitTemplate(){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
log.info("++++++ 进入回调setConfirmCallback()"); if(correlationData != null){
String messageKey = "CORRELATION_DATA_ID:"+correlationData.getId();
if(redisTemplate.hasKey(messageKey)){
String messageMap = redisTemplate.opsForValue().get(messageKey); Message message = JSONObject.parseObject(messageMap,Message.class); Map<String, Object> headers = message.getMessageProperties().getHeaders();
if( !headers.containsKey("is_success_send_to_exchange") || !(Boolean) headers.get("is_success_send_to_exchange")){ if(ack){ headers.put("is_success_send_to_exchange",true); redisTemplate.opsForValue().set(messageKey,JSONObject.toJSONString(message));
FailedMessage failedMessage = new FailedMessage(); failedMessage.setMessageId(Integer.parseInt(message.getMessageProperties().getMessageId())); failedMessage.setStatus("failed"); FailedMessage failedMessage1 = failedMessageMapper.selectOne(failedMessage); if(failedMessage1 != null){ failedMessage1.setStatus("success"); failedMessage1.setStatusTime(new Date()); failedMessageMapper.updateByPrimaryKey(failedMessage1); }
log.info("++++++ 消息投递成功,更新redis,更新失败记录数据库:{}",message);
}else{
log.info("++++++ 消息投递失败:{}",message);
if(!headers.containsKey("is_success_send_to_exchange")){ headers.put("is_success_send_to_exchange",false); }
log.info("ConfirmCallback消息发送失败,id:{},原因:{}",correlationData.getId(),cause); int count ; if(headers.containsKey("failed_count_for_send_to_exchange")){ count = (int) headers.get("failed_count_for_send_to_exchange"); }else{ count = 0; }
if(count >= 3){
redisTemplate.delete("CORRELATION_DATA_ID:"+correlationData.getId());
log.info("ConfirmCallback消息发送失败三次,id:{},停止重试",correlationData.getId(),cause); rabbitTemplate.convertAndSend("failedExchange","email.failed.deliver", message); return; }
count++;
headers.put("failed_count_for_send_to_exchange",count);
redisTemplate.opsForValue().set(messageKey,JSONObject.toJSONString(message)); log.info("++++++ 生产者投递消息重试中:{}次,{}",count,message); rabbitTemplate.convertAndSend("emailExchange", "email.topic.retry",message, correlationData); } } }
}
});
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { log.info("ReturnCallback消息发送队列不可达,message:{},exchange:{},routingKey:{},原因:{}",message,exchange,routingKey,replyText); });
return rabbitTemplate; }
|