posix消息队列
消息队列可看成一个消息链表,每条消息都包括优先级、消息长度和数据本身三个属性,可用于同一主机上不同进程之间通信,它具有随内核持续性。
消息队列主要有posix和system V两种实现版本,其区别在于:
- posix消息队列的读总是返回优先级最高的最早消息,而system V消息队列的读则可以返回任意指定优先级的消息。
- 当往一个空队列放置一个消息时,posix消息队列允许产生一个信号或者启动一个线程,而system V消息队列则不提供类似机制。
编程接口
posix消息队列的维护比较简单:
- mq_open创建一个新队列或者打开一个已存在的队列
- mq_close关闭队列
- mq_unlink删除队列
- mq_send往队列中放置消息
- mq_receive从队列中读取消息
- mq_getattr/mq_setattr用于获取和设置队列属性
相关头文件为mqueue.h,编译时需链接-lrt库。
mqd_t mq_open(const char *name, int oflag);
mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr);
int mq_close(mqd_t mq);
int mq_unlink(const char *name);
说明:
- oflag参数是O_RDONLY, O_WRONLY或O_RDWR之一,可按位或上O_CREAT, O_EXCL以及O_NONBLOCK。
- mq_close只关闭队列,并不从系统删除。进程终止时,它的所有打开着的消息队列都将自动关闭,就像调用了mq_close一样。
- 要从系统删除队列,必须用mq_unlink。
int mq_getattr(mqd_t mq, struct mq_attr *attr);
int mq_setattr(mqd_t mq, const struct mq_attr *attr, struct mq_attr *oldattr);
消息队列的属性定义如下:
struct mq_attr {
long mq_flags; // mq flag: 0, O_NONBLOCK
long mq_maxmsg; // max number of msg allowd on queue
long mq_msgsize; // max size of a msg in bytes
long mq_curmsgs; // number of msgs currently on queue
};
注意,最大消息数和消息的最大字节数只能在创建队列时设置。
int mq_send(mqd_t mq, const char *ptr, size_t len, unsigned priority);
ssize_t mq_receive(mqd_t mq, char *ptr, size_t len, unsigned *priority);
说明:
- mq_receive的len参数不能小于队列的mq_msgsize,否则将立即返回EMSGSIZE错误。
- mq_send的priority参数必须小于MQ_PRIO_MAX。
异步事件通知
posix消息队列允许异步事件通知,以告知何时有一个消息放到了空的队列中,通知有两种方式:
- 产生一个信号
- 创建一个线程来执行一个指定的函数
int mq_notify(mqd_t mq, const struct sigevent *notification);
相关参数的结构定义如下:
union sigval {
int sigval_int; // integer value
void *sigval_ptr; // pointer value
};
struct sigevent {
int sigev_notify; // SIGEV_NONE, SIGEV_SIGNAL, SIGEV_THREAD
int sigev_signo; // signal number if SIGEV_SIGNAL
union sigval sigev_value; // passed to signal handler or thread
void (*sigev_notify_function)(union sigval);
pthread_attr_t *sigev_notify_attributes;
};
说明:
- notification参数非空表示注册,空表示注销。
- 任何时刻只有一个进程可以被注册为接收某个给定队列的通知。
- 当有消息达到空队列时,假设已有进程被注册为接收该队列通知,只有在没有任何线程阻塞在该队列的mq_receive调用的前提下,通知才会发出,即mq_receive调用中的阻塞比通知优先级要高。
- 当通知被发送给它的注册进程时,其注册即被撤销,该进程必须再次调用mq_notify以重新注册。
信号处理示例
发送端
#include <stdlib.h>
#include <mqueue.h>
#include <string.h>
#include <unistd.h>
#include <stdio.h>
int main() {
mqd_t mq = mq_open("/testmq", O_WRONLY);
if (mq == (mqd_t)-1) {
perror("mq_open");
return 1;
}
int i;
char buf[65] = {0};
for (i = 0; i < 26; i++) {
memset(buf, 'A'+i, 64);
printf("[%04d] send %04d bytes: [%s]\n", i+1, 64, buf);
if (mq_send(mq, buf, 64, 0) < 0) {
perror("mq_send");
break;
}
}
mq_close(mq);
return 0;
}
接收端
#include <stdio.h>
#include <mqueue.h>
#include <string.h>
#include <stdlib.h>
#include <signal.h>
int main() {
mq_unlink("/testmq");
mqd_t mq = mq_open("/testmq", O_RDONLY | O_NONBLOCK | O_CREAT, 0644, NULL);
if (mq == (mqd_t)-1) {
perror("mq_open");
return 1;
}
struct mq_attr attr;
mq_getattr(mq, &attr);
int size = attr.mq_msgsize;
char *buf = calloc(1, size);
sigset_t mask;
sigemptyset(&mask);
sigaddset(&mask, SIGUSR1);
sigprocmask(SIG_BLOCK, &mask, NULL);
struct sigevent sigev;
sigev.sigev_notify = SIGEV_SIGNAL;
sigev.sigev_signo = SIGUSR1;
mq_notify(mq, &sigev);
int n, signo, idx = 0;
for (;;) {
sigwait(&mask, &signo);
if (signo == SIGUSR1) {
mq_notify(mq, &sigev);
while ((n = mq_receive(mq, buf, size, NULL)) >= 0)
printf("[%04d] recv %04d bytes: [%s]\n", ++idx, n, buf);
}
}
return 0;
}
线程处理示例
异步事件通知的另一种方式是将sigev_notify设置为SIGEV_THREAD,启动一个线程来响应消息处理。该线程调用的函数由sigev_notify_function指定,所用的参数由sigev_value指定,新线程的线程属性由sigev_notify_attr指定,空指针表示使用默认属性。
发送端代码相同,接收端代码如下:
#include <stdio.h>
#include <mqueue.h>
#include <string.h>
#include <stdlib.h>
#include <signal.h>
#include <pthread.h>
#include <unistd.h>
mqd_t mq;
struct mq_attr attr;
struct sigevent sigev;
static void notify_function(union sigval arg) {
int idx = 0, n;
char *buf = calloc(1, attr.mq_msgsize);
mq_notify(mq, &sigev);
while ((n = mq_receive(mq, buf, attr.mq_msgsize, NULL)) >= 0)
printf("[%04d] recv %04d bytes: [%s]\n", ++idx, n, buf);
free(buf);
pthread_exit(NULL);
}
int main() {
mq_unlink("/testmq");
mq = mq_open("/testmq", O_RDONLY | O_NONBLOCK | O_CREAT, 0644, NULL);
if (mq == (mqd_t)-1) {
perror("mq_open");
return 1;
}
mq_getattr(mq, &attr);
sigev.sigev_notify = SIGEV_THREAD;
sigev.sigev_value.sival_ptr = NULL;
sigev.sigev_notify_function = notify_function;
sigev.sigev_notify_attributes = NULL;
mq_notify(mq, &sigev);
for (;;) pause();
return 0;
}