英创Linux主板支持MQTT应用

 2023-9-20     作者:黄志超         

MQTT(Message Queuing Telemetry Transport) 是一种基于客户端服务端架构的发布/订阅模式的消息传输协议。它的设计思想是轻巧、开放、 简单、规范,易于实现。这些特点使得它对很多场景来说都是很好的选择,特别是对于受限的环境如机器与机器的通信(M2M)以及物联网环境(IoT)。


MQTT 最大的优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。而作为一种低开销、低带宽占用的即时通讯协议,也使其在物联网、小型设备、移动应用等方面有较广泛的应用。在MQTT中分为客户端和服务器端,客户端(MQTT Client)可以发布和订阅消息,而服务器端(MQTT Broker)则对多个客户端发布或订阅的各种消息做相应的处理转发,如下图所示:

  1.png


在MQTT官网就有详细的中文文档说明,因此在这里我们就不过多介绍MQTT协议本身了,有兴趣的客户可以参考资料:https://blog.mcxiaoke.com/mqtt/protocol/MQTT-3.1.1-CN.pdf


英创公司在Linux主板上移植了Mosquitto开源软件,以支持基于MQTT的各种应用。Mosquitto是一款轻量级的开源软件,适用于低功耗设备使用,实现了MQTT 协议versions 5.0,3.1.1和3.1,因此非常适合在嵌入式设备上使用。通过Mosquitto软件,Linux主板可以作为MQTT的客户端,对消息进行发布和订阅,也可以作为MQTT的服务器端,来处理客户端发布和订阅的消息。


当英创Linux主板作为服务器端时,可以通过命令mosquitto -c /etc/mosquitto/mosquitto.conf来启动服务,此时客户端的设备就都可以与服务器建立连接,进行对消息的发布和订阅。在/etc/mosquitto/mosquitto.conf文件中有MQTT服务器的一些配置选项,用户可以根据需求修改,具体的说明可以参考https://mosquitto.org/man/mosquitto-conf-5.html。在英创Linux主板中,采用了systemd来管理服务,在配置好后,systemd会在系统启动完成后,自动启动mosquitto服务,不需要用户在手动去执行命令了。如果对启动服务有特殊的需求,可以通过systemd的标准命令来控制服务的启动,常用的命令如下表:

命令

作用

systemctl disable mosquitto

关闭开机自启动服务

systemctl enable mosquitto

开启开机自启动服务

systemctl stop mosquitto

停止服务

systemctl start mosquitto

启动服务

systemctl restart mosquitto

重启服务


当英创Linux主板作为客户端时,需要先连接MQTT服务器,然后就可以发布或者订阅消息了。对应的功能可以通过Mosquitto软件提供的现成工具来测试,mosquitto_pub工具用来发布消息而mosquitto_sub工具用于订阅消息。下面我们在本地测试通过MQTT协议来发布和订阅消息,主板中mosquitto服务是自动启动的,所以连接到自身启动的MQTT服务器(Broker)就可以做本地测试,首先是订阅主题为test的消息,通过mosquitto_sub工具实现就能实现这个功能,在默认情况下mosquitto_sub工具会连接本地的MQTT服务器(Broker),也可以通过参数指定服务器地址,具体的用法介绍可以参考https://mosquitto.org/man/mosquitto_sub-1.html,测试命令如下图:

image.png


然后我们通过mosquitto_pub工具来向test主题发布一个消息,内容为”Hello World”。同样默认情况下mosquitto_pub工具会连接本地的MQTT服务器(Broker),也可以通过参数指定服务器地址,具体的用法介绍可以参考https://mosquitto.org/man/mosquitto_pub-1.html,测试命令如下图:

image.png


此时,通过mosquitto_sub工具订阅消息的终端就会收到刚刚发布的消息:

image.png


另外也可以通过C程序调用Mosquitto软件库中提供的API来进行消息的发布和订阅。关于API的介绍,可以参考Mosquitto软件的官方文档,里面有详细全面的介绍:https://mosquitto.org/api/files/mosquitto-h.html。英创公司基于这些API实现了两个简单的基本例程,一个用于发布消息,一个用于订阅消息,可以供用户参考。


订阅消息例程的主要代码如下:

// 初始化mosquitto库  
ret = mosquitto_lib_init();  
if(ret){  
   printf("Init lib error!\n");  
   return -1;  
}  
 
// 创建一个订阅端实例,名称为sub_test  
mosq =  mosquitto_new("sub_test", true, NULL);  
if(mosq == NULL){  
   printf("New test error!\n");  
   mosquitto_lib_cleanup();  
   return -1;  
}  
 
// 设置回调函数,在connect的回调函数中设置订阅的主题(test)  
mosquitto_connect_callback_set(mosq, my_connect_callback);  
// 设置回调函数,这里都是一些简单的打印信息  
mosquitto_disconnect_callback_set(mosq, my_disconnect_callback);  
mosquitto_subscribe_callback_set(mosq, my_subscribe_callback);  
mosquitto_message_callback_set(mosq, my_message_callback);  
 
// 连接至本地MQTT服务器  
ret = mosquitto_connect(mosq, "localhost", 1883, KEEP_ALIVE);  
if(ret){  
   printf("Connect server error!\n");  
   mosquitto_destroy(mosq);  
   mosquitto_lib_cleanup();  
   return -1;  
}  
 
// 开始通信:循环执行、直到运行标志running被改变  
while(running)  
{  
   mosquitto_loop(mosq, -1, 1);  
}  
 
// 结束后的清理工作  
mosquitto_destroy(mosq);  
mosquitto_lib_cleanup();


发布消息例程的主要代码如下:

//初始化libmosquitto库
ret = mosquitto_lib_init();
if(ret){
   printf("Init lib error!\n");
   return -1;
}
//创建一个发布端实例,名称为pub_test
mosq =  mosquitto_new("pub_test", true, NULL);
if(mosq == NULL){
   printf("New pub_test error!\n");
   mosquitto_lib_cleanup();
   return -1;
}

//设置回调函数,这里都是简单的打印信息
mosquitto_connect_callback_set(mosq, my_connect_callback);
mosquitto_disconnect_callback_set(mosq, my_disconnect_callback);
mosquitto_publish_callback_set(mosq, my_publish_callback);

// 连接至本地MQTT服务器
ret = mosquitto_connect(mosq, "localhost", 1883, KEEP_ALIVE);
if(ret){
   printf("Connect server error!\n");
   mosquitto_destroy(mosq);
   mosquitto_lib_cleanup();
   return -1;
}

//mosquitto_loop_start作用是开启一个线程,在线程里不停的调用 mosquitto_loop() 来处理网络信息
int loop = mosquitto_loop_start(mosq);
if(loop != MOSQ_ERR_SUCCESS)
{
   printf("mosquitto loop error\n");
   return 1;
}

while(fgets(buff, MSG_MAX_SIZE, stdin) != NULL)
{
   /* 发布消息,这里是将fget获取到的内容,作为test主题的消息发布出去 */
   mosquitto_publish(mosq,NULL,"test",strlen(buff)+1,buff,0,0);
   memset(buff,0,sizeof(buff));
}

mosquitto_destroy(mosq);
mosquitto_lib_cleanup();


我们运行测试例程,发送主题为test的消息,消息内容为emtronix,如下图:

image.png


同时在另一个终端中运行订阅消息的例程,订阅主题为test的消息,当发布了消息后,例程就会收到消息并打印出来:

image.png


感兴趣的客户可以和英创公司的工程师联系,索取相关的资料和例程。