posix消息队列

消息队列可看成一个消息链表,每条消息都包括优先级、消息长度和数据本身三个属性,可用于同一主机上不同进程之间通信,它具有随内核持续性。

消息队列主要有posix和system V两种实现版本,其区别在于:

编程接口

posix消息队列的维护比较简单:

相关头文件为mqueue.h,编译时需链接-lrt库。

mqd_t mq_open(const char *name, int oflag);
mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr);
int mq_close(mqd_t mq);
int mq_unlink(const char *name);

说明:

int mq_getattr(mqd_t mq, struct mq_attr *attr);
int mq_setattr(mqd_t mq, const struct mq_attr *attr, struct mq_attr *oldattr);

消息队列的属性定义如下:

struct mq_attr {
    long mq_flags;   // mq flag: 0, O_NONBLOCK
    long mq_maxmsg;  // max number of msg allowd on queue
    long mq_msgsize; // max size of a msg in bytes
    long mq_curmsgs; // number of msgs currently on queue
};

注意,最大消息数和消息的最大字节数只能在创建队列时设置。

int mq_send(mqd_t mq, const char *ptr, size_t len, unsigned priority);
ssize_t mq_receive(mqd_t mq, char *ptr, size_t len, unsigned *priority);

说明:

异步事件通知

posix消息队列允许异步事件通知,以告知何时有一个消息放到了空的队列中,通知有两种方式:

int mq_notify(mqd_t mq, const struct sigevent *notification);

相关参数的结构定义如下:

union sigval {
    int    sigval_int;  // integer value
    void  *sigval_ptr;  // pointer value
};

struct sigevent {
    int  sigev_notify;  // SIGEV_NONE, SIGEV_SIGNAL, SIGEV_THREAD
    int  sigev_signo;   // signal number if SIGEV_SIGNAL
    union sigval sigev_value; // passed to signal handler or thread
    void (*sigev_notify_function)(union sigval);
    pthread_attr_t *sigev_notify_attributes;
};

说明:

信号处理示例

发送端

#include <stdlib.h>
#include <mqueue.h>
#include <string.h>
#include <unistd.h>
#include <stdio.h>

int main() {
    mqd_t mq = mq_open("/testmq", O_WRONLY);
    if (mq == (mqd_t)-1) {
        perror("mq_open");
        return 1;
    }

    int i;
    char buf[65] = {0};
    for (i = 0; i < 26; i++) {
        memset(buf, 'A'+i, 64);
        printf("[%04d] send %04d bytes: [%s]\n", i+1, 64, buf);
        if (mq_send(mq, buf, 64, 0) < 0) {
            perror("mq_send");
            break;
        }
    }
    mq_close(mq);
    return 0;
}

接收端

#include <stdio.h>
#include <mqueue.h>
#include <string.h>
#include <stdlib.h>
#include <signal.h>

int main() {
    mq_unlink("/testmq");
    mqd_t mq = mq_open("/testmq", O_RDONLY | O_NONBLOCK | O_CREAT, 0644, NULL);
    if (mq == (mqd_t)-1) {
        perror("mq_open");
        return 1;
    }

    struct mq_attr attr;
    mq_getattr(mq, &attr);

    int  size = attr.mq_msgsize;
    char *buf = calloc(1, size);

    sigset_t mask;
    sigemptyset(&mask);
    sigaddset(&mask, SIGUSR1);
    sigprocmask(SIG_BLOCK, &mask, NULL);

    struct sigevent sigev;
    sigev.sigev_notify = SIGEV_SIGNAL;
    sigev.sigev_signo = SIGUSR1;

    mq_notify(mq, &sigev);

    int n, signo, idx = 0;
    for (;;) {
        sigwait(&mask, &signo);
        if (signo == SIGUSR1) {
            mq_notify(mq, &sigev);
            while ((n = mq_receive(mq, buf, size, NULL)) >= 0)
                printf("[%04d] recv %04d bytes: [%s]\n", ++idx, n, buf);
        }
    }

    return 0;
}

线程处理示例

异步事件通知的另一种方式是将sigev_notify设置为SIGEV_THREAD,启动一个线程来响应消息处理。该线程调用的函数由sigev_notify_function指定,所用的参数由sigev_value指定,新线程的线程属性由sigev_notify_attr指定,空指针表示使用默认属性。

发送端代码相同,接收端代码如下:

#include <stdio.h>
#include <mqueue.h>
#include <string.h>
#include <stdlib.h>
#include <signal.h>
#include <pthread.h>
#include <unistd.h>

mqd_t mq;
struct mq_attr attr;
struct sigevent sigev;

static void notify_function(union sigval arg) {
    int idx = 0, n;
    char *buf = calloc(1, attr.mq_msgsize);
    mq_notify(mq, &sigev);
    while ((n = mq_receive(mq, buf, attr.mq_msgsize, NULL)) >= 0)
        printf("[%04d] recv %04d bytes: [%s]\n", ++idx, n, buf);
    free(buf);
    pthread_exit(NULL);
}

int main() {
    mq_unlink("/testmq");
    mq = mq_open("/testmq", O_RDONLY | O_NONBLOCK | O_CREAT, 0644, NULL);
    if (mq == (mqd_t)-1) {
        perror("mq_open");
        return 1;
    }

    mq_getattr(mq, &attr);

    sigev.sigev_notify = SIGEV_THREAD;
    sigev.sigev_value.sival_ptr = NULL;
    sigev.sigev_notify_function = notify_function;
    sigev.sigev_notify_attributes = NULL;

    mq_notify(mq, &sigev);

    for (;;) pause();

    return 0;
}
Table of Contents