springboot + rabbitmq 如何实现消息确认机制(踩坑经验)

论坛 期权论坛     
niminba   2021-5-22 14:47   314   0
<p>本文收录在个人博客:<a href="http://www.chengxy-nds.top" rel="external nofollow">www.chengxy-nds.top</a>,技术资源共享,一起进步</p>
<p>最近部门号召大伙多组织一些技术分享会,说是要活跃公司的技术氛围,但早就看穿一切的我知道,这 T M 就是为了刷<code>KPI</code>。不过,话说回来这的确是件好事,与其开那些没味的扯皮会,多做技术交流还是很有助于个人成长的。</p>
<p>于是乎我主动报名参加了分享,咳咳咳~ ,真的不是为了那点<code>KPI</code>,就是想和大伙一起学习学习!</p>
<p style="text-align: center"><img alt="" loading="lazy" src="https://beijingoptbbs.oss-cn-hangzhou.aliyuncs.com/jb/2426819-8c4803c263d05ea2383f7c61ff6a3b67.png"></p>
<p>这次我分享的是 <code>springboot</code> + <code>rabbitmq</code> 如何实现消息确认机制,以及在实际开发中的一点踩坑经验,其实整体的内容比较简单,有时候事情就是这么神奇,越是简单的东西就越容易出错。</p>
<p>可以看到使用了 <code>RabbitMQ</code> 以后,我们的业务链路明显变长了,虽然做到了系统间的解耦,但可能造成消息丢失的场景也增加了。例如:</p>
<ul>
  <li>消息生产者 - &gt; rabbitmq服务器(消息发送失败)</li>
  <li>rabbitmq服务器自身故障导致消息丢失</li>
  <li>消息消费者 - &gt; rabbitmq服务(消费消息失败)</li>
</ul>
<p style="text-align: center"><img alt="" loading="lazy" src="https://beijingoptbbs.oss-cn-hangzhou.aliyuncs.com/jb/2426819-bc1f59001fb245dbd79b211c20d94ca6.png"></p>
<p>所以说能不使用中间件就尽量不要用,如果为了用而用只会徒增烦恼。开启消息确认机制以后,尽管很大程度上保证了消息的准确送达,但由于频繁的确认交互,<code>rabbitmq</code> 整体效率变低,吞吐量下降严重,不是非常重要的消息真心不建议你用消息确认机制。</p>
<p>下边我们先来实现<code>springboot</code> + <code>rabbitmq</code>消息确认机制,再对遇到的问题做具体分析。</p>
<p><span style="color: #ff0000"><strong>一、准备环境</strong></span></p>
<p>1、引入 rabbitmq 依赖包</p>
<div class="blockcode">
<pre class="brush:java;">
&lt;dependency&gt;
&lt;groupId&gt;org.springframework.boot&lt;/groupId&gt;
&lt;artifactId&gt;spring-boot-starter-amqp&lt;/artifactId&gt;
&lt;/dependency&gt;</pre>
</div>
<p>2、修改 application.properties 配置</p>
<p>配置中需要开启 <code>发送端</code>和 <code>消费端</code> 的消息确认。</p>
<div class="blockcode">
<pre class="brush:java;">
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

# 发送者开启 confirm 确认机制
spring.rabbitmq.publisher-confirms=true
# 发送者开启 return 确认机制
spring.rabbitmq.publisher-returns=true
####################################################
# 设置消费端手动 ack
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 是否支持重试
spring.rabbitmq.listener.simple.retry.enabled=true</pre>
</div>
<p>3、定义 Exchange 和 Queue</p>
<p>定义交换机 <code>confirmTestExchange</code> 和队列 <code>confirm_test_queue</code> ,并将队列绑定在交换机上。</p>
<div class="blockcode">
<pre class="brush:java;">
@Configuration
public class QueueConfig {

@Bean(name = "confirmTestQueue")
public Queue confirmTestQueue() {
return new Queue("confirm_test_queue", true, false, false);
}

@Bean(name = "confirmTestExchange")
public FanoutExchange confirmTestExchange() {
return new FanoutExchange("confirmTestExchange");
}

@Bean
public Binding confirmTestFanoutExchangeAndQueue(
@Qualifier("confirmTestExchange") FanoutExchange confirmTestExchange,
@Qualifier("confirmTestQueue") Queue confirmTestQueue) {
return BindingBuilder.bind(confirmTestQueue).to(confirmTestExchange);
}
}</pre>
</div>
<p><code>rabbitmq</code> 的消息确认分为两部分:发送消息确认 和 消息接收确认。</p>
<p style="text-align: center"><img alt="" loading="lazy" src="https://beijingoptbbs.oss-cn-hangzhou.aliyuncs.com/jb/2426819-abaf98917167adab239969d57697f6b9.png"></p>
<p><span style="color: #ff0000"><strong>二、消息发送确认</strong></span></p>
<p>发送消息确认:用来确认生产者 <code>producer</code> 将消息发送到 <code>broker</code> ,<code>broker</code> 上的交换机 <code>exchange</code> 再投递给队列 <code>queue</code>的过程中,消息是否成功投递。</p>
<p>消息从 <code>producer</code> 到 <code>rabbitmq broker</code>有一个 <code>confirmCallback</code> 确认模式。</p>
<p>消息从 <code>exchange</code> 到 <code>queue</code> 投递失败有一个 <code>returnCallback</code> 退回模式。</p>
<p>我们可以利用这两个<code>Callback</code>来确保消的100%送达。</p>
<p>1、 ConfirmCallback确认模式</p>
<p>消息只要被 <code>rabbitmq broker</code> 接收到就会触发 <code>confirmCallback</code> 回调 。</p>
<div class="blockcode">
<pre class="brush:java;">
@Slf4j
@Component
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {

@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause)
分享到 :
0 人收藏
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:1060120
帖子:212021
精华:0
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP