C++多线程编程(二):使用互斥锁

C++多线程编程(二):使用互斥锁

上回书说到,我们可以用C++11中的thread类来创建和管理线程。在多线程编程中,我们常常遇到对线程间共享数据访问的各种线程安全问题。在这篇博文中我将向大家介绍如何利用C++11提供的<mutex>头文件提供的相关工具来保护线程间共享数据。

竞争条件

首先来看一段代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#include <iostream>
#include <thread>

using namespace std;

int n = 0;

void func(void)
{
for (int i = 0; i < 10000; i++)
{
int x = n;
n++;
if (x + 1 == n)
{
n--;
}
}
}

int main(void)
{
thread t1(func);
thread t2(func);
thread t3(func);

t1.join();
t2.join();
t3.join();

cout << n << endl;

return 0;
}

读者不妨先猜测一下输出结果。

一种可能的猜测:

在函数func中,我们先把n赋值给x,然后让n递增1。按理说,表达式x + 1 == n的结果应该为true,那么n又会递减1。也就是说,执行一次循环,n的值应该不会改变。所以程序最终的输出结果应该是0。
几句就
我们运行一下这个程序看一下结果:

548 // 第一次运行结果

96 // 第二次运行结果

162 // 第三次运行结果

肿么肥四?为什么每次运行结果都不一致?要想搞清楚这样的结果的原因,首先要从线程调度说起:

操作系统在调度线程时采用的是抢占式调度的方式,也就是说,每一个线程在执行一段时间后会被操作系统中断,然后调用另一个线程。操作系统对线程的调度几乎是随机的。在上面的例子中,当我们的线程在执行n++后,操作系统有可能会终端当前线程去执行另一个线程。而另一个线程如果恰好也执行了n++x + 1 == n就不为true了,所以结果也就不是预期的0了。这就是多线程编程中的竞争条件(也叫竞争冒险竞态条件)

那么如果我们想让结果为0需要怎么做呢?这就需要C++中的mutex来保护循环中的代码,让程序中的三个线程在同一时间只有一个线程执行循环中的代码,来避免竞争条件

mutex类

用途

mutex常被称作互斥锁互斥量,位于<mutex>头文件中。mutex的用途就是对可能出现竞争条件的代码段(临界区)“加锁”。线程要进入临界区,首先要获取锁,如果成功获取锁,线程可以进入临界区执行代码。如果线程想要获取的锁已经被其他线程占用,则线程会阻塞,直至其他线程释放这个锁。

创建mutex对象

mutex类只有一个默认构造函数,它会创建一个没有被加锁的mutex对象。mutex类没有拷贝和移动构造函数,所以mutex不能被拷贝或者移动

锁定mutex

线程调用mutex对象的lock成员函数会尝试获取这个锁。如果这个mutex对象没有被其他线程占有,当前线程就会获取这个锁。如果这个mutex已经被其他线程占用,调用线程会被阻塞直到其他线程释放这个锁。

释放mutex

当线程执行完临界区的代码后,应当释放锁以便让其他线程能够获取这个锁。释放mutex通过调用mutex对象的unlock成员函数。

使用mutex的栗子

下面的代码利用mutex改写了上一节的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#include <iostream>
#include <mutex>
#include <thread>

using namespace std;

int n = 0;
mutex g_lock;

void func(void)
{
for (int i = 0; i < 10000; i++)
{
g_lock.lock();
int x = n;
n++;
if (x + 1 == n)
{
n--;
}
g_lock.unlock();
}
}

int main(void)
{
thread t1(func);
thread t2(func);
thread t3(func);

t1.join();
t2.join();
t3.join();

cout << n << endl;

return 0;
}

在上面的代码中,每个线程在执行循环中的代码时都会获取g_lock
在执行完这几行代码后再释放g_lock,所以同一时间只有一个线程进入临界区,所以不会产生竞争条件等问题,程序输出结果始终为0。

尝试锁定mutex

mutex有一个成员函数try_lock,在线程调用该成员函数时,如果mutex没有被其他线程占用,调用线程就会获取该锁并返回true;如果该mutex已被其他线程占用,则该函数返回false,并不会想lock一样阻塞住。

recursive_mutex

如果一个线程已经占有了mutex,那么它在尝试再次为这个mutex加锁时会产生未定义的行为,可能会产生死锁,也可能导致程序崩溃。

如果我们需要对一个互斥量重复加锁,可以使用recursive_mutex
recursive_mutex的用法与mutex基本相同。
当线程在对一个已经持有的recursive_mutex加锁时(调用lock成员函数),该recursive_mutex的计数会加1。当线程调用unlock时计数减1。直到计数为0时(即加锁次数与解锁次数相同)时释放这个recursive_mutex。

timed_mutex

timed_mutex类除了提供mutex类的基本功能之外,还提供了定时的功能。该类添加了两个新的成员函数try_lock_fortry_lock_until

try_lock_for

try_lock_for的函数原型为:

1
2
template <class Rep, class Period>
bool try_lock_for (const chrono::duration<Rep,Period>& rel_time);

当一个线程调用timed_mutex的try_lock_for时,如果该timed_mutex没有被其他线程占用,该线程会立即占有这个timed_mutex并返回true
如果该timed_mutex已经被其他线程占有,调用线程会阻塞,直到其他线程解锁了该timed_mutex(返回true)或超出指定的时间段rel_time(返回false)。

try_lock_until

try_lock_until的函数原型为:

1
2
template <class Clock, class Duration>
bool try_lock_until (const chrono::time_point<Clock,Duration>& abs_time);

当一个线程调用timed_mutex的try_lock_until时,如果该timed_mutex没有被其他线程占用,该线程会立即占有这个timed_mutex并返回true
如果该timed_mutex已经被其他线程占有,调用线程会阻塞,直到其他线程解锁了该timed_mutex(返回true)或到达了指定的时间点abs_time(返回false)。

recursive_timed_mutex

与mutex类似,timed_mutex也不支持递归加锁。如果需要可以递归加锁的timed_mutex,可以使用recursive_timed_mutex
这个类的功能相当于timed_mutex+recursive_mutex,这里不再赘述,可以参考这个网站

lock_guard类模板

用途

如果一个线程在对mutex加锁后没有释放这个锁,其他线程就无法获取这个互斥锁。所以在一个线程在使用完mutex时,必须是释放这个mutex。

然而,人总是会犯错误的,程序员可能会忘记调用unlock释放mutex,尤其是在函数有多个出口的时候。为了解决这个问题,C++11为我们提供了一个lock_guard类模板来解决这个问题。

创建lock_guard对象

在创建lock_guard对象时,我们需要向构造函数中传入一个mutex对象。lock_guard的构造函数会对这个mutex加锁。当lock_guard对象被析构时,析构函数会自动解锁mutex。这样,即使程序代码有多个出口,我们也能保证mutex被正确解锁。

lock_guard是一个类模板:

1
template <class Mutex> class lock_guard;

模板参数Mutex可以是mutexrecursive_mutextimed_mutexrecursive_timed_mutexunique_lock(下文将会提到)。

lock_guard的构造函数原型如下:

1
explicit lock_guard (mutex_type& m);

我们可以用lock_guard改写之前的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#include <iostream>
#include <mutex>
#include <thread>

using namespace std;

int n = 0;
mutex g_lock;

void func(void)
{
for (int i = 0; i < 10000; i++)
{
lock_guard<mutex> lg(g_lock);
int x = n;
n++;
if (x + 1 == n)
{
n--;
}
}
}

int main(void)
{
thread t1(func);
thread t2(func);
thread t3(func);

t1.join();
t2.join();
t3.join();

cout << n << endl;

return 0;
}

adopt_lock_t

lock_guard还有一个构造函数。

1
lock_guard (mutex_type& m, adopt_lock_t tag);

向构造函数中传入adopt_lock_t对象可以防止lock_guard构造函数对mutex加锁。这样,我们可以自行对mutex加锁,然后让lock_guard对象代替我们释放锁。adopt_lock_t只是起到了让编译器选定特定构造函数的作用。

<mutex>头文件中预定义了一个adopt_lock_t对象adopt_lock可以直接使用。

unique_lock

C++提供了unique_lock类模板来灵活地管理互斥锁。它比lock_guard提供了更多的灵活性,但是比lock_guard占用更多的可空间并且性能也会相对慢一些。

C++11为unique_lock提供了很多构造函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
unique_lock() noexcept;

explicit unique_lock (mutex_type& m);

unique_lock (mutex_type& m, try_to_lock_t tag);

unique_lock (mutex_type& m, defer_lock_t tag) noexcept;

unique_lock (mutex_type& m, adopt_lock_t tag);

template <class Rep, class Period>
unique_lock (mutex_type& m, const chrono::duration<Rep,Period>& rel_time);

template <class Clock, class Duration>
unique_lock (mutex_type& m, const chrono::time_point<Clock,Duration>& abs_time);

unique_lock (const unique_lock&) = delete;

unique_lock (unique_lock&& x);

通常我们会向unique_lock的构造函数传入一个互斥锁对象(mutexrecursive_mutextimed_mutexrecursive_timed_mutex),让unique来管理这个锁。此外,还可以传入额外的参数来控制unique_lock的构造函数的行为。例如,传入前面提到的adopt_lock_t对象可以让unique_lock来管理已经被当前线程占有的互斥锁,传入try_to_lock_t对象让unique_lock尝试获取锁(可以用unique_lock的owns_lock判断是否成功获取锁)。传入defer_lock_t对象可以让unique_lock先不获取锁,在之后由程序员自行加锁。当unique_lock被析构时,如果它已经拥有它管理的锁,它可以自动释放该锁。另外,unique_lock还提供了try_lock_fortry_lock_until两个成员函数,这两个函数与timed_mutex用法类似,不再赘述。

unique_lock只有移动构造函数,没有拷贝构造函数,这意味着它只可以被移动,不可以被复制。当unique_lock被移动时,它对锁的拥有状态也相应移动,被移动的对象恢复默认构造状态。

详细的unique_lock的API可以参考这个

call_once

读者不妨想一下,在一个多线程环境中,如何让一个函数只会被调用一次?

一个很自然的想法是可以使用一个变量flag,将这个变量初始化为false,当一个线程需要调用这个函数时,先检测flag的值。如果flagfalse,则将flag设置为true,然后调用该函数;如果flag的值为true,说明这个函数已经被调用过了,那么这个线程就不需要调用这个函数了。为了避免竞争条件,我们可以用mutex将临界区保护起来。

然而,在这种设计中,每个线程都必须获取互斥锁,造成了线程的序列化,影响了程序的性能。C++为我们提供了一个工具call_once,它保证一个函数只会被调用一次,同时还保证了较好的性能。

call_once的函数原型如下

1
2
template <class Fn, class... Args>
void call_once (once_flag& flag, Fn&& fn, Args&&... args);

once_flag是一个用于标识函数调用的类,它的对象可以被默认构造。

举个栗子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#include <iostream>
#include <thread>
#include <mutex>

using namespace std;

once_flag flag;

void once(void)
{
cout << "once被调用" << endl;
}

void func(void)
{
call_once(flag, once);
}

int main(void)
{
thread t1(func);
thread t2(func);
thread t3(func);

t1.join();
t2.join();
t3.join();

return 0;
}

以上例子中,"once被调用"只会被输出一次,而不是三次。

一次锁定多个mutex

C++的<mutex>头文件中提供了两个可用于同时锁定多个互斥量的函数:std::try_lockstd::lock。这两个函数都会保证不产生死锁。

std::try_lock

std::try_lock的函数原型为:

1
2
template <class Mutex1, class Mutex2, class... Mutexes>
int try_lock (Mutex1& a, Mutex2& b, Mutexes&... cde);

std::try_lock依次调用每个参数的try_lock成员函数尝试对每个互斥量加锁,如果try_lock成员函数返回false,std::try_lock会释放已经持有的锁并返回false,如果所有互斥量都成功加锁,std::try_lock会返回true

lock

1
2
template <class Mutex1, class Mutex2, class... Mutexes>
void lock (Mutex1& a, Mutex2& b, Mutexes&... cde);

std::lock会尝试对所有的互斥量尝试加锁,如果加锁失败,它会释放所有已拥有的锁并重新尝试加锁,直到获取所有的锁之后返回。

参考

  1. <mutex> - C++ Reference
  2. 《C++并发编程实战》

本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!