消息队列

为什么我们已经拥有共享内存时需要消息队列?这可能有多种原因,让我们尝试将其分解为多个点以进行简化 :

  • 据了解,一次消息由进程接收,它将不再可用于任何其他进程.而在共享内存中,数据可供多个进程访问.

  • 如果我们想要与小消息格式进行通信.

  • 当多个进程同时进行通信时,共享内存数据需要通过同步进行保护.

  • 写作频率并且使用共享内存读取很高,那么实现该功能将非常复杂.在这种情况下使用不值得.

  • 如果所有进程不需要访问共享内存但很少有进程只需要它,该怎么办? ,最好使用消息队列实现.

  • 如果我们想要与不同的数据包通信,请说进程A正在向进程B发送消息类型1 ,消息类型10到进程C,消息类型20到进程D.在这种情况下,使用消息队列实现更简单.为了简化给定的消息类型为1,10,20,它可以是0或+ ve或-ve,如下所述.

  • 当然,订单消息队列是FIFO(先进先出).插入队列的第一条消息是第一个要检索的消息.

使用共享内存或消息队列取决于需要应用程序以及如何有效地利用它.

使用消息队列的通信可以通过以下方式进行 :

  • 由一个进程写入共享内存,另一个进程从共享内存中读取.我们知道,阅读也可以通过多个流程完成.

消息队列

  • 通过一个进程使用不同的数据包写入共享内存并通过多个数据包读取过程,即根据消息类型.

Multiple消息队列

已经看到有关消息队列的某些信息,现在是时候检查支持消息队列的系统调用(系统V)了.

要使用消息队列执行通信,请执行以下步骤 :

步骤1 : 创建消息队列或连接到现有的消息队列(msgget())

步骤2 : 写入消息队列(msgsnd())

步骤3 : 从消息队列中读取(msgrcv())

步骤4 : 对消息队列执行控制操作(msgctl())

现在,让我们检查上述调用的语法和某些信息.

 
 #include< sys/types.h> 
 #include< sys/ipc.h> 
 #include< sys/msg.h> 
 int msgget(key_t key,int msgflg)

此系统调用创建或分配System V消息队列.以下参数需要传递 :

  • 第一个参数key识别消息队列.键可以是任意值,也可以是从库函数ftok()派生的值.

  • 第二个参数shmflg指定所需的消息队列标志/如IPC_CREAT(如果不存在则创建消息队列)或IPC_EXCL(与IPC_CREAT一起使用以创建消息队列,如果消息队列已存在则调用失败).还需要传递权限.

注意 : 有关权限的详细信息,请参阅前面的部分.

此调用将在成功时返回有效的消息队列标识符(用于进一步调用消息队列),如果失败则返回-1.要知道失败的原因,请使用errno variable或perror()函数进行检查.

有关此调用的各种错误是EACCESS(权限被拒绝),EEXIST(队列已经存在不能)创建),ENOENT(队列不存在),ENOMEM(没有足够的内存来创建队列)等等.

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgsnd(int msgid, const void *msgp, size_t msgsz, int msgflg)

此系统调用发送/附加一个消息进入消息队列(系统V).以下参数需要传递 :

  • 第一个参数msgid识别消息队列,即消息队列标识符. msgget()成功时收到标识符值

  • 第二个参数msgp是指向消息的指针,发送给调用者,定义于以下形式的结构 :

 
 struct msgbuf {
 long mtype ; 
 char mtext [1]; 
};

变量mtype用于与不同的消息类型进行通信,详见msgrcv()调用.变量mtext是一个数组或其他结构,其大小由msgsz(正值)指定.如果未提及mtext字段,则将其视为零大小消息,这是允许的.

  • 第三个参数,msgsz,是消息的大小(消息应以空字符结尾)

  • 第四个参数msgflg表示某些标志,如IPC_NOWAIT(返回当队列中没有消息或MSG_NOERROR时立即消息(截断消息文本,如果超过msgsz字节)

此调用将返回0成功,失败的情况为-1.要知道失败的原因,请使用errno变量或perror()函数进行检查.

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgrcv(int msgid, const void *msgp, size_t msgsz, long msgtype, int msgflg)

此系统调用从消息队列(系统V)检索消息.以下参数需要通过 :

  • 第一个参数msgid识别消息队列,即消息队列标识符. msgget()成功时收到标识符值

  • 第二个参数msgp是从调用者接收的消息的指针.它在以下形式的结构中定义 :

 
 struct msgbuf {
 long mtype; 
 char mtext [1]; 
};

变量mtype用于与不同的消息类型进行通信.变量mtext是一个数组或其他结构,其大小由msgsz(正值)指定.如果未提及mtext字段,则将其视为零大小消息,这是允许的.

  • 第三个参数,msgsz,是收到的消息的大小(消息应以空字符结尾)

  • fouth参数msgtype表示消息的类型 :

    • 如果msgtype为0 : 读取队列中第一个收到的消息

    • 如果msgtype是+ ve : 读取msgtype类型队列中的第一条消息(如果msgtype为10,则只读取类型10的第一条消息,即使其他类型可能在开头的队列中)

    • 如果msgtype是-ve : 读取小于或等于消息类型绝对值的最小类型的第一条消息(例如,如果msgtype为-5,则它会读取类型小于5的第一条消息,即消息类型从1到5)

  • 第五个参数msgflg表示某些标志,例如IPC_NOWAIT(当没有找到消息时立即返回在队列中或MSG_NOERROR(如果超过msgsz字节,则截断消息文本)

此调用将返回实际接收的字节数成功时的mtext数组和失败时的-1.要知道失败的原因,请使用errno变量或perror()函数进行检查.

 
 #include< sys/types.h> 
 #include< sys/ipc.h> 
 #include< sys/msg.h> 
 int msgctl(int msgstr,int cmd,struct msqid_ds * buf)

此系统调用执行消息队列(系统V)的控制操作.以下参数需要是传递 :

  • 第一个参数msgid识别消息队列,即消息队列标识符. msgget()成功时收到标识符值

  • 第二个参数cmd是对消息队列执行所需控制操作的命令. cmd的有效值为 :

IPC_STAT : 将struct msqid_ds的每个成员的当前值的信息复制到buf指向的传递结构.此命令需要对消息队列具有读取权限.

IPC_SET : 设置结构buf指向的用户ID,所有者的组ID,权限等.

IPC_RMID : 立即删除邮件队列.

IPC_INFO : 返回有关buf指向的结构中的消息队列限制和参数的信息,其类型为struct msginfo

MSG_INFO : 返回一个msginfo结构,其中包含消息队列消耗的系统资源的信息.

  • 第三个参数buf是一个指向名为struct msqid_ds的消息队列结构的指针.此结构的值将用于set或按cmd获取.

此调用将返回值,具体取决于传递的值命令. IPC_INFO和MSG_INFO或MSG_STAT的成功返回消息队列的索引或标识符,或者对于其他操作返回0,如果失败则返回-1.要知道失败的原因,请使用errno variable或perror()函数进行检查.

在看过有关消息队列的基本信息和系统调用之后,现在是时候检查一下程序.

在查看程序之前让我们看一下说明 :

第1步 : 创建两个进程,一个用于发送到消息队列(msgq_send.c),另一个用于从消息队列中检索(msgq_recv.c)

步骤2  : 去;使用ftok()函数创建密钥.为此,最初创建文件msgq.txt以获取唯一密钥.

步骤3 : 发送过程执行以下操作.

  • 读取用户输入的字符串

  • 删除新行(如果存在)

  • 发送到邮件队列

  • 重复此过程直到输入结束(CTRL + D)

  • 收到输入结束后,发送消息"结束"表示流程结束

第4步 : 在接收过程中,执行以下操作.

  • 从队列中读取消息

  • 显示输出

  • 如果收到的消息是"结束",则完成流程并退出

为简化起见,我们没有使用此示例的消息类型.此外,一个进程正在写入队列,另一个进程正在从队列中读取.这可以根据需要进行扩展,即理想情况下,一个进程会写入队列,多个进程从队列中读取.

现在,让我们检查进程(消息发送到队列中) - 文件:msgq_send.c

/* Filename: msgq_send.c */
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

#define PERMS 0644
struct my_msgbuf {
   long mtype;
   char mtext[200];
};

int main(void) {
   struct my_msgbuf buf;
   int msqid;
   int len;
   key_t key;
   system("touch msgq.txt");
   
   if ((key = ftok("msgq.txt", 'B')) == -1) {
      perror("ftok");
      exit(1);
   }
   
   if ((msqid = msgget(key, PERMS | IPC_CREAT)) == -1) {
      perror("msgget");
      exit(1);
   }
   printf("message queue: ready to send messages.\n");
   printf("Enter lines of text, ^D to quit:\n");
   buf.mtype = 1; /* we don't really care in this case */
   
   while(fgets(buf.mtext, sizeof buf.mtext, stdin) != NULL) {
      len = strlen(buf.mtext);
      /* remove newline at end, if it exists */
      if (buf.mtext[len-1] == '\n') buf.mtext[len-1] = '\0';
      if (msgsnd(msqid, &buf, len+1, 0) == -1) /* +1 for '\0' */
      perror("msgsnd");
   }
   strcpy(buf.mtext, "end");
   len = strlen(buf.mtext);
   if (msgsnd(msqid, &buf, len+1, 0) == -1) /* +1 for '\0' */
   perror("msgsnd");
   
   if (msgctl(msqid, IPC_RMID, NULL) == -1) {
      perror("msgctl");
      exit(1);
   }
   printf("message queue: done sending messages.\n");
   return 0;
}

编译和执行步骤

message queue: ready to send messages.
Enter lines of text, ^D to quit:
this is line 1
this is line 2
message queue: done sending messages.

以下是来自消息接收过程的代码(从队列中检索消息) - 文件:msgq_recv.c

/* Filename: msgq_recv.c */
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

#define PERMS 0644
struct my_msgbuf {
   long mtype;
   char mtext[200];
};

int main(void) {
   struct my_msgbuf buf;
   int msqid;
   int toend;
   key_t key;
   
   if ((key = ftok("msgq.txt", 'B')) == -1) {
      perror("ftok");
      exit(1);
   }
   
   if ((msqid = msgget(key, PERMS)) == -1) { /* connect to the queue */
      perror("msgget");
      exit(1);
   }
   printf("message queue: ready to receive messages.\n");
   
   for(;;) { /* normally receiving never ends but just to make conclusion 
             /* this program ends wuth string of end */
      if (msgrcv(msqid, &buf, sizeof(buf.mtext), 0, 0) == -1) {
         perror("msgrcv");
         exit(1);
      }
      printf("recvd: %s\n", buf.mtext);
      toend = strcmp(buf.mtext,"end");
      if (toend == 0)
      break;
   }
   printf("message queue: done receiving messages.\n");
   system("rm msgq.txt");
   return 0;
}

编译和执行步骤

message queue: ready to receive messages.
recvd: "this is line 1"
recvd: "this is line 2"
recvd: "end"
message queue: done receiving messages.