0%

c++ 设计线程池(1)----锁

线程池的好处

线程的开启也是一个相对缓慢的过程。

  • 如果不用线程池,在系统运行过程中实时开启线程,销毁线程。会极大增加系统的负载。导致系统的实时性减低,业务处理能力也会降低
  • 服务器启动时,就事先创建好线程池,当业务来到时,直接从线程池取出一个线程来执行任务。任务完成后直接把线程归还给池,用于后续重复使用。

线程池的模式

  • 固定大小
  • 可变大小

线程同步

并发编程的基本问题:

  • 互斥
  • 通信

我们希望有一段代码被原子式执行,由于单处理器的任务切换(中断)或者多处理器的并行,我们不能保证。

所以需要一种机制来保证原子性—–锁

操作系统给上层提供了一个API,使得应用程序可以使用锁来达到代码执行的原子性。

1
2
3
4
5
lock_t mutex; // some globally-allocated lock 'mutex'
...
lock(&mutex);
... // 临界区
unlock(&mutex);

调用者调用lock尝试获取锁,如果没有其他现场持有锁,线程就会获得锁,进入临界区。否则,他就会阻塞。直到锁可用。(具体实现比这复杂)

锁的实现原理

锁定实现要求:

  • ​ 提供互斥(保证临界区执行的原子性)
  • 公平性(不会有饿死线程)
  • 性能

方式1:控制中断

缺点:

  • 需要应用进程有特权操作,因为中断需要特权操作
  • 只适合单处理器,多处理器无法保证互斥
  • 会导致中断丢失
  • 效率低。

方式 2:自旋锁

需要硬件指令支持:test and set 指令 (原子交换)

假设无硬件支持,我们很容易想到的一个锁(就是变量)实现是:(测试和设置不是原子操作)

使用一个变量来标志锁是否被占有。这样进程在进入临界区时,会:

  • 检查标志,如果可以进入就进入临界区吗,设置锁标志。不能就阻塞等待。
  • 在退出临界区时,清除标志。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
typedef struct lock_t { int flag; } lock_t;
void init(lock_t *mutex) {
// 0 -> lock is available, 1 -> held
mutex->flag = 0;
}
void lock(lock_t *mutex) {
while (mutex->flag == 1) // TEST the flag
; // spin-wait (do nothing)
mutex->flag = 1;
// now SET it!
}
void unlock(lock_t *mutex) {
mutex->flag = 0;
}

不能保证互斥:

  • 如果线程1在测试时,被中断到线程2,

  • 线程2执行测试并成功设置了标志。此时又被中断

  • 线程1,此时测试也会成功。两者同时进入了临界区

性能问题:

自旋会导致白白浪费CPU资源

test and set 硬件(原子交换)指令,我们可以实现一个满足要求的最简单的锁—-自旋锁

test and set 的原理:

1
2
3
4
5
6
7
// 这就是一个交换操作,只不过硬件保证了他的原子性。
int TestAndSet(int *old_ptr, int new) {

int old = *old_ptr; // 获取旧值
*old_ptr = new;// 设置新值
return old;// 返回旧值。
}

自旋锁的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
typedef struct lock_t
{
int flag;
} lock_t;

void init(lock_t *lock) {
// 0 indicates that lock is available, 1 that it is held
lock->flag = 0;
}
void lock(lock_t *lock) {
while (TestAndSet(&lock->flag, 1) == 1); // spin-wait (do nothing)
}

void unlock(lock_t *lock) {
lock->flag = 0;
}

工作场景模拟:

  • 当调用lock时,如果没有线程持有锁,flag=0,原子交换指令flag=1,返回旧flag=0,循环为假,跳出,获得锁。(这是原子操作,不会被中断)

  • 如果在执行完原子指令之后,被中断(此时flag=1,)别的线程如果执行原子交换指令,while循环一致为真,导致自旋。保证了互斥

  • 只有在第一个线程结束执行,调用unlock时,才会解除自旋。

自旋锁:

保证了互斥,但没有保证公平。

有可能某个线程会一直自旋下去,导致饿死。

compare-and-exchange 硬件支持的原子指令

1
2
3
4
5
6
7
8
// 如果满足期望的值,则会将锁交换。否则,什么也不做。返回锁原来的值。
int CompareAndSwap(int *ptr, int expected, int new)
{
int actual = *ptr;
if (actual == expected)
*ptr = new;
return actual;
}

使用这条指令来实现自旋锁和上面的指令差不多:

1
2
3
void lock(lock_t *lock) {
while (CompareAndSwap(&lock->flag, 0, 1) == 1); // spin
}

在无等待同步(wait-free synchronization)时,这个指令比test and set强大、

链接的加载(load-linked)和条件式存储(store-conditional)指令 原子指令

连接加载:从内存中取出一个值到寄存器,(可以理解为解引用一个地址。)

条件式存储:只有上次从地址A取出的值,自从上次取出之后,没有被人家更改过(也就是说这个值还是那个值),才会去操作这个地址。)

ticket 锁

获取并增加fetch-and-add 原子指令

它能原子地返回特定地址的旧值,并且让该值自增一。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
int FetchAndAdd(int *ptr) {
int old = *ptr;
*ptr = old + 1;
return old;
}
typedef struct lock_t {
int ticket;//车票
int turn; //(轮到自己)
} lock_t;

void lock_init(lock_t *lock) {
lock->ticket = 0;
lock->turn 0;
}

void lock(lock_t *lock) {
int myturn = FetchAndAdd(&lock->ticket);
while (lock->turn != myturn); // spin
}

void unlock(lock_t *lock) {
FetchAndAdd(&lock->turn);
}

相当于先买票,再排队。

使用两个变量,构建锁:

获取锁的方式:

  • 先买票(调用获取并增加指令),此时自己的lock->ticket 为1,表示自己有票了。
  • 再去排队(while循环),如果轮到自己了,就可以获得锁。

既可以保证互斥也可以保证公平性。

自旋过长时间的应对方法 —- yield()系统调用

在以上公平互斥锁的基础上,还有一个问题:

如果处于临界区中的线程被中断,其他线程就得长时间自旋等待。

解决办法:

  • yield()系统调用:在线程发现锁被别人持有时,主动调用yield让出锁,是自己进入就绪态。

在两个线程下工作的很好。

缺点:如果在许多线程下反复竞争一把锁时,一个线程长时间持有锁,会导致其他线程全部让出锁,知道持有锁的继续执行。

解决让出过多问题 —- 使用等待队列,睡眠代替自旋。

前面一些方法的真正问题是存在太多的偶然性。调度程序决定如何调度。如果调度不合理,线程或者一直自旋(第一种方法),或者立刻让出 CPU(第二种方法)。无论哪种方法,都可能造成浪费,也能防止饿死。

因此,我们必须显式地施加某种控制,决定锁释放时,谁能抢到锁(排队)。为了做到这一点,我们需要操作系统的更多支持,并需要一个队列来保存等待锁的线程

睡眠与唤醒。

一个线程如果尝试获得锁,失败,自己会陷入睡眠。等到获得锁的线程结束执行时,他会唤醒正在睡眠的线程队列。队列中如果没有人有紧急情况,就会按排队顺序接着下一个线程执行。

例如,Linux 提供了 futex,具体来说,每个 futex 都关联一个特定的物理内存位置,也有一个事先建好的内核队列。调用者通过futex 调用(见下面的描述)来睡眠或者唤醒。
具体来说,有两个调用。调用 futex_wait(address, expected)时,如果 address 处的值等于expected,就会让调线程睡眠。否则,调用立刻返回。调用 futex_wake(address)唤醒等待队列中的一个线程。图 28.9 是 Linux 环境下的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
void mutex_lock(int *mutex)
{
int v;
/*首先测试mutixc值为是否0,如果是,说明没人等待,并且锁是空闲的。也就是说只有我自己想要锁。那我直接拿。并把最高位设置为1.*/

if (atomic_bit_test_set(mutex, 31) == 0)
return;


// 否则,等待者加1.
atomic_increment(mutex);
while (1)
{
// 再次循环尝试获得锁。
if (atomic_bit_test_set(mutex, 31) == 0)
{
// 如果,获得了,等待者减一。
atomic_decrement(mutex);
return;
}
/* We have to wait now. First make sure the futex value
we are monitoring is truly negative (i.e. locked).*/
v = *mutex;
if (v >= 0)
continue;
// 如果执行到这,说明锁已经没了,我们需要等待。
futex_wait(mutex, v);
}
}

void mutex_unlock(int *mutex)
{
/* Adding 0x80000000 to the counter results in 0 if and only if
there are not other interested threads */
if (atomic_add_zero(mutex, 0x80000000))
return;
/* There are other threads waiting for this mutex,
wake one of them up.
*/
futex_wake(mutex);
}

这段代码来自 nptl 库(gnu libc 库的一部分)[L09]中 lowlevellock.h,它很有趣。基本上,它利用一个整数,同时记录锁是否被持有(整数的最高位),以及等待者的个数(整数的其余所有位)。因此,如果锁是负的,它就被持有(因为最高位被设置,该位决定了整数的符号)。这段代码的有趣之处还在于,它展示了如何优化常见的情况,即没有竞争时:只有一个线程获取和释放锁,所做的工作很少(获取锁时测试和设置的原子位运算,释放锁时原子的加法)。

两阶段锁

两阶段锁的第一阶段会先自旋一段时间,希望它可以获取锁。
如果第一个自旋阶段没有获得锁,第二阶段调用者会睡眠,直到锁可用。