原文链接:

CON53-CPP. Avoid deadlock by locking in a predefined order


互斥量一般被用来防止多个线程同时访问同一个共享资源造成的数据竞争。有时,当锁定互斥量时,多个线程会相互持有对方的锁,导致程序产生死锁。死锁需要满足以下四个条件:

  • 互斥性(至少一个不可共享的资源被持有);
  • 持有和等待(一个线程在等待另一个资源可用的同时持有一个资源);
  • 不可抢占性(一个使用中的资源不能被其他线程抢占);
  • 环路等待(一个线程必须等待另一个线程持有的资源,而另一个线程又在等待第一个线程持有的资源)。

因为死锁需要满足这四个条件,所以避免死锁需要阻止其中任何一个条件的发生。一个简单的解决方法是按预定义的顺序锁定互斥量,这样就可以防止环路等待。

不合规代码示例

这个不合规代码示例的行为取决于运行时环境和平台的调度器。如果线程 thr1 尝试在 deposit() 函数中的同时锁定 ba2 的互斥量,而线程 thr2 尝试在同时锁定 ba1 的互斥量时,程序就容易出现死锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#include <mutex>
#include <thread>

class BankAccount {
int balance;
public:
std::mutex balanceMutex;
BankAccount() = delete;
explicit BankAccount(int initialAmount) : balance(initialAmount) {}
int get_balance() const { return balance; }
void set_balance(int amount) { balance = amount; }
};

int deposit(BankAccount *from, BankAccount *to, int amount) {
std::lock_guard<std::mutex> from_lock(from->balanceMutex);

// Not enough balance to transfer.
if (from->get_balance() < amount) {
return -1; // Indicate error
}
std::lock_guard<std::mutex> to_lock(to->balanceMutex);

from->set_balance(from->get_balance() - amount);
to->set_balance(to->get_balance() + amount);

return 0;
}

void f(BankAccount *ba1, BankAccount *ba2) {
// Perform the deposits.
std::thread thr1(deposit, ba1, ba2, 100);
std::thread thr2(deposit, ba2, ba1, 100);
thr1.join();
thr2.join();
}

合规解决方案(手动排序)

这个符合规范的解决方案通过在 deposit() 函数中建立预定义的锁定顺序来消除环路等待条件。每个线程将根据 BankAccount 的 ID 锁定,这个 ID 是在创建 BankAccount 对象时设置的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
#include <atomic>
#include <mutex>
#include <thread>

class BankAccount {
static std::atomic<unsigned int> globalId;
const unsigned int id;
int balance;
public:
std::mutex balanceMutex;
BankAccount() = delete;
explicit BankAccount(int initialAmount) : id(globalId++), balance(initialAmount) {}
unsigned int get_id() const { return id; }
int get_balance() const { return balance; }
void set_balance(int amount) { balance = amount; }
};

std::atomic<unsigned int> BankAccount::globalId(1);

int deposit(BankAccount *from, BankAccount *to, int amount) {
std::mutex *first;
std::mutex *second;

if (from->get_id() == to->get_id()) {
return -1; // Indicate error
}

// Ensure proper ordering for locking.
if (from->get_id() < to->get_id()) {
first = &from->balanceMutex;
second = &to->balanceMutex;
} else {
first = &to->balanceMutex;
second = &from->balanceMutex;
}
std::lock_guard<std::mutex> firstLock(*first);
std::lock_guard<std::mutex> secondLock(*second);

// Check for enough balance to transfer.
if (from->get_balance() >= amount) {
from->set_balance(from->get_balance() - amount);
to->set_balance(to->get_balance() + amount);
return 0;
}
return -1;
}

void f(BankAccount *ba1, BankAccount *ba2) {
// Perform the deposits.
std::thread thr1(deposit, ba1, ba2, 100);
std::thread thr2(deposit, ba2, ba1, 100);
thr1.join();
thr2.join();
}

合规解决方案 (std::lock())

这个合规解决方案使用标准模板库的功能来确保由于循环等待条件而不会发生死锁。std::lock() 函数接受可变数量的可锁定对象,并尝试锁定它们,使死锁不会发生 [ISO/IEC 14882-2014]。在典型的实现中,这是通过使用 lock()try_lock()unlock() 的组合来尝试锁定对象并在未获得锁定时进行回退来实现的,这可能比显式地预定义锁定顺序的解决方案性能要差。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#include <mutex>
#include <thread>

class BankAccount {
int balance;
public:
std::mutex balanceMutex;
BankAccount() = delete;
explicit BankAccount(int initialAmount) : balance(initialAmount) {}
int get_balance() const { return balance; }
void set_balance(int amount) { balance = amount; }
};

int deposit(BankAccount *from, BankAccount *to, int amount) {
// Create lock objects but defer locking them until later.
std::unique_lock<std::mutex> lk1(from->balanceMutex, std::defer_lock);
std::unique_lock<std::mutex> lk2(to->balanceMutex, std::defer_lock);

// Lock both of the lock objects simultaneously.
std::lock(lk1, lk2);

if (from->get_balance() >= amount) {
from->set_balance(from->get_balance() - amount);
to->set_balance(to->get_balance() + amount);
return 0;
}
return -1;
}

void f(BankAccount *ba1, BankAccount *ba2) {
// Perform the deposits.
std::thread thr1(deposit, ba1, ba2, 100);
std::thread thr2(deposit, ba2, ba1, 100);
thr1.join();
thr2.join();
}

Risk Assessment

Deadlock prevents multiple threads from progressing, halting program execution. A denial-of-service attack is possible if the attacker can create the conditions for deadlock.

Rule Severity Likelihood Remediation Cost Priority Level
CON53-CPP Low Probable Medium P4 L3

Automated Detection

Tool Version Checker Description
CodeSonar 7.3p0 CONCURRENCY.LOCK.ORDER Conflicting lock order
Coverity 6.5 DEADLOCK Fully implemented
Helix QAC 2023.1 C++1772, C++1773
Parasoft C/C++test 2022.2 CERT_CPP-CON53-a Do not acquire locks in different order
Polyspace Bug Finder R2023a CERT C++: CON53-CPP Checks for deadlocks
PRQA QA-C++ 4.4 1772, 1773 Enforced by MTA

Search for vulnerabilities resulting from the violation of this rule on the CERT website.

CERT Oracle Secure Coding Standard for Java LCK07-J. Avoid deadlock by requesting and releasing locks in the same order
SEI CERT C Coding Standard CON35-C. Avoid deadlock by locking in a predefined order
MITRE CWE CWE-764, Multiple Locks of a Critical Resource

Bibliography

[ISO/IEC 14882-2014] Subclause 30.4, “Mutual Exclusion” Subclause 30.4.3, “Generic Locking Algorithms”

img img img