儿童白癜风的治疗 http://baidianfeng.39.net/a_yqyy/220603/10939119.html如果想为一个数据结构支持多个读取器,同时防止并发写入,则读/写锁似乎是唯一的方法,但事实并非如此!如果允许数据结构的多个副本存在于内存中,则可以在没有读写锁的情况下实现相同目的。只需要一种删除不再使用的旧副本方法。咱们来用一个基于C++的读写锁示例来看看吧。
使用读写锁
假设有一台具有数十个线程的网络服务器。每个线程将消息广播到数十个连接的客户端。因为新客户端去连接或现有客户端断开连接,因此连接的客户端列表必须更改。我们可以将连接的客户端列表存储在std::vector中,并使用诸如std::shared_mutex的读写锁对其进行保护。
classServer{private:std::shared_mutexm_rwLock;//Read-writelockstd::vectorintm_clients;//Listofconnectedclientspublic:voidbroadcast(constvoid*msg,size_tlen){std::shared_lockstd::shared_mutexshared(m_rwLock);//Sharedlockfor(intfd:m_clients)send(fd,msg,len,0);}voidaddClient(intfd){std::unique_lockstd::shared_mutexexclusive(m_rwLock);//Exclusivelockm_clients.push_back(fd);}...该broadcast函数从已连接的客户端列表中读取,但不会对其进行修改,因此它需要一个读锁(也称为共享锁)。addClient另一方面,需要修改列表,因此它需要写锁定(也称为排他锁定)。现在我们只需要通过允许列表的多个副本同时存在来消除读写锁定。
消除读写锁
首先,我们必须建立一个指向当前列表的原子指针。该指针将保存到已连接客户端的最新列表。
classServer{private:structClientList{std::vectorintclients;};std::atomicClientList*m_currentList;//Themostup-to-datelistpublic:...该broadcast函数将该指针复制到局部变量,然后将该局部变量用于该函数的其余部分。请注意,共享锁已被删除。这减少了对共享内存的修改次数,这对于可伸缩性是更好的。
voidbroadcast(constvoid*msg,size_tlen){ClientList*list=m_currentList.load();//Atomicloadfromm_currentListfor(intfdst-clients)send(fd,msg,len);}该addClient函数的调用频率较低,它会创建列表的新私有副本,修改副本,然后将新副本发布回原子指针。为了简单起见,假设所有的调用addClient都是从单个线程进行的。(如果调用是从多个线程进行的,则需要addClient使用互斥锁或CAS循环进行保护。)
voidaddClient(intfd){ClientList*oldList=m_currentList.load();//Atomicloadfromm_currentListClientList*newList=newClientList{*oldList};//MakeaprivatecopynewList-clients.push_back(fd);//Modifyitm_currentList.store(newList);//Publishthenewcopy//***Note:Mustdosomethingwiththeoldlisthere***}在m_currentList被替换后,其他线程可能仍在使用旧列表。
不过,我们还没有完成。addClient需要对旧列表进行处理。我们无法立即删除旧列表,因为其他线程可能仍在使用它。而且我们不能删除它,因为那样会导致内存泄漏。让我们创建一个新对象MemoryReclaimer,该对象负责在安全的时间点删除旧列表。
classServer{...MemoryReclaimerm_reclaimer;...voidaddClient(intfd){ClientList*oldList=m_currentList.load();//Atomicloadfromm_currentListClientList*newList=newClientList{*oldList};//MakeaprivatecopynewList-clients.push_back(fd);//Modifyitm_currentList.store(newList);//Publishthenewcopym_reclaimer.addCallback([=](){deleteoldList});}...如果用Java,就不需要引入MemoryReclaimer类。我们可以停止引用旧列表,而Java的垃圾回收器最终将其删除。但这是C++,因此我们必须明确清理那些旧列表。
我们MemoryReclaimer通过将回调传递给来通知要删除的对象addCallback。MemoryReclaimer所有线程从旧对象读取完成后,必须在某个时间调用此回调。它还必须确保所有这些线程都不会再次访问旧对象。这就是我们实现两个目标的一种方法。
基于静态状态的回收
我将在这里描述的方法称之位基于静态状态回收,或简称QSBR。该想法是确定每个线程中的静态状态。静态状态有点类似于临界区。这是线程执行中位于该线程执行的所有相关关键部分之外的某些点。例如,broadcast即使不再显式锁定,我们的函数仍包含一个关键部分,因为至关重要的是不要在函数返回之前删除列表。因此,静态至少应处于broadcast函数外部的某个位置。
无论我们选择放置静态状态如何,都必须将其通知给MemoryReclaimer对象。在我们的例子中,我们需要线程来调用onQuiescentState。至少,在调用给定的回调之前,MemoryReclaimer应当等待所有参与线程都onQuiescentState首先调用。一旦满足该条件,就可以保证,如果任何先前的关键部分使用了旧对象,这些关键部分都将结束。
onQuiescentState在每个线程中找到其合适的位置后,它的调用频率要比broadcast函数少得多,否则我们首先会否定消除读写锁定的好处。例如,可以在对进行固定次数的调用后broadcast,或在固定的时间量(以先到者为准)后调用。如果是一个游戏引擎,则可以在主循环的每次迭代或某些其他粗粒度的工作单元上调用它。
间隔
一个简单的实现MemoryReclaimer可以按如下方式工作。代替单独处理每个回调,我们可以引入interval的概念,并按interval将回调分组在一起。一旦调用onQuiescentState了每个线程,就认为当前间隔结束,而新间隔被认为开始。在每个间隔结束时,我们知道调用上一个间隔中添加的所有回调是安全的,因为onQuiescentState自上一个间隔结束以来,每个参与线程都已调用。
这是这种的快速实现MemoryReclaimer。它使用bool向量来跟踪onQuiescentState在当前时间间隔内调用了哪些线程,以及尚未调用的线程。系统中的每个参与线程必须registerThread事先调用。
classMemoryReclaimer{private:std::mutexm_mutex;std::vectorboolm_threadWasQuiescent;std::vectorstd:unctionvoid()m_currentIntervalCallbacks;std::vectorstd:unctionvoid()m_previousIntervalCallbacks;size_tm_numRemaining=0;public:typedefsize_tThreadIndex;ThreadIndexregisterThread(){std::lock_guardstd::mutexguard(m_mutex);ThreadIndexid=m_threadWasQuiescent.size();m_threadWasQuiescent.push_back(false);m_numRemaining++;returnid;}voidaddCallback(conststd:unctionvoid()callback){std::lock_guardstd::mutexguard(m_mutex);m_currentIntervalCallbacks.push_back(callback);}voidonQuiescentState(ThreadIndexid){std::lock_guardstd::mutexguard(m_mutex);if(!m_threadWasQuiescent[id]){m_threadWasQuiescent[id]=true;m_numRemaining--;if(m_numRemaining==0){//Endofinterval.Invokeallcallbacksfromthepreviousinterval.for(constautocallback:m_previousIntervalCallbacks){callback();}//Movecurrentcallbackstopreviousinterval.m_previousIntervalCallbacks=std::move(m_currentIntervalCallbacks);m_currentIntervalCallbacks.clear();//Resetallthreadstatuses.for(size_ti=0;im_threadWasQuiescent.size();i++){m_threadWasQuiescent=false;}m_numRemaining=m_threadWasQuiescent.size();}}}};正常情况下,不仅可以MemoryReclaimer保证前面的关键部分都已结束,还可以确保没有线程将再次使用旧对象。再次考虑服务器的addClient功能。该函数将进行修改m_currentList,并不一定立即对其他线程可见,然后调用addCallback。addCallback锁定互斥锁,然后将其解锁。根据C++标准解锁将与同一个互斥锁的每个后续锁同步,在我们的case中,该互斥锁包括onQuiescentState从其他线程进行的调用。结果,新值ofm_currentList在onQuiescentState被调用时将自动变为对其他线程可见。
这仅仅是MemoryReclaimer基于QSBR的一种实现。或许可能有实现效果更好的版本,如果您知道更好的方式,请在评论区留言告诉我,期待与您的交流。