【OS】操作系统-实验三:进程通信

操作环境:

  • 主机:MacOS 10.13.3
  • 虚拟机软件:Parallels Desktop
  • Linux:Ubuntu 16.04 Kernal 4.13.0-36-generic

实验要求:

  1. 实现一个模拟的 shell:

编写三个不同的程序 cmd1.c, cmd2.c, cmd3.c,每个程序的功能自定,分别编译成可执行文件 cmd1, cmd2, cmd3。 然后再编写一个程序,模拟 shell 程序的功能:能根据用户输入的字符串(表示相应的命令名),为相应的命令创建子进程并让它去执行相应的程序,而父进程则等待子进程结束,然后再等待接收下一条命令。如果接收到的命令为 exit,则父进程结束:如果接收到的命令是无效命令,则显示“Command not found“,继续等待。

  1. 实现一个管道通信程序:

由父进程创建一个管道,然后再创建 3 个子进程,并由这三个子进程利用管道与父进程之间进行通信:子进程发送信息,父进程等三个子进程全部发完消息后再接收信息。通信的具体内容可根据自己的需要随意设计,要求能试验阻塞型读写过程中的各种情况,测试管道的默认大小,并且要求利用 Posix 信号量机制实现进程间对管道的互斥访问。运行程序,观察各种情况下,进程实际读写的字节数以及进程阻塞唤醒的情况。

  1. 利用 linux 的消息队列通信机制实现两个线程间的通信:

编写程序创建两个线程:sender 线程和 receive 线程,其中 sender 线程运行函数 sender(),它创建一个消息队列,然后,循环等待用户通过终端输入串字符,将这串字符通过消息队列发送给 receiver 线程,直到用户输入“exit“”为止;最后,它向 recever 线程发送消息“end“,并且等待 recever 的应答,等到应答消息后,将接收到的应答信息显示在终端屏幕上,删除相关消息队列,结束程序的运行。Recever 线程运行 receive(),它通过消息队列接收来自 sender 的消息,将消息显示在终端屏幕上,直至收到内容为“end”的消息为止,此时,它向 sender 发送一个应答消息“over“,结束程序的运行。使用无名信号量实现两个线程之间的同步与互斥。

  1. 利用 linux 的共享内存通信机制实现两个进程间的通信:

编写程序 sender,它创建一个共享内存,然后等待用户通过终端输入一串字符,并将这串字符通过共享内存发送给 receiver;最后,它等待 receiver 的应答,收到应答消息后,将接收到的应答信息显示在终端屏幕上,删除共享内存,结束程序的运行。编写 receiver 程序,它通过共享内存接收来自 sender 的消息,将消息显示在终端屏幕上,然后再通过该共享内存向 sender 发送一个应答消息“over“,结束程序的运行。使用有名信号量或 SystemV 信号量实现两个进程对共享内存的互斥及同步使用。

操作说明:

  1. 实现一个模拟的 shell:

    1. 程序构成:
      1. 命令读取
      2. 分割参数
      3. 判断是否退出
      4. 调用子程序执行命令
    2. 关键函数:
      1. 分割字符串 char **split() (自己写)
      2. 子程序 pid_t fork() ( unistd.h )
      3. 执行 shell 命令 int execvp() ( unistd.h )
      4. 等待子程序 pid_t waitpid() ( sys/wait.h )
    3. 效果:

    1.png

    1. 代码:见附录
  2. 实现一个管道通信程序:

    1. 程序构成:
      1. 建立 Pipe 管道:管道有两条,分别进行读写操作。
      2. 由 fork() 函数生成3个子进程,子进程同时独立地向 Pipe 发送单一字符'A','B'或'C',等到 Pipe 已满,无法继续写入时退出并显示自己写入的字符数。
      3. 主程序等待子程序结束,从 Pipe 中读取前 1000 个字节并显示。
    2. 关键函数:
      1. 创建管道函数 pipe()
      2. 设置函数 fcntl() ( fcntl.h )
      3. 信号量函数 sem_****() ( semaphore.h )
    3. 效果:
    • 编译并运行:gcc -pthread pipe.c -o pipe && ./pipe

    2.png

    • 得到 Pipe 的默认大小为 65536B
    1. 代码:见附录
  3. 利用 linux 的消息队列通信机制实现两个线程间的通信:

    1. 程序构成:
      1. 新建两个函数,分别为:
      • void * sender(void * a)
      • void * receiver(void * a)
      1. 将消息放入一个结构体中,进行发送与接收
      2. 使用两个信号量来保证其互斥及同步
      3. 在主程序中使用 pthread_create 调用上面的两个函数,使他们运行
      4. 使用 pthread_join 等待期运行完毕
    2. 关键函数:
      1. 打开/新建一个消息队列 msgget() ( sys/msg.h )
      2. 发送一个消息 msgsnd() ( sys/msg.h )
      3. 接受一个消息 msgrcv() ( sys/msg.h )
      4. 控制消息队列(这里用作删除)msgctl() ( sys/msg.h )
      5. 产生子线程 pthread_create ( pthread.h )
      6. 等待子线程 pthread_join ( pthread.h )
      7. 信号量函数 sem_****() ( semaphore.h )
    3. 效果:
    • 编译并运行: gcc -pthread queue.c -o queue && ./queue

    3.png

    1. 代码:见附录
  4. 利用 inux 的共享内存通信机制实现两个进程间的通信:

    1. 程序构成:
      1. 新建/打开共享内存
      2. 映射到本程序
      3. 使用两个信号量实现互斥访问
      4. 提高可用性:
        1. 可以使用一个消耗量进行互斥访问,这样可以多次读写
        2. 通过在共享内存头部加入计数器,可知道现有内存的余量(可直接将共享内存格式化为一个很大的结构体,在头部放入一些信息,然后后边是仍然一块连续可用的内存,结构体的好处是可以通过域名称访问,简化代码)
        3. 再用第二个信号量控制先读后写(在while之前)
    2. 关键函数:
      1. 打开/新建一个共享内存 shmget() ( sys/shm.h )
      2. 挂载(映射)一块共享内存 shmat() ( sys/shm.h )
      3. 卸载一块共享内存 shmdt() ( sys/shm.h )
      4. 控制一块共享内存(这里用作删除) shctl() ( sys/shm.h )
      5. 打开信号量函数 sem_****() ( semaphore.h )
    3. 效果:
    • 编译并运行:
      • gcc -pthread 4-sender.c -o 4s && ./4s
      • gcc -pthread 4-receiver.c -o 4r && ./4r

    4.png

    1. 代码:见附录

附录:

  1. 模拟 shell 代码 shell.c
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/wait.h>

char **split(char * input, const char div, int * amount) {
	int count = 0;
	char c_last = div;
	int len = 0;

	for(int i  = 0; *(input+i) != '\0'; i++) {
		if(*(input+i) == div && c_last != div){ count++;}
		c_last = *(input+i);
		len ++;
	}
	*(amount) = count+1; // para amount
	char ** array = (char **)malloc((count+2)*sizeof(char*));

	int cu_para = 0;

	c_last = div;
	for(int i  = 0; i < len; i++) {
		if(*(input+i) != div && c_last == div){ //para[0]
			array[cu_para] = input+i;
		} else if(*(input+i) == div && c_last != div){ 
			*(input+i) = '\0';
			cu_para ++;
			c_last = div;
			continue;
		}
		c_last = *(input+i);
	}
	return array;
}


int main(int argc, char *argv[]) {

char cmd[20];

char **sep_cmd;

int a; //amount of arguments

for(int i = 1; i < argc; i++) {
	printf("%s\n",argv[i]);
}

while(1) {
	// say hello
	printf("\033[33mstupidUser_(┐「ε:)_\033[0m $ ");

	// Read command
	cmd[0] = '\0';
	scanf("%[^\n]", cmd);
	char c;
	while ((c = getchar()) != EOF && c != '\n');
	if(cmd[0] == '\0') continue;

	// modify string
	sep_cmd = split(cmd, ' ', &a);

	// exit?
	if(!strcmp(sep_cmd[0], "exit")) {
		printf("\nBye~(´・ω・`(´・ω・`(´・ω・`(´・ω・`\n\n");
		break;
	}

	// exec command
	pid_t id = fork();
    if(id < 0){
        perror("fork");
    }else if(id == 0){//child run cmd
        execvp(sep_cmd[0], sep_cmd);
        exit(1);
    }else{
        int status = 0;
        pid_t ret = waitpid(id,&status,0);
        if(ret > 0 && WIFEXITED(status)){
			if(status != 0)
			printf("Command not found\n");
        }else{
            perror("waitpid");
        }
    }

}

return 0;

}
  1. 管道通信代码 代码 pipe.c
#include<stdio.h>  
#include<sys/types.h>  
#include<sys/wait.h>  
#include<unistd.h>  
#include<fcntl.h>  
#include<errno.h>  
#include<stdlib.h>  
#include<string.h>  
#include <semaphore.h> // POSIX

int main()  
{  
    // Create pipe
    int _pipe[2];  
    if(pipe(_pipe)==-1)  
    {  
        printf("pipe error\n");  
        return 1;  
    }  

    // Config pipe
    int ret;  
    int count=0;  
    int flag=fcntl(_pipe[1],F_GETFL);  
    fcntl(_pipe[1],F_SETFL,flag | O_NONBLOCK);  
    flag=fcntl(_pipe[0],F_GETFL);  
    fcntl(_pipe[0],F_SETFL,flag | O_NONBLOCK);  

    // Simplify coding
    int *tx = &_pipe[1];
    int *rx = &_pipe[0];

    // Create sem
    sem_t * key = sem_open("aeonni_sem", O_CREAT, 0644, 1);



    // Generate Children procs
    pid_t pid1 = fork();
    if(pid1 < 0){
        perror("fork1");
    }else if(pid1 == 0){ // child 1 run
// ******************** child 1 code start *****************
        
        printf("Child 1 start\n");
        while(1)  
        {  
            int sem = sem_wait(key);
            ret=write(*tx,"A",1);  
            if(ret==-1)  
            {  
                printf("Child 1 error %s\n",strerror(errno));
                sem_post(key);
                break;  
            }  
            count++;  
            sem_post(key);
        }  
        printf("count=%d\n",count); 
        
        exit(0);
// ******************** child 1 code end *******************
    }else{
        pid_t pid2 = fork();
        if(pid2 < 0){
            perror("fork2");
        }else if(pid2 == 0){ // child 2 run
// ******************** child 2 code start *****************
        printf("Child 2 start\n");
        while(1)  
        {  
            int sem = sem_wait(key);
            ret=write(*tx,"B",1);  
            if(ret==-1)  
            {  
                printf("Child 2 error %s\n",strerror(errno));
                sem_post(key);
                break;  
            }  
            count++;  
            sem_post(key);
        }  
        printf("count=%d\n",count); 
        
        exit(0);
// ******************** child 2 code end *******************
        }else{ 
            pid_t pid3 = fork();
            if(pid3 < 0){
                perror("fork2");
            }else if(pid3 == 0){ // child 3 run
// ******************** child 3 code start *****************
        printf("Child 3 start\n");
        while(1)  
        {  
            int sem = sem_wait(key);
            ret=write(*tx,"C",1);  
            if(ret==-1)  
            {  
                printf("Child 3 error %s\n",strerror(errno));
                sem_post(key);
                break;  
            }  
            count++;  
            sem_post(key);
        }  
        printf("count=%d\n",count); 
        
        exit(0);
// ******************** child 3 code end *******************
            }else{ // main


                int status = 0;
                pid_t ret1 = waitpid(pid1,&status,0);
                pid_t ret2 = waitpid(pid2,&status,0);
                pid_t ret3 = waitpid(pid3,&status,0);

                char buffer[1000];
                memset(buffer, '\0', sizeof(buffer)); 
                ret = read(*rx, buffer, 1000);  
                if(ret==-1)  
                {  
                    printf("error %s\n",strerror(errno));
                }
                printf("%s\n",buffer);


            }
        }
    }
        
    sem_close(key);
    sem_unlink("aeonni_sem");


    return 0;  
}  
  1. 消息队列 代码 queue.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <semaphore.h> // POSIX
#include <pthread.h>
#define BUFFSIZE 128

typedef struct msg_type {
    long int my_msg_type;
    char text[BUFFSIZE];
}msg_t;

sem_t full;
sem_t empty;

void * sender(void * a) {

    msg_t data;
    int msgid;
    char buffer[BUFFSIZE];
    msgid = msgget((key_t)6666, 0666|IPC_CREAT);

    if(msgid==-1) {
        printf("err\n");
        exit(1);
    }
 
    while(1) {
        
        sem_wait(&empty);
        printf("\033[32mEnter Message: \033[0m");
        fgets(buffer, BUFFSIZE, stdin);
        data.my_msg_type = 1;
        strcpy(data.text, buffer);

        
        if(msgsnd(msgid, (void *)&data, BUFFSIZE, 0)==-1)
        {
            printf("err\n");
            exit(1);
        }
        sem_post(&full);

        if(!strcmp(buffer, "end\n"))
        {
            //printf("Waiting empty\n");
            sem_wait(&empty);
            //printf("receiving...\n");
            if(msgrcv(msgid, (void *)&data, BUFFSIZE, 0, 0) == -1)
            {
                printf("err\n");
                exit(1);
            }

            printf("\033[32mReceived: %s\033[0m\n", data.text);
            sem_post(&full);
            break;
        }
    }
    if(msgctl(msgid, IPC_RMID, 0)==-1)
    {
        printf("err\n");
        exit(1);
    }
    exit(1);
}

void * receiver(void * a) {

    int msgid;
    msg_t data;
    long int msg_to_receive = 0;

    // 设置消息队列:

    msgid = msgget((key_t)6666,0666|IPC_CREAT);

    if(msgid == -1)
    {
        printf("msgget error !\n");
        exit(1);
    }

    //3 然后,接收消息队列中的消息直到遇到一个end消息。最后,消息队列被删除:

    while(1)
    {
        sem_wait(&full);
        if(msgrcv(msgid, (void *)&data, BUFFSIZE, msg_to_receive, 0) == -1)
        {
            printf("err\n");
            exit(1);
        }
        
        printf("\033[33mReceived: \033[0m%s", data.text);
        if(!strcmp(data.text, "end\n"))
        {
            
            data.my_msg_type = 1;
            strcpy(data.text, "over");
            
            if(msgsnd(msgid, (void *)&data, BUFFSIZE, 0)==-1)
            {
                printf("err\n");
                exit(1);
            }
            
            printf("\033[33mSend: over\033[0m\n");
            sem_post(&empty);
            sem_wait(&full);
            break;
        }
        sem_post(&empty);
        
    }
    exit(1);
}

int main() {

    pthread_t senderID; 
    pthread_t receiverID;

    sem_init(&full,0,0);
    sem_init(&empty,0,1);


    pthread_create(&senderID,NULL,sender,NULL);
    pthread_create(&receiverID,NULL,receiver,NULL);

    pthread_join(senderID,NULL);
    pthread_join(receiverID,NULL);

    sem_destroy(&full);
    sem_destroy(&empty);

    return 0;
}
  1. 共享内存 代码 4-receiver.c4-sender.c
    • 4-receiver.c
    #include <sys/sem.h>
    #include <stdlib.h>
    #include <unistd.h>
    #include <stdio.h>
    #include <string.h>
    #include <sys/types.h>
    #include <fcntl.h>  
    #include <sys/ipc.h>
    #include <semaphore.h> // POSIX
    #include <sys/shm.h>
    #define BUFFSIZE 1024
    
    
    
    int main() {
    
        sem_t * do_read = sem_open("aeonni_sem_r", O_CREAT, 0644, 0);
        sem_t * do_write = sem_open("aeonni_sem_s", O_CREAT, 0644, 1);
    
        int shm = shmget((key_t)5555, BUFFSIZE, IPC_CREAT | 0666);
        char * mem = (char*)shmat(shm, NULL, 0);
    
        while(1) {
            sem_wait(do_read);
            puts(mem);
            if(!strcmp(mem,"end\n")) {
                memset(mem,0,BUFFSIZE);
                strcpy(mem,"over");
                sem_post(do_write);
                break;
            }else {
                memset(mem,0,BUFFSIZE);
            }
            sem_post(do_write);
        }
    
    
        shmdt((void *)mem);
        sem_close(do_read);
        sem_close(do_write);
        sem_unlink("aeonni_sem_r");
        sem_unlink("aeonni_sem_s");
        return 0;
    }
    
    • 4-sender.c
    #include <sys/sem.h>
    #include <stdlib.h>
    #include <unistd.h>
    #include <stdio.h>
    #include <string.h>
    #include <sys/types.h>
    #include <fcntl.h>  
    #include <sys/ipc.h>
    #include <semaphore.h> // POSIX
    #include <sys/shm.h>
    #define BUFFSIZE 1024
    
    
    
    int main() {
    
        sem_t * do_read = sem_open("aeonni_sem_r", O_CREAT, 0644, 0);
        sem_t * do_write = sem_open("aeonni_sem_s", O_CREAT, 0644, 1);
    
        int shm = shmget((key_t)5555, BUFFSIZE, IPC_CREAT | 0666);
        char * mem = (char*)shmat(shm, NULL, 0);
    
        char buffer[BUFFSIZE];
    
        while(1) {
            sem_wait(do_write);
            printf("\033[32mEnter Message: \033[0m");
            fgets(buffer, BUFFSIZE, stdin);
    
            strcpy(mem, buffer);
            sem_post(do_read);
    
            if(!strcmp(buffer,"end\n")) {
                sem_wait(do_write);
                puts(mem);
                break;
            }
            
        }
        
    
    
        shmdt((void *)mem);
        if(shmctl(shm,IPC_RMID,NULL)==-1)
        {
            printf("Delete the shared memory error!\n" );
            exit(1);
        }
        sem_close(do_read);
        sem_close(do_write);
        sem_unlink("aeonni_sem_r");
        sem_unlink("aeonni_sem_s");
        return 0;
    }