在多线程编程中,数据竞争和同步问题一直是开发者面临的挑战。C++11引入的原子操作(atomic operations)为解决这些问题提供了强有力的工具。本文将详细介绍几个重要的原子操作函数:atomic_store
、atomic_compare_exchange_strong_explicit
、atomic_load_explicit
和 atomic_fetch_sub_explicit
,帮助你在多线程编程中正确使用这些函数。
什么是原子操作
原子操作是不可分割的操作,要么完全执行,要么完全不执行,不会被其他线程中断。在多线程环境中,原子操作保证了数据的一致性,避免了竞态条件。
内存序(Memory Order):为什么需要它
背景:CPU和编译器的优化带来的问题
在现代计算机系统中,为了提高性能,CPU和编译器会进行各种优化,这些优化可能会改变程序的执行顺序:
1. 编译器重排序
1 2 3 4 5 6 7 8 9
| int a = 1; int b = 2; int c = a + b;
int b = 2; int a = 1; int c = a + b;
|
2. CPU乱序执行
现代CPU为了提高吞吐量,会:
- 乱序执行:CPU可能不按程序顺序执行指令
- 写缓冲:写操作可能先进入缓冲区,稍后才写入内存
- 缓存一致性延迟:多核CPU的缓存同步不是瞬时的
3. 多线程环境下的问题
1 2 3 4 5 6 7 8
| data = 42; ready = true;
if (ready) { use(data); }
|
由于重排序,可能发生:
什么是内存序
内存序(Memory Ordering)是一套规则,用来控制多线程程序中内存操作的可见性顺序。它告诉编译器和CPU:
- 哪些重排序是允许的
- 哪些重排序是禁止的
- 何时需要确保内存操作的可见性
内存模型的发展历史
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| pthread_mutex_t mutex; int shared_data;
void thread_function() { pthread_mutex_lock(&mutex); shared_data++; pthread_mutex_unlock(&mutex); }
std::atomic<int> shared_data{0};
void thread_function() { shared_data.fetch_add(1, std::memory_order_seq_cst); }
|
C++11内存序的六种类型
1 2 3 4 5 6 7 8
| enum memory_order { memory_order_relaxed, memory_order_consume, memory_order_acquire, memory_order_release, memory_order_acq_rel, memory_order_seq_cst };
|
为什么需要不同强度的内存序
1. 性能考虑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| #include <atomic> #include <chrono> #include <thread> #include <vector>
std::atomic<long long> counter{0};
void benchmark_memory_orders() { const int iterations = 1000000; const int num_threads = 4; auto start = std::chrono::high_resolution_clock::now(); std::vector<std::thread> threads; for (int i = 0; i < num_threads; ++i) { threads.emplace_back([&]() { for (int j = 0; j < iterations; ++j) { counter.fetch_add(1, std::memory_order_relaxed); } }); } for (auto& t : threads) t.join(); auto end = std::chrono::high_resolution_clock::now(); auto relaxed_time = std::chrono::duration_cast<std::chrono::microseconds>(end - start); counter.store(0); threads.clear(); start = std::chrono::high_resolution_clock::now(); for (int i = 0; i < num_threads; ++i) { threads.emplace_back([&]() { for (int j = 0; j < iterations; ++j) { counter.fetch_add(1, std::memory_order_seq_cst); } }); } for (auto& t : threads) t.join(); end = std::chrono::high_resolution_clock::now(); auto seq_cst_time = std::chrono::duration_cast<std::chrono::microseconds>(end - start); std::cout << "Relaxed: " << relaxed_time.count() << " μs" << std::endl; std::cout << "Seq_cst: " << seq_cst_time.count() << " μs" << std::endl; std::cout << "性能差异: " << (double)seq_cst_time.count() / relaxed_time.count() << "x" << std::endl; }
|
2. 不同场景需要不同强度的同步
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| #include <atomic> #include <thread> #include <iostream>
std::atomic<int> hit_counter{0};
void increment_counter() { hit_counter.fetch_add(1, std::memory_order_relaxed); }
std::atomic<bool> data_ready{false}; std::atomic<int> data{0};
void producer() { data.store(42, std::memory_order_relaxed); data_ready.store(true, std::memory_order_release); }
void consumer() { while (!data_ready.load(std::memory_order_acquire)) { std::this_thread::yield(); } int value = data.load(std::memory_order_relaxed); std::cout << "读取到: " << value << std::endl; }
std::atomic<int> global_state{0};
void critical_state_change() { global_state.store(1, std::memory_order_seq_cst); }
|
内存序的实际硬件映射
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| class X86MemoryOrdering { public: void relaxed_store(std::atomic<int>& var, int value) { var.store(value, std::memory_order_relaxed); } void release_store(std::atomic<int>& var, int value) { var.store(value, std::memory_order_release); } void seq_cst_store(std::atomic<int>& var, int value) { var.store(value, std::memory_order_seq_cst); } };
class ARMMemoryOrdering { public: void relaxed_store(std::atomic<int>& var, int value) { var.store(value, std::memory_order_relaxed); } void release_store(std::atomic<int>& var, int value) { var.store(value, std::memory_order_release); } void seq_cst_store(std::atomic<int>& var, int value) { var.store(value, std::memory_order_seq_cst); } };
|
没有内存序会发生什么
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| #include <atomic> #include <thread> #include <iostream>
std::atomic<bool> flag{false}; int normal_variable = 0;
void writer() { normal_variable = 42; flag.store(true, std::memory_order_relaxed); }
void reader() { while (!flag.load(std::memory_order_relaxed)) { std::this_thread::yield(); } std::cout << normal_variable << std::endl; }
void correct_writer() { normal_variable = 42; flag.store(true, std::memory_order_release); }
void correct_reader() { while (!flag.load(std::memory_order_acquire)) { std::this_thread::yield(); } std::cout << normal_variable << std::endl; }
|
总结:为什么内存序至关重要
- 现代硬件的现实:CPU和编译器的优化使得简单的代码变得复杂
- 性能与正确性的平衡:不同强度的内存序提供了性能优化的机会
- 可移植性:统一的内存模型确保代码在不同架构上的行为一致
- 可预测性:明确的内存序语义让多线程程序的行为变得可预测
通过理解内存序,我们可以:
- 编写正确的多线程代码
- 在性能和正确性之间找到平衡
- 避免微妙的并发bug
- 充分利用现代硬件的性能
1. atomic_store 和 atomic_store_explicit
基本用法
atomic_store
用于原子地存储一个值到原子变量中。
1 2 3 4 5 6 7 8 9 10 11 12 13
| #include <atomic> #include <thread> #include <iostream>
std::atomic<int> counter{0};
void atomic_store_example() { std::atomic_store(&counter, 42); std::atomic_store_explicit(&counter, 100, std::memory_order_release); }
|
内存序的影响
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| #include <atomic> #include <thread> #include <vector>
std::atomic<bool> ready{false}; std::atomic<int> data{0};
void producer() { data.store(42, std::memory_order_relaxed); ready.store(true, std::memory_order_release); }
void consumer() { while (!ready.load(std::memory_order_acquire)) { std::this_thread::yield(); } int value = data.load(std::memory_order_relaxed); std::cout << "读取到的值: " << value << std::endl; }
|
2. atomic_load 和 atomic_load_explicit
基本用法
atomic_load
用于原子地读取原子变量的值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| #include <atomic> #include <thread> #include <iostream>
std::atomic<int> shared_value{10};
void atomic_load_example() { int value1 = std::atomic_load(&shared_value); int value2 = std::atomic_load_explicit(&shared_value, std::memory_order_acquire); std::cout << "Value1: " << value1 << ", Value2: " << value2 << std::endl; }
|
实际应用:状态检查
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| #include <atomic> #include <thread> #include <chrono> #include <iostream>
class ThreadSafeCounter { private: std::atomic<int> count_{0}; std::atomic<bool> stop_flag_{false};
public: void increment() { while (!stop_flag_.load(std::memory_order_acquire)) { count_.fetch_add(1, std::memory_order_relaxed); std::this_thread::sleep_for(std::chrono::milliseconds(10)); } } void stop() { stop_flag_.store(true, std::memory_order_release); } int get_count() const { return count_.load(std::memory_order_relaxed); } };
|
3. atomic_compare_exchange_strong_explicit 和 atomic_compare_exchange_weak_explicit
基本概念
这是一个比较并交换操作,它原子地比较变量的值与期望值,如果相等则交换为新值。C++提供了两个版本:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| bool atomic_compare_exchange_strong_explicit( atomic<T>* obj, T* expected, T desired, memory_order success, memory_order failure );
bool atomic_compare_exchange_weak_explicit( atomic<T>* obj, T* expected, T desired, memory_order success, memory_order failure );
|
函数行为详解
两个函数都执行相同的基本操作:
- 原子地读取
obj
的当前值
- 比较当前值与
*expected
是否相等
- 如果相等:将
desired
写入 obj
,返回 true
- 如果不相等:将当前值写入
*expected
,返回 false
strong vs weak 的关键区别
strong版本:
- 如果比较相等,保证交换操作成功
- 只有在实际值与期望值不同时才返回
false
- 适合单次尝试的场景
weak版本:
- 如果比较相等,交换可能失败(伪失败,spurious failure)
- 即使实际值与期望值相同,也可能返回
false
- 伪失败通常由硬件原因造成(如Load-Link/Store-Conditional指令被中断)
- 适合循环重试的场景,在某些架构上性能更好
strong版本的基本用法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| #include <atomic> #include <thread> #include <iostream>
std::atomic<int> value{10};
void compare_exchange_strong_example() { int expected = 10; int desired = 20; bool success = std::atomic_compare_exchange_strong_explicit( &value, &expected, desired, std::memory_order_acq_rel, std::memory_order_acquire ); if (success) { std::cout << "交换成功,新值: " << value.load() << std::endl; } else { std::cout << "交换失败,期望值: " << expected << ", 实际值: " << value.load() << std::endl; } }
|
weak版本的基本用法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| #include <atomic> #include <thread> #include <iostream>
std::atomic<int> counter{0};
void compare_exchange_weak_example() { int expected = 0; int desired = 1; while (!std::atomic_compare_exchange_weak_explicit( &counter, &expected, desired, std::memory_order_acq_rel, std::memory_order_acquire)) { if (expected >= desired) { break; } } std::cout << "最终值: " << counter.load() << std::endl; }
|
何时使用weak版本
weak版本主要用于循环中,因为它在某些硬件架构上性能更好:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| #include <atomic> #include <thread> #include <vector>
std::atomic<int> shared_counter{0};
void atomic_increment_weak() { int current = shared_counter.load(std::memory_order_relaxed); while (!shared_counter.compare_exchange_weak( current, current + 1, std::memory_order_acq_rel, std::memory_order_relaxed)) { } }
void atomic_increment_strong() { int current = shared_counter.load(std::memory_order_relaxed); while (!shared_counter.compare_exchange_strong( current, current + 1, std::memory_order_acq_rel, std::memory_order_relaxed)) { } }
|
详细行为分析:value与expected的比较结果
让我们用具体的例子来详细分析两个函数在不同情况下的行为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
| #include <atomic> #include <iostream>
void detailed_behavior_analysis() { std::atomic<int> value{10}; int expected; int desired = 20; bool result; std::cout << "=== 情况分析 ===" << std::endl; std::cout << "初始状态: value = " << value.load() << std::endl; std::cout << "\n--- 情况1: value(10) == expected(10) ---" << std::endl; value.store(10); expected = 10; result = value.compare_exchange_strong(expected, desired); std::cout << "strong版本结果:" << std::endl; std::cout << " 返回值: " << (result ? "true" : "false") << std::endl; std::cout << " value变为: " << value.load() << std::endl; std::cout << " expected变为: " << expected << std::endl; std::cout << " 结论: 比较相等,交换成功,保证返回true" << std::endl; value.store(10); expected = 10; result = value.compare_exchange_weak(expected, desired); std::cout << "\nweak版本结果:" << std::endl; std::cout << " 返回值: " << (result ? "true" : "false") << std::endl; std::cout << " value变为: " << value.load() << std::endl; std::cout << " expected变为: " << expected << std::endl; if (result) { std::cout << " 结论: 比较相等,交换成功" << std::endl; } else { if (expected == 10) { std::cout << " 结论: 比较相等但发生伪失败,value未改变,expected未改变" << std::endl; } } std::cout << "\n--- 情况2: value(15) != expected(10) ---" << std::endl; value.store(15); expected = 10; result = value.compare_exchange_strong(expected, desired); std::cout << "strong版本结果:" << std::endl; std::cout << " 返回值: " << (result ? "true" : "false") << std::endl; std::cout << " value保持: " << value.load() << std::endl; std::cout << " expected变为: " << expected << std::endl; std::cout << " 结论: 比较不相等,不交换,expected被更新为实际值" << std::endl; value.store(15); expected = 10; result = value.compare_exchange_weak(expected, desired); std::cout << "\nweak版本结果:" << std::endl; std::cout << " 返回值: " << (result ? "true" : "false") << std::endl; std::cout << " value保持: " << value.load() << std::endl; std::cout << " expected变为: " << expected << std::endl; std::cout << " 结论: 比较不相等,不交换,expected被更新为实际值(与strong相同)" << std::endl; }
|
完整的行为对比表
场景 |
函数版本 |
value值 |
expected值(调用前) |
返回值 |
value值(调用后) |
expected值(调用后) |
说明 |
相等情况 |
strong |
10 |
10 |
true |
20 |
10 |
保证交换成功 |
相等情况 |
weak |
10 |
10 |
true或false |
20或10 |
10 |
可能成功或伪失败 |
不等情况 |
strong |
15 |
10 |
false |
15 |
15 |
不交换,更新expected |
不等情况 |
weak |
15 |
10 |
false |
15 |
15 |
不交换,更新expected |
伪失败的检测方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| void detect_spurious_failure() { std::atomic<int> value{10}; int expected = 10; int desired = 20; int value_before = value.load(); int expected_before = expected; bool result = value.compare_exchange_weak(expected, desired); if (!result) { if (expected == expected_before) { std::cout << "发生伪失败!" << std::endl; std::cout << " value调用前: " << value_before << std::endl; std::cout << " expected调用前: " << expected_before << std::endl; std::cout << " value调用后: " << value.load() << std::endl; std::cout << " expected调用后: " << expected << std::endl; std::cout << " 结论: 值确实相等,但交换失败,expected未被修改" << std::endl; } else { std::cout << "真失败:值不匹配" << std::endl; std::cout << " 期望值: " << expected_before << std::endl; std::cout << " 实际值: " << expected << std::endl; } } else { std::cout << "交换成功" << std::endl; } }
|
实际应用中的处理模式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| bool try_once_with_strong(std::atomic<int>& counter, int expected_val, int new_val) { int expected = expected_val; if (counter.compare_exchange_strong(expected, new_val)) { std::cout << "成功将 " << expected_val << " 改为 " << new_val << std::endl; return true; } else { std::cout << "失败:期望 " << expected_val << ",实际是 " << expected << std::endl; return false; } }
void retry_with_weak(std::atomic<int>& counter, int increment) { int current = counter.load(); while (!counter.compare_exchange_weak(current, current + increment)) { } std::cout << "成功增加 " << increment << ",最终值: " << counter.load() << std::endl; }
|
性能对比和选择建议
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
| #include <atomic> #include <chrono> #include <thread> #include <vector> #include <iostream>
class PerformanceTest { private: std::atomic<long long> counter_{0}; public: void test_weak(int iterations) { for (int i = 0; i < iterations; ++i) { long long current = counter_.load(std::memory_order_relaxed); while (!counter_.compare_exchange_weak( current, current + 1, std::memory_order_relaxed)) { } } } void test_strong(int iterations) { for (int i = 0; i < iterations; ++i) { long long current = counter_.load(std::memory_order_relaxed); while (!counter_.compare_exchange_strong( current, current + 1, std::memory_order_relaxed)) { } } } void reset() { counter_.store(0, std::memory_order_relaxed); } long long get_value() const { return counter_.load(std::memory_order_relaxed); } };
void performance_comparison() { PerformanceTest test; const int iterations = 100000; const int num_threads = 4; test.reset(); auto start = std::chrono::high_resolution_clock::now(); std::vector<std::thread> threads; for (int i = 0; i < num_threads; ++i) { threads.emplace_back([&test, iterations]() { test.test_weak(iterations); }); } for (auto& t : threads) { t.join(); } auto end = std::chrono::high_resolution_clock::now(); auto weak_duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start); std::cout << "Weak版本耗时: " << weak_duration.count() << " 微秒" << std::endl; std::cout << "Weak版本最终值: " << test.get_value() << std::endl; test.reset(); threads.clear(); start = std::chrono::high_resolution_clock::now(); for (int i = 0; i < num_threads; ++i) { threads.emplace_back([&test, iterations]() { test.test_strong(iterations); }); } for (auto& t : threads) { t.join(); } end = std::chrono::high_resolution_clock::now(); auto strong_duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start); std::cout << "Strong版本耗时: " << strong_duration.count() << " 微秒" << std::endl; std::cout << "Strong版本最终值: " << test.get_value() << std::endl; }
|
实际应用:无锁栈的两种实现
使用strong版本的无锁栈
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| #include <atomic> #include <memory>
template<typename T> class LockFreeStackStrong { private: struct Node { T data; std::shared_ptr<Node> next; Node(T const& data_) : data(data_) {} }; std::atomic<std::shared_ptr<Node>> head_;
public: void push(T const& data) { auto new_node = std::make_shared<Node>(data); new_node->next = head_.load(); while (!head_.compare_exchange_strong( new_node->next, new_node, std::memory_order_release, std::memory_order_relaxed)) { } } std::shared_ptr<T> pop() { auto old_head = head_.load(); while (old_head && !head_.compare_exchange_strong( old_head, old_head->next, std::memory_order_acquire, std::memory_order_relaxed)) { } return old_head ? std::make_shared<T>(old_head->data) : std::shared_ptr<T>(); } };
|
使用weak版本的无锁栈(推荐)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| template<typename T> class LockFreeStackWeak { private: struct Node { T data; std::shared_ptr<Node> next; Node(T const& data_) : data(data_) {} }; std::atomic<std::shared_ptr<Node>> head_;
public: void push(T const& data) { auto new_node = std::make_shared<Node>(data); new_node->next = head_.load(); while (!head_.compare_exchange_weak( new_node->next, new_node, std::memory_order_release, std::memory_order_relaxed)) { } } std::shared_ptr<T> pop() { auto old_head = head_.load(); while (old_head && !head_.compare_exchange_weak( old_head, old_head->next, std::memory_order_acquire, std::memory_order_relaxed)) { } return old_head ? std::make_shared<T>(old_head->data) : std::shared_ptr<T>(); } };
|
选择strong还是weak的决策指南
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
|
class DecisionGuide { public: bool try_once_operation() { std::atomic<int> value{10}; int expected = 10; return value.compare_exchange_strong( expected, 20, std::memory_order_acq_rel, std::memory_order_acquire); } void loop_until_success() { std::atomic<int> counter{0}; int current = counter.load(); while (!counter.compare_exchange_weak( current, current + 1, std::memory_order_relaxed)) { } } bool complex_failure_handling() { std::atomic<int> state{0}; int expected = 0; if (!state.compare_exchange_strong( expected, 1, std::memory_order_acq_rel, std::memory_order_acquire)) { if (expected == 2) { return handle_special_case(); } return false; } return true; } private: bool handle_special_case() { return false; } };
|
硬件架构的影响
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| class ArchitectureComparison { public: void arm_optimized_increment(std::atomic<int>& counter) { int current = counter.load(std::memory_order_relaxed); while (!counter.compare_exchange_weak( current, current + 1, std::memory_order_relaxed)) { } } void x86_increment(std::atomic<int>& counter) { int current = counter.load(std::memory_order_relaxed); while (!counter.compare_exchange_strong( current, current + 1, std::memory_order_relaxed)) { } } };
|
4. atomic_fetch_sub_explicit
基本用法
atomic_fetch_sub_explicit
原子地从变量中减去一个值,并返回操作前的值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| #include <atomic> #include <thread> #include <vector> #include <iostream>
std::atomic<int> counter{1000};
void fetch_sub_example() { int old_value = std::atomic_fetch_sub_explicit( &counter, 5, std::memory_order_acq_rel ); std::cout << "操作前的值: " << old_value << std::endl; std::cout << "操作后的值: " << counter.load() << std::endl; }
|
实际应用:资源计数器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| #include <atomic> #include <thread> #include <vector> #include <chrono> #include <iostream>
class ResourcePool { private: std::atomic<int> available_resources_{100}; public: bool acquire_resource() { int current = available_resources_.load(std::memory_order_relaxed); while (current > 0) { if (available_resources_.compare_exchange_strong( current, current - 1, std::memory_order_acq_rel, std::memory_order_relaxed)) { return true; } } return false; } void release_resource() { available_resources_.fetch_add(1, std::memory_order_acq_rel); } int get_available_count() const { return available_resources_.load(std::memory_order_relaxed); } };
void worker(ResourcePool& pool, int worker_id) { for (int i = 0; i < 10; ++i) { if (pool.acquire_resource()) { std::cout << "Worker " << worker_id << " 获取了资源" << std::endl; std::this_thread::sleep_for(std::chrono::milliseconds(100)); pool.release_resource(); std::cout << "Worker " << worker_id << " 释放了资源" << std::endl; } else { std::cout << "Worker " << worker_id << " 无法获取资源" << std::endl; } std::this_thread::sleep_for(std::chrono::milliseconds(50)); } }
|
综合应用:线程安全的引用计数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| #include <atomic> #include <memory> #include <iostream>
template<typename T> class AtomicSharedPtr { private: struct ControlBlock { T* ptr; std::atomic<int> ref_count; ControlBlock(T* p) : ptr(p), ref_count(1) {} }; ControlBlock* control_block_;
public: explicit AtomicSharedPtr(T* ptr = nullptr) : control_block_(ptr ? new ControlBlock(ptr) : nullptr) {} AtomicSharedPtr(const AtomicSharedPtr& other) : control_block_(other.control_block_) { if (control_block_) { control_block_->ref_count.fetch_add(1, std::memory_order_relaxed); } } AtomicSharedPtr& operator=(const AtomicSharedPtr& other) { if (this != &other) { reset(); control_block_ = other.control_block_; if (control_block_) { control_block_->ref_count.fetch_add(1, std::memory_order_relaxed); } } return *this; } ~AtomicSharedPtr() { reset(); } void reset() { if (control_block_) { int old_count = control_block_->ref_count.fetch_sub(1, std::memory_order_acq_rel); if (old_count == 1) { delete control_block_->ptr; delete control_block_; } control_block_ = nullptr; } } T* get() const { return control_block_ ? control_block_->ptr : nullptr; } int use_count() const { return control_block_ ? control_block_->ref_count.load(std::memory_order_relaxed) : 0; } };
|
性能考虑和最佳实践
1. 选择合适的内存序
1 2 3 4 5 6 7 8
| std::atomic<int> counter{0}; counter.fetch_add(1, std::memory_order_relaxed);
std::atomic<bool> ready{false}; ready.store(true, std::memory_order_release); while (!ready.load(std::memory_order_acquire));
|
2. 避免伪共享
1 2 3 4 5 6 7 8 9 10 11 12
| struct BadCounters { std::atomic<int> counter1; std::atomic<int> counter2; };
struct alignas(64) GoodCounters { std::atomic<int> counter1; char padding[60]; std::atomic<int> counter2; };
|
3. 使用合适的原子类型
1 2 3 4 5 6 7 8
| std::atomic<bool> stop_flag{false};
std::atomic<size_t> request_count{0};
std::atomic<Node*> head{nullptr};
|
总结
原子操作是多线程编程中的重要工具,正确使用这些函数可以帮助我们:
- 避免数据竞争:原子操作保证操作的不可分割性
- 提供同步机制:通过内存序控制操作的可见性顺序
- 实现无锁数据结构:提高程序的并发性能
- 简化线程间通信:减少对互斥锁的依赖
记住以下关键点:
- 选择合适的内存序以平衡性能和正确性
- 理解每个操作的语义和返回值,特别是compare_exchange的expected参数行为
- 在循环中优先使用weak版本,单次操作使用strong版本
- 根据目标硬件架构选择:ARM等LL/SC架构上weak版本性能更好
- 在设计无锁算法时要特别小心ABA问题
- 考虑使用更高级的同步原语(如
std::atomic<std::shared_ptr<T>>
)
compare_exchange使用建议总结
场景 |
推荐版本 |
原因 |
循环重试 |
weak |
性能更好,伪失败无影响 |
单次尝试 |
strong |
避免不必要的伪失败 |
复杂失败处理 |
strong |
需要明确的失败原因 |
ARM/PowerPC等架构 |
weak |
直接映射LL/SC指令 |
x86架构 |
两者皆可 |
性能差异很小 |
通过掌握这些原子操作函数,特别是compare_exchange的strong和weak版本的区别,你将能够编写出更加高效和安全的多线程程序。