MIT6828-HW6-Threads and Locking

在这个作业中,我们将使用一个Hash Table实现多线程编程,首先下载这个文件

作业内容

下载文件后进行编译并运行:

1
2
$ gcc -g -O2 ph.c -pthread
$ ./a.out 2

2表示启动的线程数,运行一段时间后,程序应该有如下输出

1
2
3
4
5
6
7
1: put time = 0.003338
0: put time = 0.003389
0: get time = 7.684335
0: 17480 keys missing
1: get time = 7.684335
1: 17480 keys missing
completion time = 7.687856

每个线程运行在两个阶段,put和get,每个阶段大约7.7秒,现在使用一个线程:

1
2
3
4
5
$ ./a.out 1
0: put time = 0.004073
0: get time = 6.929189
0: 0 keys missing
completion time = 6.933433

可以发现总用时为6.9秒,貌似一个线程速度还快一些,但是多线程做了其两倍的工作,用时只多了不到1秒,还是效率高了一些,但是可以看到,使用两个线程会有很多次哈希miss的情况,画一个图思考一下为何会出现哈希miss。(本质上是因为两个线程对同一个空间进行写操作,第一个线程写了之后,被第二个线程擦除,所以真正有用的是第二个线程的操作)

为了避免哈希miss,我们需要在对hash table访问时加锁保护,使用下列语句对put和get进行加锁:

1
2
3
4
pthread_mutex_t lock;     // declare a lock
pthread_mutex_init(&lock, NULL); // initialize the lock
pthread_mutex_lock(&lock); // acquire lock
pthread_mutex_unlock(&lock); // release lock

由于get只是读取,因此我们只需要对put进行加锁,即可避免hash miss

反思

粗粒锁与细粒锁

我们可以使用一个锁锁住整个哈希表,也可以对哈希表中的每一个链表单独上一把锁,代码分别如下:

粗颗粒锁

粗颗粒锁对整个哈希表上锁,比较简单,但是控制颗粒度很粗,访问不同列的线程也会导致相互阻塞。

1
2
3
4
5
6
pthread_mutex_t lock;     // declare a lock
pthread_mutex_init(&lock, NULL); // initialize the lock

pthread_mutex_lock(&lock); // acquire lock
... // 临界区
pthread_mutex_unlock(&lock); // release lock

细颗粒锁

细颗粒锁针对哈希表每一列单独上锁,因此访问不同列的线程不会因为争抢锁而陷入等待,效率高一些。

1
2
3
4
5
6
7
pthread_mutex_t locks[NBUCKET];     // declare a lock
for(int i=0;i<NBUCKET; i++)
pthread_mutex_init(&locks[i], NULL); // initialize the lock

pthread_mutex_lock(&locks[i]); // acquire lock
... // 临界区
pthread_mutex_unlock(&locks[i]); // release lock

两种不同锁对比

总的来说粗颗粒锁更简单,细颗粒锁更快一些,但是需要锁的数量会上升,如果对象很多的话,使用细颗粒锁并不方便。(其实就我个人的实验,并没有看出两者之间有何差别)

源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
// Headers
#include <stdlib.h>
#include <unistd.h>
#include <stdio.h>
#include <assert.h>
#include <pthread.h>
#include <sys/time.h>

// Constants
#define SOL
#define NBUCKET 5
#define NKEYS 100000

// Node of a hash table
struct entry {
int key;
int value;
struct entry *next;
};

// Hash value is the array index, and make linear search
// inside the list
struct entry *table[NBUCKET];
// Average list length = NKEYS / NBUCKET
int keys[NKEYS];

int nthread = 1;
volatile int done;

// Get current time
double now(){...}
// Print the hash table
static void print(void){...}

// Insert at beginning
static void insert(int key, int value, struct entry **p, struct entry *n){
struct entry *e = malloc(sizeof(struct entry));
e->key = key;
e->value = value;
e->next = n;
*p = e;
}

static void put(int key, int value){
int i = key % NBUCKET;
insert(key, value, &table[i], table[i]);
}

static struct entry* get(int key)
{
struct entry *e = 0;
for (e = table[key % NBUCKET]; e != 0; e = e->next) { // 进入对应哈希列,然后顺序遍历
if (e->key == key) break;
}
// e == NULL: 搜索到末尾,哈希不命中
return e;
}

// 线程函数
static void *thread(void *xa){
long n = (long) xa;
int i;
int b = NKEYS/nthread; // 每个线程处理的索引数
int k = 0;
double t1, t0;

// printf("b = %d\n", b);
t0 = now();
for (i = 0; i < b; i++) {
// printf("%d: put %d\n", n, b*n+i);
put(keys[b*n + i], n);
}
t1 = now();
printf("%ld: put time = %f\n", n, t1-t0);

// Should use pthread_barrier, but MacOS doesn't support it ...
__sync_fetch_and_add(&done, 1);
while (done < nthread) ;

t0 = now();
for (i = 0; i < NKEYS; i++) {
struct entry *e = get(keys[i]);
if (e == 0) k++;
}
t1 = now();
printf("%ld: get time = %f\n", n, t1-t0);
printf("%ld: %d keys missing\n", n, k);
return NULL;
}

int main(int argc, char *argv[]){
pthread_t *tha;
void *value;
long i;
double t1, t0;

// 参数处理
if (argc < 2) {
fprintf(stderr, "%s: %s nthread\n", argv[0], argv[0]);
exit(-1);
}
// 创建线程,分配n个线程所需的空间
nthread = atoi(argv[1]);
tha = malloc(sizeof(pthread_t) * nthread);

// 设置随机数种子,初始化索引
srandom(0);
assert(NKEYS % nthread == 0);
for (i = 0; i < NKEYS; i++) {
keys[i] = random();
}

t0 = now();
// 创建线程
for(i = 0; i < nthread; i++) {
assert(pthread_create(&tha[i], NULL, thread, (void *) i) == 0);
}
// 等待线程执行结束
for(i = 0; i < nthread; i++) {
assert(pthread_join(tha[i], &value) == 0);
}
t1 = now();
printf("completion time = %f\n", t1-t0);
}

参考文献

0%