rabbitmq 7种队列实现java版

Zarah ·
更新时间:2024-09-21
· 807 次阅读

文章目录rabbitmq7种实现方式搭建maven项目引入依赖创建连接简单队列消息生产者消息消费者work queues 工作队列生产者消费者能者多劳(公平分发):消费能力强则消费更多消息Publish/Subscribe 发布订阅模式生产者代码消费者控制台观察该交换器交换机Routing 路由模式 rabbitmq7种实现方式

在rabbitmq的官网 https://www.rabbitmq.com/getstarted.html 有给出7中实现方式:
在这里插入图片描述
在这里插入图片描述
这里依次学习。

搭建maven项目 引入依赖 com.rabbitmq amqp-client 5.8.0 创建连接 public static Connection getConnection() throws Exception { //定义连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置服务地址 factory.setHost("localhost"); //端口 factory.setPort(5672); //设置账号信息,用户名、密码、vhost factory.setVirtualHost("/admin_vhost"); factory.setUsername("admin"); factory.setPassword("123456"); // 通过工程获取连接 Connection connection = factory.newConnection(); return connection; } 简单队列

在这里插入图片描述
顾名思义,最简单的队列,有生产者和消费者两个角色,生产者生产消息,将消息发送到队列汇总,消费者从队列中消费消息。

消息生产者

下面定义一个队列名称为q_test_01的队列,并且向其发送一条消息。

private final static String QUEUE_NAME = "q_test_01"; publicstatic void main(String[] argv) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnectionUtil.getConnection(); // 从连接中创建通道 Channel channel = connection.createChannel(); // 声明(创建)队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 消息内容 String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); //关闭通道和连接 channel.close(); connection.close(); }

执行上面main方法,发送一条消息。
在控制台可以看到有一条消息发送到了rabbitmq队列中:
在这里插入图片描述
在queue标签中可以看到新建的队列
在这里插入图片描述
点击上面的q_test_01链接进入队列详情页面,里面有Get messages功能,可以获取队列中的消息,可以看到消息内容Hello World! :
在这里插入图片描述

消息消费者

消费者消费队列中的数据:

private final static String QUEUE_NAME = "q_test_01"; public static void main(String[] argv) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnectionUtil.getConnection(); // 从连接中创建通道 Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //回调函数。队列中有数据后会回调该方法 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message); } }; //channel监听队列。 channel.basicConsume(QUEUE_NAME, true, consumer); }

上面采用了监听回调机制来消费数据。当消费者监听到有数据以后会回调Consumer 的handleDelivery方法。

注意channel.basicConsume的第二个参数我这里传的true,表示自动提交ack响应。在一些业务场景中可能会要求关闭自动提交,而是确定消费成功后才手动提交ack。

启动消费者方法,可以看到打印输出消息:
在这里插入图片描述
消费者不会停止服务,会一直监听。此时生产者再发送一条消息,消费者也可以马上收到消息并打印:
在这里插入图片描述

work queues 工作队列

多消费这模式
在这里插入图片描述

生产者

针对上面生产者做一点修改,连续发送多条数据。主要是加了一个for循环,连续发送50条消息。

private final static String QUEUE_NAME = "q_test_workqueue"; public static void main(String[] argv) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnectionUtil.getConnection(); // 从连接中创建通道 Channel channel = connection.createChannel(); // 声明(创建)队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 消息内容 for(int i = 0 ; i < 50 ; i ++){ String message = "Hello rabbitmq! " + i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); TimeUnit.MILLISECONDS.sleep(100); } //关闭通道和连接 channel.close(); connection.close(); } 消费者

这里定义两个消费者,一个消费者每条消息耗时1秒,另一个性能好,每条消息耗时2秒。使用sleep来模拟耗时。

消费者1:

public class Recv1 { private final static String QUEUE_NAME = "q_test_workqueue"; public static void main(String[] argv) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnectionUtil.getConnection(); // 从连接中创建通道 Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //回调函数。队列中有数据后会回调该方法 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [1] Received : " + message); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(" [1] done : " + message); } }; //channel监听队列。 channel.basicConsume(QUEUE_NAME, true, consumer); } }

消费者2:

public class Recv2 { private final static String QUEUE_NAME = "q_test_workqueue"; public static void main(String[] argv) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnectionUtil.getConnection(); // 从连接中创建通道 Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //回调函数。队列中有数据后会回调该方法 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [2] Received : " + message); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(" [2] done : " + message); } }; //channel监听队列。 channel.basicConsume(QUEUE_NAME, true, consumer); } }

首先启动两个消费者,使其准备好消费消息。然后启动生产者连续发送50条消息。从控制台可以看到两个消费者消费情况
在这里插入图片描述
在这里插入图片描述
通过上述实际结果,可以看到,两个消费者消费的消息总数量是相同的,消息不会被不同消费者重复消费。

由于我们设置了两个消费者消费能力(sleep时间)不同,按理来说应该消费者1消费的多。

之所以两个消费者消费消息数量相同是因为rabbitmq默认采用轮训策略(round-robin)进行分配消息。

能者多劳(公平分发):消费能力强则消费更多消息

要想实现公平分发,必须关闭自动提交ack,改为手动ack。就是channel.basicConsume(QUEUE_NAME, true, consumer);的第二个参数。
官方解释:

autoAck true if the server should consider messages acknowledged once delivered; false if the server should expect explicit acknowledgements

另外,很重要的 一点是,生产者和消费者都要配置一下basicQos,标明收到消费者相应ack前可以发送多少条消息。默认是0,不限制。

/** * Request a specific prefetchCount "quality of service" settings * for this channel. * * @see #basicQos(int, int, boolean) * @param prefetchCount maximum number of messages that the server * will deliver, 0 if unlimited * @throws java.io.IOException if an error is encountered */ void basicQos(int prefetchCount) throws IOException;

我们这里设置为channel.basicQos(1);保证消费一条发一条给消费者。

注意这里不是只生产者发送到队列,而是说的从队列中发送到消费者的消息数量。

接下来,由于关闭了自动ack ,还需要在消费者Consumer的finally中手动提交ack。
完整的生产者代码:

public class Send { private final static String QUEUE_NAME = "q_test_workqueue"; public static void main(String[] argv) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnectionUtil.getConnection(); // 从连接中创建通道 Channel channel = connection.createChannel(); // 声明(创建)队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //在收到消费者响应之前,最多发送多少条消息给消费者 //用于能者多劳模式 channel.basicQos(1); // 消息内容 for(int i = 0 ; i < 50 ; i ++) { String message = "Hello rabbitmq! " + i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); TimeUnit.MILLISECONDS.sleep(100); } //关闭通道和连接 channel.close(); connection.close(); } }

完整的消费者1:

public class Recv1 { private final static String QUEUE_NAME = "q_test_workqueue"; public static void main(String[] argv) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnectionUtil.getConnection(); // 从连接中创建通道 Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1); //回调函数。队列中有数据后会回调该方法 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = null; try { message = new String(body, "UTF-8"); System.out.println(" [1] Received : " + message); TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println(" [1] done : " + message); //手动提交ack回执 channel.basicAck(envelope.getDeliveryTag(), false); } } }; //channel监听队列。 channel.basicConsume(QUEUE_NAME, false, consumer); } }

完整消费者2代码

public class Recv2 { private final static String QUEUE_NAME = "q_test_workqueue"; public static void main(String[] argv) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnectionUtil.getConnection(); // 从连接中创建通道 Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1); //回调函数。队列中有数据后会回调该方法 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = null; try { message = new String(body, "UTF-8"); System.out.println(" [2] Received : " + message); TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println(" [2] done : " + message); //手动提交ack回执 channel.basicAck(envelope.getDeliveryTag(), false); } } }; //channel监听队列。 channel.basicConsume(QUEUE_NAME, false, consumer); } }

首先运行消费者1和2,然后启动生产者。控制台输出:
在这里插入图片描述
在这里插入图片描述
可以看到,消费者1消费的消息数量是消费者2的两倍,因为消费者1休眠时间是消费者2的一半。

Publish/Subscribe 发布订阅模式

在这里插入图片描述

上图中X指交换机exchange。
发布订阅模式有以下特点:

一个生产者多个消费者 每一个消费者都有自己的一个队列 生产者没有将消息直接发送到队列,而是发送到了交换机 每个队列都要绑定到交换机 生产者发送的消息,经过交换机,到达队列,实现,一个消息被多个消费者获取的目的
注意:一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费

以上特性类似于Kafka的topic和partition,一个topic的消息发送到不同的partition上,一个partition可以被一个消费者组内的一个消费者消费。更多Kafka知识请参考Kafka官网。

下面模拟场景为:一个生产者,通过交换机发送10条消息。交换机采用的是BuiltinExchangeType.FANOUT类型进行交换转发。生产者绑定两个队列。定义三个消费者,消费者1消费队列1,消费者2和3消费队列2

生产者代码

生产者跟之前相比,多了以下几步:

定义交换机 将队列绑定到交换机上

完整代码:

public class Send { private final static String QUEUE_NAME1 = "q_test_publish_subscribe1"; private final static String QUEUE_NAME2 = "q_test_publish_subscribe2"; private final static String EXCHANGE_NAME = "q_test_publish_subscribe_exchange"; public static void main(String[] argv) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnectionUtil.getConnection(); // 从连接中创建通道 Channel channel = connection.createChannel(); //定义交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); // 声明(创建)队列. durable=true表示该队列需要持久化 channel.queueDeclare(QUEUE_NAME1, true, false, false, null); channel.queueDeclare(QUEUE_NAME2, true, false, false, null); //将队列绑定到交换机上 channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, ""); channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, ""); //在收到消费者响应之前,最多发送多少条消息给消费者 //用于能者多劳模式 channel.basicQos(1); // 消息内容 for (int i = 0; i < 10; i++) { String message = "Hello rabbitmq! " + i; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); TimeUnit.MILLISECONDS.sleep(100); } //关闭通道和连接 channel.close(); connection.close(); } } 消费者

消费者有3个,唯一不同之处就是消费的topic不同。

消费者1:

public class Recv1 { private final static String QUEUE_NAME = "q_test_publish_subscribe1"; public static void main(String[] argv) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnectionUtil.getConnection(); // 从连接中创建通道 Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); channel.basicQos(1); //回调函数。队列中有数据后会回调该方法 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = null; try { message = new String(body, "UTF-8"); System.out.println(" [1] Received : " + message); } finally { System.out.println(" [1] done : " + message); //手动提交ack回执 channel.basicAck(envelope.getDeliveryTag(), false); } } }; //channel监听队列。 channel.basicConsume(QUEUE_NAME, false, consumer); } }

消费者2和消费者3消费的topic是q_test_publish_subscribe2,其他与消费者1完全相同。

首先启动消费者1,2,3;然后启动生产者发送消息。
控制台输出:
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

可以看到,两个队列的数据是相同的,也就是说BuiltinExchangeType.FANOUT类型的交换机将消息复制到了每个队列中。消费者1单独消费q_test_publish_subscribe1队列中的消息,所以有10条记录。q_test_publish_subscribe2队列由Recv2和Recv3共同消费,由于他俩这里消费速度相同,所以各消费了5条数据。

控制台观察该交换器

在这里插入图片描述
点击进入交换机详情页:
在这里插入图片描述
可以看到有两个队列绑定在这里。而且控制台支持手动解绑。
在队列页面也可以看到这两个队列:
在这里插入图片描述

交换机

交换机的作用是接收生产者的消息,并将其根据一定规则发送到队列中。注意交换机本身不保存消息,rabbitmq中消息都是保存在队列中。

交换机支持以下4中模式:
在这里插入图片描述

fanout:fanout类似于广播,会将受到的消息发送给每一个绑定的队列中。 direct:路由模式 topicheadersRouting 路由模式

与发布订阅相比,路由模式最主要的修改点就是交换机类型采用BuiltinExchangeType.DIRECT

根据官网实例,模拟一个生产者,一个BuiltinExchangeType.DIRECT类型的交换机,两个队列,两个消费者。
error类型的消息同时发送到两个队列。info和warning类型的消息只发送到一个队列。
在这里插入图片描述
生产者完整代码:

public class Send { private final static String QUEUE_NAME1 = "q_test_routing1"; private final static String QUEUE_NAME2 = "q_test_routing2"; private final static String EXCHANGE_NAME = "q_test_routing_exchange"; private final static String ERROR = "error"; private final static String INFO = "info"; private final static String WARNING = "warning"; public static void main(String[] argv) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnectionUtil.getConnection(); // 从连接中创建通道 Channel channel = connection.createChannel(); //定义交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); // 声明(创建)队列. durable=true表示该队列需要持久化 channel.queueDeclare(QUEUE_NAME1, true, false, false, null); channel.queueDeclare(QUEUE_NAME2, true, false, false, null); //将队列绑定到交换机上 channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, ERROR); channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, ERROR); channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, INFO); channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, WARNING); //在收到消费者响应之前,最多发送多少条消息给消费者 //用于能者多劳模式 channel.basicQos(1); // 消息内容. 这里error,info,warning的消息各发送2个 channel.basicPublish(EXCHANGE_NAME, ERROR, null, "Hello rabbitmq! error1".getBytes()); channel.basicPublish(EXCHANGE_NAME, ERROR, null, "Hello rabbitmq! error2".getBytes()); channel.basicPublish(EXCHANGE_NAME, INFO, null, "Hello rabbitmq! info1".getBytes()); channel.basicPublish(EXCHANGE_NAME, INFO, null, "Hello rabbitmq! info2".getBytes()); channel.basicPublish(EXCHANGE_NAME, WARNING, null, "Hello rabbitmq! warning1".getBytes()); channel.basicPublish(EXCHANGE_NAME, WARNING, null, "Hello rabbitmq! warning2".getBytes()); //关闭通道和连接 channel.close(); connection.close(); } }

消费者1和消费者2的代码拷贝自发布订阅模式的消费者代码,唯一不同之处就是队列名称,这里就不贴完整代码了。

首先运行两个消费者,然后运行生产者。控制台输出如下:
在这里插入图片描述
在这里插入图片描述
与预期一致,error类型的消息同时发送到两个队列,两个消费者都消费到了error的消息。info和warning类型的消息只发送到一个队列,只有消费者2消费到。


作者:快乐崇拜234



java版 JAVA 队列 rabbitmq

需要 登录 后方可回复, 如果你还没有账号请 注册新账号