1. 消息中间件简介
- 消息即是信息的载体,为了方便消息的生产者和消费者都能明白,它在传递时需要一定的数据格式(即消息协议),根据消息送达的实时性,它分为即时消息和延迟消息两类。
- 即时消息,关注消息送达的实时性,如HTTP、RPC请求等
- 延迟消息,即消息从某一端发出后,首先进入一个容器进行临时存储,当达到某种条件后,再由这个容器发送给另一端。 这个容器的一种具体实现就是消息队列。
- 实现消息队列的组件称之为消息中间件。对于消息中间件,关注于数据的发送和接收,利用高效可靠的异步消息传递机制集成分布式系统,也通常用于服务与服务解耦。常见的角色大致也就有Producer(生产者发送消息)、Consumer(消费者订阅消息)
2. RabbitMQ简介
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而群集和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。详见RabbitMQ官网:点这里
3. RabbitMQ应用场景
- 异步处理
- 应用解耦
- 流量削锋
- 日志处理
- 消息通讯
详细应用场景介绍:点这里
4. RabbitMQ消息类型
- simple,简单模式
一条消息对应一个消费者 - work,工作模式
一条消息可以被多个消费者尝试接,但是最终只能有一个消费者能获取 - Pub/Sub,订阅模式
一条消息可以被多个消费者同时获取,
生产者将消息发送到交换机,消费者将自己对应的队列注册到交换机,
当发送消息后,所有注册的队列的消费者都可以收到消息 - routing,路由模式
生产者将消息发送到了 type 为 direct 模式的交换机
消费者的队列再将自己绑定到路由的时候会给自己绑定一个 key
只有消费者发送对应 key 格式的消息时候 队列才会收到消息 - topic,通配符模式
将路由键和某模式进行匹配。
此时队列需要绑定到一个模式上。
符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。
因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs” - RPC模式
客户端启动后,创建一个独有的回调队列,某个应用将请求信息交给客户端,然后客户端发送RPC请求,在发送RPC请求到RPC请求队列时,客户端至少发送带有reply_to以及correlation_id两个属性的信息。服务器端等待接受客户端发来RPC请求,当请求出现的时候,服务器从RPC请求队列中取出请求,然后处理后,将响应发送到reply_to指定的回调队列中,客户端等待回调队列中出现响应,当响应出现时,它会根据响应中correlation_id字段的值,将其返回给对应的应用5. RabbitMQ的两种应答模式
当我们发送消息后,服务端如何知道消息已经被消费,在RabbitMQ里有两种模式:
- simple,简单模式
自动模式。不管消费者获取到消息后是否是成功处理消息,服务端都认为是成功的
手动模式。消费者获取到消息后,服务器会将消息标记为不可用,等待消费者反馈,如果不反馈,则一直标记为不可用
开启手动应答,默认为自动应答
1 | # 开启ACK手动确认 |
6. rabbitmq channel参数详解
- channel.exchangeDeclare()
1
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete,Map<String, Object> arguments) throws IOException;
type:有direct、fanout、topic三种
durable:true、false true:服务器重启会保留下来Exchange,不保证重启后消息还在
autoDelete:true、false.true:当已经没有消费者时,服务器是否可以删除该Exchange
- channel.basicPublish()
1
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;
在topic exchange做消息转发用
- channel.basicAck()
1
void basicAck(long deliveryTag, boolean multiple) throws IOException;
deliveryTag:该消息的index
multiple:是否批量,true:将一次性ack所有小于deliveryTag的消息。
- channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true)
1
2void basicNack(long deliveryTag, boolean multiple, boolean requeue)
throws IOException;
deliveryTag:该消息的index
multiple:是否批量.true:将一次性拒绝所有小于deliveryTag的消息。
requeue:被拒绝的是否重新入队列
- channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false)
1
void basicReject(long deliveryTag, boolean requeue) throws IOException;
deliveryTag:该消息的index
requeue:被拒绝的是否重新入队列
channel.basicNack 与 channel.basicReject 的区别在于basicNack可以拒绝多条消息,而basicReject一次只能拒绝一条消息
7. 简单应用(topic模式)
配置交换器Exchange(topic模式)
1
2
3
4
5
6
7
8
9
10/**
* 声明交换机,支持持久化.
* rabbitmq常用几种exchange,比如direct, fanout, topic,可根据具体业务需求配置
* 命名规范参考 scm3.services,scm3.services.retry,scm3.services.failed
* @return the exchange
*/
public TopicExchange emailExchange() {
return (TopicExchange)ExchangeBuilder.topicExchange("emailExchange").durable(true).build();
}配置队列Queue
1
2
3
4
5
6
7/**
* 声明队列,默认持久化
*/
public Queue emailQueue() {
return new Queue("emailQueue");
}绑定交换器与队列
1
2
3
4
5
6
7
8/**
* 绑定交换器与队列
*/
public Binding topicQueueBinding(@Qualifier("emailQueue") Queue queue,
@Qualifier("emailExchange") TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("email.topic.*");
}生产者发送消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33package com.bhy702.website;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/**
* @author: Hongyuan Bai
* @create: 2019-06-27 13:45:43
* @description:
*/
(SpringRunner.class)
public class RabbitMQTest {
private AmqpTemplate rabbitTemplate;
public static final String EXCHANGE = "emailExchange";
public static final String ROUTING_KEY = "email.topic.queue";
public void test() {
for (int i = 0; i < 5; i++){
rabbitTemplate.convertAndSend(EXCHANGE,ROUTING_KEY,"message:"+(i+1));
}
}
}消费者处理消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45package com.bhy702.website.common.rabbitmq;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author: Hongyuan Bai
* @create: 2019-06-27 13:38:46
* @description:
*/
4j
public class RabbitConsumer2 {
private JavaMailSender javaMailSender;
private RabbitTemplate rabbitTemplate;
"emailQueue") (queues =
public void sendMail(Message message, Channel channel) throws IOException {
try {
//消费者处理信息
System.out.println(message.getBody());
//处理完成手动应答
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (Exception e) {
//拒绝信息,重新加入队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
//拒绝信息,丢弃,视场景而定
//channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
e.printStackTrace();
}
}
}
8. RabbitMQ生产者消息确认
rabbitMQ有重试机制,配置properties文件可开启
1 | # 开启rabbitmq的生产端重试机制,默认是false,默认重试3次 |
当生产者发送消息给rabbitmq服务器时,消息是否真正的到达了服务器?为了保证生产者发送的消息能够可靠的发送到服务器(即消息落地),rabbitmq提供了两种方式:
- 通过事务实现
- 通过发送方确认机制(publisher confirm)实现
由于事务方式非常影响rabbitmq的性能,所以这里介绍第二种实现
ConfirmCallback接口
ConfirmCallback是一个回调接口,消息发送到 Broker 后触发回调,确认消息是否到达 Broker 服务器,也就是只确认是否正确到达 Exchange 中。
ReturnCallback接口
通过实现 ReturnCallback 接口,启动消息失败返回,此接口是在交换器路由不到队列时触发回调。
先配置properties配置文件,开启回调
1 | # 开启生产者发送确认(ConfirmCallback接口) |
在RabbitMQConfig中配置
1 |
|
9. Spring Boot连接RabbitMQ集群
1 | spring.rabbitmq.host = ip1:port1,ip2:port2,ip3:port3 |
要保证消息全部被消费者消费,就涉及到RabbitMQ的消息重试,死信队列,延迟重试队列等,可参考另一篇博客点这里