我有很多把锁,却没有一把钥匙。

仔细想了想,还是将锁单独列出作为一篇博客而不是放在进程与线程中,因为这个问题有些繁琐,需要仔细进行一番解释。

简介

锁的功能

解决资源占用的问题;保证同一时间一个对象只有一个线程在访问;有些业务逻辑在执行过程中要求对数据进行排他性的访问,于是需要通过一些机制保证在此过程中数据被锁住不会被外界修改,这就是所谓的锁机制。锁带来了安全,但也降低了效率。因为多个线程试图获取同一把锁,只能有一个线程获得,其他线程只能等待释放锁。

锁的功能如下:

  • 避免更新丢失
  • 能够将多个步骤原子化,隐藏中间过程
  • 保持数据不变性,假设一个数据在中间过程被改变,但是结束后又改了回来,那么使用锁可以隐藏中间过程,使数据对外表现为没变化

锁的基本模型

1
2
3
4
lock l;
acquire(l)
x = x + 1 //被保护的临界区
release(l)

锁实际上是保证了一段区域的原子性

锁的种类2

按照性质划分

公平锁/非公平锁

公平锁即尽量以请求锁的顺序来获取锁。比如同是有多个线程在等待一个锁,当这个锁被释放时,等待时间最久的线程(最先请求的线程)会获得该锁,这种就是公平锁。有可能会造成优先级反转。

非公平锁按照优先级高低的顺序获取锁,可能会造成线程饥饿

乐观锁/悲观锁

乐观锁假设数据一定不会被修改;而悲观锁假设数据一定会被修改。

  • 乐观锁适用场景

比较适合读取操作比较频繁的场景,如果出现大量的写入操作,数据发生冲突的可能性就会增大,为了保证数据的一致性,应用层需要不断的重新获取数据,这样会增加大量的查询操作,降低了系统的吞吐量。

  • 悲观锁适用场景

独享锁/共享锁

独享锁被一个线程单独占有,而共享锁则由多个线程共享

按照设计方法划分

自旋锁

是指尝试获取锁的线程不会立即被阻塞,而是采用循环的方式去尝试获取锁,这样的好处是减少线上下文切换的消耗,缺点是循环会消耗CPU。

A占有锁,如果B想要申请锁,那么不阻塞,而是反复询问A,你的锁用完了么?你的锁用完了么?直到A释放了资源的锁。

  • 应用场景

如果A线程占用锁的时间比较短,这个时候用自旋锁比较好,可以节省CPU在不同线程间切换花费的时间开销;如果A线程占用锁的时间比较长,那么使用自旋锁的话,B线程就会长时间浪费CPU的时间而得不到执行(要执行一个线程需要CPU,并且需要获得锁),这个时候不建议使用自旋锁;还有递归的时候尽量不要使用自旋锁,可能会造成死锁。

  • C++实现自旋锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class spin_mutex {
std::atomic<bool> flag = ATOMIC_VAR_INIT(false);
public:
spin_mutex() = default;
spin_mutex(const spin_mutex&) = delete;
spin_mutex& operator= (const spin_mutex&) = delete;

void lock() {
bool expected = false;
while (!flag.compare_exchange_strong(expected, true)) //flag == expected,flag的值被改为true,并返回true
// flag == false; expected == false flag = true
// return true, while break, get the lock

// flag == true; expected == false flag != expected
// return false, while continue, cannot get the lock
expected = false;
}

void unlock() {
flag.store(false);
}
};

互斥锁(mutex)

采用互斥锁可以防止竞争的发生,对于一个临界区域,进入时上锁,退出时解锁,可以保证同一时刻只能有一个线程对其进行访问。

  • 问题:多个线程请求锁

不考虑优先级,当多个线程申请一个待释放的锁时,那么哪个线程会得到锁呢?答案是不确定,只要在锁被释放前进行了申请的线程,都有可能获得这把锁。操作系统并不一定保证最早申请的会第一个得到锁。

原子锁

有些情况下在多线程中会涉及到一个经常用到而又非常简单的计算操作,例如int ++,这个时候使用自旋锁或者互斥锁代价太高,不划算,我们可以定义原子锁,利用CPU的原子操作特性,进行简单的计算操作。

锁的实现

1
2
3
4
5
6
7
8
9
struct lock { int locked; };
acquire(l){
while(1){
if(l->locked == 0){ // A
l->locked = 1; // B
return;
}
}
}

上面的方法采用了while循环来实现锁,该实现存在一个问题,例如

锁的使用

何时使用锁

当我们写代码时,问自己如下问题:

  1. 是否有两个或多个线程访问同一个内存空间
  2. 是否有至少一个线程写内存空间

如何寻找加锁位置

这里有一个简单的原则,先写出单个CPU下单线程的正确的顺序代码,然后在这个代码两端进行加锁保护,使代码段强制保持顺序执行

锁的使用原则

  • 如果能不共享,就不共享
  • 先用粗粒锁,如果有必要,逐步细化
  • 当需要更高的并发性,再使用细颗粒锁

锁的问题及解决方案

饥饿(不公平锁机制)

考虑如下场景,当线程调度允许插队时,可能会有某个线程,永远也得不到锁,这种情况下会发生线程饥饿的问题。如果线程优先级不均衡,那么在CPU繁忙的情况下,低优先级的线程得到执行的机会很小,可能发生线程饥饿;或者一个持有锁的线程长期霸占资源,也会造成饥饿问题。

如何避免

  1. 保证资源充足
  2. 采用公平机制
  3. 避免持有锁的线程长时间执行

实际使用中方案2适用范围更广一些。

死锁

在线程间共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁。尽管死锁很少发生,但一旦发生就会造成应用的停止响应。

死锁的必要条件

  1. 互斥性:一个资源每次只能被一个线程所访问
  2. 占有性:当一个线程因请求资源而阻塞时,不释放已获得的资源
  3. 不可抢占性:线程已经获得的资源,在主动释放前,不能强行剥夺
  4. 循环等待:若干线程对资源的等待构成了一种循环关系(充分必要)

如何避免

死锁预防

通过设置某些限制条件,去破坏死锁的四个条件中的一个或几个条件,来预防发生死锁。但由于所施加的限制条件往往太严格,因而导致系统资源利用率和系统吞吐量降低。

死锁避免

允许前三个必要条件,但通过明智的选择,确保永远不会到达死锁点,因此死锁避免比死锁预防允许更多的并发。

死锁检测

不须实现采取任何限制性措施,而是允许系统在运行过程发生死锁,但可通过系统设置的检测机构及时检测出死锁的发生,并精确地确定于死锁相关的进程和资源,然后采取适当的措施,从系统中将已发生的死锁清除掉。

死锁解除

与死锁检测相配套的一种措施。当检测到系统中已发生死锁,需将进程从死锁状态中解脱出来。常用方法:撤销或挂起一些进程,以便回收一些资源,再将这些资源分配给已处于阻塞状态的进程。死锁检测与解除有可能使系统获得较好的资源利用率和吞吐量,但在实现上难度也最大。

四种方法从上到下对死锁的防御程度逐渐减弱,但资源利用率和并发性得以提高。

在使用线程锁时,我们可以采用如下方法避免死锁:

  1. 尽量避免一个线程对多个锁进行上锁
  2. 设置相同的加锁顺序,避免产生循环嵌套
  3. 使用定时锁,超时自动释放锁
  4. 死锁检测,使用某种算法检测是否成环
  5. 啥也不做(缩起头当鸵鸟),死锁了,就重启

活锁

死锁很好理解,但是活锁是什么意思呢,假设两个线程互相谦让,都让对方先使用锁,那么最后两个线程也都得不到锁。

如何避免

  1. 采用先到先服务的策略

锁使用原则

在这里我们总结一下锁的使用原则:

  • 共享内存总应当由一个锁来进行保护
  • 锁应当由所有使用者可见
  • 对于所有的线程,应当有序使用锁
  • 记得解锁
  • 如果要将一个模块进行封装,那么该模块内部不要有锁

锁描述符

对于锁来说,我们同样需要一些数据结构来对其进行描述,常见的锁的关键数据如下:

  • 锁状态:上锁?解锁?
  • 锁的拥有者:持有锁的线程
  • 阻塞的线程列表:被锁阻塞的线程们

C++中关于各种锁的实现以及性能比较

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149

#include <iostream>
#include <atomic>
#include <mutex>
#include <thread>
#include <vector>

class spin_mutex {
std::atomic<bool> flag = ATOMIC_VAR_INIT(false);
public:
spin_mutex() = default;
spin_mutex(const spin_mutex&) = delete;
spin_mutex& operator= (const spin_mutex&) = delete;
void lock() {
bool expected = false;
while (!flag.compare_exchange_strong(expected, true)) //一个线程可以跳出while,而其他线程都处在while循环当中,直到某个线程释放了锁
expected = false;
}
void unlock() {
flag.store(false);
}
};

long size = 1000000;
long total = 0;
std::atomic_long total2(0);
std::mutex m;
spin_mutex lock;

void thread_click()
{
for (int i = 0; i < size; ++i)
{
++total;
}
}

void mutex_click()
{
for (int i = 0; i < size; ++i)
{
m.lock(); //加锁
++total; //被加锁保护的区域叫临界区
m.unlock(); //释放锁
}
}

void atomic_click()
{
for (int i = 0; i < size; ++i)
{
++total2;
}
}


void spinlock_click()
{
for (int i = 0; i < size; ++i)
{
lock.lock();
++total;
lock.unlock();
}
}

int main()
{
int thnum = 100;
std::vector<std::thread> threads(thnum);
clock_t start, end;

//----------------------- 单线程无锁 -----------------------
total = 0;
start = clock();
for (int i = 0; i < size * thnum; i++) {
++total;
}
end = clock();
std::cout << "single thread result: " << total << std::endl;
std::cout << "single thread time: " << end - start << std::endl;

//----------------------- 多线程无锁(结果不对,因为会出现脏读后写覆盖的问题) -----------------------
total = 0;
start = clock();
for (int i = 0; i < thnum; ++i) {
threads[i] = std::thread(thread_click);
}
for (int i = 0; i < thnum; ++i) {
threads[i].join();
}
end = clock();
std::cout << "multi thread no mutex result: " << total << std::endl;
std::cout << "multi thread no mutex time: " << end - start << std::endl;

//----------------------- 多线程原子锁 -----------------------
total = 0;
start = clock();
for (int i = 0; i < thnum; ++i) {
threads[i] = std::thread(atomic_click);
}
for (int i = 0; i < thnum; ++i) {
threads[i].join();
}
end = clock();
std::cout << "multi thread atomic result: " << total2 << std::endl;
std::cout << "multi thread atomic time: " << end - start << std::endl;

//----------------------- 多线程互斥锁 -----------------------
total = 0;
start = clock();
for (int i = 0; i < thnum; ++i) {
threads[i] = std::thread(mutex_click);
}
for (int i = 0; i < thnum; ++i) {
threads[i].join();
}
end = clock();
std::cout << "multi thread mutex result: " << total << std::endl;
std::cout << "multi thread mutex time: " << end - start << std::endl;

//----------------------- 多线程自旋锁 -----------------------
total = 0;
start = clock();
for (int i = 0; i < thnum; ++i) {
threads[i] = std::thread(spinlock_click);
}
for (int i = 0; i < thnum; ++i) {
threads[i].join();
}
end = clock();
std::cout << "spin lock result: " << total << std::endl;
std::cout << "spin lock time: " << end - start << std::endl;
getchar();
return 0;
}

/*
single thread result: 100000000
single thread time: 231
multi thread no mutex result: 11501106
multi thread no mutex time: 261
multi thread atomic result: 100000000
multi thread atomic time: 1882
multi thread mutex result: 100000000
multi thread mutex time: 16882
spin lock result: 100000000
spin lock time: 45063 //其实这个场景不建议使用自旋锁
*/

参考文献

0%