上一篇文章中,简单的介绍了一下rabbitmq,以及安装和hello world。
有的小伙伴留言说看不懂其中的方法参数,这里先解释一下几个基本的方法参数。
// 声明队列方法channel.queuedeclare(queue_name, false, false, false, null);/** * param1:queue 队列的名字 * param2:durable 是否持久化;比如现在发送到队列里面的消息,如果没有持久化,重启这个队列后数 据会丢失(false) true:重启之后数据依然在 * param3:exclusive 是否排外(是否是当前连接的专属队列),排外的意思是: * 1:连接关闭之后 这个队列是否自动删除(false:不自动删除) * 2:是否允许其他通道来进行访问这个数据(false:不允许) * param4:autodelete 是否自动删除 * 就是当最后一个连接断开的时候,是否自动删除这个队列(false:不删除) * param5:arguments(map) 声明队列的时候,附带的一些参数 */// 发送数据到队列channel.basicpublish(, queue_name, messageproperties.persistent_text_plain, 第一个队列消息....getbytes());/** * param1:exchange 交换机 没有就设置为 值就可以了 * param2:routingkey 路由的key 现在没有设置key,直接使用队列的名字 * param3:basicproperties 发送数据到队列的时候,是否要带一些参数。 * messageproperties.persistent_text_plain表示没有带任何参数 * param4:body 向队列中发送的消息数据 */work模型work模型称为工作队列或者竞争消费者模式,多个消费者消费的数据之和才是原来队列中的所有数据,适用于流量的削峰。
img
演示写个简单的测试:
生产者
public class producer { private static final string queue_name = queue_work_1; public static void main(string[] args) throws ioexception, timeoutexception { connection connection = connectionutils.getconnection(); channel channel = connection.createchannel(); channel.queuedeclare(queue_name, false, false, false, null); for (int i = 0; i < 100; i++) { channel.basicpublish(, queue_name, null, (work模型: + i).getbytes()); } channel.close(); connection.close(); }}消费者
// 消费者1public class consumer { private static final string queue_name = queue_work_1; public static void main(string[] args) throws ioexception, timeoutexception { connection connection = connectionutils.getconnection(); channel channel = connection.createchannel(); channel.queuedeclare(queue_name, false, false, false, null); // channel.basicqos(0, 1, false); defaultconsumer defaultconsumer = new defaultconsumer(channel) { @override public void handledelivery(string consumertag, envelope envelope, amqp.basicproperties properties, byte[] body) throws ioexception { system.out.println(system.currenttimemillis() + 消费者1接收到信息: + new string(body)); channel.basicack(envelope.getdeliverytag(), false); } }; channel.basicconsume(queue_name, false, defaultconsumer); }}// 消费者2public class consumer2 { private static final string queue_name = queue_work_1; public static void main(string[] args) throws ioexception, timeoutexception { connection connection = connectionutils.getconnection(); channel channel = connection.createchannel(); channel.queuedeclare(queue_name, false, false, false, null); // channel.basicqos(0, 1, false); defaultconsumer defaultconsumer = new defaultconsumer(channel) { @override public void handledelivery(string consumertag, envelope envelope, amqp.basicproperties properties, byte[] body) throws ioexception { system.out.println(system.currenttimemillis() + 消费者2接收到信息: + new string(body)); channel.basicack(envelope.getdeliverytag(), false); // 这里加了个延迟,表示处理业务时间 try { thread.sleep(200); } catch (interruptedexception e) { e.printstacktrace(); } } }; channel.basicconsume(queue_name, false, defaultconsumer); }}结果
image-20221229210012145
image-20221229210046184
可以看出来:100条消息,消费者之间是平分的,消费者1 几乎是瞬间完成,消费者2 则是慢慢吞吞的运行完毕,消费者1大量时间处于空闲状态,消费者2则一直忙碌。这显然是不适用于实际开发中。
我们需要遵从一个原则,就是 能者多劳 ,消费越快的人,消费的越多;
现在我们把消费者1和2的代码中 // channel.basicqos(0, 1, false); 这行代码取消注释,再次运行;
image-20221229211317632
image-20221229211335782
现在的结果就比较符合能者多劳,虽然你干的多,但是工资是一样的呀~
work模型的一个主要的方法是basicqos();这里也解释一下其参数:
// 设置限流机制channel.basicqos(0, 1, false);/** * param1: prefetchsize,消息本身的大小 如果设置为0 那么表示对消息本身的大小不限制 * param2: prefetchcount,告诉rabbitmq不要一次性给消费者推送大于n个消息 * param3:global,是否将上面的设置应用于整个通道,false表示只应用于当前消费者 */小结本文到这里就结束了,主要介绍了rabbitmq通信模型中的work模型,适用于限流、削峰等应用场景。
我国LED显示屏技术还存在哪些问题
我们应该如何设定才能使加工成品质量更加完善
MWC2015快速充电引关注 各大品牌纷纷秀技术
Wolfson电源管理IC获三星M1 PMP采用
美国陆军研发了一种高效的地面机器人学习模型
RabbitMQ通信模型中的work模型
近期最值得关注的手机创新技术大盘点
可变身摩托车机器人预计2020年商用
小米5c得到客户这样的评价,可能会打破所有人常规印象
劲胜智能:手机结构件研究报告
找方案 | 基于安森美半导体NCP1618多模式PFC 500W设计方案
matlab小波去噪函数实例
关于云数据存储的漏洞及避免漏洞方法
教你如何抵消OFDM系统的失真
新应用新技术加持 PCB下半年营运添柴火
制冷压缩机的PLC控制改造
解读Gartner 2024年十大战略技术趋势
小康集团的智能工厂蕴藏大世界 部署了总共800多台自动化机器人
MAX1434 全差分输入10位模数转换器(ADC)
通信技术的发展格外重要,但通信技术也是把双刃剑