rabbitmq 消息队列消息数量变化示例

Bliss ·
更新时间:2024-09-21
· 861 次阅读

rabbitmq 消息队列消息数量变化示例rabbitmq 消息队列消息数量变化说明(后附go代码)服务器和消费者之间的网络缓冲为1服务器和消费者之间的网络缓冲为2生产者代码消费者代码 rabbitmq 消息队列消息数量变化说明(后附go代码)

预先给消息队列发布5条信息
在这里插入图片描述

服务器和消费者之间的网络缓冲为1

依次读取五条消息后,消息队列消息数量变化
在这里插入图片描述

服务器和消费者之间的网络缓冲为2

在这里插入图片描述

生产者代码 package main import ( "fmt" "log" "test1/amqp" "strconv" ) func failOnError(err error, msg string) { if err != nil { fmt.Printf("%s: %s\n", msg, err) } } func f6(){ conn, err := amqp.Dial("amqp://fm:1@192.168.7.29:5672/fm") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() q, err := ch.QueueDeclare( "hello", // name false, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") err = ch.ExchangeDeclare( "logs_top", // name "topic", // type false, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a exchange") err = ch.QueueBind( q.Name, // queue name "color.*", // routing key "logs_top", // exchange false, nil ) for i:=0 ; i<5; i=i+1 { body := "Hello color.black! " + strconv.Itoa(i) err = ch.Publish( "logs_top", // exchange "color.black", // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), }) log.Printf("send [x] Sent %s", body) failOnError(err, "Failed to publish a message") } } func main(){ type pfun func()() var apf pfun apf = f6 apf() fmt.Println("hello world") } 消费者代码 package main import ( "fmt" "log" "test1/amqp" "time" ) func failOnError(err error, msg string) { if err != nil { fmt.Printf("%s: %s\n", msg, err) } } func f6(){ conn, err := amqp.Dial("amqp://fm:1@192.168.7.29:5672/fm") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() q, err := ch.QueueDeclare( "hello", // name false, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") ch.Qos(2,0,false) msgs, err := ch.Consume( q.Name, // queue "", // consumer false, // auto ack false, // exclusive false, // no local false, // no wait nil, // args ) failOnError(err, "Failed to register a consumer") forever := make(chan bool) go func() { for d := range msgs { log.Printf(" receive[x] %s", d.Body) d.Ack(false) time.Sleep(time.Second ) } }() log.Printf(" [*] Waiting for logs. To exit press CTRL+C") <-forever } func main(){ f6() fmt.Println("hello world") }
作者:zzmisok



队列 示例 rabbitmq 消息队列

需要 登录 后方可回复, 如果你还没有账号请 注册新账号