线程同步之条件变量

互斥锁用于上锁,以保证任何时刻只有一个线程在临界区内执行。有时当线程获取到互斥锁后,还需要满足特定的条件才能继续执行,那么该线程可以等待在某个条件变量上。条件变量总是有一个互斥锁与之关联。

编程接口

条件变量相关的常用编程接口如下:

#include <pthread.h>

int pthread_cond_wait(pthread_cond_t *cptr, pthread_mutex_t *mptr);
int pthread_cond_signal(pthread_cond_t *cptr);
int pthread_cond_broadcast(pthread_cond_t *cptr);

int pthread_mutex_init(pthread_mutex_t *mptr, const pthread_mutexattr_t *attr);
int pthread_mutex_destroy(pthread_mutex_t *mptr);

int pthread_cond_init(pthread_cond_t *cptr, const pthread_condattr_t *attr);
int pthread_cond_destroy(pthread_cond_t *cptr);

给条件变量发送信号的代码大致如下:

struct {
    pthread_mutex_t mutex;
    pthread_cond_t cond;
    其他变量
} var = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, ...};

pthread_mutex_lock(&var.mutex);
设置条件为真
pthread_cond_signal(&var.cond);
pthread_mutex_unlock(&var.mutex);

测试条件并进入睡眠以等待该条件为真的代码大致如下:

pthread_mutex_lock(&var.mutex);
while (条件为假)
    pthread_cond_wait(&var.cond, &var.mutex);
pthread_mutex_unlock(&var.mutex);

生产者消费者问题

为简化问题,假设有一个大小为N的数组,开M个线程当作生产者,把第i个元素的值设为i,另外再开1个线程作为消费者,验证第i个元素的值是否为i。要求生产者和消费者同时运行。

首先,多个生产者同时访问公共资源,生产者之间需要同步,这里用互斥锁就行了。另一方面,只有当元素被生产者设置后消费者才能进行验证,因此消费者和生产者之间也需要同步,这种等待同步与生产者之间的竞争同步有所区别,用条件变量来实现。

#include <stdio.h>
#include <pthread.h>

#define N 1000000
#define M 5

int buf[N], cnt[M];
struct {
    pthread_mutex_t mutex;
    int idx;
} put = {PTHREAD_MUTEX_INITIALIZER};

struct {
    pthread_mutex_t mutex;
    pthread_cond_t cond;
    int nready;
} ready = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER};

void* produce(void *arg) {
    for (;;) {
        pthread_mutex_lock(&put.mutex);
        if (put.idx >= N) {
            pthread_mutex_unlock(&put.mutex);
            return NULL;
        }
        buf[put.idx] = put.idx;
        put.idx += 1;
        pthread_mutex_unlock(&put.mutex);

        pthread_mutex_lock(&ready.mutex);
        if (ready.nready == 0)
            pthread_cond_signal(&ready.cond);
        ready.nready += 1;
        pthread_mutex_unlock(&ready.mutex);

        *((int*)arg) += 1;
    }
}

void* consume(void *arg) {
    int i;
    for (i = 0; i < N; i++) {
        pthread_mutex_lock(&ready.mutex);
        while (ready.nready == 0)
            pthread_cond_wait(&ready.cond, &ready.mutex);
        ready.nready -= 1;
        pthread_mutex_unlock(&ready.mutex);

        if (buf[i] != i)
            printf("buf[%d]=%d\n", i, buf[i]);
    }
    return NULL;
}

int main() {
    int i;
    pthread_t tid_produce[M], tid_consume;

    pthread_setconcurrency(M+1);
    for (i = 0; i < M; i++)
        pthread_create(&tid_produce[i], NULL, produce, &cnt[i]);
    pthread_create(&tid_consume, NULL, consume, NULL);

    for (i = 0; i < M; i++) {
        pthread_join(tid_produce[i], NULL);
        printf("cnt[%d]=%d\n", i, cnt[i]);
    }
    pthread_join(tid_consume, NULL);

    return 0;
}
Table of Contents