RT-Thread Studio 使用笔记(八)| 使用MQTT对接EMQ-X服务器(使用 pahomqtt 包)

Jillian ·
· 977 次阅读

1. 添加pahomqtt软件包

Paho MQTT 是 Eclipse 实现的基于 MQTT 协议的客户端,本软件包是在 Eclipse paho-mqtt 源码包的基础上设计的一套 MQTT 客户端程序。

2. 使用mqtt.fx连接到服务器并订阅测试主题

3. 编写对接 EMQ-X mqtt服务器的代码 3.1. 编写代码 /* * Copyright (c) 2006-2020, RT-Thread Development Team * * SPDX-License-Identifier: Apache-2.0 * * Change Logs: * Date Author Notes * 2020-03-05 Mculover666 the first version */ #include #include #include #include #define DBG_ENABLE #define DBG_SECTION_NAME "mqtt.sample" #define DBG_LEVEL DBG_LOG #define DBG_COLOR #include #include "paho_mqtt.h" /* 设置代理信息 */ #define MQTT_URI "tcp://www.mculover666.cn:1883" #define MQTT_USERNAME "mculover666" #define MQTT_PASSWORD "12345678" #define MQTT_SUBTOPIC "sub_test" #define MQTT_PUBTOPIC "pub_test" #define MQTT_WILLMSG "Goodbye!" static MQTTClient client; static int is_started = 0; static void mqtt_sub_callback(MQTTClient *c, MessageData *msg_data) { *((char *)msg_data->message->payload + msg_data->message->payloadlen) = '\0'; LOG_D("mqtt sub callback: %.*s %.*s", msg_data->topicName->lenstring.len, msg_data->topicName->lenstring.data, msg_data->message->payloadlen, (char *)msg_data->message->payload); } void mqtt_emqx_entry(void *parameter) { MQTTPacket_connectData condata = MQTTPacket_connectData_initializer; static char cid[20] = { 0 }; /* config MQTT context param*/ client.isconnected = 0; client.uri = MQTT_URI; /* generate the random client ID */ rt_snprintf(cid, sizeof(cid), "rtthread%d", rt_tick_get()); /* config connect param */ memcpy(&client.condata, &condata, sizeof(condata)); client.condata.clientID.cstring = cid; client.condata.keepAliveInterval = 30; client.condata.cleansession = 1; client.condata.username.cstring = MQTT_USERNAME; client.condata.password.cstring = MQTT_PASSWORD; /* config MQTT will param. */ client.condata.willFlag = 1; client.condata.will.qos = 1; client.condata.will.retained = 0; client.condata.will.topicName.cstring = MQTT_PUBTOPIC; client.condata.will.message.cstring = MQTT_WILLMSG; /* malloc buffer. */ client.buf_size = client.readbuf_size = 1024; client.buf = rt_calloc(1, client.buf_size); client.readbuf = rt_calloc(1, client.readbuf_size); if (!(client.buf && client.readbuf)) { LOG_E("no memory for MQTT client buffer!"); return; } /* set subscribe table and event callback */ client.messageHandlers[0].topicFilter = rt_strdup(MQTT_SUBTOPIC); client.messageHandlers[0].callback = mqtt_sub_callback; client.messageHandlers[0].qos = QOS1; paho_mqtt_start(&client); is_started = 1; while(1) { if (is_started == 0) { LOG_E("mqtt client is not connected."); return; } else { paho_mqtt_publish(&client, QOS1, MQTT_PUBTOPIC, "hello emqx!"); } rt_thread_mdelay(2000); } } /* 创建线程 */ int mqtt_emqx(void) { rt_thread_t tid; //线程句柄 tid = rt_thread_create("mqtt_emqx", mqtt_emqx_entry, RT_NULL, 512, 9, 10); if(tid != RT_NULL) { //线程创建成功,启动线程 rt_thread_startup(tid); } return 0; } MSH_CMD_EXPORT(mqtt_emqx, mqtt emqx test); 3.2. 测试发布


3.3. 测试订阅



studio rt-thread mqtt

需要 登录 后方可回复, 如果你还没有账号请 注册新账号