初识消息队列

常见的架构特性

高可用性

假设系统一直能够提供服务,我们说系统的可用性是100%。
如果系统每运行100个时间单位,会有1个时间单位无法提供服务,我们说系统的可用性是99%。
方法论上,高可用保证的原则是“集群化”

事务和一致性

强一致性:两个系统的结果任何时间都保持一致,都成功,或者都失败。
解决方案:分布式事务
最终一致性:两个消息系统的结果进过一段时间后最终一致,都成功,或者都失败。
解决方案:本地事务处理业务落地和消息落地。
消息落地失败整个事务回滚。
消息落地之后就可以做相应的补偿。

后者的优劣:
优:成本降低,复杂度降低。
劣:1.必须要建立一个存放消息的空间。2.对于消息延迟高敏感的业务不适用。

“补偿”如果采用自动重复调用的方式,就会产生非幂等性的操作。
如果最后消息依旧存在,可以手动补偿。

幂等性

对同一个系统,使用同样的条件,一次请求和重复的多次请求对系统资源的影响是一致的。
幂等性的提出是为了解决由网络原因带来的不一致的问题。
并且解决重复提交带来的问题
解决方案有:多版本并发控制,TOKEN机制,利用去重表,select+update

多服务使用一个数据库的操作是幂等的吗?

消息队列介绍

消息队列工作流程

消息格式

参数 类型 描述
appid string 应用的ID,SKMQ会根据应用ID将消息投递给不同的消费者
msgid string 消息ID,必须保证ID的唯一性,SKMQ会过滤重复的消息
type string 消息类型,生产者发送消息、消息队列投递消息、消费者反馈都会附带相应的消息类型
content byte 消息内容,SKMQ会将它转发给相应的消费者
  • topic 生产者要发送的消息类型 - 应用内广播
  • queue 生产中要发送的消息类型 - 点对点单播
  • push MQ推送消息时的默认类型
  • resp MQ所有的返回信息类型均为 resp ,此时的返回内容为json text,状态包含在返回内容中
  • register 注册收件人(消费者),此时消息的content应该为合法的json text,否则无法完成注册。
{
  "Status": "arrived | ack | reject | error",
  "Content": "response msg"
}
  • arrived MQ投递消息时,消费者在接收到消息时需先发送arrived类型的消息,在确保消息落地时才发送ack消息
  • ack 对消息队列的正常响应信号
  • reject 当消费者无法消费时,发送一个reject消息给消息队列,消息队列收到reject时,会将消息投入到待重传队列重新排队
  • error 消息处理失败的标志,消息同样会进入待重传队列

ack

消息到达消费者节点时,消费者先发送一个 arrived 类型的消息告知MQ信件已经到达,MQ会等待一段时间,
在这段时间内,如果消费者很快完成任务处理,可以立即发送ack响应告知MQ消息已经处理完成;若未完成,
MQ会关闭连接,消费者可以随后主动发送ack消息告知MQ消息已处理完成

心跳检测

为了消息投递的稳定性,MQ会定期监测消费者节点,通过心跳包检查节点是否失联并及时将其标记,不参与下次消息接收

设计一个消息队列

高可用性

集群环境下使用同一个存储是幂等的,如果不使用同一个,就要做好主从复制。

服务端承载消息堆积的能力

存储无非是持久化和非持久化两种。

存储子系统的选择

持久化的前提下:
速度:文件系统>分布式KV(持久化)>分布式文件系统>数据库。可靠性相反。

消息队列的作用+设计思路

  1. 解耦:立刻返回请求结果,请求过程放到队列中异步处理。
  2. 最终一致性:将调用过程放到消息队列中,调用成功就删除消息,最终一致性不是消息队列的必备特性,使用消息队列实现消息的落地不能保证100%不丢失消息(譬如停电)。
  3. 广播:谁关心谁订阅
  4. 错峰与流控

对于消息的后续自动“补偿”处理,消息服务端(broker)“确认”消息(ack)正确后,才会删除本地。
消息队列应该把消息确认和消息处理分开,通过ack可以返回消息确认的结果。
默认Auto Ack不能确定消息处理有没有失败。
消息处理失败时可以主动的ack error
如果失败(包括超时)或者误以为失败,则定时重发。
误以为失败的情况下,会出现消息重复的问题。

重复消息不可能100%避免,除非允许丢失。
不丢消息和消息不重复,两者不可兼得。
一般情况下,消息可以通过持久化的id来鉴别重复,但是消息不能永远存在,但是最终这些id总会消失,这时产生了异常情况。
现在对于重复消息有两个问题:

  1. 如何鉴别重复消息,并且处理重复消息?
  2. 一个消息队列如何减少重复消息。

版本号方式解决问题1

每个消息带有一个版本号
下游(服务器或者消费者)维护一个版本号’,只处理版本号>版本号’的消息。
这样做会出现的问题:如果版本号2的消息先到,版本号1的厚道,就gg了。
解决方案,只处理版本号=版本号’+1的消息,对于其他的消息,下游要保存起来。
优:解决了重复问题
劣:1.发送方的消息必须要带有版本号。2.对于顺序严格的,下游要保存消息的版本号。

状态机解决问题1

定义消息状态的流转关系

减少重复消息的方案

  1. broker不处理重复的id消息
  2. 心跳定期监测消费者节点

上面解决消息重复的一些方案对于消息队列而言并没有严格定义,由各种消息队列去各自实现。

消息队列对于非事务性业务的支持

并不是每个业务都要求事务。下单和发送短信就不需要,不能因为发送短信失败就导致下单回滚。一个完整的消息队列应该定义清楚自己可以投递的消息类型,如事务型消息,本地非持久型消息,以及服务端不落地的非可靠消息等。

消息队列的同步和异步

对于服务端是同步还是异步,主要看返回的是result还是Future。

客户端同步服务端异步

Future<Result> future = request(server);
//server立刻返回future
ynchronized(future)
{    while(!future.isDone())
        { future.wait();
          //server处理结束后会notify这个future,并修改isdone标志
        }
}
...
return future.get();

客户端同步服务端同步

Result result = request(server);

客户端异步服务端同步

Future<Result> future = executor.submit(new Callable(){
                              public void call<Result>()
                                {
                                  result = request(server);
                                }
                              })
return future;

客户端异步服务端异步。

Future<Result> future = request(server);
//server立刻返回futurereturn future

消息队列服务端66666·异步的好处是可以先返回一个Future
把消息堆积起来最后batch处理,提高效率,解放了同步处理需要创建的很多个线程。

最后一个客户端异步并没有创建线程,不影响后续主流程,但是因为获取数据时需要正确的结果,而在future.get()就会等待正确结果的生成。
这和我们创建一个线程,并在线程中加同步是一致的。
https://www.jianshu.com/p/fee513982f8e
https://blog.csdn.net/laoyang360/article/details/52244917

本文标题:初识消息队列

文章作者:Sun

发布时间:2018年10月28日 - 15:10

最后更新:2020年06月23日 - 15:06

原始链接:https://sunyi720.github.io/2018/10/28/Java/再识消息队列/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。