消息队列(mq)是什么?

论坛 期权论坛 期权     
匿名用户1024   2022-2-12 16:50   13939   5
如果两个在不同网络的应用(暂且称为应用A和应用B)间进行通信,在中间放一台服务器(暂且称为服务器C),应用A只能向服务器C发送消息和接受消息,应用B同理,应用A和应用B不需要知道相互的网络地址,只需要知道服务器C的位置,然后在服务区C上面设置两个文件夹(请求文件夹Req和响应文件夹Res),约定好应用A的请求数据(xml格式)通过webservice发送到服务器C的Req文件夹下,并轮询Res文件夹获取相应数据,应用B轮询Req文件夹并通过webservice返回数据(xml格式)到Res文件夹。这种通信机制算mq吗?
分享到 :
0 人收藏

5 个回复

倒序浏览
2#
有关回应  16级独孤 | 2022-2-12 16:50:13
我想直击 MQ 的本质,带你来一场更深层次的思考力训练:不仅仅搞清楚 What,同时理解 Why 和 How。
认真看完,你会感叹:原来还能以这样的视角来理解MQ!不仅能帮助你掌握 MQ 最内核的东西,同时还能弄明白:如果让你来设计一个 MQ,该如何下手?又有哪些技术挑战?
不啰嗦了,下面进入正文。
对于 MQ 来说,其实不管是 RocketMQ、Kafka 还是其他消息队列,它们的本质都是:一发一存一消费。下面我们以这个本质作为根,一起由浅入深地聊聊 MQ。
[h1]01 从 MQ 的本质说起[/h1]将 MQ 掰开了揉碎了来看,都是「一发一存一消费」,再直白点就是一个「转发器」。
生产者先将消息投递一个叫做「队列」的容器中,然后再从这个容器中取出消息,最后再转发给消费者,仅此而已。

上面这个图便是消息队列最原始的模型,它包含了两个关键词:消息和队列。
1、消息:就是要传输的数据,可以是最简单的文本字符串,也可以是自定义的复杂格式(只要能按预定格式解析出来即可)。
2、队列:大家应该再熟悉不过了,是一种先进先出数据结构。它是存放消息的容器,消息从队尾入队,从队头出队,入队即发消息的过程,出队即收消息的过程。
[h1]02 原始模型的进化[/h1]再看今天我们最常用的消息队列产品(RocketMQ、Kafka 等等),你会发现:它们都在最原始的消息模型上做了扩展,同时提出了一些新名词,比如:主题(topic)、分区(partition)、队列(queue)等等。
要彻底理解这些五花八门的新概念,我们化繁为简,先从消息模型的演进说起(道理好比:架构从来不是设计出来的,而是演进而来的
2.1 队列模型
最初的消息队列就是上一节讲的原始模型,它是一个严格意义上的队列(Queue)。消息按照什么顺序写进去,就按照什么顺序读出来。不过,队列没有 “读” 这个操作,读就是出队,从队头中 “删除” 这个消息。

这便是队列模型:它允许多个生产者往同一个队列发送消息。但是,如果有多个消费者,实际上是竞争的关系,也就是一条消息只能被其中一个消费者接收到,读完即被删除。
2.2 发布-订阅模型
如果需要将一份消息数据分发给多个消费者,并且每个消费者都要求收到全量的消息。很显然,队列模型无法满足这个需求。
一个可行的方案是:为每个消费者创建一个单独的队列,让生产者发送多份。这种做法比较笨,而且同一份数据会被复制多份,也很浪费空间。
为了解决这个问题,就演化出了另外一种消息模型:发布-订阅模型。

在发布-订阅模型中,存放消息的容器变成了 “主题”,订阅者在接收消息之前需要先 “订阅主题”。最终,每个订阅者都可以收到同一个主题的全量消息。
仔细对比下它和 “队列模式” 的异同:生产者就是发布者,队列就是主题,消费者就是订阅者,无本质区别。唯一的不同点在于:一份消息数据是否可以被多次消费。
2.3 小结
最后做个小结,上面两种模型说白了就是:单播和广播的区别。而且,当发布-订阅模型中只有 1 个订阅者时,它和队列模型就一样了,因此在功能上是完全兼容队列模型的。
这也解释了为什么现代主流的 RocketMQ、Kafka 都是直接基于发布-订阅模型实现的?此外,RabbitMQ 中之所以有一个 Exchange 模块?其实也是为了解决消息的投递问题,可以变相实现发布-订阅模型。
包括大家接触到的 “消费组”、“集群消费”、“广播消费” 这些概念,都和上面这两种模型相关,以及在应用层面大家最常见的情形:组间广播、组内单播,也属于此范畴。
所以,先掌握一些共性的理论,对于大家再去学习各个消息中间件的具体实现原理时,其实能更好地抓住本质,分清概念。
[h1]03 透过模型看 MQ 的应用场景[/h1]目前,MQ 的应用场景非常多,大家能倒背如流的是:系统解耦、异步通信和流量削峰。除此之外,还有延迟通知、最终一致性保证、顺序消息、流式处理等等。
那到底是先有消息模型,还是先有应用场景呢?答案肯定是:先有应用场景(也就是先有问题),再有消息模型,因为消息模型只是解决方案的抽象而已。
MQ 经过 30 多年的发展,能从最原始的队列模型发展到今天百花齐放的各种消息中间件(平台级的解决方案),我觉得万变不离其宗,还是得益于:消息模型的适配性很广。
我们试着重新理解下消息队列的模型。它其实解决的是:生产者和消费者的通信问题。那它对比 RPC 有什么联系和区别呢?

通过对比,能很明显地看出两点差异:
1、引入 MQ 后,由之前的一次 RPC 变成了现在的两次 RPC,而且生产者只跟队列耦合,它根本无需知道消费者的存在。
2、多了一个中间节点「队列」进行消息转储,相当于将同步变成了异步。
再返过来思考 MQ 的所有应用场景,就不难理解 MQ 为什么适用了?因为这些应用场景无外乎都利用了上面两个特性。
举一个实际例子,比如说电商业务中最常见的「订单支付」场景:在订单支付成功后,需要更新订单状态、更新用户积分、通知商家有新订单、更新推荐系统中的用户画像等等。

引入 MQ 后,订单支付现在只需要关注它最重要的流程:更新订单状态即可。其他不重要的事情全部交给 MQ 来通知。这便是 MQ 解决的最核心的问题:系统解耦。
改造前订单系统依赖 3 个外部系统,改造后仅仅依赖 MQ,而且后续业务再扩展(比如:营销系统打算针对支付用户奖励优惠券),也不涉及订单系统的修改,从而保证了核心流程的稳定性,降低了维护成本。
这个改造还带来了另外一个好处:因为 MQ 的引入,更新用户积分、通知商家、更新用户画像这些步骤全部变成了异步执行,能减少订单支付的整体耗时,提升订单系统的吞吐量。这便是 MQ 的另一个典型应用场景:异步通信。
除此以外,由于队列能转储消息,对于超出系统承载能力的场景,可以用 MQ 作为 “漏斗” 进行限流保护,即所谓的流量削峰。
我们还可以利用队列本身的顺序性,来满足消息必须按顺序投递的场景;利用队列 + 定时任务来实现消息的延时消费 ……
MQ 其他的应用场景基本类似,都能回归到消息模型的特性上,找到它适用的原因,这里就不一一分析了。
总之,就是建议大家多从复杂多变的实践场景再回归到理论层面进行思考和抽象,这样能吃得更透。
[h1]04 如何设计一个 MQ?[/h1]了解了上面这些理论知识以及应用场景后,下面我们再一起看下:到底如何设计一个 MQ?
4.1 MQ 的雏形
我们还是先从简单版的 MQ 入手,如果只是实现一个很粗糙的 MQ,完全不考虑生产环境的要求,该如何设计呢?
文章开头说过,任何 MQ 无外乎:一发一存一消费,这是 MQ 最核心的功能需求。另外,从技术维度来看 MQ 的通信模型,可以理解成:两次 RPC + 消息转储。
有了这些理解,我相信只要有一定的编程基础,不用 1 个小时就能写出一个 MQ 雏形:
1、直接利用成熟的 RPC 框架(Dubbo 或者 Thrift),实现两个接口:发消息和读消息。
2、消息放在本地内存中即可,数据结构可以用 JDK 自带的 ArrayBlockingQueue 。
4.2 写一个适用于生产环境的 MQ
当然,我们的目标绝不止于一个 MQ 雏形,而是希望实现一个可用于生产环境的消息中间件,那难度肯定就不是一个量级了,具体我们该如何下手呢?
1、先把握这个问题的关键点
假如我们还是只考虑最基础的功能:发消息、存消息、消费消息(支持发布-订阅模式)。
那在生产环境中,这些基础功能将面临哪些挑战呢?我们能很快想到下面这些:
1、高并发场景下,如何保证收发消息的性能?
2、如何保证消息服务的高可用和高可靠?
3、如何保证服务是可以水平任意扩展的?
4、如何保证消息存储也是水平可扩展的?
5、各种元数据(比如集群中的各个节点、主题、消费关系等)如何管理,需不需要考虑数据的一致性?
可见,高并发场景下的三高问题在你设计一个 MQ 时都会遇到,「如何满足高性能、高可靠等非功能性需求」才是这个问题的关键所在。
2、整体设计思路
先来看下整体架构,会涉及三类角色:

另外,将「一发一存一消费」这个核心流程进一步细化后,比较完整的数据流如下:

基于上面两个图,我们可以很快明确出 3 类角色的作用,分别如下:
1、Broker(服务端):MQ 中最核心的部分,是 MQ 的服务端,核心逻辑几乎全在这里,它为生产者和消费者提供 RPC 接口,负责消息的存储、备份和删除,以及消费关系的维护等。
2、Producer(生产者):MQ 的客户端之一,调用 Broker 提供的 RPC 接口发送消息。
3、Consumer(消费者):MQ 的另外一个客户端,调用 Broker 提供的 RPC 接口接收消息,同时完成消费确认。
3、详细设计
下面,再展开讨论下一些具体的技术难点和可行的解决方案。
难点1:RPC 通信
解决的是 Broker 与 Producer 以及 Consumer 之间的通信问题。如果不重复造轮子,直接利用成熟的 RPC 框架 Dubbo 或者 Thrift 实现即可,这样不需要考虑服务注册与发现、负载均衡、通信协议、序列化方式等一系列问题了。
当然,你也可以基于 Netty 来做底层通信,用 Zookeeper、Euraka 等来做注册中心,然后自定义一套新的通信协议(类似 Kafka),也可以基于 AMQP 这种标准化的 MQ 协议来做实现(类似 RabbitMQ)。对比直接用 RPC 框架,这种方案的定制化能力和优化空间更大。
难点2:高可用设计
高可用主要涉及两方面:Broker 服务的高可用、存储方案的高可用。可以拆开讨论。
Broker 服务的高可用,只需要保证 Broker 可水平扩展进行集群部署即可,进一步通过服务自动注册与发现、负载均衡、超时重试机制、发送和消费消息时的 ack 机制来保证。
存储方案的高可用有两个思路:1)参考 Kafka 的分区 + 多副本模式,但是需要考虑分布式场景下数据复制和一致性方案(类似 Zab、Raft等协议),并实现自动故障转移;2)还可以用主流的 DB、分布式文件系统、带持久化能力的 KV 系统,它们都有自己的高可用方案。
难点3:存储设计
消息的存储方案是 MQ 的核心部分,可靠性保证已经在高可用设计中谈过了,可靠性要求不高的话直接用内存或者分布式缓存也可以。这里重点说一下存储的高性能如何保证?这个问题的决定因素在于存储结构的设计。
目前主流的方案是:追加写日志文件(数据部分) + 索引文件的方式(很多主流的开源 MQ 都是这种方式),索引设计上可以考虑稠密索引或者稀疏索引,查找消息可以利用跳转表、二份查找等,还可以通过操作系统的页缓存、零拷贝等技术来提升磁盘文件的读写性能。
如果不追求很高的性能,也可以考虑现成的分布式文件系统、KV 存储或者数据库方案。
难点4:消费关系管理
为了支持发布-订阅的广播模式,Broker 需要知道每个主题都有哪些 Consumer 订阅了,基于这个关系进行消息投递。
由于 Broker 是集群部署的,所以消费关系通常维护在公共存储上,可以基于 Zookeeper、Apollo 等配置中心来管理以及进行变更通知。
难点5:高性能设计
存储的高性能前面已经谈过了,当然还可以从其他方面进一步优化性能。
比如 Reactor 网络 IO 模型、业务线程池的设计、生产端的批量发送、Broker 端的异步刷盘、消费端的批量拉取等等。
4.3 小结
再总结下,要回答好:如何设计一个 MQ?
1、需要从功能性需求(收发消息)和非功能性需求(高性能、高可用、高扩展等)两方面入手。
2、功能性需求不是重点,能覆盖 MQ 最基础的功能即可,至于延时消息、事务消息、重试队列等高级特性只是锦上添花的东西。
3、最核心的是:能结合功能性需求,理清楚整体的数据流,然后顺着这个思路去考虑非功能性的诉求如何满足,这才是技术难点所在。
[h1]05 写在最后[/h1]上面这些内容从 MQ  一发一存一消费这个本质出发,讲解了消息模型的演进过程,这是 MQ 最核心的理论基础。基于此,大家也能更容易理解 MQ 的各种新名词以及应用场景。
最后通过回答:如何设计一个 MQ?目的是让大家对 MQ 的核心组件和技术难点有一个清晰的认识。另外,带着这个问题的答案再去学习 Kafka、RocketMQ 等具体的消息中间件时,也会更有侧重点。
要是觉得不错,那就帮我
@Lowry
点个赞,关注一下我呗,硬核码字不易
3#
有关回应  16级独孤 | 2022-2-12 16:50:14

我想用一场别开生面的面试,去回答您的这个问题。
[h1]面试开始[/h1]
一个风度翩翩,穿着格子衬衣的中年男子,拿着一个满是划痕的mac向你走来,看着铮亮的头,心想着肯定是尼玛顶级架构师吧!但是我们看过暖男敖丙的系列,腹有诗书气自华,虚都不虚。
[h2]你为啥用消息队列? [/h2]噗此,这也叫问题?别人用了我能不用么?别人用了我就用了呗,我就是为了用而用。
你心里嘀咕就好了,千万别说出来哈,说出来了没拿到Offer别到时候就在那说,敖丙那个渣男教我说的!

面试官你好:我们公司本身的业务体量很小,所以直接单机一把梭啥都能搞定了,但是后面业务体量不断扩大,采用微服务的设计思想分布式的部署方式,所以拆分了很多的服务,随着体量的增加以及业务场景越来越复杂了,很多场景单机的技术栈和中间件以及不够用了,而且对系统的友好性也下降了,最后做了很多技术选型的工作,我们决定引入消息队列中间件
[h2]哦?你说到业务场景越来越复杂,你那说一下你都在什么场景用到了消息队列? [/h2]嗯,我从三个方面去说一下我使用的场景吧。
Tip:这三个场景也是消息队列的经典场景,大家基本上要烂熟于心那种,就是一说到消息队列你脑子就要想到异步、削峰、解耦,条件反射那种。
https://github.com/JavaFamily 已经收录,有一线大厂面试点思维导图,也整理了很多我的文档,欢迎Star和完善,大家面试可以参照考点复习,希望我们一起有点东西。

4#
有关回应  16级独孤 | 2022-2-12 16:50:15
5#
有关回应  16级独孤 | 2022-2-12 16:50:16
[h1]一、什么是消息队列[/h1]消息队列(Message Queue,简称MQ),指保存消息的一个容器,本质是个队列。
消息(Message)是指在应用之间传送的数据,消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。
消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,有消息系统来确保信息的可靠专递,消息发布者只管把消息发布到MQ中而不管谁来取,消息使用者只管从MQ中取消息而不管谁发布的,这样发布者和使用者都不用知道对方的存在。





Producer:消息生产者,负责产生和发送消息到 Broker;
Broker:消息处理中心。负责消息存储、确认、重试等,一般其中会包含多个 queue;
Consumer:消息消费者,负责从 Broker 中获取消息,并进行相应处理;


[h1]二、为什么需要消息队列[/h1]1、屏蔽异构平台的细节:发送方、接收方系统之间不需要了解双方,只需认识消息。
2、异步:消息堆积能力;发送方接收方不需同时在线,发送方接收方不需同时扩容(削峰)。
3、解耦:防止引入过多的API给系统的稳定性带来风险;调用方使用不当会给被调用方系统造成压力,被调用方处理不当会降低调用方系统的响应能力。
4、复用:一次发送多次消费。
5、可靠:一次保证消息的传递。如果发送消息时接收者不可用,消息队列会保留消息,直到成功地传递它。
6、提供路由:发送者无需与接收者建立连接,双方通过消息队列保证消息能够从发送者路由到接收者,甚至对于本来网络不易互通的两个服务,也可以提供消息路由。


[h1]三、消息队列有什么优点和缺点[/h1]1、核心优点
   解耦、异步、削峰
2、缺点
1)系统可用性降低:系统引入的外部依赖越多,越容易挂掉。
   2)系统复杂度提高了
   3)一致性问题:消息传递给多个系统,部分执行成功,部分执行失败,容易导致数据不一致


[h1]四、消息队列对比[/h1]




各种对比之后,有如下建议:
     ActiveMQ,没经过大规模吞吐量场景的验证,社区也不是很活跃,所以不推荐;
     RabbitMQ,虽然erlang 语言阻止了大量的 Java 工程师去深入研究和掌控它,对公司而言,几乎处于不可控的状态,但是毕竟是开源的,比较稳定的支持,活跃度也高,推荐中小型公司使用;推荐
    RocketMQ,阿里出品,Java语言编写,经过了阿里多年双十一大促的考验,性能和稳定性得到了充分的严重。目前在业界被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binlog分发等场景;强烈推荐
kafka,如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。


[h1]五、RabbitMQ介绍[/h1][h1]1、RabbitMQ基础概念[/h1]通常我们谈到消息队列服务, 会有三个概念: 发消息者、消息队列、收消息者。
RabbitMQ是一套开源(MPL)的消息队列服务软件,是由 LShift 提供的一个 Advanced Message Queuing Protocol (AMQP) 的开源实现,由以高性能、健壮以及可伸缩性出名的 Erlang 写成。
RabbitMQ 在这个基本概念之上, 多做了一层抽象, 在发消息者和队列之间, 加入了交换器 (Exchange)。这样发消息者和消息队列就没有直接联系,转而变成发消息者把消息发给交换器,交换器根据调度策略再把消息转发给消息队列。
消息生产者并没有直接将消息发送给消息队列,而是通过建立与Exchange的Channel,将消息发送给Exchange。Exchange根据路由规则,将消息转发给指定的消息队列。消息队列储存消息,等待消费者取出消息。消费者通过建立与消息队列相连的Channel,从消息队列中获取消息。





1.Channel(信道):多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,复用TCP连接的通道。
2.Producer(消息的生产者):向消息队列发布消息的客户端应用程序。
3.Consumer(消息的消费者):从消息队列取得消息的客户端应用程序。
4.Message(消息):消息由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(消息优先权)、delivery-mode(是否持久性存储)等。
5.Routing Key(路由键):消息头的一个属性,用于标记消息的路由规则,决定了交换机的转发路径。最大长度255 字节。
6.Queue(消息队列):存储消息的一种数据结构,用来保存消息,直到消息发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将消息取走。需要注意,当多个消费者订阅同一个Queue,这时Queue中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理,每一条消息只能被一个订阅者接收。
7.Exchange(交换器|路由器):提供Producer到Queue之间的匹配,接收生产者发送的消息并将这些消息按照路由规则转发到消息队列。交换器用于转发消息,它不会存储消息 ,如果没有 Queue绑定到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息。交换器有四种消息调度策略(下面会介绍),分别是fanout, direct, topic, headers。
8.Binding(绑定):用于建立Exchange和Queue之间的关联。一个绑定就是基于Binding Key将Exchange和Queue连接起来的路由规则,所以可以将交换器理解成一个由Binding构成的路由表。
6.Binding Key(绑定键):Exchange与Queue的绑定关系,用于匹配Routing Key。最大长度255 字节。
7.Broker:RabbitMQ Server,服务器实体。
[h1]2、Exchange消息调度策略[/h1]调度策略是指Exchange在收到生产者发送的消息后依据什么规则把消息转发到一个或多个队列中保存。调度策略与三个因素相关:Exchange Type(Exchange的类型),Binding Key(Exchange和Queue的绑定关系),消息的标记信息(Routing Key和headers)。


Exchange根据消息的Routing Key和Exchange绑定Queue的Binding Key分配消息。生产者在将消息发送给Exchange的时候,一般会指定一个Routing Key,来指定这个消息的路由规则,而这个Routing Key需要与Exchange Type及Binding Key联合使用才能最终生效。


在Exchange Type与Binding Key固定的情况下(一般这些内容都是固定配置好的),我们的生产者就可以在发送消息给Exchange时,通过指定Routing Key来决定消息流向哪里。


  • Fanout (订阅模式|广播模式)


交换器会把所有发送到该交换器的消息路由到所有与该交换器绑定的消息队列中。订阅模式
与Binding Key和Routing Key无关,交换器将接受到的消息分发给有绑定关系的所有消息队列队列(不论Binding Key和Routing Key是什么)。类似于子网广播,子网内的每台主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。


2、Direct(路由模式)


精确匹配:当消息的Routing Key与 Exchange和Queue 之间的Binding Key完全匹配,如果匹配成功,将消息分发到该Queue。只有当Routing Key和Binding Key完全匹配的时候,消息队列才可以获取消息。Direct是Exchange的默认模式。
RabbitMQ默认提供了一个Exchange,名字是空字符串,类型是Direct,绑定到所有的Queue(每一个Queue和这个无名Exchange之间的Binding Key是Queue的名字)。所以,有时候我们感觉不需要交换器也可以发送和接收消息,但是实际上是使用了RabbitMQ默认提供的Exchange。


3、Topic (通配符模式)


按照正则表达式模糊匹配:用消息的Routing Key与 Exchange和Queue 之间的Binding Key进行模糊匹配,如果匹配成功,将消息分发到该Queue。
Routing Key是一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词)。Binding Key与Routing Key一样也是句点号“. ”分隔的字符串。Binding Key中可以存在两种特殊字符“ ”与“#”,用于做模糊匹配,其中“”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)。


4、Headers(键值对模式)


Headers不依赖于Routing Key与Binding Key的匹配规则来转发消息,交换器的路由规则是通过消息头的Headers属性来进行匹配转发的,类似HTTP请求的Headers。
在绑定Queue与Exchange时指定一组键值对,键值对的Hash结构中要求携带一个键“x-match”,这个键的Value可以是any或all,代表消息携带的Hash是需要全部匹配(all),还是仅匹配一个键(any)。
当消息发送到Exchange时,交换器会取到该消息的headers,对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对;如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue。Headers交换机的优势是匹配的规则不被限定为字符串(String),而是Object类型。


如下图所示,RabbitMQdirect模式通过RoutingKey来精准匹配,RoutingKey为red的投递到Queue1,RoutingKey为black和white的投递到Queue2。





我们可以假设一个场景,我们要做一个日志模块来收集处理不同的日志,日志区分包含三个维度的标准:模块、日志紧急程度、日志重要程度。模块分为:red、black、white;紧急程度分为:critical、normal;把重要程度分为:medium、low、high在RoutingKey字段中我们把这三个维度通过两个“.“连接起来。
现在我们需要对black模块,紧急程度为critical,重要程度为high的日志分配到队列1打印到屏幕;对所以模块重要程度为high的日志和white紧急程度为critical的日志发送到队列2持久化到硬盘。如下示例:





RoutingKey为“black.critical.high”的日志会投递到queue1和queue2,。
RoutingKey为“red.critical.high”的日志会只投递到queue2。
RoutingKey为“white.critical.high”的日志会投递到queue2,并且虽然queue2的两个匹配规则都符合但只会向queue2投递一份。


[h1]3、RabbitMQ其他概念[/h1]1、rpc
生产者根据消费者返回的值,针对不同的消息(有唯一标识 或 类型标识)执行相应的逻辑
2、消息应答
消费者拿到消息并处理完相应逻辑后,必须应答,否则队列中的消息会不断堆积
3、消息持久化
队列可以设置将消息持久化,防止服务器突然宕机,导致消息丢失,当然这样会损耗性能,占用磁盘空间
4、分发机制
①轮询分发:队列给每一个消费者发送数量一样的数据
②公平分发:消费者设置每次从队列中取一条数据,并且消费完后手动应答,继续从队列取下一个数据。
5、事务
对信道(Channel)设置,保证了生产者确实将消息推送到了服务器中,但背离了消息队列解耦的本质
6、Confirm机制
对信道(Channel)设置,保证了生产者确实将消息推送到了服务器中,是异步的,性能比事务好很多
7、Alternate Exchange(代替交换器)
是Rabbitmq自己扩展的功能,不是AMQP协议定义的,可以将没有正确分发的消息交由该 交换机 再次分发,再次分发失败,则消息丢失
8、TTL(生存时间)
9、Queue Length Limit(队列长度限制)
10、Dead Letter Exchange(死信交换器)
11、priority queue(优先级队列)
12、延迟队列
①延迟消费。比如:
用户生成订单之后,需要过一段时间校验订单的支付状态,如果订单仍未支付则需要及时地关闭订单。
用户注册成功之后,需要过一段时间比如一周后校验用户的使用情况,如果发现用户活跃度较低,则发送邮件或者短信来提醒用户使用。
②延迟重试。比如消费者从队列里消费消息时失败了,但是想要延迟一段时间后自动重试。
通过设置TTL(生存时间) 和 Dead Letter Exchange(死信交换器),将消息重新分发。


[h1]六、RabbitMQ使用[/h1]1、添加maven依赖

  org.springframework.boot
  spring-boot-starter-amqp

2、添加rabbitmq java config 配置类















3、发送消息





4、消费消息





5、禁用spring boot 自动配置(约定大于配置)





以上就是基于spring boot 的 maven项目 集成rabbitmq 的所有配置。
剩下的就是交换机的灵活运用了。


6#
有关回应  16级独孤 | 2022-2-12 16:50:17
著作权归作者所有。
商业转载请联系作者获得授权,非商业转载请注明出处。
作者:艾小仙
链接:《我想进大厂》之MQ夺命连环11问
来源:微信公众号
如果你能回答出下面这些问题,你应该就掌握了消息队列到底是什么了。
继之前的mysql夺命连环之后,我发现我这个标题被好多套用的,什么夺命zookeeper,夺命多线程一大堆,这一次,开始面试题系列MQ专题,消息队列作为日常常见的使用中间件,面试也是必问的点之一,一起来看看MQ的面试题。
[h1]你们为什么使用mq?具体的使用场景是什么?[/h1]mq的作用很简单,削峰填谷。以电商交易下单的场景来说,正向交易的过程可能涉及到创建订单、扣减库存、扣减活动预算、扣减积分等等。每个接口的耗时如果是100ms,那么理论上整个下单的链路就需要耗费400ms,这个时间显然是太长了。

如果这些操作全部同步处理的话,首先调用链路太长影响接口性能,其次分布式事务的问题很难处理,这时候像扣减预算和积分这种对实时一致性要求没有那么高的请求,完全就可以通过mq异步的方式去处理了。同时,考虑到异步带来的不一致的问题,我们可以通过job去重试保证接口调用成功,而且一般公司都会有核对的平台,比如下单成功但是未扣减积分的这种问题可以通过核对作为兜底的处理方案。

使用mq之后我们的链路变简单了,同时异步发送消息我们的整个系统的抗压能力也上升了。
[h1]那你们使用什么mq?基于什么做的选型?[/h1]我们主要调研了几个主流的mq,kafka、rabbitmq、rocketmq、activemq,选型我们主要基于以下几个点去考虑:
  • 由于我们系统的qps压力比较大,所以性能是首要考虑的要素。
  • 开发语言,由于我们的开发语言是java,主要是为了方便二次开发。
  • 对于高并发的业务场景是必须的,所以需要支持分布式架构的设计。
  • 功能全面,由于不同的业务场景,可能会用到顺序消息、事务消息等。
基于以上几个考虑,我们最终选择了RocketMQ。

[h1]你上面提到异步发送,那消息可靠性怎么保证?[/h1]消息丢失可能发生在生产者发送消息、MQ本身丢失消息、消费者丢失消息3个方面。
[h1]生产者丢失 [/h1]生产者丢失消息的可能点在于程序发送失败抛异常了没有重试处理,或者发送的过程成功但是过程中网络闪断MQ没收到,消息就丢失了。
由于同步发送的一般不会出现这样使用方式,所以我们就不考虑同步发送的问题,我们基于异步发送的场景来说。
异步发送分为两个方式:异步有回调和异步无回调,无回调的方式,生产者发送完后不管结果可能就会造成消息丢失,而通过异步发送+回调通知+本地消息表的形式我们就可以做出一个解决方案。以下单的场景举例。
  • 下单后先保存本地数据和MQ消息表,这时候消息的状态是发送中,如果本地事务失败,那么下单失败,事务回滚。
  • 下单成功,直接返回客户端成功,异步发送MQ消息
  • MQ回调通知消息发送结果,对应更新数据库MQ发送状态
  • JOB轮询超过一定时间(时间根据业务配置)还未发送成功的消息去重试
  • 在监控平台配置或者JOB程序处理超过一定次数一直发送不成功的消息,告警,人工介入。



一般而言,对于大部分场景来说异步回调的形式就可以了,只有那种需要完全保证不能丢失消息的场景我们做一套完整的解决方案。
[h1]MQ丢失 [/h1]如果生产者保证消息发送到MQ,而MQ收到消息后还在内存中,这时候宕机了又没来得及同步给从节点,就有可能导致消息丢失。
比如RocketMQ:
RocketMQ分为同步刷盘和异步刷盘两种方式,默认的是异步刷盘,就有可能导致消息还未刷到硬盘上就丢失了,可以通过设置为同步刷盘的方式来保证消息可靠性,这样即使MQ挂了,恢复的时候也可以从磁盘中去恢复消息。
比如Kafka也可以通过配置做到:
  1. acks=all 只有参与复制的所有节点全部收到消息,才返回生产者成功。这样的话除非所有的节点都挂了,消息才会丢失。replication.factor=N,设置大于1的数,这会要求每个partion至少有2个副本min.insync.replicas=N,设置大于1的数,这会要求leader至少感知到一个follower还保持着连接retries=N,设置一个非常大的值,让生产者发送失败一直重试
复制代码
虽然我们可以通过配置的方式来达到MQ本身高可用的目的,但是都对性能有损耗,怎样配置需要根据业务做出权衡。
[h1]消费者丢失 [/h1]消费者丢失消息的场景:消费者刚收到消息,此时服务器宕机,MQ认为消费者已经消费,不会重复发送消息,消息丢失。
RocketMQ默认是需要消费者回复ack确认,而kafka需要手动开启配置关闭自动offset。
消费方不返回ack确认,重发的机制根据MQ类型的不同发送时间间隔、次数都不尽相同,如果重试超过次数之后会进入死信队列,需要手工来处理了。(Kafka没有这些)

[h1]你说到消费者消费失败的问题,那么如果一直消费失败导致消息积压怎么处理?[/h1]因为考虑到时消费者消费一直出错的问题,那么我们可以从以下几个角度来考虑:
  • 消费者出错,肯定是程序或者其他问题导致的,如果容易修复,先把问题修复,让consumer恢复正常消费
  • 如果时间来不及处理很麻烦,做转发处理,写一个临时的consumer消费方案,先把消息消费,然后再转发到一个新的topic和MQ资源,这个新的topic的机器资源单独申请,要能承载住当前积压的消息
  • 处理完积压数据后,修复consumer,去消费新的MQ和现有的MQ数据,新MQ消费完成后恢复原状

[h1]那如果消息积压达到磁盘上限,消息被删除了怎么办?[/h1]这。。。他妈都删除了我有啥办法啊。。。冷静,再想想。。有了。



最初,我们发送的消息记录是落库保存了的,而转发发送的数据也保存了,那么我们就可以通过这部分数据来找到丢失的那部分数据,再单独跑个脚本重发就可以了。如果转发的程序没有落库,那就和消费方的记录去做对比,只是过程会更艰难一点。
[h1]说了这么多,那你说说RocketMQ实现原理吧?[/h1]RocketMQ由NameServer注册中心集群、Producer生产者集群、Consumer消费者集群和若干Broker(RocketMQ进程)组成,它的架构原理是这样的:
  • Broker在启动的时候去向所有的NameServer注册,并保持长连接,每30s发送一次心跳
  • Producer在发送消息的时候从NameServer获取Broker服务器地址,根据负载均衡算法选择一台服务器来发送消息
  • Conusmer消费消息的时候同样从NameServer获取Broker地址,然后主动拉取消息来消费

[h1]为什么RocketMQ不使用Zookeeper作为注册中心呢?[/h1]我认为有以下几个点是不使用zookeeper的原因:
  • 根据CAP理论,同时最多只能满足两个点,而zookeeper满足的是CP,也就是说zookeeper并不能保证服务的可用性,zookeeper在进行选举的时候,整个选举的时间太长,期间整个集群都处于不可用的状态,而这对于一个注册中心来说肯定是不能接受的,作为服务发现来说就应该是为可用性而设计。
  • 基于性能的考虑,NameServer本身的实现非常轻量,而且可以通过增加机器的方式水平扩展,增加集群的抗压能力,而zookeeper的写是不可扩展的,而zookeeper要解决这个问题只能通过划分领域,划分多个zookeeper集群来解决,首先操作起来太复杂,其次这样还是又违反了CAP中的A的设计,导致服务之间是不连通的。
  • 持久化的机制来带的问题,ZooKeeper 的 ZAB 协议对每一个写请求,会在每个 ZooKeeper 节点上保持写一个事务日志,同时再加上定期的将内存数据镜像(Snapshot)到磁盘来保证数据的一致性和持久性,而对于一个简单的服务发现的场景来说,这其实没有太大的必要,这个实现方案太重了。而且本身存储的数据应该是高度定制化的。
  • 消息发送应该弱依赖注册中心,而RocketMQ的设计理念也正是基于此,生产者在第一次发送消息的时候从NameServer获取到Broker地址后缓存到本地,如果NameServer整个集群不可用,短时间内对于生产者和消费者并不会产生太大影响。
[h1]那Broker是怎么保存数据的呢?[/h1]RocketMQ主要的存储文件包括commitlog文件、consumequeue文件、indexfile文件。
Broker在收到消息之后,会把消息保存到commitlog的文件当中,而同时在分布式的存储当中,每个broker都会保存一部分topic的数据,同时,每个topic对应的messagequeue下都会生成consumequeue文件用于保存commitlog的物理位置偏移量offset,indexfile中会保存key和offset的对应关系。



CommitLog文件保存于${Rocket_Home}/store/commitlog目录中,从图中我们可以明显看出来文件名的偏移量,每个文件默认1G,写满后自动生成一个新的文件。



由于同一个topic的消息并不是连续的存储在commitlog中,消费者如果直接从commitlog获取消息效率非常低,所以通过consumequeue保存commitlog中消息的偏移量的物理地址,这样消费者在消费的时候先从consumequeue中根据偏移量定位到具体的commitlog物理文件,然后根据一定的规则(offset和文件大小取模)在commitlog中快速定位。

[h1]Master和Slave之间是怎么同步数据的呢?[/h1]而消息在master和slave之间的同步是根据raft协议来进行的:
  • 在broker收到消息后,会被标记为uncommitted状态
  • 然后会把消息发送给所有的slave
  • slave在收到消息之后返回ack响应给master
  • master在收到超过半数的ack之后,把消息标记为committed
  • 发送committed消息给所有slave,slave也修改状态为committed
[h1]你知道RocketMQ为什么速度快吗?[/h1]是因为使用了顺序存储、Page Cache和异步刷盘。
  • 我们在写入commitlog的时候是顺序写入的,这样比随机写入的性能就会提高很多
  • 写入commitlog的时候并不是直接写入磁盘,而是先写入操作系统的PageCache
  • 最后由操作系统异步将缓存中的数据刷到磁盘
[h1]什么是事务、半事务消息?怎么实现的?[/h1]事务消息就是MQ提供的类似XA的分布式事务能力,通过事务消息可以达到分布式事务的最终一致性。
半事务消息就是MQ收到了生产者的消息,但是没有收到二次确认,不能投递的消息。
实现原理如下:
  • 生产者先发送一条半事务消息到MQ
  • MQ收到消息后返回ack确认
  • 生产者开始执行本地事务
  • 如果事务执行成功发送commit到MQ,失败发送rollback
  • 如果MQ长时间未收到生产者的二次确认commit或者rollback,MQ对生产者发起消息回查
  • 生产者查询事务执行最终状态
  • 根据查询事务状态再次提交二次确认
最终,如果MQ收到二次确认commit,就可以把消息投递给消费者,反之如果是rollback,消息会保存下来并且在3天后被删除。

- END -
原文地址
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:136515
帖子:27303
精华:0
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP