# C++ 并行编程

# Refernece

https://nj.gitbooks.io/c/content/content/chapter5/chapter5-chinese.html

https://mq-b.github.io/ModernCpp-ConcurrentProgramming-Tutorial/md/05 内存模型与原子操作.html

# 使用线程

# Hello world

用 thread 来运行 hello world

p
#include <iostream>
#include <thread>
void hello() {
    std::cout << "Hello" << std::endl;
}
int main() {
    std::thread t{ hello };
    t.join();
}

t.join(); 等待线程对象 t 关联的线程执行完毕,否则将一直阻塞。这里的调用是必须的,否则 std::thread 的析构函数将调用 std::terminate() 无法正确析构。

# 多线程求和

当前环境支持并发线程数

p
unsigned int n = std::thread::hardware_concurrency();

可以查询硬件支持的并发数量.

p
#include <iostream>
#include <thread>
#include <vector>
#include <numeric>
template<typename ForwardIt>
auto sum(ForwardIt first, ForwardIt last) {
    using value_type = std::iter_value_t<ForwardIt>;
    std::size_t n = std::thread::hardware_concurrency();
    std::ptrdiff_t total = std::distance(first, last);
    std::vector<value_type> sums(n);
    std::vector<std::thread> threads;
    std::size_t chunk = total / n;
    std::size_t remainder = total % n;
    auto start = first;
    for (int i = 0;i < n; ++i) {
        auto end = std::next(start, chunk + (i<remainder?1:0));
        threads.emplace_back([start, end, &sums, i](){
            sums[i] = std::accumulate(start, end, value_type{});
        });
        start = end;
    }
    for (auto& thread : threads) {
        thread.join();
    }
    return std::accumulate(sums.begin(), sums.end(), value_type{});
}
int main() {
    std::vector<int> x = {1,2,3,4,5,6,7,8,9,10};
    std::cout << sum(x.begin(), x.end()) << std::endl;
}

# 线程管理

我们上一节的示例是传递了一个函数给 std::thread 对象,函数会在新线程中执行。 std::thread 支持的形式还有很多,只要是可调用 (Callable) 对象即可,比如重载了 operator() 的类对象(也可以直接叫函数对象)。

class Task{
public:
    void operator()()const {
        std::cout << "operator()()const\n";
    }
};

# detach

p
#include <iostream>
#include <thread>
struct func {
    int& m_i;
    func(int& i) :m_i{ i } {}
    void operator()(int n)const {
        for (int i = 0; i <= n; ++i) {
            m_i += i;           // 可能悬空引用
        }
    }
};
int main(){
    int n = 0;
    std::thread my_thread{ func{n},100 };
    my_thread.detach();        // 分离,不等待线程结束
}                              // 分离的线程可能还在运行

# RAII

构造函数申请资源,析构函数释放资源,让对象的生命周期和资源绑定

当异常抛出时,C++ 会自动调用对象的析构函数。

p
class thread_guard{
    std::thread& m_t;
public:
    explicit thread_guard(std::thread& t) :m_t{ t } {}
    ~thread_guard(){
        std::puts("析构");     // 打印日志 不用在乎
        if (m_t.joinable()) { // 线程对象当前关联了活跃线程
            m_t.join();
        }
    }
    thread_guard(const thread_guard&) = delete;
    thread_guard& operator=(const thread_guard&) = delete;
};
void f(){
    int n = 0;
    std::thread t{ func{n},10 };
    thread_guard g(t);
    f2(); // 可能抛出异常
}

调用析构函数即使函数 f2 () 抛出了一个异常,这个销毁依然会发生(前提是你捕获了这个异常) 如果异常被抛出但未被捕获那么就会调用 std::terminate

我们要判断 std::thread 线程对象现在是否有关联的活跃线程,如果有,我们才会执行 join() ,阻塞当前线程直到线程对象关联的线程执行完毕。

# 传递参数

向可调用对象传递参数很简单,我们前面也都写了,只需要将这些参数作为 std::thread 的构造参数即可。需要注意的是,这些参数会复制到新线程的内存空间中,即使函数中的参数是引用,依然实际是复制

p
void f(int, const int& a) { //a 并非引用了局部对象 n
    std::cout << &a << '\n'; 
}
int main() {
    int n = 1;
    std::cout << &n << '\n';
    std::thread t{ f, 3, n };
    t.join();
}

想要解决这个问题很简单,我们可以使用标准库的设施 std::ref std::cref 函数模板。

p
void f(int, int& a) {
    std::cout << &a << '\n'; 
}
int main() {
    int n = 1;
    std::cout << &n << '\n';
    std::thread t { f, 3, std::ref(n) };
    t.join();
}

# std::this_thread

这个命名空间包含了管理当前线程的函数。

  1. yield 建议实现重新调度各执行线程。
  2. get_id 返回当前线程 id。
  3. sleep_for 使当前线程停止执行指定时间。
  4. sleep_until 使当前线程执行停止到指定的时间点
p
int main() {
    std::this_thread::sleep_for(std::chrono::seconds(3));
}

yield 减少 CPU 的占用。

p
while (!isDone()){
    std::this_thread::yield();
}

# C++20 std::jthread

std::jthread 相比于 C++11 引入的 std::thread ,只是多了两个功能:

  1. RAII 管理:在析构时自动调用 join()
  2. 线程停止功能:线程的取消 / 停止。

这就是 C++ 的设计哲学,零开销原则你不需要为你没有用到的(特性)付出额外的开销

std::jthread 所谓的线程停止只是一种基于用户代码的控制机制,而不是一种与操作系统系统有关系的线程终止。使用 std::stop_source std::stop_token 提供了一种优雅地请求线程停止的方式,但实际上停止的决定和实现都由用户代码来完成

p
using namespace std::literals::chrono_literals;
void f(std::stop_token stop_token, int value){
    while (!stop_token.stop_requested()){ // 检查是否已经收到停止请求
        std::cout << value++ << ' ' << std::flush;
        std::this_thread::sleep_for(200ms);
    }
    std::cout << std::endl;
}
int main(){
    std::jthread thread{ f, 1 }; // 打印 1..15 大约 3 秒
    std::this_thread::sleep_for(3s);
    //jthread 的析构函数调用 request_stop () 和 join ()。
}

# 共享数据

在多线程的情况下,每个线程都抢着完成自己的任务。在大多数情况下,即使会改变执行顺序,也是良性竞争,这是无所谓的。比如两个线程都要往标准输出输出一段字符,谁先谁后并不会有什么太大影响。

只有在涉及多线程读写相同共享数据的时候,才会导致 “恶性的条件竞争”。

# 使用互斥量

互斥量(Mutex),又常被称为互斥锁、互斥体(或者直接被称作 “锁”),是一种用来保护临界区 [1] 的特殊对象,其相当于实现了一个公共的 “标志位”。它可以处于锁定(locked)状态,也可以处于解锁(unlocked)状态:

p
#include <mutex> // 必要标头
std::mutex m;
void f() {
    m.lock();
    std::cout << std::this_thread::get_id() << '\n';
    m.unlock();
}
int main() {
    std::vector<std::thread>threads;
    for (std::size_t i = 0; i < 10; ++i)
        threads.emplace_back(f);
    for (auto& thread : threads)
        thread.join();
}

# std::lock_guard

不过一般不推荐这样显式的 lock()unlock() ,应该用这个

p
void f() {
    std::lock_guard<std::mutex> lc{ m };
    std::cout << std::this_thread::get_id() << '\n';
}

这是一个 “管理类” 模板,用来管理互斥量的上锁与解锁,

这段代码极其简单,首先管理类,自然不可移动不可复制,我们定义复制构造与复制赋值为弃置函数,同时阻止了移动等函数的隐式定义。

构造函数中初始化这个引用,同时上锁,析构函数中解锁,这是一个非常典型的 RAII 式的管理。

同时它还提供一个有额外 std::adopt_lock_t 参数的构造函数 ,如果使用这个构造函数,则构造函数不会上锁。

p
void f(){
    //code..
    {
        std::lock_guard<std::mutex> lc{ m };
        // 涉及共享资源的修改的代码...
    }
    //code..
}

# try_lock

try_lock 是互斥量中的一种尝试上锁的方式。与常规的 lock 不同, try_lock 会尝试上锁,但如果锁已经被其他线程占用,则不会阻塞当前线程,而是立即返回

它的返回类型是 bool ,如果上锁成功就返回 true ,失败就返回 false

# 保护共享数据注意

互斥量主要也就是为了保护共享数据,上一节的使用互斥量也已经为各位展示了一些。

然而使用互斥量来保护共享数据也并不是在函数中加上一个 std::lock_guard 就万事大吉了。有的时候只需要一个指针或者引用,就能让这种保护形同虚设

  • 简而言之:切勿将受保护数据的指针或引用传递到互斥量作用域之外,不然保护将形同虚设

# 死锁:问题与解决

两个线程需要对它们所有的互斥量做一些操作,其中每个线程都有一个互斥量,且等待另一个线程的互斥量解锁。因为它们都在等待对方释放互斥量,没有线程工作。 这种情况就是死锁。

  • 多个互斥量才可能遇到死锁问题

避免死锁的一般建议是让两个互斥量以相同的顺序上锁,总在互斥量 B 之前锁住互斥量 A,就通常不会死锁。反面示例

# std::lock

C++ 标准库有很多办法解决这个问题,可以使用 std::lock ,它能一次性锁住多个互斥量,并且没有死锁风险。修改 swap 代码后如下:

p
void swap(X& lhs, X& rhs) {
    if (&lhs == &rhs) return;
    std::lock(lhs.m, rhs.m);    // 给两个互斥量上锁
    std::lock_guard<std::mutex> lock1{ lhs.m,std::adopt_lock }; 
    std::lock_guard<std::mutex> lock2{ rhs.m,std::adopt_lock }; 
    swap(lhs.object, rhs.object);
}

# std::scoped_lock

C++17 新增了 std::scoped_lock ,提供此函数的 RAII 包装,通常它比裸调用 std::lock 更好。

p
void swap(X& lhs, X& rhs) {
    if (&lhs == &rhs) return;
    std::scoped_lock guard{ lhs.m,rhs.m };
    swap(lhs.object, rhs.object);
}

# std::unique_lock 灵活的锁

std::unique_lock 是 C++11 引入的一种通用互斥包装器,它相比于 std::lock_guard 更加的灵活。当然,它也更加的复杂,尤其它还可以与我们下一章要讲的条件变量一起使用。使用它可以将之前使用 std::lock_guardswap 改写一下:

p
void swap(X& lhs, X& rhs) {
    if (&lhs == &rhs) return;
    std::unique_lock<std::mutex> lock1{ lhs.m, std::defer_lock };
    std::unique_lock<std::mutex> lock2{ rhs.m, std::defer_lock };
    std::lock(lock1, lock2);
    swap(lhs.object, rhs.object);
}

_Ownsfalse 表示没有互斥量所有权。并且 std::unique_lock 是有 lock() try_lock() unlock() 成员函数的,所以可以直接传递给 std::lock 、 进行调用。

# std::defer_lock

我们要了解 std::defer_lock 是 “不获得互斥体的所有权”。没有所有权自然构造函数就不会上锁

构造函数不上锁,要求构造之后上锁

# std::adopt_lock

只是不上锁,但是有所有权

构造函数不上锁,要求在构造之前互斥量上锁

# 使用和优点

  • RAII 语义,自动解锁, 也可以手动解锁
  • 你可以声明时不立刻加锁
p
void f() {
    //code..
    
    std::unique_lock<std::mutex> lock{ m };
    // 涉及共享资源的修改的代码...
    lock.unlock(); // 解锁并释放所有权,析构函数不会再 unlock ()
    //code..
}

通常建议优先 std::lock_guard ,当它无法满足你的需求或者显得代码非常繁琐,那么可以考虑使用 std::unique_lock

如果性能非常敏感(极低开销场景), std::lock_guard 更轻量。

如果只需要简单加锁解锁, std::lock_guard 语义更清晰。

# 在不同作用域传递互斥量

首先我们要明白,互斥量满足互斥体 (Mutex) 的要求,不可复制不可移动。所谓的在不同作用域传递互斥量,其实只是传递了它们的指针或者引用罢了。

std::unique_lock 可以获取互斥量的所有权,而互斥量的所有权可以通过移动操作转移给其他的 std::unique_lock 对象。

p
std::unique_lock<std::mutex> get_lock(){
    extern std::mutex some_mutex;
    std::unique_lock<std::mutex> lk{ some_mutex };
    return lk;
}
void process_data(){
    std::unique_lock<std::mutex> lk{ get_lock() };
    // 执行一些任务...
}

我相信你可能对 extern std::mutex some_mutex 有疑问,其实不用感到奇怪,这是一个互斥量的声明,可能别的翻译单元(或 dll 等)有它的定义,成功链接上。

# 保护共享数据的初始化过程

保护共享数据并非必须使用互斥量,互斥量只是其中一种常见的方式而已,对于一些特殊的场景,也有专门的保护方式,比如对于共享数据的初始化过程的保护。我们通常就不会用互斥量,这会造成很多的额外开销

# std::call_once()

比起锁住互斥量并显式检查指针,每个线程只需要使用 std::call_once 就可以。使用 std::call_once 比显式使用互斥量消耗的资源更少,特别是当初始化完成之后

p
std::shared_ptr<some> ptr;
std::once_flag resource_flag;
void init_resource(){
    ptr.reset(new some);
}
void foo(){
    std::call_once(resource_flag, init_resource); // 线程安全的一次初始化
    ptr->do_something();
}

以上代码 std::once_flag 对象是全局命名空间作用域声明,如果你有需要,它也可以是类的成员。用于搭配 std::call_once 使用,保证线程安全的一次初始化。

# 静态局部变量初始化在 C++11 是线程安全

p
class my_class;
inline my_class& get_my_class_instance(){
    static my_class instance;  // 线程安全的初始化过程 初始化严格发生一次
    return instance;
}

# 保护不常更新的数据结构

你有一个数据结构存储了用户的设置信息,每次用户打开程序的时候,都要进行读取,且运行时很多地方都依赖这个数据结构需要读取,所以为了效率,我们使用了多线程读写。这个数据结构很少进行改变,而我们知道,多线程读取,是没有数据竞争的,是安全的,但是有些时候又不可避免的有修改和读取都要工作的时候,所以依然必须得使用互斥量进行保护。

已经想到了:“读写锁

C++ 标准库自然为我们提供了: std::shared_timed_mutex (C14)、 std::shared_mutex (C17)。它们的区别简单来说,前者支持更多的操作方式,后者有更高的性能优势。

# std::shared_mutex

p
class Settings {
private:
    std::map<std::string, std::string> data_;
    mutable std::shared_mutex mutex_; // “M&M 规则”:mutable 与 mutex 一起出现
public:
    void set(const std::string& key, const std::string& value) {
        std::lock_guard<std::shared_mutex> lock{ mutex_ };
        data_[key] = value;
    }
    std::string get(const std::string& key) const {
        std::shared_lock<std::shared_mutex> lock(mutex_);
        auto it = data_.find(key);
        return (it != data_.end()) ? it->second : ""; // 如果没有找到键返回空字符串
    }
};

而那些无需修改数据结构的读线程,可以使用 std::shared_lock 获取访问权,多个线程可以一起读取

# std::shared_timed_mutex

std::shared_timed_mutex 具有 std::shared_mutex 的所有功能,并且额外支持超时功能。所以以上代码可以随意更换这两个互斥量。

# std::recursive_mutex

线程对已经上锁的 std::mutex 再次上锁是错误的,这是未定义行为。然而在某些情况下,一个线程会尝试在释放一个互斥量前多次获取,所以提供了 std::recursive_mutex

std::recursive_mutex 是 C++ 标准库提供的一种互斥量类型,它允许同一线程多次锁定同一个互斥量,而不会造成死锁。只有在解锁与锁定次数相匹配时,互斥量才会真正释放

p
#include <iostream>
#include <thread>
#include <mutex>
std::recursive_mutex mtx;
void recursive_function(int count) {
    // 递归函数,每次递归都会锁定互斥量
    mtx.lock();
    std::cout << "Locked by thread: " << std::this_thread::get_id() << ", count: " << count << std::endl;
    if (count > 0) {
        recursive_function(count - 1); // 递归调用
    }
    mtx.unlock(); // 解锁互斥量
}
int main() {
    std::thread t1(recursive_function, 3);
    std::thread t2(recursive_function, 2);
    t1.join();
    t2.join();
}

同样的,我们也可以使用 std::lock_guardstd::unique_lock 帮我们管理 std::recursive_mutex

p
void recursive_function(int count) {
    std::lock_guard<std::recursive_mutex> lc{ mtx };
    std::cout << "Locked by thread: " << std::this_thread::get_id() << ", count: " << count << std::endl;
    if (count > 0) {
        recursive_function(count - 1);
    }
}

# 线程存储期

线程存储期的对象在线程开始时分配,并在线程结束时释放。每个线程拥有自己独立的对象实例,互不干扰。

# CMake

项目结构

t
your_project/
├── CMakeLists.txt
├── main.cpp
└── include/
    └── myheader.h

构建

l
# 创建构建目录
mkdir build
cd build
# 使用 CMake 生成 Makefile 或项目文件
cmake ..
# 编译项目
cmake --build .