线程同步

 

线程同步

为什么做同步

多个线程访问共享资源时,必须要一个一个来,特别是对于写操作来说,边读边写也是危险的。

// 功能:两个函数同时数数,对number修改,每个线程数5次,期望最终得到10
#include <iostream>
#include <thread>
#include <unistd.h>

int MAX(5);
int number = 0;

void funcA_num() {
    int cur;
    for (int i = 0; i < MAX; ++i) {
        cur = number;
        cur++;
        usleep(5);
        number = cur;
        std::cout << "Thread A, id = " << std::this_thread::get_id() << " number = " << number << std::endl;
    }
}

void funcB_num() {
    int cur;
    for (int i = 0; i < MAX; ++i) {
        cur = number;
        cur++;
        usleep(3);
        number = cur;
        std::cout << "Thread B, id = " << std::this_thread::get_id() << " number = " << number << std::endl;
    }
}

int main() {
    std::thread t1(funcA_num);
    std::thread t2(funcB_num);
    t1.join();
    t2.join();
}

/*
Thread B, id = 0x16b56f000 number = 1
Thread A, id = 0x16b4e3000 number = 1
Thread B, id = 0x16b56f000 number = 2
Thread A, id = 0x16b4e3000 number = 2
Thread B, id = 0x16b56f000 number = 3
Thread B, id = 0x16b56f000 number = 4
Thread A, id = 0x16b4e3000 number = 3
Thread B, id = 0x16b56f000 number = 5
Thread A, id = 0x16b4e3000 number = 6
Thread A, id = 0x16b4e3000 number = 7
*/ // 为什么不是10?

解释:

img

由于引入了缓存,所以举个例子:

  1. A抢到时间片数了3次,其中1、2都写回到了内存,但是3没来得及写回去,保存在了寄存器当中
  2. 此时B抢到了时间片,他读取到内存中是2,从2开始数,导致错误❌,假设数到了6,但只有5已经存回到了内存
  3. 那么此时A从5开始数。

因此会有重复数数的问题。由此我们必须引入线程同步

同步的方式有四种

  1. 互斥锁
  2. 读写锁
  3. 条件变量
  4. 信号量

线程同步方法1–互斥锁[^1.2.4^](#1)

共享资源也被称为临界资源

如何算临界区:如下图,number作为共享资源,一切==和它有关==以及==和它的有关有关==以及==更深层的有关==的代码都算在临界区内(株连九族)。

imgx

重要函数原型

#include <mutex>
class mutex {
public:
	constexpr mutex() = default;
  mutex(const mutex&) = delete;
  mutex& operator=(const mutex&) = delete;
}

img

使用类来管理资源–lock_guard、unique_lock

img01

img02

类模板 描述 策略
std::lock_guard 严格基于作用域(scope-based)的锁管理类==模板==,构造时是否加锁是可选的(不加锁时假定当前线程已经获得锁的所有权—使用std::adopt_lock策略),析构时自动释放锁,所有权不可转移,==对象生存期内不允许手动加锁和释放锁== std::adopt_lock
std::unique_lock 更加灵活的锁管理类==模板==,构造时是否加锁是可选的,在对象析构时如果持有锁会自动释放锁,所有权可以转移。对象生命期内允许手动加锁和释放锁 std::adopt_lock std::defer_lock std::try_to_lock

样例

// 升级前面的数数的多线程代码
#include <iostream>
#include <thread>
#include <unistd.h>
#include <mutex>

std::mutex lock_;
int MAX(5);
int number = 0;

void funcA_num() {
    int cur;
    for (int i = 0; i < MAX; ++i) {
        {
            std::lock_guard<std::mutex> lg(lock_);
            cur = number;
            cur++;
            number = cur;
            std::cout << "Thread A, id = " << std::this_thread::get_id() << " number = " << number << std::endl;
        }
      	usleep(3); // 加上这个不要让某个线程总是抢到时间片,
      						 // 主要是为了后续打印的结果可以看出来是随机抢时间片的,确实是并行的
    }
}

void funcB_num() {
    int cur;
    for (int i = 0; i < MAX; ++i) {
        {
            std::lock_guard<std::mutex> lg(lock_);
            cur = number;
            cur++;
            number = cur;
            std::cout << "Thread B, id = " << std::this_thread::get_id() << " number = " << number << std::endl;
        }
      	usleep(5);
    }
}

int main() {
    std::thread t1(funcA_num);
    std::thread t2(funcB_num);
    t1.join();
    t2.join();
}

/*
Thread B, id = 0x16fe47000 number = 1
Thread A, id = 0x16fdbb000 number = 2
Thread B, id = 0x16fe47000 number = 3
Thread A, id = 0x16fdbb000 number = 4
Thread B, id = 0x16fe47000 number = 5
Thread B, id = 0x16fe47000 number = 6
Thread A, id = 0x16fdbb000 number = 7
Thread A, id = 0x16fdbb000 number = 8
Thread B, id = 0x16fe47000 number = 9
Thread A, id = 0x16fdbb000 number = 10
*/

// 本例适合用 lock_guard 是因为刚好临界区是一整个代码块,在这个代码块中新建 lock_guard 类
// 退出代码块后自然就调用lock_guard 的析构函数进行解锁

死锁

锁的个数 = 共享资源的个数 != 线程的个数

  1. 重复加锁
  2. A拿着X的钥匙想访问Y,B拿着Y的钥匙想访问X。即A拿着X房间的钥匙但被锁在Y房间,B拿着Y房间的钥匙却被锁在了X房间。

线程同步方法2–读写锁[^4.5.6.7^](#2)

头文件为 shared_mutex。c++14通过 shared_timed_mutex 提供读写锁,c++17通过 shared_mutex 提供读写锁

拥有两个访问级别:

  1. 共享–多个线程能共享同一互斥的所有权
  2. 独占性–仅一个线程能占有互斥

读写锁优点:可以同时读,即读操作是可以并行的。但是写操作还是串行的

当前锁的状态 读锁请求 写锁请求
无锁 可以 可以
读锁 可以 阻塞
写锁 阻塞 阻塞

重要函数原型

class shared_timed_mutex{
  	shared_timed_mutex();
    ~shared_timed_mutex() = default;
    shared_timed_mutex(const shared_timed_mutex&) = delete;
    shared_timed_mutex& operator=(const shared_timed_mutex&) = delete;
}

使用类来管理资源–shared_lock、unique_lock

  1. shared_lock 是==共享==互斥所有权包装器
  2. unique_lock 是==独占==互斥所有权包装器
因此使用 shared_lock 来管理读锁,使用 unique_lock 来管理写锁。

样例

// 5个读线程,3个写线程 操作同一个全局变量
#include <iostream>
#include <thread>
#include <unistd.h>
#include <shared_mutex>
#include <vector>

std::shared_timed_mutex sLock;
int number = 0;
const int MAX(3);

void read_num() {
    int cur;
    for (int i = 0; i < MAX; ++i) {
        {
            std::shared_lock<std::shared_timed_mutex> slk(sLock); // 使用 shared_lock 管理锁,自动解锁并且不阻塞读线程
            std::cout << "Thread read, id = " << std::this_thread::get_id() << " number = " << number << std::endl; // 临界区只有这一行
        }
        usleep(rand()%5);
    }
}

void write_num() {
    for (int i = 0; i < MAX; ++i) {
        {
            std::unique_lock<std::shared_timed_mutex> ulk(sLock); // 使用 unique_lock 管理锁,自动解锁并且阻塞其他读写线程
            number++;
            std::cout << "Thread write, id = " << std::this_thread::get_id() << " number = " << number << std::endl; // 临界区只有这一行
        }
        usleep(rand()%5);
    }
}

int main() {
    std::vector<std::thread> read_t(5);
    std::vector<std::thread> write_t(3);
    for (int i = 0; i < 5; ++i) {
        read_t[i] = std::thread (read_num);
    }
    for (int i = 0; i < 3; ++i) {
        write_t[i] = std::thread (write_num);
    }

    for(int i = 0; i < 5; ++i) {
        read_t[i].join();
    }
    for (int i = 0; i < 3; ++i) {
        write_t[i].join();
    }
  	return 0;
}

/*
Thread read, id = 0x16b4e7000 number = 0
Thread read, id = 0x16b68b000 number = 0
Thread read, id = 0x16b4e7000 number = Thread read, id = 0x16b717000 number = 0
0
Thread read, id = Thread read, id = 0x16b5ff0000x16b573000 number =  number = 00

Thread write, id = 0x16b7a3000 number = 1
Thread write, id = 0x16b8bb000 number = 2
Thread read, id = 0x16b4e7000 number = 2
Thread read, id = Thread read, id = 0x16b573000 number = 20x16b5ff000 number = 2

Thread read, id = 0x16b68b000 number = 2
Thread write, id = 0x16b82f000 number = 3
Thread write, id = 0x16b7a3000 number = 4
Thread write, id = 0x16b82f000 number = 5
Thread write, id = 0x16b8bb000 number = 6
Thread read, id = 0x16b68b000 number = 6
Thread write, id = 0x16b7a3000 number = 7
Thread read, id = 0x16b5ff000 number = 7
Thread read, id = 0x16b573000 number = 7
Thread write, id = 0x16b82f000 number = 8
Thread read, id = 0x16b717000 number = 8
Thread write, id = 0x16b8bb000 number = 9
Thread read, id = 0x16b717000 number = 9
*/

// 从结果可以看出
// read 时输出会有错乱的情况,说明他们是并行的
// write 时的输出必定是独占一行的,说明 write_num 的线程是独占的,会将其他的线程阻塞
// 并且3个写线程每一个对 number 递增3次,最终得到 number = 9

线程同步方法3–条件变量[^4.8.9^](#3)

考虑==单生产者-单消费者==模型

按照上面两个同步技术:

  1. 生产者抢到时间片,写一个双端队列后,解锁。恢复就绪状态
  2. 此时生产者和消费者都处于就绪状态,共同抢时间片,假设消费者抢到了,发现队列有一个数据,然后消费掉。放弃cpu,恢复就绪态
  3. 此后假设一直是消费者抢到了时间片,那么就一直在做无用功(加锁-判断为空-解锁恢复就绪态)

出现此现象的根本原因在于生产者和消费者之间没有通信机制,造成盲目的抢时间片。因此通过条件变量来使它们可以通信。

条件变量是线程的另外一种有效同步机制。这些同步对象为线程提供了==交互==的场所(一个线程给==另外==的一个或者多个线程发送消息),我们指定在条件变量这个地方发生。==一个线程==用于修改这个条件变量使其满足其它线程继续往下执行的条件,==其它线程(可以是多个)==则等待接收条件变量已经发生改变的信号。当条件变量同互斥锁一起使用时,条件变量允许线程以一种无竞争的方式等待任意条件的发生。

重要函数原型

#include <condition_variable>
class condition_variable {
public:
    constexpr condition_variable() noexcept = default;
    condition_variable(const condition_variable&) = delete;
    condition_variable& operator=(const condition_variable&) = delete;
  
  	void notify_one() noexcept;
    void notify_all() noexcept;
  
  	void wait(unique_lock<mutex>& __lk) noexcept;
}

条件变量操作函数

使用类来管理资源–unique_lock

一般只会使用 unique_lock。因为条件变量的锁的控制比较灵活,而 lock_guard 没有提供 lock()、unlok() 等接口,shared_lock 根本不是这个情形下使用的东西。

样例

// 5个生产者和5个消费者共同读写一个双端队列,队列最大长度为10
#include <iostream>
#include <thread>
#include <unistd.h>
#include <mutex>
#include <condition_variable>
#include <deque>
#include <vector>

std::condition_variable cond_empty;
std::condition_variable cond_full;
std::mutex lock_;
std::deque<int> q;
const int MAX_SIZE = 10;

void prod_fun() {
    int number;
    while(1) {
        number = rand() % 10;
        std::unique_lock<std::mutex> ulk(lock_);
        while(q.size() == MAX_SIZE) { // 使用while而不是if的理由见后面的while语句块
            cond_full.wait(ulk); // 拿到锁后若队列是满的,通过条件变量wait方法释放锁,进入休眠状态等待唤醒
        }
        q.push_back(number);
        std::cout << "Thread producer, id: " << std::this_thread::get_id() << " insert " << number << std::endl;
        ulk.unlock(); // 因为需要灵活 unlock,所以只能使用 unique_lock,而不能使用 lock_guard
        cond_empty.notify_all(); // 唤醒消费者,可以开始消费了。另ps:这一行可以和上一行换行,不影响
        sleep(rand()%2);
    }
}

void cons_fun() {
    int number;
    while(1) {
        std::unique_lock<std::mutex> ulk(lock_);
        while (q.empty()) { // 使用while而不能是if:因为被唤醒后可能队列先被先前抢到时间片的消																			// 费者消费完了,此时若不再次判断,直接往下执行,
          									// 就会发现 q 中没有资源而报错
            cond_empty.wait(ulk); // 释放已经锁上的lock_,若释放后其他消费者抢到了,也都会被堵在这
          												// 只有被生产者抢到了,生产了资源,才能够继续往下执行
        }
        number = q[0];
        q.pop_front();
        std::cout << "Thread consumer, id: " << std::this_thread::get_id() << " delete " << number << std::endl;
        ulk.unlock();
        cond_full.notify_all(); // 唤醒所有的生产者可以开始生产了,因为消费了一个,自然就有空位了
        sleep(rand()%2);
    }

}

int main() {
    std::vector<std::thread> producer(5);
    std::vector<std::thread> consumer(3);
    for (int i = 0; i < 5; ++i) {
        producer[i] = std::thread (prod_fun);
        consumer[i] = std::thread (cons_fun);
    }
    for(int i = 0; i < 5; ++i) {
        producer[i].join();
        consumer[i].join();
    }
    return 0;
}

/*
Thread producer, id: 0x16db77000 insert 7
Thread producer, id: 0x16dda7000 insert 8
Thread consumer, id: 0x16dc03000 delete 7
Thread producer, id: 0x16dda7000 insert 2
Thread consumer, id: 0x16dc03000 delete 8
Thread consumer, id: 0x16de33000 delete 2
Thread producer, id: 0x16dc8f000 insert 9
Thread producer, id: 0x16dfd7000 insert 2
Thread consumer, id: 0x16de33000 delete 9
Thread consumer, id: 0x16e063000 delete 2
Thread producer, id: 0x16dfd7000 insert 7
Thread producer, id: 0x16debf000 insert 9
Thread consumer, id: 0x16dd1b000 delete 7
*/

线程同步方法4–信号量[^10.11^](#4)

c++20 才开始支持信号量,先不进行了解了。之后有需要可以看参考资料 10.11 进行完善。

参考资料

  1. C++ 多线程互斥锁(mutex,lock,lock_guard)
  2. C++多线程并发(二)—线程同步之互斥锁
  3. c++11多线程系列–生产者消费者
  4. linux教程by爱编程的大丙
  5. c++ shared_mutex 读写锁
  6. c++多线程—读写锁shared_lock/shared_mutex
  7. C++多线程——unique_lock
  8. C++多线程并发(三)—线程同步之条件变量
  9. c++ 11 多线线程系列———–生产者消费者
  10. C++ 多线程(七):信号量 Semaphore 及 C++ 11 实现
  11. C++ 多线程编程(四) 信号量