进程间通信 - 信号量

首先想到的问题是,为什么我们需要信号量?一个简单的答案,保护多个进程之间共享的关键/公共区域.

让我们假设,多个进程使用相同的代码区域,如果所有进程都需要并行访问,那么结果是重叠的.比如说,例如,多个用户只使用一台打印机(公共/关键部分),比如3个用户,同时给出3个作业,如果所有作业并行启动,则一个用户输出与另一个重叠.因此,我们需要使用信号量来保护它,即在一个进程运行时锁定关键部分并在完成时解锁.这将针对每个用户/进程重复进行,以便一个作业不与另一个作业重叠.

基本上信号量分为两种类型 :

二进制信号量 : 只有两个状态0& 1,即锁定/解锁或可用/不可用,Mutex实现.

计算信号量 : 允许任意资源计数的信号量称为计数信号量.

假设我们有5台打印机(要理解假设1台打印机只接受1份工作),我们有3个工作要打印.现在将为3台打印机(每台1台)提供3个工作.在此过程中再次有4个工作岗位.现在,在2台可用的打印机中,已经安排了2个作业,我们剩下2个作业,只有在其中一个资源/打印机可用后才能完成.根据资源可用性的这种调度可以被视为计数信号量.

要使用信号量执行同步,以下是步骤 :

第1步 : 创建信号量或连接到已存在的信号量(semget())

步骤2 : 对信号量执行操作,即分配或释放或等待资源(semop())

步骤3 : 对消息队列执行控制操作(semctl())

现在,让我们用我们的系统调用来检查它.

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>

int semget(key_t key, int nsems, int semflg)

此系统调用创建或分配System V信号量集.需要传递以下参数 :

  • 第一个参数key识别消息队列.键可以是任意值,也可以是从库函数ftok()派生的值.

  • 第二个参数nsems指定的数量信号灯.如果是二进制,那么它是1,意味着需要1个信号量集,否则根据所需的信号量集数量.

  • 第三个参数semflg,指定所需的信号量标志,如IPC_CREAT(如果不存在,则创建信号量)或IPC_EXCL(与IPC_CREAT一起使用以创建信号量,如果信号量已存在则调用失败).还需要传递权限.

注意 : 有关权限的详细信息,请参阅前面的部分.

此调用将在成功时返回有效的信号量标识符(用于进一步调用信号量),如果失败则返回-1.要知道失败的原因,请使用errno variable或perror()函数进行检查.

有关此调用的各种错误是EACCESS(权限被拒绝),EEXIST(队列已经存在不能)创建),ENOENT(队列不存在),ENOMEM(没有足够的内存来创建队列),ENOSPC(超出最大集限制)等等.

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>

int semop(int semid, struct sembuf *semops, size_t nsemops)

此系统调用在System V上执行操作信号量设置即,分配资源,等待资源或释放资源.以下参数需要传递 :

  • 第一个参数semid表示由semget()创建的信号量集标识符.

  • 第二个参数semops是指向要在信号量集上执行的操作数组的指针.结构如下<

struct sembuf {
   unsigned short sem_num; /* Semaphore set num */
   short sem_op; /* Semaphore operation */
   short sem_flg; /* Operation flags, IPC_NOWAIT, SEM_UNDO */
};

上述结构中的元素sem_op表示需要执行的操作 :

  • 如果sem_op是-ve,则分配或获取资源.阻止调用进程,直到其他进程释放了足够的资源,以便该进程可以分配.

  • 如果sem_op为零,则调用进程等待或休眠直到信号量值达到0.

  • 如果sem_op是+ ve,则释放资源.

例如 :

struct sembuf sem_lock = {0,-1,SEM_UNDO};

struct sembuf sem_unlock = {0,1,SEM_UNDO};

  • 第三个参数nsemops是该数组中的操作数.

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>

int semctl(int semid, int semnum, int cmd, …)

此系统调用执行System V的控制操作信号.需要传递以下参数 :

  • 第一个参数semid是信号量的标识符.此id是信号量标识符,它是semget()系统调用的返回值.

  • 第二个参数semnum是信号量的数量.信号量从0开始编号.

  • 第三个参数cmd是对信号量执行所需控制操作的命令.

  • 类型为union semun的第四个参数取决于cmd.在少数情况下,第四个参数不适用.

让我们检查联合semun :

union semun {
   int val; /* val for SETVAL */
   struct semid_ds *buf; /* Buffer for IPC_STAT and IPC_SET */
   unsigned short *array; /* Buffer for GETALL and SETALL */
   struct seminfo *__buf; /* Buffer for IPC_INFO and SEM_INFO*/
};

在sys/sem.h中定义的semid_ds数据结构如下 :

struct semid_ds {
   struct ipc_perm sem_perm; /* Permissions */
   time_t sem_otime; /* Last semop time */
   time_t sem_ctime; /* Last change time */
   unsigned long sem_nsems; /* Number of semaphores in the set */
};

注意 : 有关其他数据结构,请参阅手册页.

union semun arg; cmd的有效值为 :

  • IPC_STAT : 将struct semid_ds的每个成员的当前值的信息复制到arg.buf指向的传递结构.此命令需要对信号量的读取权限.

  • IPC_SET : 设置结构semid_ds指向的用户ID,所有者的组ID,权限等.

  • IPC_RMID : 删除信号集.

  • IPC_INFO : 返回有关arg .__ buf指向的结构semid_ds中的信号量限制和参数的信息.

  • SEM_INFO : 返回一个seminfo结构,其中包含有关信号量消耗的系统资源的信息.

此调用将返回值(非负值),具体取决于通过命令.成功后,IPC_INFO和SEM_INFO或SEM_STAT返回根据信号量使用的最高条目的索引或标识符或GETNCNT的semncnt值或GETPID的sempid值或GETVAL 0的semval值用于其他成功操作和 - 1如果失败.要知道失败的原因,请使用errno变量或perror()函数进行检查.

在查看代码之前,让我们了解它的实现 :

  • 创建两个进程,例如,child和parent.

  • 创建共享内存主要需要存储计数器和其他标志以指示读/写过程结束到共享存储器中.

  • 计数器由父进程和子进程递增计数.计数作为命令行参数传递或作为默认值传递(如果未作为命令行参数传递或值小于10000).调用一定的睡眠时间以确保父和子同时访问共享内存,即并行.

  • 因为,计数器会逐步递增父母和孩子都是1,最终值应该是柜台的两倍.由于父进程和子进程同时执行操作,因此计数器不会根据需要递增.因此,我们需要确保一个流程完成的完整性,然后是其他流程.

  • 所有上述实现都在文件shm_write_cntr.c中执行

  • 检查计数器值是否在文件shm_read_cntr.c中实现

  • 为确保完成,信号量程序在文件shm_write_cntr_with_sem.c中实现.完成整个过程后删除信号量(从其他程序读取完毕后)

  • 因为,我们有单独的文件来读取计数器中的值共享内存并且写入没有任何影响,读取程序保持不变(shm_read_cntr.c)

  • 执行编写程序总是更好在一个终端和从另一个终端读取程序.由于程序仅在写入和读取过程完成后才完成执行,因此可以在完全执行写入程序后运行程序.写程序将一直等到读程序运行,并且只在完成后才结束.

没有信号量的程序.

/* Filename: shm_write_cntr.c */
#include<stdio.h>
#include<sys/ipc.h>
#include<sys/shm.h>
#include<sys/types.h>
#include<string.h>
#include<errno.h>
#include<stdlib.h>
#include<unistd.h>
#include<string.h>

#define SHM_KEY 0x12345
struct shmseg {
   int cntr;
   int write_complete;
   int read_complete;
};
void shared_memory_cntr_increment(int pid, struct shmseg *shmp, int total_count);

int main(int argc, char *argv[]) {
   int shmid;
   struct shmseg *shmp;
   char *bufptr;
   int total_count;
   int sleep_time;
   pid_t pid;
   if (argc != 2)
   total_count = 10000;
   else {
      total_count = atoi(argv[1]);
      if (total_count < 10000)
      total_count = 10000;
   }
   printf("Total Count is %d\n", total_count);
   shmid = shmget(SHM_KEY, sizeof(struct shmseg), 0644|IPC_CREAT);

   if (shmid == -1) {
      perror("Shared memory");
      return 1;
   }

   // Attach to the segment to get a pointer to it.
   shmp = shmat(shmid, NULL, 0);
   if (shmp == (void *) -1) {
      perror("Shared memory attach");
      return 1;
   }
   shmp->cntr = 0;
   pid = fork();

   /* Parent Process - Writing Once */
   if (pid > 0) {
      shared_memory_cntr_increment(pid, shmp, total_count);
   } else if (pid == 0) {
      shared_memory_cntr_increment(pid, shmp, total_count);
      return 0;
   } else {
      perror("Fork Failure\n");
      return 1;
   }
   while (shmp->read_complete != 1)
   sleep(1);

   if (shmdt(shmp) == -1) {
      perror("shmdt");
      return 1;
   }

   if (shmctl(shmid, IPC_RMID, 0) == -1) {
      perror("shmctl");
      return 1;
   }
   printf("Writing Process: Complete\n");
   return 0;
}

/* Increment the counter of shared memory by total_count in steps of 1 */
void shared_memory_cntr_increment(int pid, struct shmseg *shmp, int total_count) {
   int cntr;
   int numtimes;
   int sleep_time;
   cntr = shmp->cntr;
   shmp->write_complete = 0;
   if (pid == 0)
   printf("SHM_WRITE: CHILD: Now writing\n");
   else if (pid > 0)
   printf("SHM_WRITE: PARENT: Now writing\n");
   //printf("SHM_CNTR is %d\n", shmp->cntr);
   
   /* Increment the counter in shared memory by total_count in steps of 1 */
   for (numtimes = 0; numtimes < total_count; numtimes++) {
      cntr += 1;
      shmp->cntr = cntr;
      
      /* Sleeping for a second for every thousand */
      sleep_time = cntr % 1000;
      if (sleep_time == 0)
      sleep(1);
   }
   
   shmp->write_complete = 1;
   if (pid == 0)
   printf("SHM_WRITE: CHILD: Writing Done\n");
   else if (pid > 0)
   printf("SHM_WRITE: PARENT: Writing Done\n");
   return;
}

编制和执行步骤

Total Count is 10000
SHM_WRITE: PARENT: Now writing
SHM_WRITE: CHILD: Now writing
SHM_WRITE: PARENT: Writing Done
SHM_WRITE: CHILD: Writing Done
Writing Process: Complete

现在,让我们检查共享内存读取程序.

/* Filename: shm_read_cntr.c */
#include<stdio.h>
#include<sys/ipc.h>
#include<sys/shm.h>
#include<sys/types.h>
#include<string.h>
#include<errno.h>
#include<stdlib.h>
#include<unistd.h>

#define SHM_KEY 0x12345
struct shmseg {
   int cntr;
   int write_complete;
   int read_complete;
};

int main(int argc, char *argv[]) {
   int shmid, numtimes;
   struct shmseg *shmp;
   int total_count;
   int cntr;
   int sleep_time;
   if (argc != 2)
   total_count = 10000;
   
   else {
      total_count = atoi(argv[1]);
      if (total_count < 10000)
      total_count = 10000;
   }
   shmid = shmget(SHM_KEY, sizeof(struct shmseg), 0644|IPC_CREAT);
   
   if (shmid == -1) {
      perror("Shared memory");
      return 1;
   }
   // Attach to the segment to get a pointer to it.
   shmp = shmat(shmid, NULL, 0);
   
   if (shmp == (void *) -1) {
      perror("Shared memory attach");
      return 1;
   }
   
   /* Read the shared memory cntr and print it on standard output */
   while (shmp->write_complete != 1) {
      if (shmp->cntr == -1) {
         perror("read");
         return 1;
      }
      sleep(3);
   }
   printf("Reading Process: Shared Memory: Counter is %d\n", shmp->cntr);
   printf("Reading Process: Reading Done, Detaching Shared Memory\n");
   shmp->read_complete = 1;
   
   if (shmdt(shmp) == -1) {
      perror("shmdt");
      return 1;
   }
   printf("Reading Process: Complete\n");
   return 0;
}

编制和执行步骤

Reading Process: Shared Memory: Counter is 11000
Reading Process: Reading Done, Detaching Shared Memory
Reading Process: Complete

如果你观察到在输出上面,计数器应该是20000,但是,由于在完成一个过程任务之前其他过程也是并行处理,因此计数器值不是预期的.输出因系统而异,并且每次执行都会有所不同.为了确保两个进程在完成一个任务后执行任务,应该使用同步机制来实现.

现在,让我们使用信号量检查相同的应用程序.

注意 : 阅读程序保持不变.

/* Filename: shm_write_cntr_with_sem.c */
#include<stdio.h>
#include<sys/types.h>
#include<sys/ipc.h>
#include<sys/shm.h>
#include<sys/sem.h>
#include<string.h>
#include<errno.h>
#include<stdlib.h>
#include<unistd.h>
#include<string.h>

#define SHM_KEY 0x12345
#define SEM_KEY 0x54321
#define MAX_TRIES 20

struct shmseg {
   int cntr;
   int write_complete;
   int read_complete;
};
void shared_memory_cntr_increment(int, struct shmseg*, int);
void remove_semaphore();

int main(int argc, char *argv[]) {
   int shmid;
   struct shmseg *shmp;
   char *bufptr;
   int total_count;
   int sleep_time;
   pid_t pid;
   if (argc != 2)
   total_count = 10000;
   else {
      total_count = atoi(argv[1]);
      if (total_count < 10000)
      total_count = 10000;
   }
   printf("Total Count is %d\n", total_count);
   shmid = shmget(SHM_KEY, sizeof(struct shmseg), 0644|IPC_CREAT);
   
   if (shmid == -1) {
      perror("Shared memory");
      return 1;
   }
   // Attach to the segment to get a pointer to it.
   shmp = shmat(shmid, NULL, 0);
   
   if (shmp == (void *) -1) {
      perror("Shared memory attach: ");
      return 1;
   }
   shmp->cntr = 0;
   pid = fork();
   
   /* Parent Process - Writing Once */
   if (pid > 0) {
      shared_memory_cntr_increment(pid, shmp, total_count);
   } else if (pid == 0) {
      shared_memory_cntr_increment(pid, shmp, total_count);
      return 0;
   } else {
      perror("Fork Failure\n");
      return 1;
   }
   while (shmp->read_complete != 1)
   sleep(1);
   
   if (shmdt(shmp) == -1) {
      perror("shmdt");
      return 1;
   }
   
   if (shmctl(shmid, IPC_RMID, 0) == -1) {
      perror("shmctl");
      return 1;
   }
   printf("Writing Process: Complete\n");
   remove_semaphore();
   return 0;
}

/* Increment the counter of shared memory by total_count in steps of 1 */
void shared_memory_cntr_increment(int pid, struct shmseg *shmp, int total_count) {
   int cntr;
   int numtimes;
   int sleep_time;
   int semid;
   struct sembuf sem_buf;
   struct semid_ds buf;
   int tries;
   int retval;
   semid = semget(SEM_KEY, 1, IPC_CREAT | IPC_EXCL | 0666);
   //printf("errno is %d and semid is %d\n", errno, semid);
   
   /* Got the semaphore */
   if (semid >= 0) {
      printf("First Process\n");
      sem_buf.sem_op = 1;
      sem_buf.sem_flg = 0;
      sem_buf.sem_num = 0;
      retval = semop(semid, &sem_buf, 1);
      if (retval == -1) {
         perror("Semaphore Operation: ");
         return;
      }
   } else if (errno == EEXIST) { // Already other process got it
      int ready = 0;
      printf("Second Process\n");
      semid = semget(SEM_KEY, 1, 0);
      if (semid < 0) {
         perror("Semaphore GET: ");
         return;
      }
      
      /* Waiting for the resource */
      sem_buf.sem_num = 0;
      sem_buf.sem_op = 0;
      sem_buf.sem_flg = SEM_UNDO;
      retval = semop(semid, &sem_buf, 1);
      if (retval == -1) {
         perror("Semaphore Locked: ");
         return;
      }
   }
   sem_buf.sem_num = 0;
   sem_buf.sem_op = -1; /* Allocating the resources */
   sem_buf.sem_flg = SEM_UNDO;
   retval = semop(semid, &sem_buf, 1);
   
   if (retval == -1) {
      perror("Semaphore Locked: ");
      return;
   }
   cntr = shmp->cntr;
   shmp->write_complete = 0;
   if (pid == 0)
   printf("SHM_WRITE: CHILD: Now writing\n");
   else if (pid > 0)
   printf("SHM_WRITE: PARENT: Now writing\n");
   //printf("SHM_CNTR is %d\n", shmp->cntr);
   
   /* Increment the counter in shared memory by total_count in steps of 1 */
   for (numtimes = 0; numtimes < total_count; numtimes++) {
      cntr += 1;
      shmp->cntr = cntr;
      /* Sleeping for a second for every thousand */
      sleep_time = cntr % 1000;
      if (sleep_time == 0)
      sleep(1);
   }
   shmp->write_complete = 1;
   sem_buf.sem_op = 1; /* Releasing the resource */
   retval = semop(semid, &sem_buf, 1);
   
   if (retval == -1) {
      perror("Semaphore Locked\n");
      return;
   }
   
   if (pid == 0)
      printf("SHM_WRITE: CHILD: Writing Done\n");
      else if (pid > 0)
      printf("SHM_WRITE: PARENT: Writing Done\n");
      return;
}
   
void remove_semaphore() {
   int semid;
   int retval;
   semid = semget(SEM_KEY, 1, 0);
      if (semid < 0) {
         perror("Remove Semaphore: Semaphore GET: ");
         return;
      }
   retval = semctl(semid, 0, IPC_RMID);
   if (retval == -1) {
      perror("Remove Semaphore: Semaphore CTL: ");
      return;
   }
   return;
}

编制和执行步骤

Total Count is 10000
First Process
SHM_WRITE: PARENT: Now writing
Second Process
SHM_WRITE: PARENT: Writing Done
SHM_WRITE: CHILD: Now writing
SHM_WRITE: CHILD: Writing Done
Writing Process: Complete

现在,我们将通过阅读过程检查计数器值.

执行步骤

Reading Process: Shared Memory: Counter is 20000
Reading Process: Reading Done, Detaching Shared Memory
Reading Process: Complete