300字范文,内容丰富有趣,生活中的好帮手!
300字范文 > 【STM32 x ESP8266】连接 MQTT 服务器(报文 附部分源码解析)

【STM32 x ESP8266】连接 MQTT 服务器(报文 附部分源码解析)

时间:2019-04-18 21:05:43

相关推荐

【STM32 x ESP8266】连接 MQTT 服务器(报文 附部分源码解析)

MQTT 协议作为物联网非常重要的传输协议,如何使用它十分重要,如果有不理解的同学可以点击这里学习,这里只是简单介绍一下。同时这里附上MQTT 3.1.1协议中文版 pdf 的链接,对协议底层感兴趣的同学可以下载学习一下,同时下面的实现函数就是基于该报文来实现的

项目整体在这里下载(赚点积分),查看报文调试不易,多多支持一下。该项目是基于野火的 3-向电脑网络助手上传 DHT11 温湿度 的基础上添加了 MQTT 部分,主要使用了里面的TCP 连接 + 透传的设置。

实例功能:通过 stm32 控制 esp8266连接阿里云 MQTT 服务器 / 自己服务器上搭建的 MQTT 服

务器(比如说 EMQ)/ 其他公用 MQTT 服务器。用户只需在mqtt_config.h文件中修改MQTT服务器的相关信息即可。

有关如何连接阿里云 MQTT 服务器的内容在这篇文章中,如有需要可以去看看怎么使用。

开发版:野火指南者 + 自带的 ESP8266

ESP8266要求:使用原生的固件即可,无需烧写专门连接MQTT服务器的固件

一、MQTT协议报文标识

1、CONNECT 连接报文

1.1 客户端ID

ClientId是MQTT客户端的标识。MQTT服务端用该标识来识别客户端。因此ClientId必须是独立的。如果两个MQTT客户端使用相同ClientId标识,服务端会把它们当成同一个客户端来处理。

1.2 清除会话

①如果没有设置该标识(cleanSession = “false” && QoS > 0),服务器端会将尚未被客户端确认的报文保存起来,并再次尝试向客户端发送报文,并且再次等待客户端发来确认信息。

②如果设置该标识(cleanSession = “true”),服务端不需要客户端确认收到报文,也不会保存任何报文。在这种情况下,即使客户端错过了服务端发来的报文,也没办法让服务端再次发送报文。

1.3 心跳时间间隔

让客户端在没有向服务端发送信息时,定时向服务端发送一条消息。心跳请求的作用正是用于告知服务端,当前客户端依然在线。

举例来说,如果心跳时间间隔是 60 秒。那么服务端在 90 秒内没有收到客户端发布的消息也没有收到心跳请求(PINGREQ)请求,那么它就会认为客户端已经掉线。

2、PUBLISH 发布消息报文

1.1 保留标志

在默认情况下,当客户端订阅了某一主题后,并不会马上接收到该主题的信息。只有在客户端订阅该主题后,服务端接收到该主题的新信息时,服务端才会将最新接收到的该主题信息推送给客户端。

但是在有些情况下,我们需要客户端在订阅了某一主题后马上接收到一条该主题的信息。这时候就需要用到保留标志这一信息。

每一个主题只能有一个 “保留消息”,如果客户端想要更新 “保留消息”,就需要向该主题发送一条新的 “保留消息”,这样服务端会将新的 “保留消息” 覆盖旧的 “保留消息”。

如果要删除主题的“保留消息”,可以通过向该主题**发布一条空的“保留消息”**即可。

1.2 QoS – 服务质量等级

QoS = 0 –> 最多发一次

QoS = 1 –> 最少发一次

QoS = 2 –> 保证收一次

① QoS = 0 –> 最多发一次

当 QoS = 0 时,MQTT 协议并不保证所有信息都能得以传输。也就是说,MQTT服务端和客户端不会对消息传输是否成功进行确认和检查。消息能否成功传输全看网络环境是否稳定。发送端不会检查发出的消息能否被正确接收到

② QoS = 1 –> 最少发一次

发送端将消息发送给接收端后,会等待接收端的确认。接收端成功接收消息后,会发送一条确认报文 PUBACK 给发送端。如果发送端收到了这条 PUBACK 确认报文,那么它就知道消息已经成功接收。

假如过了一段时间后,发送端没有收到PUBACK 报文,那么发送端会再次发送消息,然后再次等待接收端的 PUBACK 确认报文。因此,当 QoS = 1 时,发送端在没有收到接收端的PUBACK确认报文以前,会重复发送同一条消息

③ QoS = 2 –> 保证收一次

QoS = 2 的收发相对更加复杂。发送端需要接收端进行两次消息确认。因此,2级MQTT服务质量是最安全的服务级别,也是最慢的服务级别。

接收端收到 QoS = 2 的消息后,会返回 PUBREC 报文作为应答。发送端收到 PUBREC 报文后,会把此报文进行存储,并且返回 PUBREL 报文作为应答。当接收端收到 PUBREL 报文后,会应答发送端一条 PUBCOMP 报文。至此,一次 QoS2 的 MQTT 消息传输就结束了。

3、遗嘱

一旦客户端意外断线,服务端就可以将客户端的遗嘱公之于众。

遗嘱中包含遗嘱主题、遗嘱消息、遗嘱QoS、遗嘱保留。

3.1 遗嘱操作建议

假设我们现在有一台MQTT客户端。它的 client id 是 client-1。它的遗嘱主题是“client-1-will”

当 client-1 连接服务端时,CONNECT 报文中的遗嘱消息是 “offline”。并且它的遗嘱保留设置为 true

当 client-1成功连接服务端后,立即向遗嘱主题 “client-1-will” 发布一条消息 “online”。同时在发布此消息时,保留标志设置为 true。这样,只要 client-1 在线,那么任何设备一订阅 “client-1-will” 就能收到设备在线的消息 “online”

如果 client-1发生意外离线那么任何设备一订阅 “client-1-will” 就会收到设备离线的消息 ”offline”

如果 client-1恢复连接,那么它会将遗嘱主题 “client-1-will” 的保留消息更改为 “online”,这样任何设备一订阅 “client-1-will” 就能收到设备在线的消息 “online”。

二、项目解析

注:该项目的实践对象是自己搭建的 MQTT 服务器 EMQ,有关如何连接阿里云 MQTT 服务器的内容在这篇文章中,如有需要可以去看看怎么使用。

1、ESP8266 连接 WIFI + 连接 MQTT 服务器

bsp_esp8266_test.h

/********************************** 用户需要设置的参数**********************************/#definemacUser_ESP8266_ApSsid ""//要连接的热点的名称#definemacUser_ESP8266_ApPwd ""//要连接的热点的密钥#definemacUser_ESP8266_TcpServer_IP IP//要连接的服务器的 IP(在 mqtt_config.h 中定义)#definemacUser_ESP8266_TcpServer_PortPORT //要连接的服务器的端口(在 mqtt_config.h 中定义)

2、mqtt_config.h

2.1 切换连接其他 MQTT 服务器

#defineUSE_Aliyun_MQTT0// 是否使用阿里云的免费MQTT服务器

2.2 debug 开关

#define DEBUG_11// 为 0 屏蔽内部所有串口输出信息#define DEBUG_21// 查看接收/发送的报文

2.3 修改缓冲区大小

#define MAX_BUF_SIZE2048// 接收/发送的数据缓冲区#define MAX_THEME_NUM10// 最大存储订阅主题数目

2.4 移植需要提供的内容

#define Rx_Finish_FlagstrEsp8266_Fram_Record.InfBit.FramFinishFlag// 串口接收完成标志#defineRx_BufferstrEsp8266_Fram_Record.Data_RX_BUF// 串口接收缓冲区#defineRx_Buffer_LenstrEsp8266_Fram_Record.InfBit.FramLength// 串口接收数据长度#defineESP8266_USARTmacESP8266_USARTx// ESP8266的串口/*提供一个能够准确计算一长串16进制数据的长度的函数(strlen函数有些情况统计的长度要比实际长度小)函数格式:int UpdateStrlen_uint8_t(const void* source),函数返回值是实际长度值(该函数在my_string.c中)*/#defineCount_Hex_Num(source)UpdateStrlen_uint8_t(source)/*提供发送报文函数,该函数要一个一个字节的接收*/#defineSend_Message(usart, ch)Usart_SendByte(usart, ch)

2.5 用户详细配置 MQTT 连接参数

​ 包括设置心跳周期是否使用遗嘱以及遗嘱的相关信息(阿里云 MQTT 服务器好像不能使用遗嘱,因为我一旦使用遗嘱了,它就不给我连接返回报文了)、设置 ClientID决定是否匿名登录(阿里云 MQTT 服务器只能使用账号密码登录)

#define Set_KeepAlive 60// 设置心跳周期#if(USE_Aliyun_MQTT == 1)/* 使用阿里云MQTT服务器 */#define Enable_Will_Topic0// 阿里云MQTT服务器应该是不能使用遗嘱的,置 0#define ClientID"zyt"// 自定义#defineEnable_Username_And_Password1// 必须置 1,不能置零!!!#else/* USE_Aliyun_MQTT == 0使用其他的MQTT服务器 */#define Enable_Will_Topic1// 是否使用遗嘱主题#define MQTTClientID"zyt"// 自定义#defineEnable_Username_And_Password1// 是否使用用户名密码,有些MQTT服务器支持匿名登录#endif/* USE_Aliyun_MQTT */#if (Enable_Will_Topic == 1)/* 使用遗嘱主题 */#defineWill_Topic_QosQos1// 遗嘱主题 Qos 等级#defineWill_Topic_Name"/user/will"// 遗嘱主题名字#defineWill_Topic_Message"off_line"// 遗嘱主题的消息内容#endif/* Enable_Will_Topic */

3、mqtt.h

这个定义使用在void MQTT_ReceiveMsg(u8 mqtt_msg_type, u8 *mqtt_rxbuf)函数中的,主要是为了区分不同发送报文的返回报文的长度,具体请看后面。

该定义不要修改!!

/* ========================== MQTT报文类型 ========================== */#define MQTT_TypeCONNECT 1//请求连接#define MQTT_TypeCONNACK 2//请求应答#define MQTT_TypePUBLISH 3//发布消息#define MQTT_TypePUBACK 4//发布应答#define MQTT_TypePUBREC 5//发布已接收,保证传递1#define MQTT_TypePUBREL 6//发布释放,保证传递2#define MQTT_TypePUBCOMP 7//发布完成,保证传递3#define MQTT_TypeSUBSCRIBE 8//订阅请求#define MQTT_TypeSUBACK 9//订阅应答#define MQTT_TypeUNSUBSCRIBE 10//取消订阅#define MQTT_TypeUNSUBACK 11//取消订阅应答#define MQTT_TypePINGREQ 12//ping请求#define MQTT_TypePINGRESP 13//ping响应#define MQTT_TypeDISCONNECT 14//断开连接#define MQTT_WriteMsg15//等待接收订阅的消息(自定义的)

这些宏定义是设置有关 CONNECT 连接报文的标志位,以及遗嘱主题的设置。如果无需使用遗嘱的话,只需将 MQTT_StaWillFlag 置 0 即可,后面的遗嘱相关内容就失效了。无需使用用户名密码也是同样操作,将 MQTT_StaUserNameFlag 和 MQTT_StaPasswordFlag 置 0 即可。

CONNECT 连接报文的内容可以根据自己实际要求更改,CONNACK 报文返回码不能更改。

/* ========================== CONNECT报文设置 ========================== */#define MQTT_StaCleanSession 1 //清理会话#define MQTT_StaWillFlag 1//遗嘱标志#define MQTT_StaWillQoS 0 //遗嘱QoS连接标志的第4和第3位。#define MQTT_StaWillRetain 0 //遗嘱保留#define MQTT_StaUserNameFlag 1 //用户名标志 User Name Flag#define MQTT_StaPasswordFlag 1 //密码标志 Password Flag#define MQTT_KeepAlive 120//心跳周期#define MQTT_ClientIdentifier "111" //客户端标识符 Client Identifier#define MQTT_WillTopic "yizhu" //遗嘱主题 Will Topic#define MQTT_WillMessage "zheshiyizhu" //遗嘱消息 Will Message#define MQTT_UserName "zyt" //用户名 User Name#define MQTT_Password "010823"//密码 Password/* ========================== CONNACK报文返回码 ========================== */#define Connect_Accept0x00// 连接已接受#defineConnect_Refuse_Version0x01// 连接已拒绝,不支持的协议版本#defineConnect_Refuse_ClientId0x02// 连接已拒绝,不合格的客户端标识符#defineConnect_Refuse_Sever_Unavailable 0x03// 连接已拒绝,服务端不可用#defineConnect_Refuse_Acc_Or_Pass0x04// 连接已拒绝,无效的用户名或密码

4、mqtt.c

4.1 生成MQTT报文的固定报头函数:GetDataFixedHead()(用户无需调用)

此函数是生成报文的时候使用的,用户层无需调用此函数。

剩下的剩余长度的值由于跟每个报文的长度有关,于是该部分通常放在最后生成,由static int AddRemainingLength(void* mqtt_txbuf, uint8_t cps_len)函数负责。

/*** @brief 生成固定报头** @param MesType: mqtt报文类型(详见mqtt.h)* @paramDupFlag: 重发标志*@arg0: 客户端或服务端第一次请求发送这个报文*@arg1: 可能是一个早前报文请求的重发* @paramQosLevel: Qos等级* @paramRetain: 保留标志(设置后在订阅了该主题后马上接收到一条该主题的信息)*@arg0: 不发布保留消息*@arg1: 发布保留消息** @retval 返回固定报头(8bit/1位)*/uint8_t GetDataFixedHead(unsigned char MesType, unsigned char DupFlag, unsigned char QosLevel, unsigned char Retain){unsigned char dat = 0;dat = (MesType & 0x0f) << 4;dat |= (DupFlag & 0x01) << 3;dat |= (QosLevel & 0x03) << 1;dat |= (Retain & 0x01);return dat;}

4.2 生成 MQTT 报文固定报头中剩余长度部分函数:AddRemainingLength()(用户无需调用)

/*** @brief 往发送的报文中插入剩余长度** @param mqtt_txbuf: 要插入剩余长度的报文缓存区* @param cps_len: 补偿长度(使用这个位通常是报文最后面是0,要额外加上去)** @retval 返回报文总长度*/static int AddRemainingLength(void* mqtt_txbuf, uint8_t cps_len){uint8_t* txbuf = mqtt_txbuf;uint8_t last = 1, cur = 0, next_1 = 0, next_2 = 0;int src_len = -1;int i;int remain_len, remain_num;// 获取该数组的长度,当检测到连续三个元素都为0的时候退出循环for (i = 0; ; i++){if (i)last = cur;cur = *(txbuf + i);next_1 = *(txbuf + i + 1);next_2 = *(txbuf + i + 2);if (last == 0 && cur == 0 && next_1 == 0 && next_2 == 0)break;elsesrc_len++;}// 得到剩余长度值remain_len = src_len + cps_len - 1;// 将剩余长度值插入进发送数组中if(remain_len <= 127)remain_num = 1;else if(remain_len <= 128 && remain_len >= 16383)remain_num = 2;for(i = src_len; i > 0; i--)txbuf[i + remain_num] = txbuf[i];switch(remain_num){case 1:txbuf[1] = remain_len;break;case 2:txbuf[1] = 0x80 | (remain_len / 128);txbuf[2] = remain_len % 128;break;}return (remain_len + remain_num + 1);}

4.3 连接 MQTT 服务器函数:MQTT_Connect()

该函数在发送 CONNECT 连接报文的同时,还会接收服务器发送回来的 CONNACK 确认连接报文。

而 CONNACK 确认连接报文中最主要的部分就是第四字节连接返回码了。

#define MQTT_Connect()SendCONNECT()/*** @brief 获取连接的数据包** @param None** @retval 返回连接返回码*@argConnect_Accept: 连接已接受*@argConnect_Refuse_Version:连接已拒绝,不支持的协议版本*@argConnect_Refuse_ClientId:连接已拒绝,不合格的客户端标识符*@argConnect_Refuse_Sever_Unavailable:连接已拒绝,服务端不可用*@argConnect_Refuse_Acc_Or_Pass:连接已拒绝,无效的用户名或密码*/uint8_t SendCONNECT(void){unsigned int i,len,lennum = 0;unsigned char *msg;memset(strEsp8266_Fram_Record.Data_RX_BUF, 0, RX_BUF_MAX_LEN);/* 1、固定报头 */mqtt_txbuf[0] = SendFixedHead(MQTT_TypeCONNECT, 0, 0, 0);// 最后再添加剩余长度的值/* 2、可变报头 */// 2.1 协议名mqtt_txbuf[1] = 0x00;mqtt_txbuf[2] = 0x04;mqtt_txbuf[3] = 'M';mqtt_txbuf[4] = 'Q';mqtt_txbuf[5] = 'T';mqtt_txbuf[6] = 'T';// 2.2 协议级别0x04:MQTT 3.1.1mqtt_txbuf[7] = 0x04;// 2.3 连接标志mqtt_txbuf[8] = 0 | (MQTT_StaCleanSession << 1) | (MQTT_StaWillFlag << 2) | (MQTT_StaWillQoS << 3) | (MQTT_StaWillRetain << 5) | (MQTT_StaPasswordFlag << 6) |(MQTT_StaUserNameFlag << 7);// 2.4 保持连接时间(心跳周期)mqtt_txbuf[9] = MQTT_KeepAlive >> 8;mqtt_txbuf[10] = MQTT_KeepAlive;/* 3、有效载荷 */// 3.1 客户端标识符(ClientId)// 客户端标识符 = 0,必须同时将清理会话标志 MQTT_StaCleanSession 设置为 1len = strlen(MQTT_ClientIdentifier);mqtt_txbuf[11] = len >> 8;mqtt_txbuf[12] = len;msg = (u8 *)MQTT_ClientIdentifier;for(i = 0; i < len; i++){mqtt_txbuf[13 + i] = msg[i];}lennum += len;#if (MQTT_StaWillFlag == 1)// 3.2 遗嘱主题len = strlen(MQTT_WillTopic);mqtt_txbuf[12 + lennum + 1] = len >> 8;mqtt_txbuf[12 + lennum + 2] = len;lennum += 2;msg = (u8 *)MQTT_WillTopic;for(i = 0;i<len;i++){mqtt_txbuf[13 + lennum + i] = msg[i];}lennum += len;// 3.3 遗嘱消息len = strlen(MQTT_WillMessage);mqtt_txbuf[12 + lennum + 1] = len >> 8;mqtt_txbuf[12 + lennum + 2] = len;lennum += 2;msg = (u8 *)MQTT_WillMessage;for(i = 0; i < len; i++){mqtt_txbuf[13 + lennum + i] = msg[i];}lennum += len;#endif /* (MQTT_StaWillFlag == 1) */#if (MQTT_StaUserNameFlag == 1)// 3.4 用户名len = strlen(MQTT_UserName);mqtt_txbuf[12 + lennum + 1] = len >> 8;mqtt_txbuf[12 + lennum + 2] = len;lennum += 2;msg = (u8 *)MQTT_UserName;for(i = 0; i< len; i++){mqtt_txbuf[13 + lennum + i] = msg[i];}lennum += len;#endif/* (MQTT_StaUserNameFlag == 1) */#if (MQTT_StaPasswordFlag == 1)// 3.5 密码len = strlen(MQTT_Password);mqtt_txbuf[12 + lennum + 1] = len >> 8;mqtt_txbuf[12 + lennum + 2] = len;lennum += 2;msg = (u8 *)MQTT_Password;for(i = 0; i < len; i++){mqtt_txbuf[13 + lennum + i] = msg[i];}lennum += len;#endif/* (MQTT_StaPasswordFlag == 1) *//* 1、固定报头(补剩余长度值) */len = AddRemainingLength(mqtt_txbuf, 0);/* 将 CONNECT 报文发送 */MQTT_SendMsg(mqtt_txbuf, len);/* 接收发送回来的报文,判断是否连接成功 */MQTT_ReceiveMsg(MQTT_TypeCONNECT, mqtt_rxbuf);// 处理连接返回码return mqtt_rxbuf[3];}

4.4 订阅主题函数:MQTT_Subscribe_Topic()

订阅主题成功后,服务器会发生回来一个 SUBACK 订阅确认报文,通过确认返回的报文类型判断是否订阅成功。成功后会将该主题名字保持在一个 themes[] 结构体数组中。这部分非常重要,因为在后面接收服务器发送过来的 PUBLISH 报文时,要在中间将主题发送过来的消息提取出来,而在报文中的主题名字和消息内容是紧挨在一起的,如果没有存储订阅的主题名,就无法判断出主题名字和消息内容的边界到底在哪

#defineMQTT_Subscribe_Topic(topic_name, Qos)SendSUBSCRIBE(topic_name, Qos)/*** @brief 发送订阅消息的 SUBSCRIBE 报文** @param topic: 想要订阅的主题* @param RequestedQoS: 服务质量等级 Qos** @retval >=0: 订阅成功,具体返回订阅的 Qos值 -1:订阅失败*/int SendSUBSCRIBE(const char *topic,unsigned char RequestedQoS){unsigned int i,len = 0,lennum = 0;uint8_t cps_len = 0;/* 1、固定报头 */// 第3,2,1,0位是保留位,必须分别设置为0,0,1,0mqtt_txbuf[0] = 0x82;/* 2、可变报头 */// 2.1 报文标识符(如果一个客户端要重发这个特殊的控制报文,在随后重发那个报文时,它必须使用相同的标识符)// 校验输入的报文标识符是否符合要求if(RequestedQoS == 0)cps_len = 1;else if(RequestedQoS > 0)new_pid++;mqtt_txbuf[1] = new_pid >> 8;mqtt_txbuf[2] = new_pid;/* 3、有效载荷 */// 3.1 想要订阅的主题(符合主题过滤器的要求,即主题中含有'/'的分级符)len = strlen(topic);mqtt_txbuf[3] = len >> 8;mqtt_txbuf[4] = len;for(i = 0; i < len; i++)mqtt_txbuf[5 + i] = topic[i];lennum = len;// 3.2 服务质量要求(Requested QoS)mqtt_txbuf[5 + lennum] = RequestedQoS;/* 1、固定报头(补剩余长度值) */len = AddRemainingLength(mqtt_txbuf, cps_len);/* 发送 SUBSCRIBE 报文 */MQTT_SendMsg(mqtt_txbuf, len);/* 接收发送回来的报文,判断是否订阅成功 */MQTT_ReceiveMsg(MQTT_TypeSUBSCRIBE, mqtt_rxbuf);if(Get_Fixed_Header_Type(mqtt_rxbuf) == MQTT_TypeSUBACK && mqtt_rxbuf[4] == RequestedQoS){themes[ThemeNum].ThemeName = (char *)topic;themes[ThemeNum].ThemeMsg = NULL;ThemeNum++;#if (DEBUG_1 == 1)printf("\"%s\" 主题订阅成功!\n", themes[ThemeNum - 1].ThemeName);#endifreturn RequestedQoS;}elsereturn -1;}

4.5 发布消息函数:MQTT_Publish_Topic()

在发布消息的时候,涉及到了 Qos 不同级别的处理问题,Qos0 级别的是“我发出去了就不关我的事了”,所以只需将 PUBLISH 报文发送给服务器就可以了,服务器不会返回确认报文。而 Qos1 级别是“一发一回”,发送 PUBLISH 报文后服务器还会返回一个 PUBACK 确认报文。Qos2 级别是“四次握手”,发送 PUBLISH 报文首先接收 PUBREC 报文,接着自身还要发送 一个 PUBREL 报文,等待接收到 PUBCOMP 报文才算结束。

在发送PUBREL报文的时候要特别注意第一个字节的保留位要设置成0,0,1,0

#define Qos00#defineQos11#define Qos22#define Save_Msg1#define No_Save_Msg0#define MQTT_Publish_Topic(topic_name, msg, Qos, retain)SendPUBLISH(0, Qos, retain, topic_name, msg)/*** @brief 获取发布消息的数据包** @paramdup: 重发标志*@arg0: 客户端或服务端第一次请求发送这个报文*@arg1: 可能是一个早前报文请求的重发* @paramqos: Qos等级* @paramretain: 保留标志(设置后在订阅了该主题后马上接收到一条该主题的信息)*@arg0: 不发布保留消息*@arg1: 发布保留消息* @param topic: 主题名* @param msg: 待发布的消息** @retval 0: 订阅成功-1:订阅失败*/int SendPUBLISH(unsigned char dup, unsigned char qos, unsigned char retain, const char *topic, const char *msg){unsigned int i,len = 0,lennum = 0;uint8_t cps_len = 0;/* 1、固定报头 */mqtt_txbuf[0] = SendFixedHead(MQTT_TypePUBLISH, dup, qos, retain);/* 2、可变报头 */// 2.1 主题名len = strlen(topic);mqtt_txbuf[1] = len >> 8;mqtt_txbuf[2] = len;for(i = 0; i < len; i++)mqtt_txbuf[3 + i] = topic[i];lennum = len;// 2.2 报文标识符// 校验输入的报文标识符是否符合要求if(qos == 0)cps_len = 1;else if(qos > 0)new_pid++;mqtt_txbuf[2 + lennum + 1] = new_pid >> 8;mqtt_txbuf[2 + lennum + 2] = new_pid;lennum += 2;/* 3、有效载荷 */// 3.1 消息len = strlen(msg);for(i = 0; i < len; i++)mqtt_txbuf[3 + i + lennum] = msg[i];lennum += len;/* 1、固定报头(补剩余长度值) */len = AddRemainingLength(mqtt_txbuf, cps_len);/* 发送 PUBLISH 报文 */MQTT_SendMsg(mqtt_txbuf, len);if(qos == 0)return 0;else{/* 接收发送回来的 PUBACK/PUBREC 报文,判断是否发布成功(Qos > 0 才会有返回报文) */MQTT_ReceiveMsg(MQTT_TypePUBLISH, mqtt_rxbuf);if(mqtt_rxbuf[2] == 0 && mqtt_rxbuf[3] == new_pid)// 处理报文标识符{// 接收到的是 PUBREC 报文if(Get_Fixed_Header_Type(mqtt_rxbuf) == MQTT_TypePUBREC){if(SendPUBREL(new_pid) < 0)return -1;else return 0;}// 接收到的是 PUBACK 报文else if(Get_Fixed_Header_Type(mqtt_rxbuf) == MQTT_TypePUBACK)return 0;elsereturn -1;}else return -1;}}/*** @brief 接收到 Qos = 2 的 PUBREC 报文,需要回复 PUBREL 报文,* 并且要接收对方发送的 PUBCOMP 报文** @param pid:报文标识符,要与 Qos = 2 的 PUBLISH 报文标识符一致** @retval 0:对方成功接收-1:对方接收失败*/static int SendPUBREL(uint16_t pid){/* 1、固定报头 */mqtt_txbuf[0] = SendFixedHead(MQTT_TypePUBREL, 0, 1, 0);mqtt_txbuf[1] = 2;/* 2、可变报头 */// 2.1 报文标识符mqtt_txbuf[2] = pid >> 8;mqtt_txbuf[3] = pid;/* 发送 PUBREL 报文 */MQTT_SendMsg(mqtt_txbuf, 2 + mqtt_txbuf[1]);/* 接收发送回来的报文,判断是否发布成功 */MQTT_ReceiveMsg(MQTT_TypePUBREL, mqtt_rxbuf);if((mqtt_rxbuf[0] >> 4) == MQTT_TypePUBCOMP)// 处理报文标识符return 0;else return -1;}

4.6 发送心跳函数:MQTT_alive()

能够在心跳周期之内发送一次心跳函数是非常重要的,因为这可以让客户端和服务器端相互之间都能知道对方是否因某种原因非正常断开连接(不是发送 DISCONNECT 报文断开连接的),如果在连接 MQTT 服务器的时候设立了遗嘱,等到由于客户端未在规定时间内发送心跳报文给服务器端,服务器端判定对方非正常断开连接,就会将遗嘱发送给订阅了该主题的客户端。

同时客户端也能知晓与服务器端断开连接了,这时候可以采取重连的方式恢复连接

#define MQTT_alive()SendPINGREQ()/*** @brief 发送心跳请求** @param None** @retval 0:与MQTT服务器通讯正常 -1:通讯可能断开,可以多发几次确认一下*/int SendPINGREQ(void){mqtt_txbuf[0] = 0xc0;mqtt_txbuf[1] = 0x00;/* 发送 PINGREQ 报文 */MQTT_SendMsg(mqtt_txbuf, 2);/* 接收心跳响应报文 PINGRESP */MQTT_ReceiveMsg(MQTT_TypePINGREQ, mqtt_rxbuf);if(Get_Fixed_Header_Type(mqtt_rxbuf) == MQTT_TypePINGRESP)return 0;elsereturn -1;}

4.7 半覆盖遗嘱函数:MQTT_Modify_Will()

该函数的原理就是向遗嘱主题再次发送一个新的消息。

为啥叫作“半覆盖”呢?具体原因可以查看3.1 遗嘱操作建议,这里不再过多阐述。

#if (MQTT_StaWillFlag == 1)#defineMQTT_Modify_Will(msg)MQTT_Publish_Topic(MQTT_WillTopic, msg, MQTT_StaWillQoS, Save_Msg)#endif/*MQTT_alive()*/

4.8 监听订阅主题发送消息函数:MQTT_Listen_Topic()

该函数的功能是接收服务器发送过来的 PUBLISH 报文,根据提取出来的 Qos 等级判断是否需要发送回复报文,发送 PUBREC、PUBCOMP 、PUBACK 报文的函数就不贴出来了,跟前面的 PUBREL 大同小异。

接收完成后默认使用串口将接收到的消息显示到串口调试助手上(支持显示中文)。

#define MQTT_Listen_Topic()\{\MQTT_ReceiveMsg(MQTT_WriteMsg, mqtt_rxbuf); \PrintRecvMsg();\}/*** @brief 从esp8266获取到mqtt服务器返回的报文** @param mqtt_msg_type: 在MQTT_SendMsg()函数中发送报文的类型(类型详见mqtt.h)* @param mqtt_rxbuf: 存储从与esp8266连接的串口中暂存的报文** @retval None*/void MQTT_ReceiveMsg(u8 mqtt_msg_type, u8 *mqtt_rxbuf){u8 len = 0;uint8_t qos;uint16_t PBULISH_pid;delay_nms(500);/* 如果接收到了ESP8266的数据 */if(strEsp8266_Fram_Record.InfBit.FramFinishFlag){// 根据发送的报文类型得到接收到的报文长度switch(mqtt_msg_type){case MQTT_TypeCONNECT:// 返回报文类型是 CONNACKcase MQTT_TypePUBLISH:// 返回报文类型是 PUBACK/PUBRECcase MQTT_TypePUBREC:// 返回报文类型是 PUBRELcase MQTT_TypePUBREL:// 返回报文类型是 PUBCOMPcase MQTT_TypeUNSUBSCRIBE:// 返回报文类型是 UNSUBACKlen = 4;break;case MQTT_TypeSUBSCRIBE:// 返回报文类型是 SUBACKlen = 5;break;case MQTT_TypePINGREQ:// 返回报文类型是 PINGRESPlen = 2;break;case MQTT_WriteMsg:// 等待接收订阅的消息len = UpdateStrlen_uint8_t(strEsp8266_Fram_Record.Data_RX_BUF);break;default:return;}memset(mqtt_rxbuf, 0, MAX_BUF_SIZE);memset(buf, 0, MAX_BUF_SIZE);#if ((DEBUG_1 == 1) && (DEBUG_2 == 1))// 将接收到的报文显示在串口调试助手上HexToAscii(strEsp8266_Fram_Record.Data_RX_BUF, buf, len, ADD_SPACE_AND_0X);printf("接收报文: %s\n", buf);#endif// 将指定长度的报文保存至mqtt_rxbuf中memcpy(mqtt_rxbuf, strEsp8266_Fram_Record.Data_RX_BUF, len);// 函数接收到了订阅的主题发送过来的消息报文if(mqtt_msg_type == MQTT_WriteMsg){// 判断接收到的报文是什么类型的switch(Get_Fixed_Header_Type(mqtt_rxbuf)){// 接收到 PUBLISH 报文case MQTT_TypePUBLISH:// 先保存发送过来的消息内容PBULISH_pid = Get_Packet_Identifier(mqtt_rxbuf, SaveReceiveBuf(len)); // 提取固定报头中的 qos 等级qos = Get_Fixed_Header_Qos(mqtt_rxbuf);// qos = 1:PUBLISH -> PUBACKif(qos == 1)SendPUBACK(PBULISH_pid);// qos = 2:PUBLISH -> PUBREC PUBREL -> PUBCOMPelse if(qos == 2){SendPUBREC(PBULISH_pid);delay_nms(100);SendPUBREC(PBULISH_pid);delay_nms(100);SendPUBCOMP(PBULISH_pid);}break;default:break;}}strEsp8266_Fram_Record.InfBit.FramLength = 0; //接收数据长度置零strEsp8266_Fram_Record.InfBit.FramFinishFlag = 0; //接收标志置零memset(strEsp8266_Fram_Record.Data_RX_BUF, 0, RX_BUF_MAX_LEN);// 清空数据缓冲区}}#if (DEBUG_1 == 1)void PrintMsg(void){// 打印接收到的消息for(int i = 0; i < ThemeNum; i++){if(themes[i].ThemeMsg != NULL){printf("\"%s\" 主题发送:", themes[i].ThemeName);for(int j = 0; j < UpdateStrlen_uint8_t(themes[i].ThemeMsg); ){if( themes[i].ThemeMsg[j] >= 0xA1 && themes[i].ThemeMsg[j + 1] >= 0xA1 ){printf("%c%c", themes[i].ThemeMsg[j], themes[i].ThemeMsg[j + 1]);j += 2;}else{printf("%c", themes[i].ThemeMsg[j]);j++;}}printf("\n");free(themes[i].ThemeMsg);themes[i].ThemeMsg = NULL;}}}#endif

4.9 主动断开连接函数: MQTT_Disconnect()

#defineMQTT_Disconnect()SendDISCONNECT()/*** @brief 发送断开连接的数据包** @param mqtt_txbuf: 存储待发送报文的数组** @retval None*/void SendDISCONNECT(void){mqtt_txbuf[0] = 0xe0;mqtt_txbuf[1] = 0x00;/* 发送 DISCONNECT 报文 */MQTT_SendMsg(mqtt_txbuf, 2);}

三、实例演示

1、代码配置

这里发布了两个主题,一个是遗嘱主题,一个是“/user/123”主题。订阅了两个主题,一个是“/user/abc”主题,一个是“/user/test”主题。

2、运行效果

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。