POSIX 消息队列

POSIX消息队列与System V消息队列的主要区别:
1.对POSIX队列的读总数返回最高优先级到最早消息,对SV队列到读则可以返回任意指定优先级的消息
2.当往一个空队列放置一个消息时,POSIX允许产生一个信号或启动一个线程,System V不提供此机制

消息的属性:
1.一个无符号整数的优先级(POSIX)或一个长整数的类型(SV)
2.消息的数据部分长度(可以为0)
3.数据本身(如果长度大于0)

POSIX消息队列总结:
mq_open创建一个新队列或者打开一个已经存在的队列
mq_close关闭队列
mq_unlink删除队列名,删除队列
mq_send往队列放置消息
mq_receive从一个队列中读出消息
mq_setattr和mq_getattr查询和设置队列的属性
mq_notify允许注册一个信号或者线程,在有一个消息被放置到空队列时,发送信号或者激活线程
每个消息被赋予一个小整数优先级,mq_receive总是返回最高优先级的最早消息

限制:
/proc/sys/fs/mqueue/msg_max 10
/proc/sys/fs/mqueue/msgsize_max 8192
/proc/sys/fs/mqueue/queues_max 256

创建一个新的消息队列或者打开一个已经存在的消息队列
<mqueue.h> 注意:编译加-lrt
<fcntl.h>
<sys/stat.h>
mqd_t mq_open(const char *name, int oflag);
mqd_t mq_open(const char *name, int oflag, mode_t mode,  struct mq_attr *attr);
成功返回描述字,失败返回-1并设置errno
name: 必须为/开头!!!
oflag: O_RDONLY, O_WRONLY, O_RDWR, O_CREAT, O_EXCL, O_NONBLOCK

关闭消息队列,但不能删除它
mqd_t mq_close(mqd_t mqdes);
成功返回0,失败返回-1

删除消息队列,不一定马上删除消息队列,但队列名会立即删除
mqd_t mq_unlink(const char *name);
成功返回0,失败返回-1
当某个进程还没有关闭此消息队列时,调用mq_unlink时,不会马上删除队列,当最后一个进程关闭队列时,该队列被删除
int flags;
mqd_t mqd;
flags = O_RDWR | O_CREAT | O_EXCL;
mqd = mq_open("/tmp.111", flags, 0644, NULL);
if (mqd == (mqd_t)-1) {
perror("mq_open");
return 1;
}

消息队列的属性
mq_getattr mq_setattr
mqd_t mq_getattr(mqd_t mqdes, struct mq_attr *attr);
mqd_t mq_setattr(mqd_t mqdes, struct mq_attr *newattr, struct mq_attr *oldattr);
成功返回0,失败返回-1
struct mq_attr {
long mq_flags;       /* Flags: 0 or O_NONBLOCK */
long mq_maxmsg;      /* Max. # of messages on queue */
long mq_msgsize;     /* Max. message size (bytes) */
long mq_curmsgs;     /* # of messages currently in queue */
};
mq_setattr只能修改mq_flags属性,maxmsg和msgsize在mq_open时设置
mqd_t mqd;
struct mq_attr attr;
mqd = mq_open(argv[1], O_RDONLY);
mq_getattr(mqd, &attr);
printf("maxmsg=%ld, msgsize=%ld, curmsgs=%ld\n",
attr.mq_maxmsg, attr.mq_msgsize, attr.mq_curmsgs);
mq_close(mqd);

收发消息
mq_send mq_receive
mq_receive返回队列中最高优先级的最早消息,而且该优先级能随该消息的内容及其长度一起返回

ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned int *msg_prio);
成功返回消息的长度,消息的实际长度,不包括消息头;失败返回-1
msg_len指示msg_ptr的长度,必须大于等于mq_msgsize
如果msg_prio不为NULL,函数返回消息的优先级
如果队列为空,调用将阻塞,如果队列设置0_NONBLOCK,调用立即返回EAGAIN

// 向队列加入一条消息
mqd_t mqd;
char *msg;
size_t len;
unsigned int prio;
len = 100;
prio = 5;
mqd = mq_open("/abc.123", O_WRONLY);
msg = (char *)malloc(len);
memset(msg, 0, len);
mq_send(mqd, msg, len, prio);

// 从队列读入一条消息
mqd_t mqd;
char *msg;
size_t len;
int n;
unsigned int prio;
struct mq_attr attr;
mqd = mq_open("/abc.123", O_RDONLY);
mq_getattr(mqd, &attr);
len = attr.mq_msgsize;
msg = (char *)malloc(len);
memset(msg, 0, len);
n = mq_receive(mqd, msg, len, &prio);
printf("read %ld bytes, priority=%u\n", (long)n, prio);

队列限制
long int open_max = sysconf(_SC_MQ_OPEN_MAX);  // -1
long int prio_max = sysconf(_SC_MQ_PRIO_MAX);  // 32768

消息通告
当往空队列放置了一个消息时,通知进程
通告方式有2种:
1. 产生一个信号
2. 创建一个线程执行一个指定的函数
mqd_t mq_notify(mqd_t mqdes, const struct sigevent *notification);
成功返回0;失败返回-1
给队列建立或者删除异步事件通知
1.如果notification非空,那么当前进程希望在有一个消息到达而且队列先前为空时得到通知,该进程被注册为接收该队列的通知
2.如果notification为空,而且当前进程目前被注册为接收该队列的通知,那么现有注册将被撤销
3.任意时刻只有一个进程可以被注册为接收队列的通知
4.当有一个消息到达一个空队列,而且已经有一个进程被注册为接收该队列的通知时,只有在没有任何线程阻塞在该队列的mq_receive调用的前提下,通知才会发送。即在mq_receive调用中的阻塞比任何通知的注册都优先
5.当该通知已经发送给它的注册进程时,其注册即被撤销。该进程必须再次调用mq_notify以重新注册
6.当调用mq_notify但是队列不为空时,通知不会发送;当队列变为空,并且有一个消息入队时,才发送通知

union sigval {                /* Data passed with notification */
int     sival_int;        /* Integer value */
void   *sival_ptr;        /* Pointer value */
};

struct sigevent {
int    sigev_notify;      /* Notification method */
int    sigev_signo;       /* Notification signal */
union sigval sigev_value; /* Data passed with notification */
void (*sigev_notify_function) (union sigval);
/* Function for thread notification */
void  *sigev_notify_attributes;
/* Thread function attributes */
};
sigev_notify:SIGEV_NONE,SIGEV_SIGNAL,SIGEV_THREAD

// 使用非阻塞mq_receive的信号通知
volatile sig_atomic_t mqflag;
static void sig_usr1(int);
int main(int argc, char *argv[])
{
mqd_t mqd;
void *buf;
ssize_t n;
sigset_t zeromask, newmask, oldmask;
struct mq_attr attr;
struct sigevent sigev;

assert(argc == 2);
mqd = mq_open(argv[1], O_RDONLY|O_NONBLOCK);
mq_getattr(mqd, &attr);
buf = malloc(attr.mq_msgsize);
sigemptyset(&zeromask);
sigemptyset(&newmask);
sigemptyset(&oldmask);
sigaddset(&newmask, SIGUSR1);
signal(SIGUSR1, sig_usr1);
sigev.sigev_notify = SIGEV_SIGNAL;
sigev.sigev_signo = SIGUSR1;
mq_notify(mqd, &sigev);

for ( ; ; ) {
sigprocmask(SIG_BLOCK, &newmask, &oldmask);
while (mqflag == 0)
sigsuspend(&zeromask);
mqflag = 0;
mq_notify(mqd, &sigev);
while ((n = mq_receive(mqd, buf, attr.mq_msgsize, NULL)) >= 0) {
printf("read %ld bytes\n", (long)n);
}
if (errno != EAGAIN)
die("mq_receive");
sigprocmask(SIG_UNBLOCK, &newmask, NULL);
}

return 0;
}
static void sig_usr1(int signo)
{
mqflag = 1;
return;
}

// 使用sigwait代替信号处理程序的信号通知
#include <signal.h>
int sigwait(const sigset_t *set, int *sig);
成功返回0,并设置sig为收到的信号;失败返回错误码

int main(...)
{
...
sigemptyset(&newmask);
sigaddset(&newmask, SIGUSR1);
sigprocmask(SIGBLOCK, &newmask, NULL);

sigev.sigev_notify = SIGEV_SIGNAL;
sigev.sigev_signo = SIGUSR1;
mq_notify(mqd, &sigev);
for ( ; ; ) {
sigwait(&newmask, &signo);
if (signo == SIGUSR1) {
mq_notify(mqd, &sigev);
while ((n = mq_receive(mqd, buf, len, NULL)) >=0) {
printf("read %ld bytes\n", n);
}
if (errno != EAGAIN)
die("mq_receive");
}
}
...
}

// 使用select的POSIX消息队列
int pfds[2];
static void sig_usr1(int);
int main(int argc, char *arg[])
{
int fds;
char c;
fd_set rfds;
mqd_t mqd;
void *buf;
ssize_t n;
size_t len;
struct mq_attr attr;
struct sigevent sigev;

asset(argc == 2);
mqd = mq_open(argv[1], O_RDONLY|O_NONBLOCK);
mq_getattr(mqd, &attr);
len = attr.mq_msgsize;
buf = malloc(len);
pipe(pfds);
// 设置信号处理程序,建立通知
signal(SIGUSR1, sig_usr1);
sigev.sigev_notify = SIGEV_SIGNAL;
sigev.sigev_signo = SIGUSR1;
mq_notify(mqd, &sigev);
FD_ZERO(&rfds);
for ( ; ; ) {
FD_SET(pfds[0], &rfds);
nfds = select(pfds[0]+1, &rfds, NULL, NULL, NULL);
if (FD_ISSET(pfds[0], &rfds)) { // 管道可读
read(pfds[0], &c, 1);
mq_notify(mqd, &sigev);
while ((n = mq_receive(mqd, buf, len, NULL)) >= 0) {
printf("read %ld bytes\n", (long)n);
}
if (errno != EAGAIN)
die("mq_receive");
}
}
return 0;
}

static void sig_usr1(int signo)
{
write(pfds[1], "", 1); // 异步信号处理安全的函数
return;
}

// 收到通知后,启动一个线程,接收消息,然后结束进程
#include <pthread.h>
#include <mqueue.h>
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#define die(msg) { perror(msg); exit(EXIT_FAILURE); }
static void tfunc(union sigval sv)   /* Thread start function */
{
struct mq_attr attr;
ssize_t nr;
void *buf;
mqd_t mqdes = *((mqd_t *) sv.sival_ptr);
/* Determine max. msg size; allocate buffer to receive msg */
if (mq_getattr(mqdes, &attr) == -1) die("mq_getattr");
buf = malloc(attr.mq_msgsize);
if (buf == NULL) die("malloc");
nr = mq_receive(mqdes, buf, attr.mq_msgsize, NULL);
if (nr == -1) die("mq_receive");
printf("Read %ld bytes from MQ0\n", (long) nr);
free(buf);
exit(EXIT_SUCCESS);         /* Terminate the process */
}

int main(int argc, char *argv[])
{
mqd_t mqdes;
struct sigevent not;
assert(argc == 2);
mqdes = mq_open(argv[1], O_RDONLY);
if (mqdes == (mqd_t) -1) die("mq_open");
not.sigev_notify = SIGEV_THREAD;
not.sigev_notify_function = tfunc;
not.sigev_notify_attributes = NULL;
not.sigev_value.sival_ptr = &mqdes;   /* Arg. to thread func. */
if (mq_notify(mqdes, &not) == -1) die("mq_notify");
pause();    /* Process will be terminated by thread function */
return 0;
}         

// 启动一个新线程
mqd_t mqd;
struct mq_attr attr;
struct sigevent sigev;
static void notify_thread(union sigval);
int main(int argc, char *argv[])
{
assert(argc == 2);
mqd = mq_open(argv[1], O_RDONLY|O_NONBLOCK);
mq_getattr(mqd, &attr);
sigev.sigev_notify = SIGEV_THREAD;
sigev.sigev_value.sival_ptr = NULL;
sigev.sigev_notify_function = notify_thread;
sigev.sigev_notify_attributes = NULL;
mq_notify(mqd, &sigev);
for ( ; ; )
pause();
return 0;
}
static void notify_thread(union sigval arg)
{
ssize_t n;
size_t len;
void *buf;
len = attr.mq_msgsize;
printf("notify_thread started\n");
buf = malloc(len);
mq_notify(mqd, &sigev);
while ((n = mq_receive(mqd, buf, len, NULL)) >= 0) {
printf("read %ld bytes\n", (long)n);
}
if (errno != EAGAIN)
die("mq_receive");
free(buf);
pthread_exit(NULL);
}

POSIX实时信号
unix信号分为两大组:
实时信号:SIGRTMIN--SIGRTMAX
其他信号:SIGINT, SIGQUIT, SIGKILL, ...

信号的实时行为取决于SA_SIGINFO
实时行为包含以下特征:
1.信号是排队的,即如果一个信号产生了3次,它就递交3次。以FIFO的顺序排队
2.当有多种SIGRTMIN到SIGRTMAX范围内的解阻塞信号排队时,值较小的信号先于值较大的信号递交(注意:linux与此相反)
3.当某个非实时信号递交时,传递给它的信号处理的唯一参数是该信号的值,实时信号比其他信号传递更多的信息
4.有些新函数使用实时信号工作,如sigqueue用来代替kill

// 查看实时信号的递交顺序
static void sig_rt(int, siginfo_t *, void *);
int main(void)
{
int i, j;
pid_t pid;
sigset_t newset;
union sigval val;
printf("SIGRTMIN=%d, SIGRTMAX=%d\n", (int)SIGRTMIN, (int)SIGRTMAX);
pid = fork();
if (pid < 0) die("fork");
else if (pid == 0) {
/* 阻塞3个实时信号 */
sigemptyset(&newset);
sigaddset(&newset, SIGRTMIN);
sigaddset(&newset, SIGRTMIN+1);
sigaddset(&newset, SIGRTMIN+2);
sigprocmask(SIG_BLOCK, &newset, NULL);
signal_rt(SIGRTMIN, sig_rt);
signal_rt(SIGRTMIN+1, sig_rt);
signal_rt(SIGRTMIN+2, sig_rt);
sleep(6);
sigprocmask(SIG_UNBLOCK, &newset, NULL);
sleep(3);
exit(0);
}
else {
sleep(3);
for (i=SIGRTMIN; i<=SIGRTMIN+2; i++) {
for (j=0; j<=2; j++) {
val.sival_int = j;
sigqueue(pid, i, val);
printf("send signal signo=%d, val=%d\n", i, j);
}
}
exit(0);
}
}

static void sig_rt(int signo, siginfo_t *info, void *context)
{
printf("receive signal signo=%d, code=%d, ival=%d\n",
signo, info->si_code, info->si_value.sival_int);
}
typedef void sigfunc_rt(int, siginfo_t *, void *);
sigfunc_rt *signal_rt(int signo, sigfunc_rt *func)
{
struct sigaction act, oact;
act.sa_sigaction = func;
sigemptyset(&act.sa_mask);
act.sa_flags = SA_SIGINFO; /* 实时信号必须指定 */
if (signo == SIGALRM) {
#ifdef    SA_INTERRUPT
act.sa_flags |= SA_INTERRUPT;
#endif       
}
else {
#ifdef    SA_RESTART
act.sa_flags |= SA_RESTART;
#endif
}
if (sigaction(signo, &act, &oact) < 0)
return (sigfunc_rt *)SIG_ERR;
else
return oact.sa_sigaction;
}
输出如下:
[root@jiangkun unp]# ./rtsig
SIGRTMIN=34, SIGRTMAX=64
send signal signo=34, val=0
send signal signo=34, val=1
send signal signo=34, val=2
send signal signo=35, val=0
send signal signo=35, val=1
send signal signo=35, val=2
send signal signo=36, val=0
send signal signo=36, val=1
send signal signo=36, val=2
receive signal signo=36, code=-1, ival=0
receive signal signo=36, code=-1, ival=1
receive signal signo=36, code=-1, ival=2
receive signal signo=35, code=-1, ival=0
receive signal signo=35, code=-1, ival=1
receive signal signo=35, code=-1, ival=2
receive signal signo=34, code=-1, ival=0
receive signal signo=34, code=-1, ival=1
receive signal signo=34, code=-1, ival=2

struct sigaction {
void (*sa_handler)(int);
void (*sa_sigaction)(int, siginfo_t *, void *);
sigset_t sa_mask;
int sa_flags;
void (*sa_restorer)(void); /* 被遗弃了! */
};

实时信号之所以是可靠的,因为在进程阻塞该信号的时间内,发给该进程的所有实时信号会排队,而非实时信号则会合并为一个信号。早期的kill函数只能向特 定的进程发送一个特定的信号,并且早期的信号处理函数也不能接受附加数据。siqueue和sigaction解决了这个问题。
下面这个例子中,进程先屏蔽SIGINT和SIGRTMIN两个信号,其中SIGINT是非实时信号,而SIGRTMIN为实时信号,接着进程睡眠,睡眠完成之后再接触对这两个信号的屏蔽,此时可以比较对两种信号的处理方式是否一样。
#include <stdio.h>
#include <string.h>
#include <signal.h>
#include <unistd.h>
void sig_handler(int, siginfo_t*, void*);
int main(int argc,char *argv[])
{
struct sigaction act;
sigset_t newmask, oldmask;
int rc;   
sigemptyset(&newmask);
/* 往信号集中添加一个非实时信号 */
sigaddset(&newmask, SIGINT);
/* 往信号集中添加一个实时信号 */
sigaddset(&newmask, SIGRTMIN);
/* 屏蔽实时信号SIGRTMIN */
sigprocmask(SIG_BLOCK, &newmask, &oldmask);
act.sa_sigaction = sig_handler;
act.sa_flags = SA_SIGINFO;
if(sigaction(SIGINT, &act, NULL) < 0) {
printf("install signal error\n");
}
if(sigaction(SIGRTMIN, &act, NULL) < 0) {
printf("install signal error\n");
}
printf("pid = %d\n", getpid());
/* 进程睡眠,在此时间内的发给该进程的所有实时信号 将排队,不会有信号丢失 */
sleep(20);   
/* 解除对SIGRTMIN信号的屏蔽,信号处理函数将会被调用 */
sigprocmask(SIG_SETMASK, &oldmask, NULL);
return 0;
}
void sig_handler(int signo, siginfo_t *info, void *context)
{
if(signo == SIGINT)
printf("Got a common signal\n");
else
printf("Got a real time signal\n");
}

将程序编译好之后,再开一个终端用于发送实时信号。
# ./sigqueue_receive
pid = 8871
进程开始睡眠……
在新的终端输入:
ecy@ecy-geek:~/pthreads$ kill -SIGRTMIN 8871
ecy@ecy-geek:~/pthreads$ kill -SIGRTMIN 8871
ecy@ecy-geek:~/pthreads$ kill -SIGRTMIN 8871
ecy@ecy-geek:~/pthreads$ kill -SIGRTMIN 8871
连续发送四个SIGRTMIN,接着回到之前的终端,连续四次按下"ctrl+c"。
^C^C^C^C
最后进程终于醒来,整个输出如下:
pid = 8871
^C^C^C^CGot a real time signal
Got a real time signal
Got a real time signal
Got a real time signal
Got a common signal
果然接受到四个实时信号,并且四次调用了信号处理函数,而对于SIGINT,虽然也按下了四次"ctrl+c",但是进程对其只做一次处理。这个例子中是先发实时信号后发非实时信号,所以信号处理函数先处理实
时信号,如果只是按照顺序注册信号的话,这很好理解,但是换一下,先按下了四次"ctrl+c"然后使用kill发四次实时信号,结果发现输出的结果仍然 一样,这说明实时信号的优先级比非实时信号要高,内核每个进程的信号组成一个双向链表,实时信号插入的时候就不是随便插在尾部了。
信号的优先级:信号实质上是软中断,中断有优先级,信号也有优先级。如果一个进程有多个未决信号,则对于同一个未决的实时信号,内核将按照发送的顺序来递 交信号。如果存在多个未决的实时信号,则值(或者说编号)越大的越先被递送。如果既存在不可靠信号,又存在可靠信号(实时信号),虽然POSIX对这一情 况没有明确规定,但Linux系统和大多数遵循POSIX标准的操作系统一样,将优先递交可靠信号。一个进程如果处理 SIGQUIT(3),SIGINT(2),SIGHUP(1)(通过"kill -l" 可以查看信号的编号),那么先后给该进程发送SIGINT,SIGHUP,SIGQUIT,处理的顺序会是SIGQUIT,SIGINT,SIGHUP, 不论改变这个三个信号的发送顺序,处理的顺序都是一样的。

posted on 2012-03-21 21:49 Marv 阅读(4742) 评论(0)  编辑 收藏 引用


只有注册用户登录后才能发表评论。
网站导航: 博客园   IT新闻   BlogJava   知识库   博问   管理


<2024年4月>
31123456
78910111213
14151617181920
21222324252627
2829301234
567891011

导航

统计

常用链接

留言簿

随笔档案

文章分类

搜索

最新评论

阅读排行榜

评论排行榜