本文共 11770 字,大约阅读时间需要 39 分钟。
1、概述
消息队列可以认为是一个消息链表,System V 消息队列使用消息队列标识符标识。具有足够特权的任何进程都可以往一个队列放置一个消息,具有足够特权的任何进程都可以从一个给定队列读出一个消息。在某个进程往一个队列写入消息之前,并不需要另外某个进程在该队列上等待消息的到达。System V 消息队列是随内核持续的,只有在内核重起或者显示删除一个消息队列时,该消息队列才会真正被删除。可以将内核中的某个特定的消息队列画为一个消息链表,如下图所示:
对于系统中没个消息队列,内核维护一个msqid_ds的信息结构:struct msqid_ds
{ struct msqid_ds { struct ipc_perm msg_perm; struct msg *msg_first; /* first message on queue,unused */ struct msg *msg_last; /* last message in queue,unused */ __kernel_time_t msg_stime; /* last msgsnd time */ __kernel_time_t msg_rtime; /* last msgrcv time */ __kernel_time_t msg_ctime; /* last change time */ unsigned long msg_lcbytes; /* Reuse junk fields for 32 bit */ unsigned long msg_lqbytes; /* ditto */ unsigned short msg_cbytes; /* current number of bytes on queue */ unsigned short msg_qnum; /* number of messages in queue */ unsigned short msg_qbytes; /* max number of bytes on queue */ __kernel_ipc_pid_t msg_lspid; /* pid of last msgsnd */ __kernel_ipc_pid_t msg_lrpid; /* last receive pid */};2、System V 消息队列操作函数
系统V消息队列API共有四个,使用时需要包括几个头文件:
#include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> |
1)int msgget(key_t key, int msgflg)
参数key是一个键值,由ftok获得;msgflg参数是一些标志位。该调用返回与健值key相对应的消息队列描述字。
在以下两种情况下,该调用将创建一个新的消息队列:
参数msgflg可以为以下:IPC_CREAT、IPC_EXCL、IPC_NOWAIT或三者的或结果。
调用返回:成功返回消息队列描述字,否则返回-1。
注:参数key设置成常数IPC_PRIVATE并不意味着其他进程不能访问该消息队列,只意味着即将创建新的消息队列。
2)int msgrcv(int msqid, struct msgbuf *msgp, int msgsz, long msgtyp, int msgflg);
该系统调用从msgid代表的消息队列中读取一个消息,并把消息存储在msgp指向的msgbuf结构中。msqid为消息队列描述字;消息返回后存储在msgp指向的地址,msgsz指定msgbuf的mtext成员的长度(即消息内容的长度),msgtyp为请求读取的消息类型;读消息标志msgflg可以为以下几个常值的或:
msgrcv手册中详细给出了消息类型取不同值时(>0; <0; =0),调用将返回消息队列中的哪个消息。
msgrcv()解除阻塞的条件有三个:
调用返回:成功返回读出消息的实际字节数,否则返回-1。
3)int msgsnd(int msqid, struct msgbuf *msgp, int msgsz, int msgflg);
向msgid代表的消息队列发送一个消息,即将发送的消息存储在msgp指向的msgbuf结构中,消息的大小由msgze指定。对发送消息来说,有意义的msgflg标志为IPC_NOWAIT,指明在消息队列没有足够空间容纳要发送的消息时,msgsnd是否等待。造成msgsnd()等待的条件有两种:
msgsnd()解除阻塞的条件有三个:
调用返回:成功返回0,否则返回-1。
4)int msgctl(int msqid, int cmd, struct msqid_ds *buf);
该系统调用对由msqid标识的消息队列执行cmd操作,共有三种cmd操作:IPC_STAT、IPC_SET 、IPC_RMID。调用返回:成功返回0,否则返回-1。
写个程序创建一个消息队列,然后网消息队列中放置一个含有1个字节的消息,发出msgctl的IPC_STAT命令,使用system函数执行ipcs命令,最后使用IPC_RMID命令删除该队列。程序如下:
1 #include2 #include 3 #include 4 #include 5 #include 6 #include 7 8 #define MSG_R 0400 /* read permission */ 9 #define MSG_W 0200 /* write permission */10 11 #define SVMSG_MODE (MSG_R | MSG_W | MSG_R >>3 | MSG_R >>6)12 13 typedef unsigned long ulong_t;14 struct msgbuf15 {16 long mtype;17 char mtext[1];18 };19 20 int main()21 {22 int msqid;23 struct msqid_ds info;24 struct msgbuf buf;25 if((msqid = msgget(IPC_PRIVATE,SVMSG_MODE|IPC_CREAT)) == -1)26 {27 perror("msgget() error");28 exit(-1);29 }30 buf.mtype = 1;31 buf.mtext[0] = 1;32 if(msgsnd(msqid,&buf,1,0) == -1)33 {34 perror("msgsnd() error");35 exit(-1);36 }37 if(msgctl(msqid,IPC_STAT,&info) == -1)38 {39 perror("msgctl() error");40 exit(-1);41 }42 printf("read-write: %03o,cbytes = %lu,qnum =%lu,qbytes = %lu\n",43 info.msg_perm.mode & 0777,(ulong_t) info.msg_cbytes,44 (ulong_t) info.msg_qnum,(ulong_t) info.msg_qbytes);45 system("ipcs -q");46 if(msgctl(msqid,IPC_RMID,NULL) == -1)47 {48 perror("msgctl() error");49 exit(-1);50 }51 exit(0);52 }
程序执行结果如下:
System V消息队列是随内核持续的,我们可以编写一组小程序来操作消息队列。程序如下:
创建消息队列msgcreate程序:
1 #include2 #include 3 #include 4 #include 5 #include 6 #include 7 8 #define MSG_R 0400 /* read permission */ 9 #define MSG_W 0200 /* write permission */10 11 #define SVMSG_MODE (MSG_R | MSG_W | MSG_R >>3 | MSG_R >>6)12 13 int main(int argc,char *argv[])14 {15 int c,oflag,mqid;16 key_t key;17 oflag = SVMSG_MODE | IPC_CREAT;18 //通过-e命令指定IPC_EXCL标志19 while((c= getopt(argc,argv,"e")) != -1) 20 {21 switch(c)22 {23 case 'e':24 oflag |= IPC_EXCL;25 break;26 }27 }28 if(optind != argc -1)29 {30 printf("usage: msgcreate [-e] ");31 exit(0);32 }33 //创建key34 if((key = ftok(argv[optind],0)) == -1)35 {36 perror("ftok() error");37 exit(-1);38 }39 //创建或者打开一个消息队列40 if((mqid = msgget(key,oflag)) == -1)41 {42 perror("msgget() error");43 exit(-1);44 }45 exit(0);46 }
向消息队列放置信息msgsnd程序:
1 #include2 #include 3 #include 4 #include 5 #include 6 #include 7 8 #define MSG_W 0200 /* write permission */ 9 //自定义消息结构10 struct msgbuf11 {12 long mtype;13 char mtext[100];14 };15 16 int main(int argc,char *argv[])17 {18 int c,oflag,mqid;19 size_t len;20 long type;21 struct msgbuf *ptr;22 key_t key;23 if(argc != 4)24 {25 printf("usage: msgsnd <#bytes> ");26 exit(0);27 }28 len = atoi(argv[2]);29 type = atoi(argv[3]);30 if((key = ftok(argv[1],0)) == -1)31 {32 perror("ftok() error");33 exit(-1);34 }35 if((mqid = msgget(key,MSG_W)) == -1)36 {37 perror("msgget() error");38 exit(-1);39 }40 ptr = calloc(sizeof(long)+len,sizeof(char));41 ptr->mtype = type;42 if(msgsnd(mqid,ptr,len,0) == -1)43 {44 perror("msgsnd() error");45 exit(-1);46 }47 exit(0);48 }
从消息队列中读出一个消息msgrcv程序:
1 #include2 #include 3 #include 4 #include 5 #include 6 #include 7 8 #define MAXMSG (8192+sizeof(long)) 9 #define MSG_R 0400 /* read permission */10 11 struct msgbuf12 {13 long mtype;14 char mtext[100];15 };16 17 int main(int argc,char *argv[])18 {19 int c,flag,mqid;20 long type;21 ssize_t n;22 struct msgbuf *buff;23 key_t key;24 type = flag = 0;25 while((c = getopt(argc,argv,"nt:")) != -1)26 {27 switch(c)28 {29 case 'n':30 flag |= IPC_NOWAIT;31 break;32 case 't':33 type = atol(optarg);34 break;35 }36 }37 if(optind != argc -1)38 {39 printf("usage: msgrcv[-n] [-t type] ");40 exit(0);41 }42 if((key = ftok(argv[optind],0)) == -1)43 {44 perror("ftok() error");45 exit(-1);46 }47 if((mqid = msgget(key,MSG_R)) == -1)48 {49 perror("msgget() error");50 exit(-1);51 }52 buff = malloc(MAXMSG) ;53 if(buff == NULL)54 {55 perror("malloc() error");56 exit(-1);57 }58 if((n=msgrcv(mqid,buff,MAXMSG,type,flag)) == -1)59 {60 perror("msgsnd() error");61 exit(-1);62 }63 printf("read %d bytes,type =%ld\n",n,buff->mtype);64 exit(0);65 }
删除一个消息队列msgrmid程序:
1 #include2 #include 3 #include 4 #include 5 #include 6 #include 7 8 int main(int argc,char *argv[]) 9 {10 int mqid;11 key_t key;12 if(argc != 2)13 {14 printf("usage: msgrmid ");15 exit(0);16 }17 if((key = ftok(argv[1],0)) == -1)18 {19 perror("ftok() error");20 exit(-1);21 }22 if((mqid = msgget(key,0)) == -1)23 {24 perror("msgget() error");25 exit(-1);26 }27 msgctl(mqid,IPC_RMID,NULL);28 exit(0);29 }
程序测试结果如下:
写个客户-服务器例子,创建两个消息队列,一个队列用来从客户到服务器的消息,一个队列用于从服务器到客户的消息。主要功能是:客户向服务器发现一条消息,服务接收到并输出接收到的客户消息,然后服务器向客户发送一条消息,客户显示服务器发送的消息。程序如下:
公共头文件svmsg.h:
1 #ifndef SVMSG_H 2 #define SVMSG_H 3 #include4 #include 5 #include 6 #include 7 #include 8 #include 9 #include 10 #define MQ_KEY1 1234L //定义两个KEY11 #define MQ_KEY2 2345L12 #define MAXMESGDATA 102413 #define MAXMSG sizeof(long)*2+102414 //消息结构信息15 struct msgbuf16 {17 long mesg_type; //消息类型18 char mesg_data[1]; //消息内容19 };20 #endif
客户端程序svmsg_client.c
1 #include2 #include 3 #include 4 #include "svmsg.h" 5 6 void client(int ,int); 7 8 int main(int argc,char *argv[]) 9 {10 int readid,writeid;11 //打开消息队列12 //MO_KEY2用于客户从服务器读取13 if((readid = msgget(MQ_KEY2,0)) == -1) 14 {15 perror("msgget() error");16 exit(-1);17 }18 //MO_KEY用于客户向服务器写19 if((writeid = msgget(MQ_KEY1,0))==-1)20 {21 perror("msgget() error");22 exit(-1);23 }24 client(readid,writeid);25 msgctl(readid,IPC_RMID,NULL); //删除消息队列26 msgctl(writeid,IPC_RMID,NULL);27 exit(0);28 }29 30 void client(int readid ,int writeid)31 {32 ssize_t n,len;33 struct msgbuf *buff,*ptr;34 char content[MAXMESGDATA];35 long type;36 printf("Enter the msg_type: ");37 scanf("%ld",&type);38 printf("Enter the msg_data: ");39 scanf("%s",content);40 len = strlen(content);41 ptr = calloc(sizeof(long)*2+len,sizeof(char));42 ptr->mesg_type = type;43 strcpy(ptr->mesg_data,content);44 printf("Sent to server.\n");45 if(msgsnd(writeid,ptr,len+sizeof(long),0) == -1)46 {47 perror("msgsnd() error");48 exit(-1);49 }50 printf("Sent to server successfully.\n");51 free(ptr);52 printf("*********************************\n");53 buff = malloc(MAXMSG );54 printf("waiting for server......\n");55 //以阻塞形式读取56 if((n=msgrcv(readid,buff,MAXMSG,0,0)) == -1)57 {58 perror("msgrcv() error");59 exit(-1);60 }61 printf("Received message from server is:\nmsg_type = %ld\nmsg_data = %s\n",62 buff->mesg_type,buff->mesg_data);63 free(buff);64 }
服务器程序svmsg_server.c
1 #include "svmsg.h" 2 3 #define MSG_R 0400 /* read permission */ 4 #define MSG_W 0200 /* write permission */ 5 #define SVMSG_MODE (MSG_R | MSG_W | MSG_R >>3 | MSG_R >>6) 6 7 void server(int ,int); 8 9 int main(int argc,char *argv[])10 {11 int readid,writeid;12 if((readid = msgget(MQ_KEY1,SVMSG_MODE | IPC_CREAT)) == -1)13 {14 perror("msgget() error");15 exit(-1);16 }17 if((writeid = msgget(MQ_KEY2,SVMSG_MODE | IPC_CREAT)) == -1)18 {19 perror("msgget() error");20 exit(-1);21 }22 server(readid,writeid);23 exit(0);24 }25 26 void server(int readid,int writeid)27 {28 ssize_t n,len;29 struct msgbuf *buff,*ptr;30 char content[MAXMESGDATA];31 long type;32 buff = malloc(MAXMSG );33 printf("waiting for client......\n");34 if((n=msgrcv(readid,buff,MAXMSG,0,0)) == -1)35 {36 perror("msgrcv() error");37 exit(-1);38 }39 printf("Received message from client is:\nmsg_type = %ld\nmsg_data = %s\n",40 buff->mesg_type,buff->mesg_data);41 free(buff);42 printf("*********************************\n");43 printf("Enter the msg_type: ");44 scanf("%ld",&type);45 printf("Enter the msg_data: ");46 scanf("%s",content);47 len = strlen(content);48 ptr = calloc(sizeof(long)*2+len,sizeof(char));49 ptr->mesg_type = type;50 strcpy(ptr->mesg_data,content);51 printf("Sent to client.\n");52 if(msgsnd(writeid,ptr,len,0) == -1)53 {54 perror("msgsnd() error");55 exit(-1);56 }57 printf("Sent to client successfully.\n");58 free(buff);59 }
程序测试结果如下:
3、复用消息
与一个队列中的每个消息相关联的类型字段提供了两个特性:
(1)类型字段可以用于标识消息,从而允许多个进程在单个队列上复用消息。
(2)类型字段可以用做优先级,允许接收者以不同于先进先出的某个顺序读出各个消息。
转载地址:http://yiyil.baihongyu.com/