从头开始搭建一个Spring boot+RabbitMQ环境
消息队列在目前分布式系统下具备非常重要的地位,如下的场景是比较适合消息队列的:
- 跨系统的调用,异步性质的调用最佳。
- 高并发问题,利用队列串行特点。
- 订阅模式,数据被未知数量的消费者订阅,比如某种数据的变更会影响多个系统的数据,订单数据就是比较好理解的。
之前有一个场景是商品数据在修改后需要推送到 elasticsearch 中,由于修改产品的并发量以及数据量均不大,所以对于消息未做持久化,而且为了快速上线简化系统,生产者与消费者更是部署在一个应用中,自生产自消费。这篇将从头搭建 RabbitMQ 环境,并且将之集成在 Spring boot 中。
搭建 RabbitMQ 环境
erlang
由于 RabbitMQ 是基于 erlang 开发的,所以要安装 RabbitMQ 先必须安装 erlang。
更换软件源
使用 apt-get 时默认的软件源是 us.archive.ubuntu.com,这会经常发生安装问题,比如速度特别慢或者由于下载不了造成不能安装。
可以更换成国内的数据源 cn.archive.ubuntu.com,速度那是不用说的了(这里感谢我的同事的提醒)。找到下面这个文件然后进行替换。
/etc/apt/sources.list
:%s/us.archive/cn.archive/g
在没有更新软件源时,我采取的是源码编译安装方法,参考这篇文章。我安装的是最新 19.2 版本,安装过程中还遇到各种问题就不一一记录了。
测试 erlang 安装是否正确,输入 erl,如果看到如下图所示就说明安装成功了。
安装 RabbitMQ
在未更换软件源之前我也是选择了源码编译安装方法,安装的最新的 3.6.6, 但手动启动时总是不成功,错误信息如下:
版本问题
RabbitMQ 3.6.6+ erlang 19.2 启动失败的问题暂时未解决,有谁知道的可以告诉我。
由于启动不成功,最后在更新成国内软件源之后,再次通过 apt-get 安装 RabbitMQ,默认的版本是 3.5.7,好像也可以选版本,以后再尝试。可喜的是通过 apt-get 安装的 RabbitMQ 成功的启动起来了。可以通过如下命令查看 RabbitMQ 状态。
./rabbitmqctl stauts
RabbitMQ 管理工具
这是自带的一个 web 插件,可以用来管理消息队列,启动它的方法比较简单:
rabbitmq-plugins enable rabbitmq_management
然后重启 RabbitMQ 即可生效。默认生成了 guest 用户,但这个 guest 用户只能在 RabbitMQ 所在主机才能访问,所以要想远程访问就需要重新分配一个用户,有两种办法:
- 通过网页,以 guest 登录然后在页面上完成操作。
- 通过命令,创建用户,授权也可以。
创建用户,指定用户名以及密码
./rabbitmqctl add_user root root // 用户名密码都是 root
分配角色,administrator 是可以操作和 guest 本地用户一样的功能,当登录上 rabbitmq_management 之后,里面的所有功能都可以使用。
rabbitmqctl set_user_tags root administrator
授权,队列的操作管理权限。如果不配置,那么客户端在连接消息队列时会出问题。
rabbitmqctl set_permissions -p / root ".*" ".*" ".*"
上面语句我没有执行成功,后续再研究下是不是写法问题
Spring boot 集成 RabbitMQ
我们在 rabbitmq_management 上面可以正常访问操作后,就可以放心的写 demo 了,这里采用 spring boot。先看简单看下RabbitMQ 的简易架构图,容易理解下面提到的一些组件。
-
生产者,消息,消费者
-
消息内部:Exchange,Binding,Queues
引用 amqp 的 starter
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
增加配置信息
这里没有采用自动配置
mq.rabbit.host=192.168.21.128
mq.rabbit.port=5672
mq.rabbit.virtualHost=/
mq.rabbit.username=root
mq.rabbit.password=root
创建 RabbitMQConfig
- ConnectionFactory,类似于数据库连接等。
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(this.mqRabbitHost,this.mqRabbitPort);
connectionFactory<span class="pl-k">.</span>setUsername(<span class="pl-c1">this</span><span class="pl-k">.</span>mqRabbitUserName);
connectionFactory<span class="pl-k">.</span>setPassword(<span class="pl-c1">this</span><span class="pl-k">.</span>mqRabbitPassword);
connectionFactory<span class="pl-k">.</span>setVirtualHost(<span class="pl-c1">this</span><span class="pl-k">.</span>mqRabbitVirtualHost);
connectionFactory<span class="pl-k">.</span>setPublisherConfirms(<span class="pl-c1">true</span>);
<span class="pl-k">return</span> connectionFactory;
}
- RabbitTemplate,用来发送消息。
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
return template;
}
- DirectExchange
@Bean
public DirectExchange defaultExchange() {
return new DirectExchange(EXCHANGE_NAME);}
- Queue,构建队列,名称,是否持久化之类
@Bean
public Queue queue() {
return new Queue(QUEUE_NAME, true);}
- Binding,将 DirectExchange 与 Queue 进行绑定
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(defaultExchange()).with(ROUTING_KEY);}
- SimpleMessageListenerContainer,消费者
需要将 ACK 修改为手动确认,避免消息在处理过程中发生异常造成被误认为已经成功消费的假象。
@Bean
public SimpleMessageListenerContainer messageContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
container.setQueues(queue());
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setMessageListener(new ChannelAwareMessageListener() {
<span class="pl-k">public</span> <span class="pl-k">void</span> <span class="pl-en">onMessage</span>(<span class="pl-smi">Message</span> <span class="pl-v">message</span>, <span class="pl-smi">com.rabbitmq.client<span class="pl-k">.</span>Channel</span> <span class="pl-v">channel</span>) <span class="pl-k">throws</span> <span class="pl-smi">Exception</span> {
<span class="pl-k">byte</span>[] body <span class="pl-k">=</span> message<span class="pl-k">.</span>getBody();
logger<span class="pl-k">.</span>info(<span class="pl-s"><span class="pl-pds">"</span>消费端接收到消息 : <span class="pl-pds">"</span></span> <span class="pl-k">+</span> <span class="pl-k">new</span> <span class="pl-smi">String</span>(body));
channel<span class="pl-k">.</span>basicAck(message<span class="pl-k">.</span>getMessageProperties()<span class="pl-k">.</span>getDeliveryTag(), <span class="pl-c1">false</span>);
}
});
<span class="pl-k">return</span> container;
}
服务端,业务逻辑,调用消息队列。
为了让客户端知道消息是否已经成功,消息队列提供了回调机制 (需要实现 ConfirmCallback),当消息服务器接收到消息之后会给客户端一个通知,此时客户端根据消息应答来决定后续的流程。
@Service
public class ProductServiceImpl extends BaseService implements ProductService, RabbitTemplate.ConfirmCallback {
<span class="pl-k">@Autowired</span>
<span class="pl-k">private</span> <span class="pl-smi">ProductMapper</span> productMapper;
<span class="pl-k">private</span> <span class="pl-smi">RabbitTemplate</span> rabbitTemplate;
<span class="pl-k">public</span> <span class="pl-en">ProductServiceImpl</span>(<span class="pl-smi">RabbitTemplate</span> <span class="pl-v">rabbitTemplate</span>){
<span class="pl-c1">this</span><span class="pl-k">.</span>rabbitTemplate<span class="pl-k">=</span>rabbitTemplate;
<span class="pl-c1">this</span><span class="pl-k">.</span>rabbitTemplate<span class="pl-k">.</span>setConfirmCallback(<span class="pl-c1">this</span>);
}
<span class="pl-k">public</span> <span class="pl-k">void</span> <span class="pl-en">confirm</span>(<span class="pl-smi">CorrelationData</span> <span class="pl-v">correlationData</span>, <span class="pl-k">boolean</span> <span class="pl-v">ack</span>, <span class="pl-smi">String</span> <span class="pl-v">cause</span>) {
<span class="pl-c1">this</span><span class="pl-k">.</span>logger<span class="pl-k">.</span>info(<span class="pl-s"><span class="pl-pds">"</span> 消息id:<span class="pl-pds">"</span></span> <span class="pl-k">+</span> correlationData);
<span class="pl-k">if</span> (ack) {
<span class="pl-c1">this</span><span class="pl-k">.</span>logger<span class="pl-k">.</span>info(<span class="pl-s"><span class="pl-pds">"</span>消息发送确认成功<span class="pl-pds">"</span></span>);
} <span class="pl-k">else</span> {
<span class="pl-c1">this</span><span class="pl-k">.</span>logger<span class="pl-k">.</span>info(<span class="pl-s"><span class="pl-pds">"</span>消息发送确认失败:<span class="pl-pds">"</span></span> <span class="pl-k">+</span> cause);
}
}
<span class="pl-k">@Override</span>
<span class="pl-k">public</span> <span class="pl-k">void</span> <span class="pl-en">save</span>(<span class="pl-smi">Product</span> <span class="pl-v">product</span>) {
<span class="pl-c"><span class="pl-c">//</span>执行保存</span>
<span class="pl-smi">String</span> uuid <span class="pl-k">=</span> <span class="pl-c1">UUID</span><span class="pl-k">.</span>randomUUID()<span class="pl-k">.</span>toString();
<span class="pl-smi">CorrelationData</span> correlationId <span class="pl-k">=</span> <span class="pl-k">new</span> <span class="pl-smi">CorrelationData</span>(uuid);
rabbitTemplate<span class="pl-k">.</span>convertAndSend(<span class="pl-smi">RabbitMQConfig</span><span class="pl-c1"><span class="pl-k">.</span>EXCHANGE_NAME</span>, <span class="pl-smi">RabbitMQConfig</span><span class="pl-c1"><span class="pl-k">.</span>ROUTING_KEY</span>, product<span class="pl-k">.</span>getName(),correlationId);
}
}
执行结果
可以清晰的看到 RabbitMQ 发给生产者的信息收到的确认信息,也能看到消息被消费端消费后的信息。
RabbitMQ 的其它方面
高可用方案
与常见的数据库类似,都是主从模式来保证高可用,可以利用 HAProxy 来实现主从备份方案。
水平扩展方案
主要是为了解决垂直优化的瓶颈问题,主要有这三种:
- clustering, 这是默认内置的一种集群模式,与下面两种不同的是 clustering 一般应用于同一局域网。
- federation,有待后续学习
- shovel,有待后续学习
不丢消息特性
这个不是 RabbitMQ 的专利,将消息持久化可以确保 RabbitMQ 重启或者死机过程中不至于丢掉没有消费的消息。
消息不被重复消费
这点要靠消费端来完成,尽管消费端可以通过 ACK 来通知消息队列消息已经被消费,但如果消费端消费了消息,此时 ACK 过程中的通知出现异常,消息队列会认为消息未被消费会继续发给消费端。
总结
初次安装可能会出现一堆问题,特别是需要安装所依赖的众多包。RabbitMQ 与 Erlang 可能存在版本依赖问题待后续确认。spring boot 下集成 RabbitMQ 异常简单,可以根据需求部署集群来实现可扩展高可用的消息系统。