在rabbitmq的官网 https://www.rabbitmq.com/getstarted.html 有给出7中实现方式:
这里依次学习。
com.rabbitmq
amqp-client
5.8.0
创建连接
public static Connection getConnection() throws Exception {
//定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务地址
factory.setHost("localhost");
//端口
factory.setPort(5672);
//设置账号信息,用户名、密码、vhost
factory.setVirtualHost("/admin_vhost");
factory.setUsername("admin");
factory.setPassword("123456");
// 通过工程获取连接
Connection connection = factory.newConnection();
return connection;
}
简单队列
顾名思义,最简单的队列,有生产者和消费者两个角色,生产者生产消息,将消息发送到队列汇总,消费者从队列中消费消息。
下面定义一个队列名称为q_test_01
的队列,并且向其发送一条消息。
private final static String QUEUE_NAME = "q_test_01";
publicstatic void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
// 从连接中创建通道
Channel channel = connection.createChannel();
// 声明(创建)队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 消息内容
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
//关闭通道和连接
channel.close();
connection.close();
}
执行上面main方法,发送一条消息。
在控制台可以看到有一条消息发送到了rabbitmq队列中:
在queue标签中可以看到新建的队列
点击上面的q_test_01
链接进入队列详情页面,里面有Get messages
功能,可以获取队列中的消息,可以看到消息内容Hello World!
:
消费者消费队列中的数据:
private final static String QUEUE_NAME = "q_test_01";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
// 从连接中创建通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//回调函数。队列中有数据后会回调该方法
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message);
}
};
//channel监听队列。
channel.basicConsume(QUEUE_NAME, true, consumer);
}
上面采用了监听回调机制来消费数据。当消费者监听到有数据以后会回调Consumer 的handleDelivery方法。
注意channel.basicConsume的第二个参数我这里传的true,表示自动提交ack响应。在一些业务场景中可能会要求关闭自动提交,而是确定消费成功后才手动提交ack。
启动消费者方法,可以看到打印输出消息:
消费者不会停止服务,会一直监听。此时生产者再发送一条消息,消费者也可以马上收到消息并打印:
多消费这模式
针对上面生产者做一点修改,连续发送多条数据。主要是加了一个for循环,连续发送50条消息。
private final static String QUEUE_NAME = "q_test_workqueue";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
// 从连接中创建通道
Channel channel = connection.createChannel();
// 声明(创建)队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 消息内容
for(int i = 0 ; i < 50 ; i ++){
String message = "Hello rabbitmq! " + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
TimeUnit.MILLISECONDS.sleep(100);
}
//关闭通道和连接
channel.close();
connection.close();
}
消费者
这里定义两个消费者,一个消费者每条消息耗时1秒,另一个性能好,每条消息耗时2秒。使用sleep来模拟耗时。
消费者1:
public class Recv1 {
private final static String QUEUE_NAME = "q_test_workqueue";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
// 从连接中创建通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//回调函数。队列中有数据后会回调该方法
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [1] Received : " + message);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(" [1] done : " + message);
}
};
//channel监听队列。
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
消费者2:
public class Recv2 {
private final static String QUEUE_NAME = "q_test_workqueue";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
// 从连接中创建通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//回调函数。队列中有数据后会回调该方法
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [2] Received : " + message);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(" [2] done : " + message);
}
};
//channel监听队列。
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
首先启动两个消费者,使其准备好消费消息。然后启动生产者连续发送50条消息。从控制台可以看到两个消费者消费情况
通过上述实际结果,可以看到,两个消费者消费的消息总数量是相同的,消息不会被不同消费者重复消费。
由于我们设置了两个消费者消费能力(sleep时间)不同,按理来说应该消费者1消费的多。
之所以两个消费者消费消息数量相同是因为rabbitmq默认采用轮训策略
(round-robin)进行分配消息。
要想实现公平分发,必须关闭自动提交ack,改为手动ack。就是channel.basicConsume(QUEUE_NAME, true, consumer);
的第二个参数。
官方解释:
autoAck true if the server should consider messages acknowledged once delivered;
false if the server should expect explicit acknowledgements
另外,很重要的 一点是,生产者和消费者都要配置一下basicQos
,标明收到消费者相应ack前可以发送多少条消息。默认是0,不限制。
/**
* Request a specific prefetchCount "quality of service" settings
* for this channel.
*
* @see #basicQos(int, int, boolean)
* @param prefetchCount maximum number of messages that the server
* will deliver, 0 if unlimited
* @throws java.io.IOException if an error is encountered
*/
void basicQos(int prefetchCount) throws IOException;
我们这里设置为channel.basicQos(1);
保证消费一条发一条给消费者。
注意这里不是只生产者发送到队列,而是说的从队列中发送到消费者的消息数量。
接下来,由于关闭了自动ack ,还需要在消费者Consumer的finally中手动提交ack。
完整的生产者代码:
public class Send {
private final static String QUEUE_NAME = "q_test_workqueue";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
// 从连接中创建通道
Channel channel = connection.createChannel();
// 声明(创建)队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//在收到消费者响应之前,最多发送多少条消息给消费者
//用于能者多劳模式
channel.basicQos(1);
// 消息内容
for(int i = 0 ; i < 50 ; i ++) {
String message = "Hello rabbitmq! " + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
TimeUnit.MILLISECONDS.sleep(100);
}
//关闭通道和连接
channel.close();
connection.close();
}
}
完整的消费者1:
public class Recv1 {
private final static String QUEUE_NAME = "q_test_workqueue";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
// 从连接中创建通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
//回调函数。队列中有数据后会回调该方法
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = null;
try {
message = new String(body, "UTF-8");
System.out.println(" [1] Received : " + message);
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(" [1] done : " + message);
//手动提交ack回执
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
//channel监听队列。
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
完整消费者2代码
public class Recv2 {
private final static String QUEUE_NAME = "q_test_workqueue";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
// 从连接中创建通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
//回调函数。队列中有数据后会回调该方法
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = null;
try {
message = new String(body, "UTF-8");
System.out.println(" [2] Received : " + message);
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(" [2] done : " + message);
//手动提交ack回执
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
//channel监听队列。
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
首先运行消费者1和2,然后启动生产者。控制台输出:
可以看到,消费者1消费的消息数量是消费者2的两倍,因为消费者1休眠时间是消费者2的一半。
上图中X指交换机exchange。
发布订阅模式有以下特点:
以上特性类似于Kafka的topic和partition,一个topic的消息发送到不同的partition上,一个partition可以被一个消费者组内的一个消费者消费。更多Kafka知识请参考Kafka官网。
下面模拟场景为:一个生产者,通过交换机发送10条消息。交换机采用的是BuiltinExchangeType.FANOUT
类型进行交换转发。生产者绑定两个队列。定义三个消费者,消费者1消费队列1,消费者2和3消费队列2
生产者跟之前相比,多了以下几步:
定义交换机 将队列绑定到交换机上完整代码:
public class Send {
private final static String QUEUE_NAME1 = "q_test_publish_subscribe1";
private final static String QUEUE_NAME2 = "q_test_publish_subscribe2";
private final static String EXCHANGE_NAME = "q_test_publish_subscribe_exchange";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
// 从连接中创建通道
Channel channel = connection.createChannel();
//定义交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
// 声明(创建)队列. durable=true表示该队列需要持久化
channel.queueDeclare(QUEUE_NAME1, true, false, false, null);
channel.queueDeclare(QUEUE_NAME2, true, false, false, null);
//将队列绑定到交换机上
channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "");
channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "");
//在收到消费者响应之前,最多发送多少条消息给消费者
//用于能者多劳模式
channel.basicQos(1);
// 消息内容
for (int i = 0; i < 10; i++) {
String message = "Hello rabbitmq! " + i;
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
TimeUnit.MILLISECONDS.sleep(100);
}
//关闭通道和连接
channel.close();
connection.close();
}
}
消费者
消费者有3个,唯一不同之处就是消费的topic不同。
消费者1:
public class Recv1 {
private final static String QUEUE_NAME = "q_test_publish_subscribe1";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
// 从连接中创建通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.basicQos(1);
//回调函数。队列中有数据后会回调该方法
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = null;
try {
message = new String(body, "UTF-8");
System.out.println(" [1] Received : " + message);
} finally {
System.out.println(" [1] done : " + message);
//手动提交ack回执
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
//channel监听队列。
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
消费者2和消费者3消费的topic是q_test_publish_subscribe2
,其他与消费者1完全相同。
首先启动消费者1,2,3;然后启动生产者发送消息。
控制台输出:
可以看到,两个队列的数据是相同的,也就是说BuiltinExchangeType.FANOUT
类型的交换机将消息复制到了每个队列中。消费者1单独消费q_test_publish_subscribe1队列中的消息,所以有10条记录。q_test_publish_subscribe2队列由Recv2和Recv3共同消费,由于他俩这里消费速度相同,所以各消费了5条数据。
点击进入交换机详情页:
可以看到有两个队列绑定在这里。而且控制台支持手动解绑。
在队列页面也可以看到这两个队列:
交换机的作用是接收生产者的消息,并将其根据一定规则发送到队列中。注意交换机本身不保存消息,rabbitmq中消息都是保存在队列中。
交换机支持以下4中模式:
fanout
:fanout类似于广播,会将受到的消息发送给每一个绑定的队列中。
direct
:路由模式
topic
:
headers
:
Routing 路由模式
与发布订阅相比,路由模式最主要的修改点就是交换机类型采用BuiltinExchangeType.DIRECT
。
根据官网实例,模拟一个生产者,一个BuiltinExchangeType.DIRECT
类型的交换机,两个队列,两个消费者。
error类型的消息同时发送到两个队列。info和warning类型的消息只发送到一个队列。
生产者完整代码:
public class Send {
private final static String QUEUE_NAME1 = "q_test_routing1";
private final static String QUEUE_NAME2 = "q_test_routing2";
private final static String EXCHANGE_NAME = "q_test_routing_exchange";
private final static String ERROR = "error";
private final static String INFO = "info";
private final static String WARNING = "warning";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
// 从连接中创建通道
Channel channel = connection.createChannel();
//定义交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 声明(创建)队列. durable=true表示该队列需要持久化
channel.queueDeclare(QUEUE_NAME1, true, false, false, null);
channel.queueDeclare(QUEUE_NAME2, true, false, false, null);
//将队列绑定到交换机上
channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, ERROR);
channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, ERROR);
channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, INFO);
channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, WARNING);
//在收到消费者响应之前,最多发送多少条消息给消费者
//用于能者多劳模式
channel.basicQos(1);
// 消息内容. 这里error,info,warning的消息各发送2个
channel.basicPublish(EXCHANGE_NAME, ERROR, null, "Hello rabbitmq! error1".getBytes());
channel.basicPublish(EXCHANGE_NAME, ERROR, null, "Hello rabbitmq! error2".getBytes());
channel.basicPublish(EXCHANGE_NAME, INFO, null, "Hello rabbitmq! info1".getBytes());
channel.basicPublish(EXCHANGE_NAME, INFO, null, "Hello rabbitmq! info2".getBytes());
channel.basicPublish(EXCHANGE_NAME, WARNING, null, "Hello rabbitmq! warning1".getBytes());
channel.basicPublish(EXCHANGE_NAME, WARNING, null, "Hello rabbitmq! warning2".getBytes());
//关闭通道和连接
channel.close();
connection.close();
}
}
消费者1和消费者2的代码拷贝自发布订阅模式的消费者代码,唯一不同之处就是队列名称,这里就不贴完整代码了。
首先运行两个消费者,然后运行生产者。控制台输出如下:
与预期一致,error类型的消息同时发送到两个队列,两个消费者都消费到了error的消息。info和warning类型的消息只发送到一个队列,只有消费者2消费到。