Looyao's Blog

记录一些点滴

使用POSIX 有名信号量、共享内存实现多进程生产者消费者模型

| Comments

工作后多进程程序写的不多, 大部分都是使用多线程, 多线程和多进程各有利弊, 比如多线程可以更好的利用多核, 但是数据同步时候的锁的设计也是问题, 并且锁多了也会有增加额外的开销. 多进程彼此相对独立, 不像多线程, 一个线程崩溃整个程序就over了, 比较麻烦的是进程间通信问题, 细节就不在讨论了. 下边主要写一下多进程使用POSIX有名信号量和共享内存实现生产者消费者模型.

首先设计一个结构, 保存数据

product.h
1
2
3
4
5
6
7
8
#define MAX_PRODUCT 32

typedef struct product_s {
        int   p_idx;             /*生产者的index*/
        int   c_idx;             /*消费者的index*/
        short init;              /*初始化标志*/
        char  data[MAX_PRODUCT]; /*用于保存产品*/
} product;

先实现生产者

producer.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
#include "product.h"

#include <stdio.h>
#include <string.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <semaphore.h>
#include <unistd.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/mman.h>
#include <time.h>

int main(void)
{
    //用于通知消费者进程, 可以消费了
    sem_t *sem = sem_open("/sem_test", O_CREAT, 0644, 0);
    if (sem == SEM_FAILED) {
        fprintf(stderr, "sem_open error\n");
        exit(1);
    }

    //用于等待消费者通知
    sem_t *sem1 = sem_open("/sem_test1", O_CREAT, 0644, 0);
    if (sem1 == SEM_FAILED) {
        fprintf(stderr, "sem_open error\n");
        exit(1);
    }


    //创建共享内存
    int fd = shm_open("/shm_product", O_CREAT | O_RDWR, 0644);
    if (fd < 0) {
        fprintf(stderr, "shm_open error\n");
        exit(1);
    }

    //刚创建的共享内存是空的, 需要预创建一些空间
    if (ftruncate(fd, sizeof(product)) == -1) {
        fprintf(stderr, "ftruncate error\n");
        exit(1);
    }

    //把创建的共享内存文件映射到内存
    product *p = mmap(NULL, sizeof(product), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);

    if (p == MAP_FAILED) {
        fprintf(stderr, "mmap error\n");
        exit(1);
    }

    //判断是否已经初始化过
    if (p->init != 1) {
        memset(p, 0, sizeof(product));
        p->init = 1;
    }

    for ( ;; ) {
        srand(time(NULL));
        char c = rand() % 26 + 65; //随机产生一个字符
        if (p->p_idx > 31) { //超过31, 重新从0开始覆盖
            p->p_idx = p->p_idx % MAX_PRODUCT;
        }
        p->data[p->p_idx] = c;
        printf("create %c, idx %d\n", c, p->p_idx);
        p->p_idx++;
        //通知消费者进程
        sem_post(sem);
        //等待消费者通知
        sem_wait(sem1);
    }

    munmap(p, sizeof(product));
    close(fd);
    sem_close(sem);
    sem_close(sem1);

    return 0;
}

实现消费者

consumer.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
#include "product.h"

#include <stdio.h>
#include <string.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <semaphore.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/mman.h>

int main(void)
{
    sem_t *sem = sem_open("/sem_test", O_CREAT, 0644, 0);
    if (sem == SEM_FAILED) {
        fprintf(stderr, "sem_open error\n");
        exit(1);
    }

    sem_t *sem1 = sem_open("/sem_test1", O_CREAT, 0644, 0);
    if (sem1 == SEM_FAILED) {
        fprintf(stderr, "sem_open error\n");
        exit(1);
    }

    int fd = shm_open("/shm_product", O_CREAT | O_RDWR, 0666);
    if (fd < 0) {
        fprintf(stderr, "shm_open error\n");
        exit(1);
    }

    if (ftruncate(fd, sizeof(product)) == -1) {
        fprintf(stderr, "ftruncate error\n");
        exit(1);
    }

    product *p = mmap(NULL, sizeof(product), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);

    if (p == MAP_FAILED) {
        fprintf(stderr, "mmap error\n");
        exit(1);
    }

    if (p->init != 1) {
        memset(p, 0, sizeof(product));
        p->init = 1;
    }

    for ( ;; ) {
        //等待生产者通知
        sem_wait(sem);
        //可以消费了
        if (p->c_idx > 31) {
            p->c_idx = p->c_idx % MAX_PRODUCT;
        }
        printf("get %c, idx %d\n", p->data[p->c_idx], p->c_idx);
        p->c_idx++;

        sleep(1);
        //通知生产者消费完成
        sem_post(sem1);
    }

    munmap(p, sizeof(product));
    close(fd);
    sem_close(sem);
    sem_close(sem1);

    return 0;
}

编译

gcc producer.c -o producer -lrt
gcc consumer.c -o consumer -lrt

运行测试, 打开两个终端, 一个运行生产者进程, 一个运行消费者进程. 生产者

bash$ ./producer 
create P, idx 0
create F, idx 1
create I, idx 2
create P, idx 3
create J, idx 4
create Y, idx 5
create I, idx 6
create P, idx 7
create O, idx 8
create C, idx 9
create R, idx 10
create A, idx 11

消费者

bash$ ./consumer 
get P, idx 0
get F, idx 1
get I, idx 2
get P, idx 3
get J, idx 4
get Y, idx 5
get I, idx 6
get P, idx 7
get O, idx 8
get C, idx 9
get R, idx 10
get A, idx 11

当然也可以一个生产者进程, 多个消费者进程.

shm_open创建的共享内存文件和sem_open创建的有名信号量都在/dev/shm下, /dev/shm下的文件在RAM中, 而非本地磁盘, 读取也相当于在内存中操作, 当然也比在本地磁盘上读写效率高很多.

Comments