消息队列使用场景 为什么会需要消息队列(mq)?
解耦
在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。消息系统在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
冗余
有些情况下,处理数据的过程会失败。除非数据被持久化,否则将造成丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的”插入-获取-删除”范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。
扩展性
因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。不需要改变代码、不需要调节参数。扩展就像调大电力按钮一样简单。
灵活性 & 峰值处理能力
在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见;如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。
可恢复性
系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
顺序保证
在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。kafka保证一个partition内的消息的有序性。
缓冲
在任何重要的系统中,都会有需要不同的处理时间的元素。例如,加载一张图片比应用过滤器花费更少的时间。消息队列通过一个缓冲层来帮助任务最高效率的执行———写入队列的处理会尽可能的快速。该缓冲有助于控制和优化数据流经过系统的速度。
异步通信
很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
mq常用的使用场景:
1. 进程间通讯和系统间的消息通知,比如在分布式系统中。
2. 解耦,比如像我们公司有许多开发团队,每个团队负责业务的不同模块,各个开发团队可以使用mq来通信。
3. 在一些高并发场景下,使用mq的异步特性。
消息队列和rpc对比 系统架构
rpc系统结构:
+----------++----------+|consumer||provider|+----------++----------+
consumer调用的provider提供的服务。
message queue系统结构:
+--------++-------++----------+|sender||queue||receiver|+--------++-------++----------+
sender发送消息给queue;receiver从queue拿到消息来处理。
功能特点
在架构上,rpc和message queue的差异点是,message queue有一个中间结点message queue(broker),可以把消息存储。
消息队列的特点
message queue把请求的压力保存一下,逐渐释放出来,让处理者按照自己的节奏来处理。
message queue引入一下新的结点,让系统的可靠性会受message queue结点的影响。
message queue是异步单向的消息。发送消息设计成是不需要等待消息处理的完成。
所以对于有同步返回需求,用message queue则变得麻烦了。
rpc的特点
同步调用,对于要等待返回结果/处理结果的场景,rpc是可以非常自然直觉的使用方式。rpc也可以是异步调用。
由于等待结果,consumer(client)会有线程消耗。
如果以异步rpc的方式使用,consumer(client)线程消耗可以去掉。但不能做到像消息一样暂存消息/请求,压力会直接传导到服务provider。
rpc适用场合说明
希望同步得到结果的场合,rpc合适。
希望使用简单,则rpc;rpc操作基于接口,使用简单,使用方式模拟本地调用。异步的方式编程比较复杂。
不希望发送端(rpc consumer、message sender)受限于处理端(rpc provider、message receiver)的速度时,使用message queue。
随着业务增长,有的处理端处理量会成为瓶颈,会进行同步调用到异步消息的改造。这样的改造实际上有调整业务的使用方式。
比如原来一个操作页面提交后就下一个页面会看到处理结果;改造后异步消息后,下一个页面就会变成“操作已提交,完成后会得到通知”。
rpc不适用场合说明
rpc同步调用使用message queue来传输调用信息。 上面分析可以知道,这样的做法,发送端是在等待,同时占用一个中间点的资源。变得复杂了,但没有对等的收益。
对于返回值是void的调用,可以这样做,因为实际上这个调用业务上往往不需要同步得到处理结果的,只要保证会处理即可。(rpc的方式可以保证调用返回即处理完成,使用消息方式后这一点不能保证了。)
返回值是void的调用,使用消息,效果上是把消息的使用方式wrap成了服务调用(服务调用使用方式成简单,基于业务接口)。
常用的消息队列及使用场景 activemq
acitvemq是作为一种消息存储和分发组件,涉及到client与broker端数据交互的方方面面,它不仅要担保消息的存储安全性,还要提供额外的手段来确保消息的分发是可靠的。
activemq消息传送机制
producer客户端使用来发送消息的, consumer客户端用来消费消息;它们的协同中心就是activemq broker,broker也是让producer和consumer调用过程解耦的工具,最终实现了异步rpc/数据交换的功能。随着activemq的不断发展,支持了越来越多的特性,也解决开发者在各种场景下使用activemq的需求。比如producer支持异步调用;使用flow control机制让broker协同consumer的消费速率;consumer端可以使用prefetchack来最大化消息消费的速率;提供”重发策略”等来提高消息的安全性等。在此我们不详细介绍。
一条消息的生命周期如下:
图片中简单的描述了一条消息的生命周期,不过在不同的架构环境中,message的流动行可能更加复杂.将在稍后有关broker的架构中详解..一条消息从producer端发出之后,一旦被broker正确保存,那么它将会被consumer消费,然后ack,broker端才会删除;不过当消息过期或者存储设备溢出时,也会终结它。
activemq的安装
启动后,activemq会占用两个端口,一个是负责接收发送消息的tcp端口:61616,一个是基于web负责用户界面化管理的端口:8161。这两个端口可以在conf下面的xml中找到。http服务器使用了jettry。这里有个问题是启动mq后,很长时间管理界面才可以显示出来。可以使用netstat -an|find “61616”来测试activemq是否启动。
jms与activemq的结合
jms是一个用于提供消息服务的技术规范,它制定了在整个消息服务提供过程中的所有数据结构和交互流程。而mq则是消息队列服务,是面向消息中间件(mom)的最终实现,是真正的服务提供者;mq的实现可以基于jms,也可以基于其他规范或标准。目前选择的最多的是activemq。
jms支持两种消息传递模型:点对点(point-to-point,简称ptp)和发布/订阅(publish/subscribe,简称pub/sub)。这两种消息传递模型非常相似,但有以下区别:
ptp消息传递模型规定了一条消息之恩能够传递费一个接收方。
pub/sub消息传递模型允许一条消息传递给多个接收方
点对点模型
通过点对点的消息传递模型,一个应用程序可以向另外一个应用程序发送消息。在此传递模型中,目标类型是队列。消息首先被传送至队列目标,然后从该队列将消息传送至对此队列进行监听的某个消费者,如下图:
一个队列可以关联多个队列发送方和接收方,但一条消息仅传递给一个接收方。如果多个接收方正在监听队列上的消息,jms provider将根据“先来者优先”的原则确定由哪个价售房接受下一条消息。如果没有接收方在监听队列,消息将保留在队列中,直至接收方连接到队列为止。这种消息传递模型是传统意义上的拉模型或轮询模型。在此列模型中,消息不时自动推动给客户端的,而是要由客户端从队列中请求获得。
点对点模型的代码(springboot+jms+activemq)实现如下:
@service(queueproducer)publicclassqueueproducer{@autowired// 也可以注入jmstemplate,jmsmessagingtemplate对jmstemplate进行了封装privatejmsmessagingtemplate jmsmessagingtemplate;// 发送消息,destination是发送到的队列,message是待发送的消息@scheduled(fixeddelay=3000)//每3s执行1次publicvoidsendmessage(destination destination,finalstring message){ jmsmessagingtemplate.convertandsend(destination, message); }@jmslistener(destination=out.queue)publicvoidconsumermessage(string text){ system.out.println(从out.queue队列收到的回复报文为:+text); } }
producer的实现
@componentpublicclassqueueconsumer2{// 使用jmslistener配置消费者监听的队列,其中text是接收到的消息@jmslistener(destination =mytest.queue)//sendto 该注解的意思是将return回的值,再发送的out.queue队列中@sendto(out.queue)publicstringreceivequeue(string text) { system.out.println(queueconsumer2收到的报文为:+text);returnreturn message +text; } }
consumer的实现
@runwith(springrunner.class)@springboottestpublicclassactivemqqueuetests{@autowiredprivatequeueproducer producer;@testpublicvoidcontextloads()throwsinterruptedexception { destination destination =newactivemqqueue(mytest.queue);for(inti=0; i<10; i++){ producer.sendmessage(destination,myname is flytiger+ i); } } }
test的实现
其中queueconsumer2表明的是一个双向队列。
发布/订阅模型
通过发布/订阅消息传递模型,应用程序能够将一条消息发送到多个接收方。在此传送模型中,目标类型是主题。消息首先被传送至主题目标,然后传送至所有已订阅此主题的或送消费者。如下图:
主题目标也支持长期订阅。长期订阅表示消费者已注册了主题目标,但在消息到达目标时该消费者可以处于非活动状态。当消费者再次处于活动状态时,将会接收该消息。如果消费者均没有注册某个主题目标,该主题只保留注册了长期订阅的非活动消费者的消息。与ptp消息传递模型不同,pub/sub消息传递模型允许多个主题订阅者接收同一条消息。jms一直保留消息,直至所有主题订阅者都接收到消息为止。pub/sub消息传递模型基本上是一个推模型。在该模型中,消息会自动广播,消费者无须通过主动请求或轮询主题的方法来获得新的消息。
上面两种消息传递模型里,我们都需要定义消息生产者和消费者,生产者把消息发送到jms provider的某个目标地址(destination),消息从该目标地址传送至消费者。消费者可以同步或异步接收消息,一般而言,异步消息消费者的执行和伸缩性都优于同步消息接收者,体现在:
1. 异步消息接收者创建的网络流量比较小。单向对东消息,并使之通过管道进入消息监听器。管道操作支持将多条消息聚合为一个网络调用。
2. 异步消息接收者使用线程比较少。异步消息接收者在不活动期间不使用线程。同步消息接收者在接收调用期间内使用线程,结果线程可能会长时间保持空闲,尤其是如果该调用中指定了阻塞超时。
3. 对于服务器上运行的应用程序代码,使用异步消息接收者几乎总是最佳选择,尤其是通过消息驱动bean。使用异步消息接收者可以防止应用程序代码在服务器上执行阻塞操作。而阻塞操作会是服务器端线程空闲,甚至会导致死锁。阻塞操作使用所有线程时则发生死锁。如果没有空余的线程可以处理阻塞操作自身解锁所需的操作,这该操作永远无法停止阻塞。
发布/订阅模型的代码(springboot+jms+activemq)实现如下:
@service(topicproducer)publicclasstopicproducer{@autowired// 也可以注入jmstemplate,jmsmessagingtemplate对jmstemplate进行了封装privatejmsmessagingtemplate jmsmessagingtemplate;// 发送消息,destination是发送到的队列,message是待发送的消息@scheduled(fixeddelay=3000)//每3s执行1次publicvoidsendmessage(destination destination,finalstring message){ jmsmessagingtemplate.convertandsend(destination, message); } }
producer的实现
@componentpublicclasstopicconsumer2 {// 使用jmslistener配置消费者监听的队列,其中text是接收到的消息@jmslistener(destination =mytest.topic)publicvoidreceivetopic(string text) { system.out.println(topicconsumer2收到的topic报文为:+text); } }
consumer的实现
@runwith(springrunner.class)@springboottestpublicclassactivemqtopictests{@autowiredprivatetopicproducer producer;@testpublicvoidcontextloads()throwsinterruptedexception { destination destination =newactivemqtopic(mytest.topic);for(inti=0; i<3; i++){ producer.sendmessage(destination,myname is topicflytiger+ i); } } }
test的实现
topic模式工作时,默认只能发送和接收queue消息,如果要发送和接收topic消息,需要加入:
spring.jms.pub-sub-domain=true queue与topic的比较
jms queue执行load balancer语义
一条消息仅能被一个consumer收到。如果在message发送的时候没有可用的consumer,那么它讲被保存一直到能处理该message的consumer可用。如果一个consumer收到一条message后却不响应它,那么这条消息将被转到另外一个consumer那儿。一个queue可以有很多consumer,并且在多个可用的consumer中负载均衡。
topic实现publish和subscribe语义
一条消息被publish时,他将发送给所有感兴趣的订阅者,所以零到多个subscriber将接收到消息的一个拷贝。但是在消息代理接收到消息时,只有激活订阅的subscriber能够获得消息的一个拷贝。
分别对应两种消息模式
point-to-point(点对点),publisher/subscriber model(发布/订阅者)
其中在publicher/subscriber模式下又有nondurable subscription(非持久化订阅)和durable subscription(持久化订阅)两种消息处理方式。
activemq优缺点
优点:是一个快速的开源消息组件(框架),支持集群,同等网络,自动检测,tcp,ssl,广播,持久化,xa,和j2ee1.4容器无缝结合,并且支持轻量级容器和大多数跨语言客户端上的java虚拟机。消息异步接受,减少软件多系统集成的耦合度。消息可靠接收,确保消息在中间件可靠保存,多个消息也可以组成原子事务。
缺点:activemq默认的配置性能偏低,需要优化配置,但是配置文件复杂,activemq本身不提供管理工具;示例代码少;主页上的文档看上去比较全面,但是缺乏一种有效的组织方式,文档只有片段,用户很难由浅入深进行了解,二、文档整体的专业性太强。在研究阶段可以通过查maillist、看javadoc、分析源代码来了解。
rabbitmq 简介
rabbitmq简介可以参考我的两篇文章:
openstack的rpc机制之amqp协议()
rabbitmq高可用性()
rabbitmq安装好之后的默认账号密码是(guest/guest)
需要注意的是:
多个消费者可以订阅同一个queue,这时queue中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。这种分发方式叫做round-robin(循环的方式)。
当publisher将消息发给queue的过程中,publisher会指明routing key。direct模式中,direct exchange 根据 routing key 进行精确匹配,只有对应的 message queue 会接受到消息。topic模式中exchange会根据routing key和bindkey进行模式匹配,决定将消息发送到哪个queue中。
有一个疑问:当有多个consumer时,rabbitmq会平均分摊给这些consumer;没办法把同一个message发给不同的consumer吗?
我之前的猜想是,当有多个consumer使用topic模式订阅消息时,所有的消息它们都会收到;但如果是direct模式,只有一个consumer会收到消息。(理解错误,topic和direct只是publisher用来选择发到不同的queue,不是consumer接收消息。一个队列一个消息只能发送给一个消费者,不然消费者的ack也会有很多,rabbitmq server也不好处理)
rabbitmq的消息确认
默认情况下,如果message 已经被某个consumer正确的接收到了,那么该message就会被从queue中移除。当然也可以让同一个message发送到很多的consumer。
如果一个queue没被任何的consumer subscribe(订阅),那么,如果这个queue有数据到达,那么这个数据会被cache,不会被丢弃。当有consumer时,这个数据会被立即发送到这个consumer,这个数据被consumer正确收到时,这个数据就被从queue中删除。
那么什么是正确收到呢?通过ack。每个message都要被acknowledged(确认,ack)。我们可以显示的在程序中去ack,也可以自动的ack。如果有数据没有被ack,那么:
rabbitmq server会把这个信息发送到下一个consumer。而且ack的机制可以起到限流的作用(benefitto throttling):在consumer处理完成数据后发送ack,甚至在额外的延时后发送ack,将有效的balance consumer的load。
rabbitmq功能测试
本次测试依然是rabbitmq+springboot,首先需要application.properties
spring.rabbitmq.host=localhost spring.rabbitmq.port=5672spring.rabbitmq.username=guest spring.rabbitmq.password=guest
这里的端口是5672,,15672时管理端的端口。
pom要添加依赖:
org.springframework.boot
章莹颖事件中的「暗网」,到底是个怎样的世界?可怜的姑娘 愿你平安回家
到底如何看人工智能?是天堂还是地狱
压力传感器在矿山机械液压泵站上的应用
什么是DCS?DCS系统都有哪些,它有哪些作用?
vivo S15系列官宣 支持80W快充技术
Java常用消息队列原理介绍及性能对比
豪威集团打造高效28V 2A同步降压转换器WD1502F
Linux如何修改主机名命令
xMate 七轴柔性协作机器人的特性以及应用领域
滤波器的选型原则和分类
丰田固态电池技术或在明年推出
聚酰亚胺薄膜应用于数字隔离器
FAULHABER 福尔哈贝扩大IE3编码器兼容性
计算机基础知识之汇编语言2
电容RMS纹波额定电流的设定
开关电源直流EMI滤波器的设计及实现
如何去实现一种数字IC的设计?
2023年10个最佳Linux服务器发行版
电力巡检机器人大展身手,未来市场空间广阔
荣耀Magic2评测 YOYO才是绝对核心