高级
使用信号量协调对共享资源的访问
使用同步可能会让代码执行变慢
基本思想:线程使用信号量操作来通知另一个线程某个条件已变为真
- 使用计数信号量来跟踪资源状态并通知其他线程
- 使用互斥锁来保护对资源的访问
两种典型例子:
- 生产者-消费者问题
- 读者-写者问题
生产者-消费者
常见的同步模式:
- 生产者等待空位,将项目插入缓冲区,并通知消费者。
- 消费者等待项目,从缓冲区中移除项目,并通知生产者。
例子:
- 多媒体处理:
- 生产者创建MPEG视频帧,消费者渲染它们。
- 事件驱动的图形用户界面:
- 生产者检测鼠标点击、鼠标移动和键盘输入,并将相应的事件插入缓冲区。
- 消费者从缓冲区中检索事件并更新显示。
需要一个互斥锁和两个计数信号量:
- mutex:强制缓冲区的互斥访问。
- slots:计算缓冲区中可用的插槽数。
- items:计算缓冲区中可用的项目数。
代码:
typedef struct {
int *buf; /* Buffer array */
int n; /* Maximum number of slots */
int front; /* buf[(front+1)%n] is first item */
int rear; /* buf[rear%n] is last item */
sem_t mutex; /* Protects accesses to buf */
sem_t slots; /* Counts available slots */
sem_t items; /* Counts available items */
} sbuf_t;
void sbuf_init(sbuf_t *sp, int n);
void sbuf_deinit(sbuf_t *sp);
void sbuf_insert(sbuf_t *sp, int item);
int sbuf_remove(sbuf_t *sp);
void sbuf_init(sbuf_t *sp, int n) {
sp->buf = calloc(n, sizeof(int));
sp->n = n; /* Buffer holds max of n items */
sp->front = sp->rear = 0; /* Empty buffer iff front == rear */
Sem_init(&sp->mutex, 0, 1); /* Binary semaphore for locking */
Sem_init(&sp->slots, 0, n); /* Initially, buf has n empty slots */
Sem_init(&sp->items, 0, 0); /* Initially, buf has zero data items */
}
void sbuf_insert(sbuf_t *sp, int item) {
P(&sp->slots); /* Wait for available slot */
P(&sp->mutex); /* Lock the buffer */
sp->buf[(++sp->rear)%(sp->n)] = item; /* Insert the item */
V(&sp->mutex); /* Unlock the buffer */
V(&sp->items); /* Announce available item */
}
int sbuf_remove(sbuf_t *sp) {
int item;
P(&sp->items); /* Wait for available item */
P(&sp->mutex); /* Lock the buffer */
item = sp->buf[(++sp->front)%(sp->n)]; /* Remove the item */
V(&sp->mutex); /* Unlock the buffer */
V(&sp->slots); /* Announce available slot */
return item;
}读写问题
对互斥问题的泛化
问题描述:
- 读者线程只读取对象。
- 写者线程修改对象。
- 写者必须独占访问对象。
- 无限数量的读者可以同时访问对象。
这种情况在实际系统中经常发生,例如:
- 在线航空订票系统
- 多线程缓存Web代理
第一类读者-写者问题(偏向读者)
任何读者不应被阻塞,除非一个写者已经被授予使用对象的权限。
一个在等待的写者之后到达的读者优先于写者。
代码:
思路:
- 根据读计数来判断是否为首次读和最后读
- 首次读:
- 获取写的互斥锁,释放读的互斥锁
- 等待写互斥锁释放后,获取写的互斥锁
- 后续读:
- 不断进行读行为,期间所有的写都被阻塞
- 最后读:
- 释放写的互斥锁
int readcnt; /* Initially = 0 */
sem_t mutex, w; /* Both initially = 1 */
void reader(void) {
while (1) {
// 两种情况:
// 1. 首次读, 则获取互斥锁。
// 2. 后续读, 则阻塞
P(&mutex);
readcnt++;
// 第一次读, 判断此时是否正在写,若写则等待写入完成
// 若是没有写,则获取写的互斥锁,阻塞写
if (readcnt == 1) {
P(&w);
}
V(&mutex); // 此时,其他阻塞的读行为可以继续
P(&mutex); // 此时,若是有其他读行为,则阻塞
readcnt--;
// 如果为最后一个读,则归还写的互斥锁
if (readcnt == 0)
V(&w);
V(&mutex);
}
}
void writer(void) {
while (1) {
P(&w);
V(&w);
}
}第二类读者-写者问题(偏向写者)
- 一旦写者准备写入,它应尽快执行写操作。
- 一个在写者之后到达的读者必须等待,即使写者也在等待。
在这两种情况下,都可能发生饥饿(线程无限期等待)的情况。
预线程并发服务器
#define NTHREADS 4
#define SBUFSIZE 16
void echo_cnt(int connfd);
void *thread(void *vargp);
sbuf_t sbuf;
int main(int argc, char **argv) {
int i, listenfd, connfd;
socklen_t clientlen;
struct sockaddr_storage clientaddr;
pthread_t tid;
if (argc != 2) {
fprintf(stderr, "usage: %s <port>\n", argv[0]);
exit(0);
}
listenfd = Open_listenfd(argv[1]);
sbuf_init(&sbuf, SBUFSIZE);
for (i = 0; i < NTHREADS; i++)
Pthread_create(&tid, NULL, thread, NULL);
while (1) {
clientlen = sizeof(struct sockaddr_storage);
// 建立连接后, 生产者添加item, 若是没有slot,则阻塞等待
connfd = Accept(listenfd, (SA *) &clientaddr, &clientlen);
sbuf_insert(&sbuf, connfd);
}
}
void *thread(void *vargp) {
Pthread_detach(pthread_self());
while (1) {
// 获取item,若是没有item则等待
int connfd = sbuf_remove(&sbuf);
echo_cnt(connfd);
Close(connfd);
}
}static int byte_cnt;
static sem_t mutex;
static void init_echo_cnt(void) {
Sem_init(&mutex, 0, 1);
byte_cnt = 0;
}
void echo_cnt(int connfd) {
int n;
char buf[MAXLINE];
rio_t rio;
static pthread_once_t once = PTHREAD_ONCE_INIT;
Pthread_once(&once, init_echo_cnt); // 所有线程只执行一次
Rio_readinitb(&rio, connfd);
while((n = Rio_readlineb(&rio, buf, MAXLINE)) != 0) {
P(&mutex); // 获取互斥锁,阻止竞争问题
byte_cnt += n;
printf("server received %d (%d total) bytes on fd %d\n",
n, byte_cnt, connfd);
V(&mutex);
Rio_writen(connfd, buf, n);
}
}关键概念:线程安全
线程安全函数: 如果一个函数在被多个并发线程重复调用时总能产生正确的结果,那么它就是线程安全的
线程不安全函数:
- 不保护共享变量的函数
- 这些函数直接访问或修改共享变量,而没有使用诸如锁或互斥体之类的同步机制
- 解决办法: 使用互斥锁。
- 跨多次调用保持状态的函数
- 这些函数维护内部状态,可能会被多个线程同时访问或修改,而没有适当的同步机制
- 解决方案: 使用外部状态, 用户自行维持
- 返回指向静态变量的指针的函数
- 这些函数返回指向所有线程共享的变量的指针,如果在没有同步的情况下同时访问可能会导致数据损坏或不一致
- 调用线程不安全函数的函数
- 这些函数本身可能是线程安全的,但如果它们调用其他属于类别 1、类别 2 或类别 3 的函数,由于间接访问共享资源,它们可能会变得线程不安全
可重入函数
定义:一个函数是可重入的,如果在被多个线程调用时,它没有访问任何共享变量。
- 可重入函数是线程安全函数的一个重要子集。
- 它们不需要使用同步操作。
- 使一个属于类别 2 的函数成为线程安全的唯一方法是使其可重入(例如,rand_r 函数)
竞争问题
竞争发生在程序的正确性取决于一个线程在另一个线程到达点 y 之前到达点 x 的情况
#define N 4
void *thread(void *vargp);
int main() {
pthread_t tid[N];
int i;
for (i = 0; i < N; i++) {
Pthread_create(&tid[i], NULL, thread, &i);
}
for (i = 0; i < N; i++) {
Pthread_join(tid[i], NULL);
}
exit(0);
}
void *thread(void *vargp) {
int myid = *((int *)vargp);
printf("Hello from thread %d\n", myid);
return NULL;
}竞争消除 很像JS中的var循环问题
#define N 4
void *thread(void *vargp);
int main() {
pthread_t tid[N];
int i, *ptr;
for (i = 0; i < N; i++) {
ptr = Malloc(sizeof(int));
*ptr = i;
Pthread_create(&tid[i], NULL, thread, ptr);
}
for (i = 0; i < N; i++)
Pthread_join(tid[i], NULL);
exit(0);
}
void *thread(void *vargp) {
int myid = *((int *)vargp);
Free(vargp);
printf("Hello from thread %d\n", myid);
return NULL;
}死锁
如果一个进程在等待一个永远不会成真的条件,则称该进程处于死锁状态。
代码
// 0 1
// 1 0
// 线程2获取互斥锁2,阻塞在互斥锁1
// 线程1获取互斥锁1, 阻塞在互斥锁2
void *count(void *vargp) {
int i;
int id = (int) vargp;
for (i = 0; i < NITERS; i++) {
P(&mutex[id]);
P(&mutex[1-id]);
cnt++;
V(&mutex[id]);
V(&mutex[1-id]);
}
return NULL;
}
int main() {
pthread_t tid[2];
Sem_init(&mutex[0], 0, 1);
Sem_init(&mutex[1], 0, 1);
Pthread_create(&tid[0], NULL, count, (void *)0);
Pthread_create(&tid[1], NULL, count, (void *)1);
Pthread_join(tid[0], NULL);
Pthread_join(tid[1], NULL);
printf("cnt=%d\n", cnt);
exit(0);
}解决
// 保持互斥锁和线程ID一致性
void *count(void *vargp) {
int i;
int id = (int) vargp;
for (i = 0; i < NITERS; i++) {
P(&mutex[0]);
P(&mutex[1]);
cnt++;
V(&mutex[0]);
V(&mutex[1]);
}
return NULL;
}
int main() {
pthread_t tid[2];
Sem_init(&mutex[0], 0, 1);
Sem_init(&mutex[1], 0, 1);
Pthread_create(&tid[0], NULL, count, (void *)0);
Pthread_create(&tid[1], NULL, count, (void *)1);
Pthread_join(tid[0], NULL);
Pthread_join(tid[1], NULL);
printf("cnt=%d\n", cnt);
exit(0);
}转载请注明出处