多线程并发

You are just a part of the machine, you have nothing special, now, get back to work.

由于并发的复杂性,我们需要详细地针对并发进行设计,本文将针对多线程编程过程当中的一些并发设计及模式进行总结。

注意事项

在进行多线程编程时,我们需要注意以下几点:

  • 追踪锁保护的资源(变量、文件等)
  • 确保正确上锁、解锁配对
  • 不要对资源上多把锁
  • 确保发送正确的条件
  • 区分单播/广播的场景
  • 线程之间需要优先级吗?因为多线程可能很难控制哪一个线程先执行

生产者/消费者模式(P/C)

生产者消费者模式也成为有限缓冲模式,是多进程同步的经典模式。这个模式很简单,有若干数据的生产者和消费者,共享固定宽度的缓冲区,生产者产生数据并填入缓冲区,消费者从缓冲区取出数据进行使用。

graph LR
    node((生产者))
    node1["缓冲区"]
    node2((消费者))
    node--生产数据-->node1
    node1--消费数据-->node2

P/C模式关键问题

PC模式的一个关键问题是,如何协调生产速度和消费速度,确保生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。那么如何保证上面这一点呢,我们可以采用如下手段:

轮询方式(低效)

在轮询方式中,生产者和消费者在执行操作前,先判断是否符合操作条件,如果符合,就执行操作,否则就不执行,这个过程是很低效的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// main
for i=0..10
producers[i] = fork(insert_data, NULL)
consumer = fork(get_data, my_list)

//生产者: insert_data
lock(m)
if my_list.not_full -> insert data //空间未满,插入数据
else -> 释放锁,然后过一会儿尝试插入数据
unlock(m)

// consumer: get_data
lock(m)
if my_list.full -> print and clean my_list
else -> 释放锁,然后过一会儿等待buff满了再尝试 这个过程非常低效!!
unlock(m)

条件变量方式

在条件变量方式中,生产者和消费者通过条件变量建立联系,消费者在buffer没有满时等待,如果满了就被唤醒,处理buffer数据并清空;而生产者插入数据,如果buffer满了,就唤醒消费者。具体的条件变量的使用请参考条件变量

1
2
3
4
5
6
7
8
9
10
11
12
13
// 消费者
lock(m)
while(my_list.not_full) //使用while防止虚假唤醒
wait(m, list_full) //进入wait时自动释放锁,从wait中唤醒时自动获取锁
my_list.print and clear
unlock(m)

// 生产者
lock(m)
my_list.insert()
if my_list is full
signal(list_full) //发送唤醒信号
unlock(m)

上面的代码中,我们使用了while(my_list.not_full)而不是if(my_list.not_full),原因如下:

为了支持多个消费者,当一个消费者被唤醒后到它实际获得锁的这个时间,可能有其他消费者先获得了锁,并处理清空了buff,如果此时不再判断buff是否为满,那么可能会出现处理空buff的情况。出现这个现象的根本原因是,我们无法保证被唤醒后的线程一定会抢到锁,也无法保证被唤醒后buff没有发生改变,所以我们需要while循环。这个现象叫做虚假唤醒。

pthread实现

主函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#include<stdio.h>
#include<stdlib.h>
#include<pthread.h>

#define BUF_SIZE 3

int buffer(BUF_SIZE); //共享空间
int add = 0; //在指定位置添加元素
int rem = 0; //在指定位置删除元素
int num = 0 //当前元素数目

pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t c_cons = PTHREAD_COND_INITIALIZER; //消费者等待cv
pthread_cond_t c_prod = PTHREAD_COND_INITIALIZER; //生产者等待cv

void *producer(void *param);
void *consumer(void *param);

int main(int argc, char** argv[]){
pthread_t tid1, tid2;
int i;
if(pthread(&tid, NULL, producer, NULL) != 0){
//错误处理
}

if(pthread(&tid, NULL, consumer, NULL) != 0){
//错误处理
}

pthread_join(tid1, NULL);
pthread_join(tid2, NULL);

}

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void *producer(void *param){
int i;
for(int i=1;i<20;i++){
pthread_mutex_lock (&m);
if(num > BUF_SIZE){
exit(1);
}
while(num == BUF_SIZE){
pthread_cond_wait(&c_prod, &m);
}
buffer[add] = i;
add = (add+1) % BUF_SIZE;
num++;
pthread_nutex_unlock(&m);

pthread_cond_signal (&c_cons);
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void consumer(void *param){
int i;
while(1){
pthread_mutex_lock (&m);
if(num < 0){
exit(1);
}
while(num == 0){
pthread_cond_wait(&c_cons, &m);
}
i = buffer(rem);
rem = (rem+1)%BUF_SIZE;
num--;
pthread_mutex_unlock (&m);
pthread_cond_signal (&c_prod);
}
}

读写模式

介绍

在读写模式中,同一时间我们可以有多个读者读取一个空间,只能有0个或者1个写者对空间进行写入。如果我们采用互斥锁,那么空间是一个0或1的状态,要么可以访问,要么不能访问,且同一时间只能由一个线程访问,这个对于多个读者的情况是非常低效的。我们希望读线程都能同时访问这个空间。在读写模式中,我们会遇到三种情况:

  1. buff既没有读者也没有写者使用,这种情况读写均可(resource_counter:0)
  2. buff只有读者使用,这种情况buff可读(resource_counter:>0)
  3. buff由写者占据,这种情况既不能读也不能由其他线程写(resource_counter:-1)

实现

这部分需要多理解一下

读者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//--------------进入临界区,下面这段代码实际上可以看作加一个大锁
Lock(counter_mutex)
while(resource_counter == -1)
Wait(counter_mutex, read_phase)
resource_counter++; //增加读者,可以视为读者一个一个有序进入图书馆
Unlock(counter_mutex)
//--------------进入临界区,实际上可以看作加一个大锁

// read data

//--------------走出临界区,释放这个大锁
Lock(counter_mutex)
resource_counter--;
if(resource_counter == 0) // 没有读者也没有写者
Signal(write_phase) // 只能通知一个写者进行写操作
Unlock(counter_mutex)
//--------------走出临界区,释放这个大锁

写者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//写前处理
Lock(counter_mutex)
while(resource_counter != 0)
Wait(counter_mutex, write_phase) //等待write_phase 到来
resource_counter = -1; //开始写
Unlock(counter_mutex)

// write data

//写完后处理
Lock(counter_mutex)
resource_counter = 0;
Broadcast(read_phase) // 通知所有读者进行读操作
Signal(write_phase) // 通知其他某一个写者进行写操作
// 此处我们无法控制到底是读者先读还是写者先写
Unlock(counter_mutex)

从上面的代码中我们可以看出,读写操作实际上不是在临界区进行的,而是依靠resource_counter这个变量进行辅助控制,同一时间只能有一个线程对resource_counter进行访问,但是有多个线程可以读取数据。

临界区结构

从读写模式中,我们可以抽象出基于代理变量的临界区保护结构,分别为进入临界区和退出临界区,代码如下:

1
2
3
4
5
6
Enter critical section              |    Exit critical section
Lock(mutex){ | Lock(mutex){
while(!predicate) | update predicate
wait(mutex, cond_var) | signal/broadcast
update predicate | }
}

老板/工人模式

在老板/工人模式中,有一个老板线程用于分配任务(指挥),多个工人线程用于执行具体的任务(执行)。在这种模式下,必须保证老板线程是高效的,如果老板的效率不高,那么有些工人就分配不到任务。在这个模式中,老板通过通信直接向工人们分发任务

不同的工作方式

老板直接通知工人指派任务

优点

工人不需要同步,因为任务是老板指派的。

缺点

老板必须跟踪每一个工人的进度

通过生产者/消费者模式指派任务

在这种方式中,老板是任务生产者,而工人是任务消费者,这样我们就将问题转换为了生产者/消费者问题。老板通过将任务放入固定空间的队列中,通知工人工作

优点

老板不需要了解工人工作的细节

缺点

工人们需要自己去同步

工人数设置

在老板/工人模式中,一个重要的问题就是我们需要多少工人,来实现良好的运作。工人不能太少也不能太多,取决于我们的需求。一个解决方案是设置一个线程池,对工人数量进行动态调整。同时,池子的规模也可以随需求增长。例如,我们可以参考vector的增长方式,按照指数型扩增vector。

优缺点

优点

简单

缺点

  • 线程池管理比较困难
  • 忽视局部性,当一个工人完成一个任务时,如果又来了一个类似的任务,那么他可以充分利用缓存来完成这个任务,但是在老板/工人模式中,他可能没得选择,只能去做别的任务。

流水线模式

在流水线中,每个工人只做特定的事情,熟练度很高。所以我们仿照流水线设计了流水线模式的程序。将一个大任务分解为若干步骤,然后每一个步骤由一些工人完成。如果我们能够令每个工人线程做特定的任务,那么就可以充分利用局部性,加快效率。当然这种改进也带来了负载均衡的问题,我们需要考虑每个任务需要多少工人,才能达到良好的均衡。流水线的效率取决于流水线中执行最慢的线程,所以我们要确保每个任务的平均时间能够达到一个微妙的平衡。

负载均衡

在将大任务分解为小任务后,我们需要对每个任务进行负载均衡,一个方法是使用线程池,动态调整工人的数目。

老板/工人模式和流水线模式效率比较

在任务不多的情况下,老板/工人模式效率比流水线模式高,但是在任务很多的情况下,使用流水线的效率会大大增加。(小作坊VS大工厂)

参考文献

0%