Looyao's Blog

记录一些点滴

使用POSIX 信号量实现多线程生产者消费者模型

| Comments

上一篇写了使用POSIX 有名信号量、共享内存实现多进程生产者消费者模型, 这篇写一下多线程的版本. 多线程同步要比多进程同步方便得多, 因为多线程共用所在进程的地址空间. 多线程生产者消费者模型还是会经常用到的, 比如实现一个服务器处理客户端请求, 可以在主线程中处理数据包的收发, 可以使用select, poll, epoll等, 这个是很快的, 客户端读取完成后一般需要逻辑处理, 那么就可以创建一些工作线程处理逻辑, 这样主线程就相当于生产者, 工作线程相当于消费者, 每一个连接都是一个产品(每个包含读缓冲区, 写缓冲区, fd等).

下边是一个Demo, 实现生产者消费者模型.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>
#include <time.h>

#define MAX_PRODUCT 32

typedef struct product_s {
    int             p_idx;             /*生产者的index*/
    int             c_idx;             /*消费者的index*/
    short           init;              /*初始化标志*/
    char            data[MAX_PRODUCT]; /*用于保存产品*/
    pthread_mutex_t mutex;             /*互斥锁*/
    sem_t           sem;               /*信号量*/
} product;

//生产者线程入口函数
void *producer_thread(void *arg)
{
    product *p = (product *)arg;

    for ( ;; ) {
        pthread_mutex_lock(&p->mutex);

        srand(time(NULL));
        char c = rand() % 26 + 65; //随机产生一个字符
        if (p->p_idx > 31) { //超过31, 重新从0开始覆盖
            p->p_idx = p->p_idx % MAX_PRODUCT;
        }
        p->data[p->p_idx] = c;
        printf("create %c, idx %d\n", c, p->p_idx);
        p->p_idx++;

        pthread_mutex_unlock(&p->mutex);

        sem_post(&p->sem);
        sleep(1);
    }

    return (void *)0;
}

//消费者线程入口函数
void *consumer_thread(void *arg)
{
    product *p = (product *)arg;

    for ( ;; ) {
        sem_wait(&p->sem);

        pthread_mutex_lock(&p->mutex);
        if (p->c_idx > 31) {
            p->c_idx = p->c_idx % MAX_PRODUCT;
        }
        printf("get %c, idx %d\n", p->data[p->c_idx], p->c_idx);
        p->c_idx++;
        pthread_mutex_unlock(&p->mutex);
    }

    return (void *)0;
}

int main(void)
{
    product p;
    memset(&p, 0, sizeof(p));

    if (sem_init(&p.sem, 0, 0) != 0) {
        fprintf(stderr, "sem_init error\n");
        exit(1);
    }

    if (pthread_mutex_init(&p.mutex, NULL) != 0) {
        fprintf(stderr, "pthread_mutex_init error\n");
        exit(1);
    }


    pthread_attr_t attr;
    if (pthread_attr_init(&attr) != 0) {
        fprintf(stderr, "pthread_attr_init error\n");
        exit(1);
    }

    if (pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) != 0) {
        fprintf(stderr, "pthread_attr_setdetachstate error\n");
        exit(1);
    }

    pthread_t tid;

    if (pthread_create(&tid, &attr, consumer_thread, (void *)&p) != 0) {
        fprintf(stderr, "pthread_create error\n");
        exit(1);
    }

    if (pthread_create(&tid, &attr, producer_thread, (void *)&p) != 0) {
        fprintf(stderr, "pthread_create error\n");
        exit(1);
    }

    pause();

    sem_destroy(&p.sem);
    pthread_attr_destroy(&attr);
    return 0;
}

Comments