当前位置:首页 > > 嵌入式大杂烩
[导读]本文分享了mqtt应用于进程间通信的实例。

前言

上一篇分享了:《简单认识认识mqtt及mosquitto》,但也只是分享了mqtt的一些概念及mosquitto的一些介绍。然后就有读者来催更了:

这一篇我们就来分享mqtt应用于进程间通信的实例。我们沿用往期文章《例说嵌入式实用知识之JSON数据》的综合demo来改造改造。那个综合demo的功能是这样子的:

这是以socket来作为进程间通信的方式,并且这个demo是基于Windows写的,需要包含Windows特定的头文件。

本篇笔记我们把上面这个综合demo改为:

我们用mqtt来作为进程间通信的方式,在Linux下进程测试。

先贴代码:

json_print进程源码

「json_print.c:」

左右滑动查看全部代码>>>

/*
- 程序功能: 组JSON格式数据包并发送(MQTT发布者客户端程序)
- 编译命令: gcc cJSON.c json_print.c -L ../mosquitto/build/lib -lmosquitto -o json_print
- 导出mosquitto动态库: export LD_LIBRARY_PATH=../mosquitto/build/lib:$LD_LIBRARY_PATH
- 作者:ZhengN
- 公众号:嵌入式大杂烩
*/


#include 
#include 
#include 
#include "cJSON.h"
#include "../mosquitto/lib/mosquitto.h"

#define  STU_NAME_LEN  32

/* 学生结构体 */
typedef struct _Student
{

    char name[STU_NAME_LEN];  // 名字      
    int num;                  // 学号      
    int c_score;              // C语言分数
}StudentDef, *pStudentDef;

/* 内部函数声明 */
static StudentDef StudentData_Prepare(void);
static char *StudentsData_Packet(pStudentDef _Stu);
static void StudentData_Send(const char *_data);
static void MqttClientInit(void);
static void MqttClientClean(void);

bool clean_session = true;
struct mosquitto *mosq = NULL;

/********************************************************************************************************
** 函数: main
**------------------------------------------------------------------------------------------------------
** 参数: 
** 说明: 
** 返回: 
********************************************************************************************************/

int main(void)
{
    StudentDef stu = {0};
    char *stu_data = NULL;
    int stu_count = 0;
    int i = 0;

    /* MQTT客户端初始化 */
    MqttClientInit();

    /* 需要登记的学生总人数 */
    printf("Please input number of student: ");
    scanf("%d", &stu_count);

    while (i++ < stu_count)
    {
        /* 准备数据 */
        stu = StudentData_Prepare();

        /* JSON格式数据组包 */
        stu_data = StudentsData_Packet(&stu);

        /* 发送数据 */
        StudentData_Send(stu_data);
    }

    /* 回收操作 */
    MqttClientClean();

    return 0;
}

/********************************************************************************************************
** 函数: StudentData_Prepare, 准备组包需要的数据
**------------------------------------------------------------------------------------------------------
** 参数: 
** 说明: 
** 返回: 获取得到的数据
********************************************************************************************************/

static StudentDef StudentData_Prepare(void)
{
    char name[STU_NAME_LEN] = {0};
    int num = 0;
    int c_score = 0;
    StudentDef stu;

    /* 名字 */
    printf("Please input name: ");
    scanf("%s", name);
    if (strlen(name) < STU_NAME_LEN)
    {
        strncpy((char*)&stu.name, name, strlen(name)+1);
    }
    else
    {
        printf("The name is too long\n");
    }
    
    /* 学号 */
    printf("Please input num (0~100): ");
    scanf("%d", &num);
    stu.num = num;

    /* C语言分数 */
    printf("Please input c_score (0~100): ");
    scanf("%d", &c_score);
    stu.c_score = c_score;

    return stu;
}

/********************************************************************************************************
** 函数: StudentsData_Packet, JSON格式数据组包
**------------------------------------------------------------------------------------------------------
** 参数: _Stu:组student json数据包需要的数据
** 说明: 
** 返回: JSON格式的字符串 
********************************************************************************************************/

static char *StudentsData_Packet(pStudentDef _Stu)
{
    char *res_string = NULL;    // 返回值
    cJSON *name = NULL;         // 名字
    cJSON *num = NULL;          // 学号
    cJSON *c_score = NULL;      // C语言分数

    /* 创建一个JSON对象,{}扩起来 */
    cJSON *obj = cJSON_CreateObject();
    if (obj == NULL)
    {
        goto end;
    }

    /* 创建 "name": "xxx" 键值对 */
    name = cJSON_CreateString(_Stu->name);
    if (name == NULL)
    {
        goto end;
    }
    cJSON_AddItemToObject(obj, "name", name);

    /* 创建 "num": 207 键值对 */
    num = cJSON_CreateNumber(_Stu->num);
    if (name == NULL)
    {
        goto end;
    }
    cJSON_AddItemToObject(obj, "num", num);
    
    /* 创建 "c_score": 95 键值对 */
    c_score = cJSON_CreateNumber(_Stu->c_score);
    if (name == NULL)
    {
        goto end;
    }
    cJSON_AddItemToObject(obj, "c_score", c_score); 

    res_string = cJSON_Print(obj);          // 呈现为JSON格式 
    // res_string = cJSON_PrintUnformatted(obj);   // 呈现为无格式

    if (res_string == NULL)
    {
        fprintf(stderr"Failed to print monitor.\n");
    }

/* 异常情况统一Delete(free) */
end:
    cJSON_Delete(obj);
    return res_string;
}

/********************************************************************************************************
** 函数: StudentData_Send, JSON格式字符串数据组包发送
**------------------------------------------------------------------------------------------------------
** 参数: _data:要发送的数据
** 说明: 
** 返回: JSON格式的字符串 
********************************************************************************************************/

static void StudentData_Send(const char *_data)
{
    printf("%s: %s\n\n", __FUNCTION__, _data);

    /* 发布消息 */
    mosquitto_publish(mosq, NULL"test_topic"strlen(_data)+1, (const char*)_data, 00);
}

/********************************************************************************************************
** 函数: MqttClientInit, MQTT客户端初始化
**------------------------------------------------------------------------------------------------------
** 参数: void
** 说明: 
** 返回: 
********************************************************************************************************/

static void MqttClientInit(void)
{
    /* libmosquitto 库初始化 */
    mosquitto_lib_init();

    /* 创建mosquitto客户端 */
    mosq = mosquitto_new(NULL, clean_session, NULL);
    if(NULL == mosq)
    {
        printf("Create mqtt client failed...\n");
        mosquitto_lib_cleanup();
        return;
    }

    /* 连接服务器 */
    if(mosquitto_connect(mosq, "localhost"188360))
    {
        printf("Unable to connect...\n");
        return;
    }

    /* 网络消息处理线程 */
    int loop = mosquitto_loop_start(mosq);
    if(loop != MOSQ_ERR_SUCCESS)
    {
        printf("mosquitto loop error\n");
        return;
    }
}

/********************************************************************************************************
** 函数: MqttClientClean, MQTT客户端清理操作
**------------------------------------------------------------------------------------------------------
** 参数: void
** 说明: 
** 返回: 
********************************************************************************************************/

static void MqttClientClean(void)
{
    mosquitto_destroy(mosq);
    mosquitto_lib_cleanup();
}

json_parse进程源码

「json_parse.c:」

左右滑动查看全部代码>>>

/*
- 程序功能: 接收JSON数据并解析(MQTT订阅者客户端程序)
- 编译命令: gcc cJSON.c json_parse.c -L ../mosquitto/build/lib -lmosquitto -o json_parse
- 导出mosquitto动态库: export LD_LIBRARY_PATH=../mosquitto/build/lib:$LD_LIBRARY_PATH
- 作者:ZhengN
- 公众号:嵌入式大杂烩
*/


#include 
#include 
#include 
#include "cJSON.h"
#include "../mosquitto/lib/mosquitto.h"

#define  STU_NAME_LEN  32

/* 学生结构体 */
typedef struct _Student
{

    char name[STU_NAME_LEN];  // 名字      
    int num;                  // 学号      
    int c_score;              // C语言分数
}StudentDef, *pStudentDef;

/* 内部函数声明 */
static void StudentsData_Parse(pStudentDef _Stu, const char *_JsonStudnetData);
static void PrintParseResult(const pStudentDef _Stu);
static void SaveParseResult(const pStudentDef _Stu);
static void my_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message);
static void my_connect_callback(struct mosquitto *mosq, void *userdata, int result);

/* 内部全局变量 */
static FILE *stu_fp = NULL;

/********************************************************************************************************
** 函数: main
**------------------------------------------------------------------------------------------------------
** 参数: 
** 说明: 
** 返回: 
********************************************************************************************************/

bool clean_session = true;
int main(void)
{         
    struct mosquitto *mosq = NULL;

    /* libmosquitto 库初始化 */
    mosquitto_lib_init();

    /* 创建mosquitto客户端 */
    mosq = mosquitto_new(NULL, clean_session, NULL);
    if(NULL == mosq)
    {
        printf("Create mqtt client failed...\n");
        mosquitto_lib_cleanup();
        return 1;
    }

    /* 绑定连接、消息接收回调函数 */
    mosquitto_connect_callback_set(mosq, my_connect_callback);
    mosquitto_message_callback_set(mosq, my_message_callback);

    /* 连接服务器 */
    if(mosquitto_connect(mosq, "localhost"188360))
    {
        printf("Unable to connect...\n");
        return 1;
    }

    /* 循环处理网络消息 */
    mosquitto_loop_forever(mosq, -11);

    /* 回收操作 */
    mosquitto_destroy(mosq);
    mosquitto_lib_cleanup();

    return 0;
}

/********************************************************************************************************
** 函数: StudentsData_Parse, JOSN格式学生期末数据解析
**------------------------------------------------------------------------------------------------------
** 参数: _JsonStudnetData:JSON数据   _Stu:保存解析出的有用数据
** 说明: 
** 返回: 
********************************************************************************************************/

static void StudentsData_Parse(pStudentDef _Stu, const char *_JsonStudnetData)
{
    cJSON *student_json = NULL;   // student_json操作对象,可代表 {} 扩起来的内容
    cJSON *name = NULL;             
    cJSON *num = NULL;
    cJSON *c_score = NULL;
    
    /* 开始解析 */
    student_json = cJSON_Parse(_JsonStudnetData);
    if (NULL == student_json)
    {
        const char *error_ptr = cJSON_GetErrorPtr();
        if (error_ptr != NULL)
        {
            fprintf(stderr"Error before: %s\n", error_ptr);
        }
        goto end;
    }

    /* 解析获取name得值 */
    name = cJSON_GetObjectItemCaseSensitive(student_json, "name");
    if (cJSON_IsString(name) && (name->valuestring != NULL))
    {
        memset(&_Stu->name, 0, STU_NAME_LEN*sizeof(char));
        memcpy(&_Stu->name, name->valuestring, strlen(name->valuestring));
    }

    /* 解析获取num的值 */
    num = cJSON_GetObjectItemCaseSensitive(student_json, "num");
    if (cJSON_IsNumber(num))
    {
        _Stu->num = num->valueint;
    }

    /* 解析获取c_score的值 */
    c_score = cJSON_GetObjectItemCaseSensitive(student_json, "c_score");
    if (cJSON_IsNumber(c_score))
    {
        _Stu->c_score = c_score->valueint;
    }

end:
    cJSON_Delete(student_json);
}

/********************************************************************************************************
** 函数: PrintParseResult, 打印输出解析结果
**------------------------------------------------------------------------------------------------------
** 参数: 
** 说明: 
** 返回: 
********************************************************************************************************/

static void PrintParseResult(const pStudentDef _Stu)
{
    printf("name: %s, num: %d, c_score: %d\n\n", _Stu->name, _Stu->num, _Stu->c_score);
}

/********************************************************************************************************
** 函数: SaveParseResult, 保存解析结果
**------------------------------------------------------------------------------------------------------
** 参数: _Stu:需要保存的数据
** 说明: 
** 返回: 
********************************************************************************************************/

static void SaveParseResult(const pStudentDef _Stu)
{
    char write_buf[512] = {0};
    static int stu_count = 0;

    /* 以可在文件末尾追加内容的方式打开文件 */
 if((stu_fp = fopen("ParseResult.txt""a+")) == NULL)
 {
  printf("Open file error!\n");
  return exit(EXIT_FAILURE);
 } 

    /* 按指定格式写入文件 */
    snprintf(write_buf, 512"name: %s, num: %d, c_score: %d\n", _Stu->name, _Stu->num, _Stu->c_score);
    size_t len = fwrite((char*)write_buf, 1strlen(write_buf), stu_fp);

    /* 文件位置指针偏移 */
    fseek(stu_fp, len * stu_count, SEEK_SET);
    stu_count++;

    /* 关闭文件 */
    fclose(stu_fp);
}

/********************************************************************************************************
** 函数: my_message_callback, 消息接收回调函数
**------------------------------------------------------------------------------------------------------
** 参数: 
** 返回: 
********************************************************************************************************/

static void my_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message)
{
    StudentDef stu = {0};

    if(message->payloadlen)
    {
        printf("%s %s\n", message->topic, (char*)message->payload);

        /* 解析JSON数据 */
        StudentsData_Parse(&stu, (const char*)message->payload);  

        /* 打印输出解析结果 */
        PrintParseResult(&stu);  

        /* 保存数据到文件 */
        SaveParseResult(&stu); 
    }
    else
    {
        printf("%s (null)\n", message->topic);
    }
    fflush(stdout);
}

/********************************************************************************************************
** 函数: my_connect_callback, 连接回调函数
**------------------------------------------------------------------------------------------------------
** 参数: 
** 返回: 
********************************************************************************************************/

static void my_connect_callback(struct mosquitto *mosq, void *userdata, int result)
{
    if(!result)
    {
        /* 订阅test_topic主题的消息 */
        mosquitto_subscribe(mosq, NULL"test_topic"0);
    }
    else
    {
        fprintf(stderr"Connect failed\n");
    }
}

编译运行

1、编译生成json_parse、json_print程序:

左右滑动查看全部代码>>>

gcc cJSON.c json_parse.c -L ../mosquitto/build/lib -lmosquitto -o json_parse
gcc cJSON.c json_print.c -L ../mosquitto/build/lib -lmosquitto -o json_print    

这里用到链接动态库的方式生成可执行程序。关于动态链接与静态链接,可查看往期笔记:《静态链接与动态链接补充(Linux)》《什么是动态链接与静态链接?》

2、执行json_parse、json_print程序:

执行这两个程序会报错:

./json_parse: error while loading shared libraries: libmosquitto.so.1: cannot open shared object file: No such file or directory
./json_print: error while loading shared libraries: libmosquitto.so.1: cannot open shared object file: No such file or directory

这是因为 不能找到共享库文件libmosquitto.so.1,加载失败。

因为一般情况下Linux会在/usr/lib路径中搜索需要用到的库,而libmosquitto.so.1库并不在这个路径下。

解决方法有两种:一种就是把这个文件拷贝至/usr/lib路径下,但是一般不允许这样做,一般用户也不允许往这个路径里拷贝东西。另一种就是把libmosquitto.so.1库所在路径增加为动态库的搜索路径,命令为:

左右滑动查看全部代码>>>

export LD_LIBRARY_PATH=../mosquitto/build/lib:$LD_LIBRARY_PATH

关于这方面的说明可以阅读往期笔记:《静态链接与动态链接补充(Linux)》

按照上诉方法添加动态库搜索路径之后就可以正常运行这两个程序:

ParseResult.txt文本里得到:

实验成功!

以上就是本次的分享,代码写得比较仓促,如有错误,欢迎指出,谢谢!由于准备demo花了挺多时间,包括注释也写了很多。

所以本篇文章就不做过多的说明,感兴趣的朋友可以结合本篇文章的demo及mosquitto/client/pub_client.cmosquitto/client/sub_client.c这两个源文件。

本篇文章的demo:

可在本公众号聊天界面回复关键词:json_mqtt_demo,即可获取,若无法获取可联系我进行获取。

推荐资料https://blog.csdn.net/Dancer__Sky/article/details/77855249

最近加群人有点多,两个群都以加满,现建③群,感兴趣可自行加入:


猜你喜欢

简单认识认识mqtt及mosquitto

什么是Linux内核空间与用户空间?


1024G 嵌入式资源大放送!包括但不限于C/C++、单片机、Linux等。在公众号聊天界面回复1024,即可免费获取!

免责声明:本文内容由21ic获得授权后发布,版权归原作者所有,本平台仅提供信息存储服务。文章仅代表作者个人观点,不代表本平台立场,如有问题,请联系我们,谢谢!

本站声明: 本文章由作者或相关机构授权发布,目的在于传递更多信息,并不代表本站赞同其观点,本站亦不保证或承诺内容真实性等。需要转载请联系该专栏作者,如若文章内容侵犯您的权益,请及时联系本站删除。
换一批
延伸阅读

随着现代家庭生活方式不断升级,厨房已不再只是功能空间,而逐渐成为融合审美表达、健康管理与智能体验的重要场域。消费者在关注空间整体性的同时,也对食材储存的安全性、洁净度以及使用便利性提出了更高要求。基于对中国家庭真实使用场...

关键字: 冰箱 嵌入式

在嵌入式系统、工业物联网等各类电子设备中,UART与网口是两种应用广泛的通信接口,前者作为经典的串行通信接口,承担着简单设备互联、调试日志传输等基础任务,后者则专注于高速、远距离的数据交互,是设备接入网络、实现大数据量传...

关键字: 嵌入式 通信接口 网口通讯

在软件开发领域,设计模式被誉为“解决特定问题的最佳实践”,但在嵌入式开发中,它却常常处于“边缘地带”。许多嵌入式工程师职业生涯中可能从未刻意使用过设计模式,甚至认为这些“软件工程理论”与单片机、传感器、实时系统等硬件紧密...

关键字: 嵌入式 设计模式

在居住结构持续演进与消费需求不断升级的背景下,中国家庭厨房正经历从“功能集合”向“系统空间”的深层转变。厨房不再只是烹饪的场所,而逐渐成为融合效率、健康、美学与家庭互动的重要生活空间。基于对这一趋势的长期洞察,西门子家电...

关键字: 嵌入式 蒸玲珑

在物联网设备、工业控制系统和智能家居等嵌入式场景中,轻量级WEB服务器扮演着核心角色。它们不仅需要满足资源受限环境下的性能需求,还需兼顾安全性、可扩展性和开发效率。本文从资源占用、功能特性、适用场景三个维度,对比分析六大...

关键字: 嵌入式 WEB服务器

在嵌入式软件开发工具领域,一场悄然的变革正在发生。随着全球软件行业向订阅制转型,嵌入式软件开发工具的授权模式也迎来了重要调整。市场上的嵌入式软件开发工具基本可以分为三类:商用开发工具,开源开发工具和厂商私有开发工具,其中...

关键字: 嵌入式 MCU RISC-V

在资源受限的嵌入式系统中,传统调试工具(如JTAG)往往成本高昂且占用引脚资源。本文介绍一种基于串口的低成本调试方案,通过自定义协议实现内存数据的实时监控,硬件成本可降低80%以上,特别适用于8/16位MCU开发场景。

关键字: 嵌入式 串口 内存数据

随着国家家电以旧换新补贴政策的持续推进,绿色节能、品质升级正成为越来越多家庭的新年焕新关键词。面对消费者在居住空间、生活效率与健康体验上的多元需求,西门子家电围绕新春焕新节点,正式开启“开门红”焕新季,通过国家补贴与企业...

关键字: 嵌入式 咖啡机 嵌饮机

设计人员通过瑞萨远程板场可在新MCU发布首日免费开始编程和编码

关键字: MCU 嵌入式 电路板
关闭