OpenEdv-开源电子网

 找回密码
 立即注册
正点原子全套STM32/Linux/FPGA开发资料,上千讲STM32视频教程免费下载...
查看: 25268|回复: 24

物联网核心之MQTT移植

[复制链接]

58

主题

499

帖子

4

精华

金牌会员

Rank: 6Rank: 6

积分
1920
金钱
1920
注册时间
2013-11-18
在线时间
268 小时
发表于 2017-9-11 22:57:41 | 显示全部楼层 |阅读模式
本帖最后由 mzwhhwj 于 2017-9-11 22:57 编辑

   在上一篇文章中,只是讲了MQTT的主要内容,至于怎么移植到STM32上,怎么使用才是最重要的关键。这里使用的平台是RT8711WIFI SOC,使用的LWIPFreeRTOS,移植使用跟STM32+LWIP是没什么区别的。
   先在Github上找到Eclipse的开源MQTT客户端程序https://github.com/eclipse/paho.mqtt.embedded-c.git,并把源码下载起来
1.jpg
   解压源码,再进入MQTTPacket文件夹,里面有三个文件夹
1.jpg
   把src里面的所有文件和samples下的transport.ctransport.h两个文件复制到工程目录下。这里我们主要的移植工作就在transport里面。打开transport.c文件,这个是MQTT连接,发送,接收的接口,源码是LinuxWindows平台,用的标准的Socket接口函数,我们这里的移植工作量很小,因为LWIP也是支持标准的Socket接口函数,只不过里面有些函数接口是LWIP不支持的,主要就是transport_open这个连接函数有区别。把原来的transport_open函数注释掉,重新写一个。
[mw_shl_code=c,true]int transport_open(char* addr, int port)
{
        int* sock = &mysock;
        struct hostent *server;
    struct sockaddr_in serv_addr;
        static struct timeval tv;
        int timeout = 1000;
        fd_set readset;
        fd_set writeset;

    *sock = socket(AF_INET, SOCK_STREAM, 0);
    if(*sock < 0)
        DiagPrintf("[ERROR] Create socket failed\n");
   
    server = gethostbyname(addr);
    if(server == NULL)
        DiagPrintf("[ERROR] Get host ip failed\n");
   
    memset(&serv_addr,0,sizeof(serv_addr));
    serv_addr.sin_family = AF_INET;
    serv_addr.sin_port = htons(port);
    memcpy(&serv_addr.sin_addr.s_addr,server->h_addr,server->h_length);
   
    if (connect(*sock,(struct sockaddr *)&serv_addr,sizeof(serv_addr)) < 0){
        DiagPrintf("[ERROR] connect failed\n");
        return -1;
        }
        tv.tv_sec = 10;  /* 1 second Timeout */
        tv.tv_usec = 0;  
        setsockopt(mysock, SOL_SOCKET, SO_RCVTIMEO, (char *)&timeout,sizeof(timeout));
        return mysock;
}[/mw_shl_code]
   到这里其实移植工作就已经完成了,就是这么的简单,剩下就是怎么使用MQTT。直接上代码,这里用FreeRTOS新建一个MQTT的任务。[mw_shl_code=c,true]#define HOST_NAME "m2m.eclipse.org"
#define HOST_PORT 1883

void mqtt_thread( void *arg)
{
        MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
        MQTTString receivedTopic;
        int rc = 0;
        char buf[200];
        int buflen = sizeof(buf);
        int mysock = 0;
        MQTTString topicString = MQTTString_initializer;
        int payloadlen_in;
        unsigned char* payload_in;
        unsigned short msgid = 1;
        int subcount;
        int granted_qos =0;
        unsigned char sessionPresent, connack_rc;
        unsigned short submsgid;
        int len = 0;
        int req_qos = 1;
        unsigned char dup;
        int qos;
        unsigned char retained;

        char *host = "m2m.eclipse.org";
        int port = 1883;
        uint8_t  msgtypes = CONNECT;
        uint32_t curtick = xTaskGetTickCount();
        log_info("socket connect to server");
        mysock = transport_open(host,port);
        if(mysock < 0)
                return mysock;
        log_notice("Sending to hostname %s port %d\n", host, port);
        data.clientID.cstring = "me";
        data.keepAliveInterval = 50;
        data.cleansession = 1;
        data.username.cstring = "";
        data.password.cstring = "";
        data.MQTTVersion = 4;
        while(1)
        {
                if((xTaskGetTickCount() - curtick) >(data.keepAliveInterval/2*1000))
                {
                        if(msgtypes == 0)
                        {
                                curtick = xTaskGetTickCount();
                                msgtypes = PINGREQ;
                        }
                }

                switch(msgtypes)
                {
                        case CONNECT:        len = MQTTSerialize_connect(buf, buflen, &data);        
                                                        rc = transport_sendPacketBuffer(mysock, (unsigned char*)buf, len);
                                                        if (rc == len)
                                                                log_info("send CONNECT Successfully");
                                                        else
                                                                log_debug("send CONNECT failed");               
                                                        log_info("MQTT concet to server!");
                                                        msgtypes = 0;
                                                        break;
                        case CONNACK:         if (MQTTDeserialize_connack(&sessionPresent, &connack_rc, buf, buflen) != 1 || connack_rc != 0)
                                                        {
                                                                log_info("Unable to connect, return code %d\n", connack_rc);
                                                        }
                                                        else log_info("MQTT is concet OK!");
                                                        msgtypes = SUBSCRIBE;
                                                        break;

                        case SUBSCRIBE:        topicString.cstring = "ledtest";
                                                        len = MQTTSerialize_subscribe(buf, buflen, 0, msgid, 1, &topicString, &req_qos);
                                                        rc = transport_sendPacketBuffer(mysock, (unsigned char*)buf, len);
                                                        if (rc == len)
                                                                log_info("send SUBSCRIBE Successfully\n");
                                                        else
                                                                log_debug("send SUBSCRIBE failed\n");        
                                                        log_info("client subscribe:[%s]",topicString.cstring);
                                                        msgtypes = 0;
                                                        break;
                        case SUBACK:        rc = MQTTDeserialize_suback(&submsgid, 1, &subcount, &granted_qos, buf, buflen);                                                        
                                                        log_info("granted qos is %d\n", granted_qos);                                                               
                                                        msgtypes = 0;
                                                        break;
                        case PUBLISH:        rc = MQTTDeserialize_publish(&dup, &qos, &retained, &msgid, &receivedTopic,        &payload_in, &payloadlen_in, buf, buflen);
                                                        log_info("message arrived : %s\n", payload_in);
                                                        if(strstr(payload_in,"on"))
                                                        {
                                                                log_notice("LED on!!");
                                                        }
                                                        else if(strstr(payload_in,"off"))
                                                        {
                                                                log_notice("LED off!!");
                                                        }
                                                        if(qos == 1)
                                                        {
                                                                log_info("publish qos is 1,send publish ack.");
                                                                memset(buf,0,buflen);
                                                                len = MQTTSerialize_ack(buf,buflen,PUBACK,dup,msgid);   //publish ack                        
                                                                rc = transport_sendPacketBuffer(mysock, (unsigned char*)buf, len);
                                                                if (rc == len)
                                                                        log_info("send PUBACK Successfully");
                                                                else
                                                                        log_debug("send PUBACK failed");                                       
                                                        }
                                                        msgtypes = 0;
                                                        break;

                        case PUBACK:        log_info("PUBACK!");
                                                        msgtypes = 0;
                                                        break;
                        case PUBREC:        log_info("PUBREC!");     //just for qos2
                                                        break;
                        case PUBREL:        log_info("PUBREL!");        //just for qos2
                                                        break;
                        case PUBCOMP:        log_info("PUBCOMP!");        //just for qos2
                                                        break;
                        case PINGREQ:        len = MQTTSerialize_pingreq(buf, buflen);
                                                        rc = transport_sendPacketBuffer(mysock, (unsigned char*)buf, len);
                                                        if (rc == len)
                                                                log_info("send PINGREQ Successfully\n");
                                                        else
                                                                log_debug("send PINGREQ failed\n");        
                                                        log_info("time to ping mqtt server to take alive!");
                                                        msgtypes = 0;
                                                        break;
                        case PINGRESP:        log_info("mqtt server Pong");                                                        
                                                        msgtypes = 0;
                                                        break;
                }
                                memset(buf,0,buflen);
                                rc=MQTTPacket_read(buf, buflen, transport_getdata);        
                                if(rc >0)
                                {
                                        msgtypes = rc;
                                        log_info("MQTT is get recv:");
                                }
                                gpio_write(&gpio_led, !gpio_read(&gpio_led));
        }

exit:
        transport_close(mysock);
    log_info("mqtt thread exit.");
    vTaskDelete(NULL);
}[/mw_shl_code]
       MQTT服务器仍然是上篇文章用的m2m.eclipse.org,一开始就是调用 前面移植的transport_open(host,port)去连接MQTT服务器,并返回套接字。然后就是前面说的登录、订阅、发布、心跳等操作,这里在While里用状态来实现整个连接。
首先就是CONNECT登录,MQTTPacket_connectData data = MQTTPacket_connectData_initializer初始化登录数据结构体,然后再对data进行初始化,data.clientID.cstring = "me";data.keepAliveInterval = 50;data.cleansession = 1;
data.username.cstring = "";data.password.cstring = "";data.MQTTVersion = 4;这里表示cilentIDme,心跳时间为50,用户名跟密码都为空,结构初始化后,就是要对数据进去打包,调用MQTTSerialize_connect函数。打包后就是调用transport_sendPacketBuffer发送数据包。数据发送完后是调用MQTTPacket_read去接收服务器返回来的数据,并得到返回的数据包类型,根据数据包类型进行不同的逻辑处理。发送CONNECT后服务器对相应的返回CONNACK数据包表示登录成功。
登录成功后,开始订阅我们想要Topics,这里订阅一个”ledtest”Tipics。数据包的初始化、打包、发送跟登录是相似的,就不详述,直接看代码就可以了,服务器会相应的返回SUBACK
    接下来是心跳,通过获取系统的Tick来判断是否要发PINGREQ,如果的话,让状态机状态为PINGREQ后去发送心跳包,相应服务器会返回PINGRESP
    最后是PUBLISH,这是MQTT的最主要的通信协议,这里我只实现的客户端接收PUBLISH,其它的使用,其实都可以在源码的MQTTPacket里面sample可以找到例程。PUBLISH实现也很简单,定阅了Topics之后,只要其它客户端向这个Topics推送数据,服务器就会转发到订阅者上。MQTTPacket_read接收到服务器的推送,解析成PUBLISH数据包,然后状态机改变状态到PUBLISH再调用MQTTDeserialize_publish进行解包,得到推送的内容,最后根据推送的内容执行相应的动作,就实现远程控制。
     整个代码的效果如下:
2.jpg
    可以看到MQTT的登录,订阅,还有在PC上通过MQTT.fx推送信息给ledtestRT8711上收到推送的内容,并执行打开LED的动作。
    也许有人会问,如果STM32或者其它单片机是用WIFI模块或者GPRS模块,没有LWIP的怎么办。其实只要理解的MQTT的源码,就不难用GPRS或者WiFi模块去实现。MQTT的源码里都是对协议包进行打包解包,数据传输都是在tranport.c里面,我们完全不用transport,可以自己写通信接口,然后把打包的数据包通过模块发出去,写接收接口,把模块接收到服务器数据调用MQTT解包接口解析就可以了。

已经放下多年的FPGA,要重新再拾起来,却是如此的陌生
正点原子逻辑分析仪DL16劲爆上市
回复

使用道具 举报

3

主题

178

帖子

0

精华

高级会员

Rank: 4

积分
524
金钱
524
注册时间
2016-12-31
在线时间
195 小时
发表于 2017-9-11 23:31:05 来自手机 | 显示全部楼层
回复 支持 反对

使用道具 举报

1

主题

5

帖子

0

精华

新手上路

积分
39
金钱
39
注册时间
2017-5-10
在线时间
8 小时
发表于 2017-10-27 15:59:48 | 显示全部楼层
写得很好
回复 支持 反对

使用道具 举报

0

主题

7

帖子

0

精华

新手上路

积分
25
金钱
25
注册时间
2017-10-23
在线时间
1 小时
发表于 2017-10-27 17:26:55 | 显示全部楼层
加一下196143972这个群有问题有大神解答,有利于物联网的学习
回复 支持 反对

使用道具 举报

1

主题

5

帖子

0

精华

新手上路

积分
39
金钱
39
注册时间
2017-5-10
在线时间
8 小时
发表于 2017-10-31 18:35:34 | 显示全部楼层
按照这移植方法,我使用的是stm32f4+freeRTOS+lwip,调用MQTTPacket_read会引起阻塞,看了mqqt的资料后使用MQTTPacket_readnb依然还是不能正常接收心跳数据等,请问是什么原因?
回复 支持 反对

使用道具 举报

58

主题

499

帖子

4

精华

金牌会员

Rank: 6Rank: 6

积分
1920
金钱
1920
注册时间
2013-11-18
在线时间
268 小时
 楼主| 发表于 2017-12-27 00:14:47 | 显示全部楼层
Hren 发表于 2017-10-31 18:35
按照这移植方法,我使用的是stm32f4+freeRTOS+lwip,调用MQTTPacket_read会引起阻塞,看了mqqt的资料后使用 ...

要在opt.h打开超时
/**
* LWIP_SO_RCVTIMEO==1: Enable receive timeout for sockets/netconns and
* SO_RCVTIMEO processing.
*/
#if !defined LWIP_SO_RCVTIMEO || defined __DOXYGEN__
#define LWIP_SO_RCVTIMEO                1
#endif
已经放下多年的FPGA,要重新再拾起来,却是如此的陌生
回复 支持 反对

使用道具 举报

1

主题

11

帖子

0

精华

初级会员

Rank: 2

积分
178
金钱
178
注册时间
2016-11-22
在线时间
24 小时
发表于 2018-1-17 11:45:33 | 显示全部楼层
为什么没了?!!!
回复 支持 反对

使用道具 举报

33

主题

60

帖子

0

精华

中级会员

Rank: 3Rank: 3

积分
420
金钱
420
注册时间
2017-11-16
在线时间
120 小时
发表于 2018-1-17 16:41:16 | 显示全部楼层
看不见啊
回复 支持 反对

使用道具 举报

0

主题

1

帖子

0

精华

新手入门

积分
12
金钱
12
注册时间
2018-3-28
在线时间
0 小时
发表于 2018-3-28 14:32:59 | 显示全部楼层
看不到啊
回复 支持 反对

使用道具 举报

1

主题

3

帖子

0

精华

新手上路

积分
27
金钱
27
注册时间
2017-11-16
在线时间
4 小时
发表于 2018-4-4 09:51:57 | 显示全部楼层
Hren 发表于 2017-10-31 18:35
按照这移植方法,我使用的是stm32f4+freeRTOS+lwip,调用MQTTPacket_read会引起阻塞,看了mqqt的资料后使用 ...

您好,可以把您的代码给我发一下吗?  我学习学习,谢谢       973061070@qq.com
回复 支持 反对

使用道具 举报

1

主题

3

帖子

0

精华

新手上路

积分
27
金钱
27
注册时间
2017-11-16
在线时间
4 小时
发表于 2018-4-8 08:56:11 | 显示全部楼层
Hren 发表于 2017-10-31 18:35
按照这移植方法,我使用的是stm32f4+freeRTOS+lwip,调用MQTTPacket_read会引起阻塞,看了mqqt的资料后使用 ...

你好,请问有这个的程序吗?  我可以参考参考吗?
回复 支持 反对

使用道具 举报

0

主题

12

帖子

0

精华

初级会员

Rank: 2

积分
97
金钱
97
注册时间
2017-10-24
在线时间
18 小时
发表于 2018-4-28 07:14:58 | 显示全部楼层
谢谢楼主 学习一下
回复 支持 反对

使用道具 举报

1

主题

27

帖子

0

精华

中级会员

Rank: 3Rank: 3

积分
314
金钱
314
注册时间
2013-3-3
在线时间
101 小时
发表于 2018-5-14 12:00:34 | 显示全部楼层
学习学习,感谢分享。
回复 支持 反对

使用道具 举报

5

主题

29

帖子

0

精华

中级会员

Rank: 3Rank: 3

积分
275
金钱
275
注册时间
2017-6-20
在线时间
74 小时
发表于 2018-5-23 15:40:44 | 显示全部楼层
6666666666666666
回复 支持 反对

使用道具 举报

23

主题

323

帖子

0

精华

金牌会员

Rank: 6Rank: 6

积分
1010
金钱
1010
注册时间
2016-11-8
在线时间
233 小时
发表于 2018-5-23 15:51:50 | 显示全部楼层

你能看见?我为啥看不见
亦余心之所善,
虽九死其犹未悔。
回复 支持 反对

使用道具 举报

0

主题

2

帖子

0

精华

新手入门

积分
15
金钱
15
注册时间
2018-5-29
在线时间
1 小时
发表于 2018-5-29 15:48:39 | 显示全部楼层
怎么看不了?
回复 支持 反对

使用道具 举报

4

主题

18

帖子

0

精华

初级会员

Rank: 2

积分
95
金钱
95
注册时间
2016-9-26
在线时间
17 小时
发表于 2018-8-19 15:04:48 | 显示全部楼层
学习了
回复 支持 反对

使用道具 举报

0

主题

3

帖子

0

精华

新手入门

积分
4
金钱
4
注册时间
2018-9-7
在线时间
0 小时
发表于 2018-9-7 16:52:57 | 显示全部楼层
楼主大大你好,请问能发一份你上一帖的链接吗,我也是卡在MQTT这里了 谢谢你了
回复 支持 反对

使用道具 举报

0

主题

3

帖子

0

精华

新手入门

积分
4
金钱
4
注册时间
2018-9-7
在线时间
0 小时
发表于 2018-9-7 16:57:19 | 显示全部楼层
我在你个人的帖子里找了好久没找找。
回复 支持 反对

使用道具 举报

0

主题

3

帖子

0

精华

新手入门

积分
4
金钱
4
注册时间
2018-9-7
在线时间
0 小时
发表于 2018-9-7 19:43:03 | 显示全部楼层
楼主大大 能分享这个的源码吗?425676101@qq.com
回复 支持 反对

使用道具 举报

0

主题

1

帖子

0

精华

新手入门

积分
4
金钱
4
注册时间
2018-9-25
在线时间
1 小时
发表于 2018-9-27 11:15:55 | 显示全部楼层
Hren 发表于 2017-10-31 18:35
按照这移植方法,我使用的是stm32f4+freeRTOS+lwip,调用MQTTPacket_read会引起阻塞,看了mqqt的资料后使用 ...

在SUBSCRIBE  分支后面不用break 用continue,就解决了
回复 支持 反对

使用道具 举报

181

主题

311

帖子

0

精华

金牌会员

Rank: 6Rank: 6

积分
1055
金钱
1055
注册时间
2012-8-26
在线时间
52 小时
发表于 2018-9-29 21:34:07 | 显示全部楼层
这几天想试试百度天工物接入服务,准备用STM32和A9G模块结合,百度提供的物接入服务是MQTT协议,然后看了一下该协议的说明,老命差点没了。东西太多,自己敲代码不是个事,然后上论坛来找移植的代码了
为了雅典娜?为了爱与正义
回复 支持 反对

使用道具 举报

10

主题

81

帖子

0

精华

初级会员

Rank: 2

积分
149
金钱
149
注册时间
2018-5-8
在线时间
27 小时
发表于 2019-5-1 22:55:31 | 显示全部楼层
你好,请问有这个的程序吗?  我可以参考参考吗?1725509430@qq.com
回复 支持 反对

使用道具 举报

0

主题

2

帖子

0

精华

新手入门

积分
12
金钱
12
注册时间
2019-7-22
在线时间
3 小时
发表于 2019-9-7 15:17:56 | 显示全部楼层
这个可以一铺哟哟
回复 支持 反对

使用道具 举报

3

主题

11

帖子

0

精华

初级会员

Rank: 2

积分
117
金钱
117
注册时间
2020-1-7
在线时间
26 小时
发表于 2020-7-6 09:57:00 | 显示全部楼层
您好,我想问下您这边MQTT有没用到SSL/TLS协议,我们移植的MQTT库里面有自带SSL/TLS功能吗,还是要我们移植对应的加密库
回复 支持 反对

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则



关闭

原子哥极力推荐上一条 /2 下一条

正点原子公众号

QQ|手机版|OpenEdv-开源电子网 ( 粤ICP备12000418号-1 )

GMT+8, 2025-6-29 15:28

Powered by OpenEdv-开源电子网

© 2001-2030 OpenEdv-开源电子网

快速回复 返回顶部 返回列表