C#利用RabbitMQ实现点对点消息传输

Cynthia ·
更新时间:2024-11-14
· 746 次阅读

目录

消息队列模型

RabbitMQ设置

RabbitMQ动态库安装

RabbitMQ.Client相关知识点

示例效果图

核心代码

消息队列模型

所有 MQ 产品从模型抽象上来说都是一样的过程:
消费者(consumer)订阅某个队列。生产者(producer)创建消息,然后发布到队列(queue)中,最后将消息发送到监听的消费者。

RabbitMQ设置

RabbitMQ是通过交换机将消息转发到对应队列,所以队列需要和交换机进行绑定。本例将队列绑定到默认的amq.direct交换机,并设置Routing key,如下图所示:

RabbitMQ动态库安装

通过NuGet包管理器进行安装RabbitMQ.Client,如下所示:

RabbitMQ.Client相关知识点

ConnectionFactory:构造一个实例,主要创建连接。

IConnection:表示一个基于AMQP协议的连接。

IModel:表示一个RabbitMQ通道,可用于声明一个队列,然后开始消费。

EventingBasicConsumer:基于独立事件监听的基础消费者,可以监听并接收消息。

生产者基本步骤:1. 创建连接 2. 基于连接创建通道 3. 基于通道声明队列,4. 开始生产并发布消息

消费者基本步骤:1. 创建连接 2. 基于连接创建通道 3. 基于通道声明队列,4. 创建消费者,5. 绑定通道和消费者,并开始消费

示例效果图

本例主要有一个生产者,一个消费者,通过消息队列进行消息转发和接收。

生产者负责消息发送,如下图所示:

消费者负责消息接收,如下图所示:

核心代码

代码结构:主要包括生产者,消费者,公共基础代码,如下所示:

RabbitMqHelper主要创建连接,如下所示:

public class RabbitMqHelper { /// <summary> /// 创建连接 /// </summary> /// <returns></returns> public IConnection GetConnection() { try { var factory = new ConnectionFactory() { HostName = "127.0.0.1", Port = 5672, UserName = "guest", Password = "guest", VirtualHost = "/ShortMsgHost" }; var conn = factory.CreateConnection(); return conn; } catch (Exception ex) { throw ex; } } }

RabbmitMqSendHelper用于发送消息,如下所示:

public class RabbmitMqSendHelper : RabbitMqHelper { /// <summary> /// 发送消息 /// </summary> /// <param name="msg"></param> /// <returns></returns> public bool SendMsg(string msg) { try { using (var conn = GetConnection()) { using (var channel = conn.CreateModel()) { channel.QueueDeclare(queue: "ShortMsgQueue", durable: true, exclusive: false, autoDelete: false, arguments: null); var body = Encoding.UTF8.GetBytes(msg); channel.BasicPublish(exchange: "amq.direct", routingKey: "ShortMsgKey", basicProperties: null, body: body); //Console.WriteLine(" [x] Sent {0}", message); }; }; return true; } catch (Exception ex) { throw ex; } } }

RabbitMqReceiveHelper主要用于接收信息,如下所示:

public class RabbitMqReceiveHelper : RabbitMqHelper { public RabbitMqReceiveEventHandler OnReceiveEvent; private IConnection conn; private IModel channel; private EventingBasicConsumer consumer; public bool StartReceiveMsg() { try { conn = GetConnection(); channel = conn.CreateModel(); channel.QueueDeclare(queue: "ShortMsgQueue", durable: true, exclusive: false, autoDelete: false, arguments: null); consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); //Console.WriteLine(" [x] Received {0}", message); if (OnReceiveEvent != null) { OnReceiveEvent(message); } }; channel.BasicConsume(queue: "ShortMsgQueue", autoAck: true, consumer: consumer); return true; } catch (Exception ex) { throw ex; } } }

作者:Alan.hsiang
出处:http://www.cnblogs.com/hsiang/

以上就是C#利用RabbitMQ实现点对点消息传输的实现示例的详细内容,更多关于c# 用RabbitMQ实现点对点消息传输的资料请关注软件开发网其它相关文章!



C# rabbitmq 点对点

需要 登录 后方可回复, 如果你还没有账号请 注册新账号