消息队列应用于线程间通信的简单例子

大家好,我是linuxzn。
在应用开发中,生产者,消费者的模型非常常见,一方产生数据并把数据放入队列中,而另一方从队列中取数据,先进先出。
应用:线程间通信/进程间通信。hello系列 | 多线程编程基础!
linux系统中提供了两种不同接口的消息队列:
posix消息队列。posix为可移植的操作系统接口。
system v消息队列。system v 是 at&t 的第一个商业unix版本(unix system iii)的加强。
其中,posix消息队列可移植性较强,使用较广。
linux系统中提供的消息队列一般应用于进行间通信,但也可以用于线程间通信。
本文介绍posix消息队列应用于线程间通信。
头文件:
#include            /* for o_* constants */#include         /* for mode constants */#include   
编译链接需要加上 -lr 链接。
linux内核提供了一系列函数来使用消息队列:
/** * @brief 创建消息队列实例 * * detailed function description * * @param[in] name: 消息队列名称 * @param[in] oflag:根据传入标识来创建或者打开一个已创建的消息队列                    - o_creat: 创建一个消息队列                    - o_excl: 检查消息队列是否存在,一般与o_creat一起使用                    - o_creat|o_excl: 消息队列不存在则创建,已存在返回null                    - o_nonblock: 非阻塞模式打开,消息队列不存在返回null                    - o_rdonly: 只读模式打开                    - o_wronly: 只写模式打开                    - o_rdwr: 读写模式打开 * @param[in] mode:访问权限 * @param[in] attr:消息队列属性地址 * * @return 成功返回消息队列描述符,失败返回-1,错误码存于error中 */mqd_t mq_open(const char *name, int oflag,  mode_t mode, struct mq_attr *attr);/** * @brief 无限阻塞方式接收消息 * * detailed function description * * @param[in] mqdes: 消息队列描述符 * @param[in] msg_ptr:消息体缓冲区地址 * @param[in] msg_len:消息体长度,长度必须大于等于消息属性设定的最大值 * @param[in] msg_prio:消息优先级 * * @return 成功返回消息长度,失败返回-1,错误码存于error中 */mqd_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio);/** * @brief 指定超时时间阻塞方式接收消息 * * detailed function description * * @param[in] mqdes: 消息队列描述符 * @param[in] msg_ptr:消息体缓冲区地址 * @param[in] msg_len:消息体长度,长度必须大于等于消息属性设定的最大值 * @param[in] msg_prio:消息优先级 * @param[in] abs_timeout:超时时间 * * @return 成功返回消息长度,失败返回-1,错误码存于error中 */mqd_t mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio, const struct timespec *abs_timeout);/** * @brief 无限阻塞方式发送消息 * * detailed function description * * @param[in] mqdes: 消息队列描述符 * @param[in] msg_ptr:待发送消息体缓冲区地址 * @param[in] msg_len:消息体长度 * @param[in] msg_prio:消息优先级 * * @return 成功返回0,失败返回-1 */mqd_t mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio);/** * @brief 指定超时时间阻塞方式发送消息 * * detailed function description * * @param[in] mqdes: 消息队列描述符 * @param[in] msg_ptr:待发送消息体缓冲区地址 * @param[in] msg_len:消息体长度 * @param[in] msg_prio:消息优先级 * @param[in] abs_timeout:超时时间 * * @return 成功返回0,失败返回-1 */mqd_t mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio, const struct timespec *abs_timeout);/** * @brief 关闭消息队列 * * detailed function description * * @param[in] mqdes: 消息队列描述符 * * @return 成功返回0,失败返回-1 */mqd_t mq_close(mqd_t mqdes);/** * @brief 分离消息队列 * * detailed function description * * @param[in] name: 消息队列名称 * * @return 成功返回0,失败返回-1 */mqd_t mq_unlink(const char *name);  
例子:线程1不断给线程2发送字符串数据。
#include #include #include #include #include #include            /* for o_* constants */#include         /* for mode constants */#include #define mq_msg_max_size    512  ///< 最大消息长度 #define mq_msg_max_item    5  ///< 最大消息数目static pthread_t s_thread1_id;static pthread_t s_thread2_id;static unsigned char s_thread1_running = 0;static unsigned char s_thread2_running = 0;static mqd_t s_mq;static char send_msg[10] = hello;void *thread1_fun(void *arg){    int ret = 0;    s_thread1_running = 1;    while (s_thread1_running)      {  ret = mq_send(s_mq, send_msg, sizeof(send_msg), 0);  if (ret < 0)  {         perror(mq_send error);  }        printf(send msg = %s, send_msg);        usleep(100 * 1000);    }        pthread_exit(null);}void *thread2_fun(void *arg){ char  buf[mq_msg_max_size]; int recv_size = 0;    s_thread2_running = 1;    while (s_thread2_running)    {  recv_size = mq_receive(s_mq, &buf[0], sizeof(buf), null);  if (-1 != recv_size)  {   printf(receive msg = %s, buf);  }  else  {   perror(mq_receive error);   break;  }        usleep(100 * 1000);    }        pthread_exit(null);}int main(void){    int ret = 0;    struct mq_attr attr;    ///< 创建消息队列    memset(&attr, 0, sizeof(attr));    attr.mq_maxmsg = mq_msg_max_item;    attr.mq_msgsize = mq_msg_max_size;    attr.mq_flags = 0;    s_mq = mq_open(/mq, o_creat|o_rdwr, 0777, &attr); if(-1 == s_mq)    {        perror(mq_open error);        return -1;    }    ///< 创建线程1    ret = pthread_create(&s_thread1_id, null, thread1_fun, null);    if (ret != 0)    {        printf(thread1_create error!);        exit(exit_failure);    }    ret = pthread_detach(s_thread1_id);    if (ret != 0)    {        printf(s_thread1_id error!);        exit(exit_failure);    }    ///< 创建线程2    ret = pthread_create(&s_thread2_id, null, thread2_fun, null);    if (ret != 0)    {        printf(thread2_create error!);        exit(exit_failure);    }    ret = pthread_detach(s_thread2_id);    if (ret != 0)    {        printf(s_thread2_id error!);        exit(exit_failure);    }    while (1)    {        sleep(1);    }    return 0;}  
编译、运行:
以上就是本次的分享,如果文章有帮助,麻烦帮忙转发,谢谢!


信合光电荣获“十大景观亮化企业”
三菱重工收购庞巴迪CRJ项目
浅析一起有载调压开关轻瓦斯报警处理案例
Realme的Narzo 20将在亚洲市场上市
可编程逻辑在微控制器中起什么关键作用
消息队列应用于线程间通信的简单例子
深康佳首款存储主控芯片已实现量产
小米MIUI9还有隐藏功能?这些特性MIUI9发布会上都没讲
小米6最新消息:小米6备货困难,上百万人预约仅售千台!三星S8笑了
疫情之下,如何打造一场别开生面的云直播年会
泰克波形监测仪及同步信号发生器在广州亚运会中的应用
LG官宣48寸曲面OLED显示器
根本没什么NOKIA 8 不过却意外有一部“双摄新机”
功率放大器的输入阻抗和输出阻抗的关系
分析电子透雾与光学透雾监控摄像机的不同之处
人工智能绝不是洪水猛兽,而且可以体现人文关怀
硕贝德发布了投资者调研活动相关信息
如何降低电力变压器短路事故的发生?
iOS10.3怎么样?iOS10.3评测:iOS10.3.2又更新,iPhone耗电问题解决,稳定、流畅性大提升
爱德万R3131大优惠啦R3131频谱分析仪R3131