1. 消息中间件简介
  1. 消息即是信息的载体,为了方便消息的生产者和消费者都能明白,它在传递时需要一定的数据格式(即消息协议),根据消息送达的实时性,它分为即时消息和延迟消息两类。
  2. 即时消息,关注消息送达的实时性,如HTTP、RPC请求等
  3. 延迟消息,即消息从某一端发出后,首先进入一个容器进行临时存储,当达到某种条件后,再由这个容器发送给另一端。 这个容器的一种具体实现就是消息队列
  4. 实现消息队列的组件称之为消息中间件。对于消息中间件,关注于数据的发送和接收,利用高效可靠的异步消息传递机制集成分布式系统,也通常用于服务与服务解耦。常见的角色大致也就有Producer(生产者发送消息)、Consumer(消费者订阅消息)在这里插入图片描述
2. RabbitMQ简介

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而群集和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。

详见RabbitMQ官网:点这里
3. RabbitMQ应用场景
  1. 异步处理
  2. 应用解耦
  3. 流量削锋
  4. 日志处理
  5. 消息通讯
详细应用场景介绍:点这里
4. RabbitMQ消息类型
  • simple,简单模式
    一条消息对应一个消费者
    在这里插入图片描述
  • work,工作模式
    一条消息可以被多个消费者尝试接,但是最终只能有一个消费者能获取
    在这里插入图片描述
  • Pub/Sub,订阅模式
    一条消息可以被多个消费者同时获取,
    生产者将消息发送到交换机,消费者将自己对应的队列注册到交换机,
    当发送消息后,所有注册的队列的消费者都可以收到消息
    在这里插入图片描述
  • routing,路由模式
    生产者将消息发送到了 type 为 direct 模式的交换机
    消费者的队列再将自己绑定到路由的时候会给自己绑定一个 key
    只有消费者发送对应 key 格式的消息时候 队列才会收到消息
    在这里插入图片描述
  • topic,通配符模式
    将路由键和某模式进行匹配。
    此时队列需要绑定到一个模式上。
    符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。
    因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”
    在这里插入图片描述
  • RPC模式
    客户端启动后,创建一个独有的回调队列,某个应用将请求信息交给客户端,然后客户端发送RPC请求,在发送RPC请求到RPC请求队列时,客户端至少发送带有reply_to以及correlation_id两个属性的信息。服务器端等待接受客户端发来RPC请求,当请求出现的时候,服务器从RPC请求队列中取出请求,然后处理后,将响应发送到reply_to指定的回调队列中,客户端等待回调队列中出现响应,当响应出现时,它会根据响应中correlation_id字段的值,将其返回给对应的应用
    在这里插入图片描述
5. RabbitMQ的两种应答模式

当我们发送消息后,服务端如何知道消息已经被消费,在RabbitMQ里有两种模式:

  • 自动模式。不管消费者获取到消息后是否是成功处理消息,服务端都认为是成功的

  • 手动模式。消费者获取到消息后,服务器会将消息标记为不可用,等待消费者反馈,如果不反馈,则一直标记为不可用

开启手动应答,默认为自动应答

1
2
3
# 开启ACK手动确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.direct.acknowledge-mode=manual
6. rabbitmq channel参数详解
  • channel.exchangeDeclare()
1
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete,Map<String, Object> arguments) throws IOException;

type:有direct、fanout、topic三种
durable:true、false true:服务器重启会保留下来Exchange,不保证重启后消息还在
autoDelete:true、false.true:当已经没有消费者时,服务器是否可以删除该Exchange

  • channel.basicPublish()
1
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;

在topic exchange做消息转发用

  • channel.basicAck()
1
void basicAck(long deliveryTag, boolean multiple) throws IOException;

deliveryTag:该消息的index
multiple:是否批量,true:将一次性ack所有小于deliveryTag的消息。

  • channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true)
1
2
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
throws IOException;

deliveryTag:该消息的index
multiple:是否批量.true:将一次性拒绝所有小于deliveryTag的消息。
requeue:被拒绝的是否重新入队列

  • channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false)
1
void basicReject(long deliveryTag, boolean requeue) throws IOException;

deliveryTag:该消息的index
requeue:被拒绝的是否重新入队列

channel.basicNack 与 channel.basicReject 的区别在于basicNack可以拒绝多条消息,而basicReject一次只能拒绝一条消息


7. 简单应用(topic模式)
  • 配置交换器Exchange(topic模式)
1
2
3
4
5
6
7
8
9
10
/**
* 声明交换机,支持持久化.
* rabbitmq常用几种exchange,比如direct, fanout, topic,可根据具体业务需求配置
* 命名规范参考 scm3.services,scm3.services.retry,scm3.services.failed
* @return the exchange
*/
@Bean
public TopicExchange emailExchange() {
return (TopicExchange)ExchangeBuilder.topicExchange("emailExchange").durable(true).build();
}
  • 配置队列Queue
1
2
3
4
5
6
7
/**
* 声明队列,默认持久化
*/
@Bean
public Queue emailQueue() {
return new Queue("emailQueue");
}
  • 绑定交换器与队列
1
2
3
4
5
6
7
8
/**
* 绑定交换器与队列
*/
@Bean
public Binding topicQueueBinding(@Qualifier("emailQueue") Queue queue,
@Qualifier("emailExchange") TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("email.topic.*");
}
  • 生产者发送消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.bhy702.website;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

/**
* @author: Hongyuan Bai
* @create: 2019-06-27 13:45:43
* @description:
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitMQTest {

@Autowired
private AmqpTemplate rabbitTemplate;

public static final String EXCHANGE = "emailExchange";
public static final String ROUTING_KEY = "email.topic.queue";

@Test
public void test() {

for (int i = 0; i < 5; i++){
rabbitTemplate.convertAndSend(EXCHANGE,ROUTING_KEY,"message:"+(i+1));
}
}

}
  • 消费者处理消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package com.bhy702.website.common.rabbitmq;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.stereotype.Component;
import java.io.IOException;


/**
* @author: Hongyuan Bai
* @create: 2019-06-27 13:38:46
* @description:
*/
@Slf4j
@Component
public class RabbitConsumer2 {

@Autowired
private JavaMailSender javaMailSender;

@Autowired
private RabbitTemplate rabbitTemplate;

@RabbitListener(queues = "emailQueue")
public void sendMail(Message message, Channel channel) throws IOException {
try {
//消费者处理信息
System.out.println(message.getBody());
//处理完成手动应答
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (Exception e) {
//拒绝信息,重新加入队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
//拒绝信息,丢弃,视场景而定
//channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
e.printStackTrace();
}
}

}


8. RabbitMQ生产者消息确认

rabbitMQ有重试机制,配置properties文件可开启

1
2
# 开启rabbitmq的生产端重试机制,默认是false,默认重试3次
spring.rabbitmq.template.retry.enabled=true

当生产者发送消息给rabbitmq服务器时,消息是否真正的到达了服务器?为了保证生产者发送的消息能够可靠的发送到服务器(即消息落地),rabbitmq提供了两种方式:

  • 通过事务实现
  • 通过发送方确认机制(publisher confirm)实现

由于事务方式非常影响rabbitmq的性能,所以这里介绍第二种实现

ConfirmCallback接口

ConfirmCallback是一个回调接口,消息发送到 Broker 后触发回调,确认消息是否到达 Broker 服务器,也就是只确认是否正确到达 Exchange 中。

ReturnCallback接口

通过实现 ReturnCallback 接口,启动消息失败返回,此接口是在交换器路由不到队列时触发回调。

先配置properties配置文件,开启回调

1
2
3
4
# 开启生产者发送确认(ConfirmCallback接口)
spring.rabbitmq.publisher-confirms=true
# 开启生产者发送失败退回(ReturnCallback接口)
spring.rabbitmq.publisher-returns=true

在RabbitMQConfig中配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Bean
public AmqpTemplate amqpTemplate(){

// 必须开启回调才会生效
rabbitTemplate.setMandatory(true);

// 消息确认,需要配置 spring.rabbitmq.publisher-confirms = true
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {

//根据返回的状态,生产者可以处理失败与成功的相应信息,比如发送失败,可重发,转发或者存入日志等
//if(ack){
// correlationData.getId()为message唯一标识,需要生产者发送message时传入自定义的correlationData才能获取到,否则为null
// //do something
//}else{
// correlationData.getId()
// //do something
//}

//此处只做打印,不对生产者发送失败的信息处理
System.out.println("------------------------------------------------");
System.out.println("ConfirmCallBackListener:correlationData=" + correlationData + ",ack=" + ack + ",cause=" + cause);
System.out.println("------------------------------------------------");

});

// 消息发送失败返回到队列中,需要配置spring.rabbitmq.publisher-returns = true
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {

System.out.println("------------------------------------------------");
System.out.println("ReturnCallBackListener:message=" + new String(message.getBody()) + ",replyCode=" + replyCode + ",replyText=" + replyText + ",exchange=" + exchange + ",routingKey=" + routingKey);
System.out.println("------------------------------------------------");

});

return rabbitTemplate;
}

9. Spring Boot连接RabbitMQ集群
1
spring.rabbitmq.host = ip1:port1,ip2:port2,ip3:port3

要保证消息全部被消费者消费,就涉及到RabbitMQ的消息重试,死信队列,延迟重试队列等,可参考另一篇博客点这里