高级

使用信号量协调对共享资源的访问
使用同步可能会让代码执行变慢

基本思想:线程使用信号量操作来通知另一个线程某个条件已变为真

  1. 使用计数信号量来跟踪资源状态并通知其他线程
  2. 使用互斥锁来保护对资源的访问

两种典型例子:

  1. 生产者-消费者问题
  2. 读者-写者问题

生产者-消费者

常见的同步模式:

  1. 生产者等待空位,将项目插入缓冲区,并通知消费者。
  2. 消费者等待项目,从缓冲区中移除项目,并通知生产者。

例子:

需要一个互斥锁和两个计数信号量:

代码:

typedef struct {
    int *buf; /* Buffer array */
    int n; /* Maximum number of slots */
    int front; /* buf[(front+1)%n] is first item */
    int rear; /* buf[rear%n] is last item */
    sem_t mutex; /* Protects accesses to buf */
    sem_t slots; /* Counts available slots */
    sem_t items; /* Counts available items */
} sbuf_t;

void sbuf_init(sbuf_t *sp, int n);
void sbuf_deinit(sbuf_t *sp);
void sbuf_insert(sbuf_t *sp, int item);
int sbuf_remove(sbuf_t *sp);

void sbuf_init(sbuf_t *sp, int n) {
    sp->buf = calloc(n, sizeof(int));
    sp->n = n;                       /* Buffer holds max of n items */
    sp->front = sp->rear = 0;        /* Empty buffer iff front == rear */
    Sem_init(&sp->mutex, 0, 1);      /* Binary semaphore for locking */
    Sem_init(&sp->slots, 0, n);      /* Initially, buf has n empty slots */
    Sem_init(&sp->items, 0, 0);      /* Initially, buf has zero data items */
}
void sbuf_insert(sbuf_t *sp, int item) {
    P(&sp->slots);                          /* Wait for available slot */
    P(&sp->mutex);                          /* Lock the buffer */
    sp->buf[(++sp->rear)%(sp->n)] = item;   /* Insert the item */
    V(&sp->mutex);                          /* Unlock the buffer */
    V(&sp->items);                          /* Announce available item */
}

int sbuf_remove(sbuf_t *sp) {
    int item;
    P(&sp->items);                          /* Wait for available item */
    P(&sp->mutex);                          /* Lock the buffer */
    item = sp->buf[(++sp->front)%(sp->n)];  /* Remove the item */
    V(&sp->mutex);                          /* Unlock the buffer */
    V(&sp->slots);                          /* Announce available slot */
    return item;
}

读写问题

对互斥问题的泛化

问题描述:

这种情况在实际系统中经常发生,例如:

第一类读者-写者问题(偏向读者)

任何读者不应被阻塞,除非一个写者已经被授予使用对象的权限。
一个在等待的写者之后到达的读者优先于写者。

代码:

思路:

  1. 根据读计数来判断是否为首次读和最后读
  2. 首次读:
  3. 获取写的互斥锁,释放读的互斥锁
  4. 等待写互斥锁释放后,获取写的互斥锁
  5. 后续读:
  6. 不断进行读行为,期间所有的写都被阻塞
  7. 最后读:
  8. 释放写的互斥锁
int readcnt;    /* Initially = 0 */
sem_t mutex, w; /* Both initially = 1 */

void reader(void) {
    while (1) {
	    // 两种情况:
	    // 1. 首次读, 则获取互斥锁。
	    // 2. 后续读, 则阻塞
        P(&mutex); 
        readcnt++;  
        // 第一次读, 判断此时是否正在写,若写则等待写入完成
        // 若是没有写,则获取写的互斥锁,阻塞写
        if (readcnt == 1) {  
	       P(&w); 
        }
        V(&mutex); // 此时,其他阻塞的读行为可以继续


        P(&mutex);  // 此时,若是有其他读行为,则阻塞
        
        readcnt--;          
        // 如果为最后一个读,则归还写的互斥锁
        if (readcnt == 0) 
            V(&w);
        V(&mutex);
    }
}
void writer(void) {
    while (1) {
        P(&w);
        V(&w);
    }
}

第二类读者-写者问题(偏向写者)

在这两种情况下,都可能发生饥饿(线程无限期等待)的情况。

预线程并发服务器

#define NTHREADS  4
#define SBUFSIZE  16
void echo_cnt(int connfd);
void *thread(void *vargp);
sbuf_t sbuf;
int main(int argc, char **argv) {
    int i, listenfd, connfd;
    socklen_t clientlen;
    struct sockaddr_storage clientaddr;
    pthread_t tid;
    if (argc != 2) {
        fprintf(stderr, "usage: %s <port>\n", argv[0]);
        exit(0);
    }
    listenfd = Open_listenfd(argv[1]);
    sbuf_init(&sbuf, SBUFSIZE);
    
    for (i = 0; i < NTHREADS; i++)
        Pthread_create(&tid, NULL, thread, NULL);
       
    while (1) {
        clientlen = sizeof(struct sockaddr_storage);
        // 建立连接后, 生产者添加item, 若是没有slot,则阻塞等待
        connfd = Accept(listenfd, (SA *) &clientaddr, &clientlen);
        sbuf_insert(&sbuf, connfd);
    }

}

  

void *thread(void *vargp) {  
    Pthread_detach(pthread_self());
    while (1) {
	    // 获取item,若是没有item则等待
        int connfd = sbuf_remove(&sbuf); 
        echo_cnt(connfd); 
        Close(connfd);
    }
}
static int byte_cnt; 
static sem_t mutex;
static void init_echo_cnt(void) {
    Sem_init(&mutex, 0, 1);
    byte_cnt = 0;
}
void echo_cnt(int connfd) {
    int n;
    char buf[MAXLINE];
    rio_t rio;
    static pthread_once_t once = PTHREAD_ONCE_INIT;
    Pthread_once(&once, init_echo_cnt); // 所有线程只执行一次
    Rio_readinitb(&rio, connfd);    
    while((n = Rio_readlineb(&rio, buf, MAXLINE)) != 0) {
        P(&mutex); // 获取互斥锁,阻止竞争问题
        byte_cnt += n;
        printf("server received %d (%d total) bytes on fd %d\n",
           n, byte_cnt, connfd); 
        V(&mutex);
        Rio_writen(connfd, buf, n);
    }
}

关键概念:线程安全

线程安全函数: 如果一个函数在被多个并发线程重复调用时总能产生正确的结果,那么它就是线程安全的

线程不安全函数:

  1. 不保护共享变量的函数
    1. 这些函数直接访问或修改共享变量,而没有使用诸如锁或互斥体之类的同步机制
    2. 解决办法: 使用互斥锁。
  2. 跨多次调用保持状态的函数
    1. 这些函数维护内部状态,可能会被多个线程同时访问或修改,而没有适当的同步机制
    2. 解决方案: 使用外部状态, 用户自行维持
  3. 返回指向静态变量的指针的函数
    1. 这些函数返回指向所有线程共享的变量的指针,如果在没有同步的情况下同时访问可能会导致数据损坏或不一致
  4. 调用线程不安全函数的函数
    1. 这些函数本身可能是线程安全的,但如果它们调用其他属于类别 1、类别 2 或类别 3 的函数,由于间接访问共享资源,它们可能会变得线程不安全

可重入函数

定义:一个函数是可重入的,如果在被多个线程调用时,它没有访问任何共享变量。

竞争问题

竞争发生在程序的正确性取决于一个线程在另一个线程到达点 y 之前到达点 x 的情况

#define N 4
void *thread(void *vargp);
int main() {
    pthread_t tid[N];
    int i;
    for (i = 0; i < N; i++) {
	    Pthread_create(&tid[i], NULL, thread, &i);
    }
    for (i = 0; i < N; i++) {
	    Pthread_join(tid[i], NULL);
    }
    exit(0);
}
void *thread(void *vargp) {
    int myid = *((int *)vargp); 
    printf("Hello from thread %d\n", myid);
    return NULL;
}

竞争消除 很像JS中的var循环问题

#define N 4
void *thread(void *vargp);

int main() {
    pthread_t tid[N];
    int i, *ptr;
    for (i = 0; i < N; i++) {
        ptr = Malloc(sizeof(int));
        *ptr = i;
        Pthread_create(&tid[i], NULL, thread, ptr);
    }

    for (i = 0; i < N; i++)
        Pthread_join(tid[i], NULL);
    exit(0);
}
void *thread(void *vargp) {
    int myid = *((int *)vargp);
    Free(vargp);
    printf("Hello from thread %d\n", myid);
    return NULL;
}

死锁

如果一个进程在等待一个永远不会成真的条件,则称该进程处于死锁状态。

代码

// 0 1
// 1 0
// 线程2获取互斥锁2,阻塞在互斥锁1
// 线程1获取互斥锁1, 阻塞在互斥锁2
void *count(void *vargp) {
	int i;
	int id = (int) vargp;
	for (i = 0; i < NITERS; i++) {
		P(&mutex[id]); 
		P(&mutex[1-id]);
		cnt++;
		V(&mutex[id]); 
		V(&mutex[1-id]);
	 }
	return NULL;
}

int main() {
    pthread_t tid[2];
    Sem_init(&mutex[0], 0, 1); 
    Sem_init(&mutex[1], 0, 1);
    Pthread_create(&tid[0], NULL, count, (void *)0);
    Pthread_create(&tid[1], NULL, count, (void *)1);
    Pthread_join(tid[0], NULL);
    Pthread_join(tid[1], NULL);
    printf("cnt=%d\n", cnt);
    exit(0);
}

解决

// 保持互斥锁和线程ID一致性
void *count(void *vargp) {
	int i;
	int id = (int) vargp;
	for (i = 0; i < NITERS; i++) {
		P(&mutex[0]); 
		P(&mutex[1]);
		cnt++;
		V(&mutex[0]); 
		V(&mutex[1]);
	 }
	return NULL;
}

int main() {
    pthread_t tid[2];
    Sem_init(&mutex[0], 0, 1); 
    Sem_init(&mutex[1], 0, 1);
    Pthread_create(&tid[0], NULL, count, (void *)0);
    Pthread_create(&tid[1], NULL, count, (void *)1);
    Pthread_join(tid[0], NULL);
    Pthread_join(tid[1], NULL);
    printf("cnt=%d\n", cnt);
    exit(0);
}
转载请注明出处