多线程同步问题 - 条件变量和Event

网络是二十一世纪的图标。

本文记录Poco::Condition使用过程中遇到的坑点,并且延伸到Windows Event和C++11中的std::condition_variable相关内容。

Poco::Condition实现

首先让我们看看Poco::Condition中的wait/signal两个操作的代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
template <class Mtx>
void wait(Mtx& mutex) {
ScopedUnlock<Mtx> unlock(mutex, false);
Event event;
{
FastMutex::ScopedLock lock(mutex_);
mutex.unlock();
enqueue(event);
}
event.wait();
}
void Condition::signal() {
FastMutex::ScopedLock lock(mutex_);
if (!waitQueue_.empty())
{
waitQueue_.front()->set();
dequeue();
}
}

可以看出,Poco::Condition底层是基于Poco::Event来实现的,wait操作将Poco::Event对象加入到队列中,并等待其相应置位,signal操作从队列中pop出Poco::Event并置位,wait操作和signal操作通过FastMutex对象来保证对队列的互斥操作。

问题再现

在程序中使用了Poco的Condition类,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Poco::Condition cond;
Poco::Mutex mtx;
void wait() {
mtx.lock();
cond.wait(mtx);
mtx.unlock();
}
void signal() {
cond.signal();
}
void execute() {
std::thread([](){
dosomething();
signal();
});
wait();
}

由于多线程执行顺序的不可预期性,上面的代码存在一定的概率会导致cond.signal操作调用之后,cond.wait才开始执行,导致cond.wait函数一直block。

修改代码

我们按照linux下的条件变量的调用方法,将上述代码做一定的修改:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Poco::Condition cond;
Poco::Mutex mtx;
void lock(){
mtx.lock();
}
void wait() {
cond.wait(mtx);
mtx.unlock();
}
void signal() {
mtx.lock();
cond.signal();
mtx.unlock();
}
void execute() {
lock();
std::thread([](){
dosomething();
signal();
});
wait();
}

要使wait函数能够成功返回的最重要的一点是:要保证先执行wait函数,然后再执行signal函数。这个保证能够确保在cond.signal操作执行时,Poco::Condition的等待队列中一定会存在一个Poco::Event对象。只有队列中存在Poco::Event对象,cond.signal中的置位操作才会执行。

由于多线程执行顺序的不确定性,具体的函数执行顺序可能如下:

wait函数中enqueue(event)执行完毕 → 切换时间片 → signal函数中waitQueue_.front()->set()执行完毕 → wait函数中event.wait()开始执行。

这个时候面临另外一个问题:在Poco::Event对象置位时,该对象可能并没有处于wait状态,那么此时,会不会遗漏该置位请求,导致cond.wait操作卡死?由于Poco::Event在Windows下的实现就是基于Event,所以翻译过来就是:如果Event对象调用setEventPulseEvent时,此时并没有线程WaitForSingleObject等待该Event置位,这个setEventPulseEvent请求会不会遗失掉?事实是不会的,在Event对象被置位后,调用WaitForSingleObject函数时会立即返回的。

Event的置位请求有没有可能丢失?

在《Win32多线程编程》一书中有说到:

要求苏醒的请求并不会被存储起来,可能会遗失掉。如果一个AutoReset event对象调用SetEvent或PulseEvent,而此时并没有线程在等待,这个event会被遗失。如Wait…()函数还没来得及调用就发生了Context Switch,这个时候SetEvent,这个要求苏醒的请求会被遗失,然后调用Wait…()函数线程卡死。

我一直以为事实就是如此,不过代码证明结果并不是这样的,看看下面代码:

1
2
3
4
5
6
7
8
9
#include <Windows.h>
HANDLE ghWriteEvent;
int main()
{
ghWriteEvent = CreateEvent(NULL, TRUE, FALSE, TEXT("WriteEvent"));
SetEvent(ghWriteEvent);
DWORD dwWaitResult = WaitForSingleObject(ghWriteEvent, INFINITE);
return 0;
}

这段代码并不会卡死在WaitForSingleObject函数,该函数会立即返回。因此采用上文中修改后的代码能够解决Poco::Condition存在的问题,虽然不如直接使用Poco::Event优雅。

std::condition_variable的notify请求有没有可能丢失?

我们现在可以确定Event的置位请求不会丢失,那么std::condition_variable的notify请求会不会丢失呢?我们来看一段简单的代码:

1
2
3
4
5
std::condition_variable cv;
std::mutex mtx;
cv.notify_one();
std::unique_lock <std::mutex> lck(mtx);
cv.wait(lck);

执行这段代码会发现,cv.wait操作会block。目前还没有深入了解condition_variable的底层实现,具体原因未知。不过可以找到不少方案保证cv.notify_one操作执行之前,cv.wait操作一定已经执行。譬如增加标志位,标志notify_one操作是否已经执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
std::mutex mtx;
std::condition_variable cv;
bool ready = false;
void do_print_id(int id) {
std::unique_lock <std::mutex> lck(mtx);
while (!ready) cv.wait(lck);
std::cout << "thread " << id << '\n';
}
void go() {
std::unique_lock <std::mutex> lck(mtx);
ready = true;
cv.notify_all();
}
int main() {
std::thread threads[10];
for (int i = 0; i < 10; ++i) threads[i] = std::thread(do_print_id, i);
go();
for (auto & th : threads) th.join();
return 0;
}

《C++并发编程》一书中给出了另外一个例子,使用std::condition_variable的wait操作的第二个版本来解决该问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
std::mutex mut;
std::queue<data_chunk> data_queue;
std::condition_variable data_cond;
void data_preparation_thread()
{
while(more_data_to_prepare())
{
data_chunk const data=prepare_data();
std::lock_guard<std::mutex> lk(mut);
data_queue.push(data);
data_cond.notify_one();
}
}
void data_processing_thread()
{
while(true)
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(
lk,[]{return !data_queue.empty();});
data_chunk data=data_queue.front();
data_queue.pop();
lk.unlock();
process(data);
if(is_last_chunk(data))
break;
}
}

上述代码中,wait()会去检查这些条件(通过调用所提供的lambda函数),当条件满足(lambda函数返回true)时返回。如果条件不满足(lambda函数返回false),wait()函数将解锁互斥量,并且将这个线程(上段提到的处理数据的线程)置于阻塞或等待状态。当准备数据的线程调用notify_one()通知条件变量时,处理数据的线程从睡眠状态中苏醒,重新获取互斥锁,并且对条件再次检查,在条件满足的情况下,从wait()返回并继续持有锁。当条件不满足时,线程将对互斥量解锁,并且重新开始等待。

必须通过额外的辅助条件才能很好地利用条件变量。

总结

对于Poco库的使用还是存在不少坑点,只有踩坑之后才能更好的掌握。同时,对于过去掌握的知识点,一定不能轻信,书本的内容也可能存在错误,有机会自己写点demo,验证所学习的内容。

Event和std::condition_variable对于置位/notify请求的处理是不一样的。这在平时使用的时候需要注意。


本文作者:ZeroJiu
本文链接: http://www.freehacker.cn/foundation/cpp-thread-sync-event-condition_varaiable/
版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC-SA 3.0 CN 许可协议。转载请注明出处!