【OS】操作系统-实验三:进程通信
操作环境:
- 主机:MacOS 10.13.3
- 虚拟机软件:Parallels Desktop
- Linux:Ubuntu 16.04 Kernal 4.13.0-36-generic
实验要求:
- 实现一个模拟的 shell:
编写三个不同的程序 cmd1.c, cmd2.c, cmd3.c,每个程序的功能自定,分别编译成可执行文件 cmd1, cmd2, cmd3。 然后再编写一个程序,模拟 shell 程序的功能:能根据用户输入的字符串(表示相应的命令名),为相应的命令创建子进程并让它去执行相应的程序,而父进程则等待子进程结束,然后再等待接收下一条命令。如果接收到的命令为 exit,则父进程结束:如果接收到的命令是无效命令,则显示“Command not found“,继续等待。
- 实现一个管道通信程序:
由父进程创建一个管道,然后再创建 3 个子进程,并由这三个子进程利用管道与父进程之间进行通信:子进程发送信息,父进程等三个子进程全部发完消息后再接收信息。通信的具体内容可根据自己的需要随意设计,要求能试验阻塞型读写过程中的各种情况,测试管道的默认大小,并且要求利用 Posix 信号量机制实现进程间对管道的互斥访问。运行程序,观察各种情况下,进程实际读写的字节数以及进程阻塞唤醒的情况。
- 利用 linux 的消息队列通信机制实现两个线程间的通信:
编写程序创建两个线程:sender 线程和 receive 线程,其中 sender 线程运行函数 sender(),它创建一个消息队列,然后,循环等待用户通过终端输入串字符,将这串字符通过消息队列发送给 receiver 线程,直到用户输入“exit“”为止;最后,它向 recever 线程发送消息“end“,并且等待 recever 的应答,等到应答消息后,将接收到的应答信息显示在终端屏幕上,删除相关消息队列,结束程序的运行。Recever 线程运行 receive(),它通过消息队列接收来自 sender 的消息,将消息显示在终端屏幕上,直至收到内容为“end”的消息为止,此时,它向 sender 发送一个应答消息“over“,结束程序的运行。使用无名信号量实现两个线程之间的同步与互斥。
- 利用 linux 的共享内存通信机制实现两个进程间的通信:
编写程序 sender,它创建一个共享内存,然后等待用户通过终端输入一串字符,并将这串字符通过共享内存发送给 receiver;最后,它等待 receiver 的应答,收到应答消息后,将接收到的应答信息显示在终端屏幕上,删除共享内存,结束程序的运行。编写 receiver 程序,它通过共享内存接收来自 sender 的消息,将消息显示在终端屏幕上,然后再通过该共享内存向 sender 发送一个应答消息“over“,结束程序的运行。使用有名信号量或 SystemV 信号量实现两个进程对共享内存的互斥及同步使用。
操作说明:
实现一个模拟的 shell:
- 程序构成:
- 命令读取
- 分割参数
- 判断是否退出
- 调用子程序执行命令
- 关键函数:
- 分割字符串
char **split()
(自己写) - 子程序
pid_t fork()
( unistd.h ) - 执行 shell 命令
int execvp()
( unistd.h ) - 等待子程序
pid_t waitpid()
( sys/wait.h )
- 分割字符串
- 效果:
- 代码:见附录
- 程序构成:
实现一个管道通信程序:
- 程序构成:
- 建立 Pipe 管道:管道有两条,分别进行读写操作。
- 由 fork() 函数生成3个子进程,子进程同时独立地向 Pipe 发送单一字符'A','B'或'C',等到 Pipe 已满,无法继续写入时退出并显示自己写入的字符数。
- 主程序等待子程序结束,从 Pipe 中读取前 1000 个字节并显示。
- 关键函数:
- 创建管道函数
pipe()
- 设置函数
fcntl()
( fcntl.h ) - 信号量函数
sem_****()
( semaphore.h )
- 创建管道函数
- 效果:
- 编译并运行:
gcc -pthread pipe.c -o pipe && ./pipe
- 得到 Pipe 的默认大小为
65536B
- 代码:见附录
- 程序构成:
利用 linux 的消息队列通信机制实现两个线程间的通信:
- 程序构成:
- 新建两个函数,分别为:
void * sender(void * a)
void * receiver(void * a)
- 将消息放入一个结构体中,进行发送与接收
- 使用两个信号量来保证其互斥及同步
- 在主程序中使用
pthread_create
调用上面的两个函数,使他们运行 - 使用
pthread_join
等待期运行完毕
- 关键函数:
- 打开/新建一个消息队列
msgget()
( sys/msg.h ) - 发送一个消息
msgsnd()
( sys/msg.h ) - 接受一个消息
msgrcv()
( sys/msg.h ) - 控制消息队列(这里用作删除)
msgctl()
( sys/msg.h ) - 产生子线程
pthread_create
( pthread.h ) - 等待子线程
pthread_join
( pthread.h ) - 信号量函数
sem_****()
( semaphore.h )
- 打开/新建一个消息队列
- 效果:
- 编译并运行:
gcc -pthread queue.c -o queue && ./queue
- 代码:见附录
- 程序构成:
利用 inux 的共享内存通信机制实现两个进程间的通信:
- 程序构成:
- 新建/打开共享内存
- 映射到本程序
- 使用两个信号量实现互斥访问
- 提高可用性:
- 可以使用一个消耗量进行互斥访问,这样可以多次读写
- 通过在共享内存头部加入计数器,可知道现有内存的余量(可直接将共享内存格式化为一个很大的结构体,在头部放入一些信息,然后后边是仍然一块连续可用的内存,结构体的好处是可以通过域名称访问,简化代码)
- 再用第二个信号量控制先读后写(在while之前)
- 关键函数:
- 打开/新建一个共享内存
shmget()
( sys/shm.h ) - 挂载(映射)一块共享内存
shmat()
( sys/shm.h ) - 卸载一块共享内存
shmdt()
( sys/shm.h ) - 控制一块共享内存(这里用作删除)
shctl()
( sys/shm.h ) - 打开信号量函数
sem_****()
( semaphore.h )
- 打开/新建一个共享内存
- 效果:
- 编译并运行:
gcc -pthread 4-sender.c -o 4s && ./4s
gcc -pthread 4-receiver.c -o 4r && ./4r
- 代码:见附录
- 程序构成:
附录:
- 模拟 shell 代码
shell.c
:
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/wait.h>
char **split(char * input, const char div, int * amount) {
int count = 0;
char c_last = div;
int len = 0;
for(int i = 0; *(input+i) != '\0'; i++) {
if(*(input+i) == div && c_last != div){ count++;}
c_last = *(input+i);
len ++;
}
*(amount) = count+1; // para amount
char ** array = (char **)malloc((count+2)*sizeof(char*));
int cu_para = 0;
c_last = div;
for(int i = 0; i < len; i++) {
if(*(input+i) != div && c_last == div){ //para[0]
array[cu_para] = input+i;
} else if(*(input+i) == div && c_last != div){
*(input+i) = '\0';
cu_para ++;
c_last = div;
continue;
}
c_last = *(input+i);
}
return array;
}
int main(int argc, char *argv[]) {
char cmd[20];
char **sep_cmd;
int a; //amount of arguments
for(int i = 1; i < argc; i++) {
printf("%s\n",argv[i]);
}
while(1) {
// say hello
printf("\033[33mstupidUser_(┐「ε:)_\033[0m $ ");
// Read command
cmd[0] = '\0';
scanf("%[^\n]", cmd);
char c;
while ((c = getchar()) != EOF && c != '\n');
if(cmd[0] == '\0') continue;
// modify string
sep_cmd = split(cmd, ' ', &a);
// exit?
if(!strcmp(sep_cmd[0], "exit")) {
printf("\nBye~(´・ω・`(´・ω・`(´・ω・`(´・ω・`\n\n");
break;
}
// exec command
pid_t id = fork();
if(id < 0){
perror("fork");
}else if(id == 0){//child run cmd
execvp(sep_cmd[0], sep_cmd);
exit(1);
}else{
int status = 0;
pid_t ret = waitpid(id,&status,0);
if(ret > 0 && WIFEXITED(status)){
if(status != 0)
printf("Command not found\n");
}else{
perror("waitpid");
}
}
}
return 0;
}
- 管道通信代码 代码
pipe.c
:
#include<stdio.h>
#include<sys/types.h>
#include<sys/wait.h>
#include<unistd.h>
#include<fcntl.h>
#include<errno.h>
#include<stdlib.h>
#include<string.h>
#include <semaphore.h> // POSIX
int main()
{
// Create pipe
int _pipe[2];
if(pipe(_pipe)==-1)
{
printf("pipe error\n");
return 1;
}
// Config pipe
int ret;
int count=0;
int flag=fcntl(_pipe[1],F_GETFL);
fcntl(_pipe[1],F_SETFL,flag | O_NONBLOCK);
flag=fcntl(_pipe[0],F_GETFL);
fcntl(_pipe[0],F_SETFL,flag | O_NONBLOCK);
// Simplify coding
int *tx = &_pipe[1];
int *rx = &_pipe[0];
// Create sem
sem_t * key = sem_open("aeonni_sem", O_CREAT, 0644, 1);
// Generate Children procs
pid_t pid1 = fork();
if(pid1 < 0){
perror("fork1");
}else if(pid1 == 0){ // child 1 run
// ******************** child 1 code start *****************
printf("Child 1 start\n");
while(1)
{
int sem = sem_wait(key);
ret=write(*tx,"A",1);
if(ret==-1)
{
printf("Child 1 error %s\n",strerror(errno));
sem_post(key);
break;
}
count++;
sem_post(key);
}
printf("count=%d\n",count);
exit(0);
// ******************** child 1 code end *******************
}else{
pid_t pid2 = fork();
if(pid2 < 0){
perror("fork2");
}else if(pid2 == 0){ // child 2 run
// ******************** child 2 code start *****************
printf("Child 2 start\n");
while(1)
{
int sem = sem_wait(key);
ret=write(*tx,"B",1);
if(ret==-1)
{
printf("Child 2 error %s\n",strerror(errno));
sem_post(key);
break;
}
count++;
sem_post(key);
}
printf("count=%d\n",count);
exit(0);
// ******************** child 2 code end *******************
}else{
pid_t pid3 = fork();
if(pid3 < 0){
perror("fork2");
}else if(pid3 == 0){ // child 3 run
// ******************** child 3 code start *****************
printf("Child 3 start\n");
while(1)
{
int sem = sem_wait(key);
ret=write(*tx,"C",1);
if(ret==-1)
{
printf("Child 3 error %s\n",strerror(errno));
sem_post(key);
break;
}
count++;
sem_post(key);
}
printf("count=%d\n",count);
exit(0);
// ******************** child 3 code end *******************
}else{ // main
int status = 0;
pid_t ret1 = waitpid(pid1,&status,0);
pid_t ret2 = waitpid(pid2,&status,0);
pid_t ret3 = waitpid(pid3,&status,0);
char buffer[1000];
memset(buffer, '\0', sizeof(buffer));
ret = read(*rx, buffer, 1000);
if(ret==-1)
{
printf("error %s\n",strerror(errno));
}
printf("%s\n",buffer);
}
}
}
sem_close(key);
sem_unlink("aeonni_sem");
return 0;
}
- 消息队列 代码
queue.c
:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <semaphore.h> // POSIX
#include <pthread.h>
#define BUFFSIZE 128
typedef struct msg_type {
long int my_msg_type;
char text[BUFFSIZE];
}msg_t;
sem_t full;
sem_t empty;
void * sender(void * a) {
msg_t data;
int msgid;
char buffer[BUFFSIZE];
msgid = msgget((key_t)6666, 0666|IPC_CREAT);
if(msgid==-1) {
printf("err\n");
exit(1);
}
while(1) {
sem_wait(&empty);
printf("\033[32mEnter Message: \033[0m");
fgets(buffer, BUFFSIZE, stdin);
data.my_msg_type = 1;
strcpy(data.text, buffer);
if(msgsnd(msgid, (void *)&data, BUFFSIZE, 0)==-1)
{
printf("err\n");
exit(1);
}
sem_post(&full);
if(!strcmp(buffer, "end\n"))
{
//printf("Waiting empty\n");
sem_wait(&empty);
//printf("receiving...\n");
if(msgrcv(msgid, (void *)&data, BUFFSIZE, 0, 0) == -1)
{
printf("err\n");
exit(1);
}
printf("\033[32mReceived: %s\033[0m\n", data.text);
sem_post(&full);
break;
}
}
if(msgctl(msgid, IPC_RMID, 0)==-1)
{
printf("err\n");
exit(1);
}
exit(1);
}
void * receiver(void * a) {
int msgid;
msg_t data;
long int msg_to_receive = 0;
// 设置消息队列:
msgid = msgget((key_t)6666,0666|IPC_CREAT);
if(msgid == -1)
{
printf("msgget error !\n");
exit(1);
}
//3 然后,接收消息队列中的消息直到遇到一个end消息。最后,消息队列被删除:
while(1)
{
sem_wait(&full);
if(msgrcv(msgid, (void *)&data, BUFFSIZE, msg_to_receive, 0) == -1)
{
printf("err\n");
exit(1);
}
printf("\033[33mReceived: \033[0m%s", data.text);
if(!strcmp(data.text, "end\n"))
{
data.my_msg_type = 1;
strcpy(data.text, "over");
if(msgsnd(msgid, (void *)&data, BUFFSIZE, 0)==-1)
{
printf("err\n");
exit(1);
}
printf("\033[33mSend: over\033[0m\n");
sem_post(&empty);
sem_wait(&full);
break;
}
sem_post(&empty);
}
exit(1);
}
int main() {
pthread_t senderID;
pthread_t receiverID;
sem_init(&full,0,0);
sem_init(&empty,0,1);
pthread_create(&senderID,NULL,sender,NULL);
pthread_create(&receiverID,NULL,receiver,NULL);
pthread_join(senderID,NULL);
pthread_join(receiverID,NULL);
sem_destroy(&full);
sem_destroy(&empty);
return 0;
}
- 共享内存 代码
4-receiver.c
、4-sender.c
:4-receiver.c
#include <sys/sem.h> #include <stdlib.h> #include <unistd.h> #include <stdio.h> #include <string.h> #include <sys/types.h> #include <fcntl.h> #include <sys/ipc.h> #include <semaphore.h> // POSIX #include <sys/shm.h> #define BUFFSIZE 1024 int main() { sem_t * do_read = sem_open("aeonni_sem_r", O_CREAT, 0644, 0); sem_t * do_write = sem_open("aeonni_sem_s", O_CREAT, 0644, 1); int shm = shmget((key_t)5555, BUFFSIZE, IPC_CREAT | 0666); char * mem = (char*)shmat(shm, NULL, 0); while(1) { sem_wait(do_read); puts(mem); if(!strcmp(mem,"end\n")) { memset(mem,0,BUFFSIZE); strcpy(mem,"over"); sem_post(do_write); break; }else { memset(mem,0,BUFFSIZE); } sem_post(do_write); } shmdt((void *)mem); sem_close(do_read); sem_close(do_write); sem_unlink("aeonni_sem_r"); sem_unlink("aeonni_sem_s"); return 0; }
4-sender.c
#include <sys/sem.h> #include <stdlib.h> #include <unistd.h> #include <stdio.h> #include <string.h> #include <sys/types.h> #include <fcntl.h> #include <sys/ipc.h> #include <semaphore.h> // POSIX #include <sys/shm.h> #define BUFFSIZE 1024 int main() { sem_t * do_read = sem_open("aeonni_sem_r", O_CREAT, 0644, 0); sem_t * do_write = sem_open("aeonni_sem_s", O_CREAT, 0644, 1); int shm = shmget((key_t)5555, BUFFSIZE, IPC_CREAT | 0666); char * mem = (char*)shmat(shm, NULL, 0); char buffer[BUFFSIZE]; while(1) { sem_wait(do_write); printf("\033[32mEnter Message: \033[0m"); fgets(buffer, BUFFSIZE, stdin); strcpy(mem, buffer); sem_post(do_read); if(!strcmp(buffer,"end\n")) { sem_wait(do_write); puts(mem); break; } } shmdt((void *)mem); if(shmctl(shm,IPC_RMID,NULL)==-1) { printf("Delete the shared memory error!\n" ); exit(1); } sem_close(do_read); sem_close(do_write); sem_unlink("aeonni_sem_r"); sem_unlink("aeonni_sem_s"); return 0; }