Main Page | Namespace List | Class Hierarchy | Class List | Directories | File List | Namespace Members | Class Members | File Members

ReadersWritersLock.C

Go to the documentation of this file.
00001 // Copyright (C) 2001, Compaq Computer Corporation
00002 // 
00003 // This file is part of Vesta.
00004 // 
00005 // Vesta is free software; you can redistribute it and/or
00006 // modify it under the terms of the GNU Lesser General Public
00007 // License as published by the Free Software Foundation; either
00008 // version 2.1 of the License, or (at your option) any later version.
00009 // 
00010 // Vesta is distributed in the hope that it will be useful,
00011 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00012 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00013 // Lesser General Public License for more details.
00014 // 
00015 // You should have received a copy of the GNU Lesser General Public
00016 // License along with Vesta; if not, write to the Free Software
00017 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00018 
00019 //
00020 // ReadersWritersLock.C
00021 //
00022 // Readers/writer locks
00023 //
00024 
00025 #include "ReadersWritersLock.H"
00026 #include "Basics.H"
00027 #include "lock_timing.H"
00028 
00029 #include <assert.h>
00030 #include <sys/types.h>
00031 #include <unistd.h>
00032 
00033 #ifndef USE_PTHREAD_RWLOCK
00034 struct RWLock_Queue_Item
00035 {
00036   // The next and previous queue elements.
00037   RWLock_Queue_Item *next, *prev;
00038 
00039   // Is this for a waiting writer?
00040   bool writer;
00041 
00042   // The number of waiting threads (only used by readers).
00043   unsigned int waiting_count;
00044 
00045   // Conditional variable signalled when it's this items turn.
00046   Basics::cond my_turn;
00047 
00048 #if !defined(NO_ACQUIRE_DEBUG_DATA)
00049   // Debug info: enqueue_time and enqueue_thread are set when this
00050   // object is created (and placed in the waiting queue for a
00051   // ReadersWritersLock).
00052   time_t enqueue_time;
00053   pthread_t enqueue_thread;
00054 #endif
00055 
00056   RWLock_Queue_Item(bool write, RWLock_Queue_Item *&tail)
00057     : writer(write), waiting_count(0),
00058       next(0), prev(tail)
00059   {
00060     // We're the tail now.
00061     tail = this;
00062 
00063     // If there was a previous entry, we're it's next entry.
00064     if(prev)
00065       {
00066         assert(prev->next == 0);
00067         prev->next = this;
00068       }
00069 
00070 #if !defined(NO_ACQUIRE_DEBUG_DATA)
00071     // Remember when this object was enqueued.  (This is sometimes
00072     // useful for post-motrem debugging.)
00073     enqueue_time = time(NULL);
00074     enqueue_thread = pthread_self();
00075 #endif
00076   }
00077 
00078   ~RWLock_Queue_Item()
00079   {
00080     // A queue item should only ever be deleted once it has reached
00081     // the head.
00082     assert(prev == 0);
00083 
00084     // Maintain list integrity.
00085     if(next)
00086       {
00087         assert(next->prev == this);
00088         next->prev = 0;
00089       }
00090   }
00091 };
00092 #endif
00093 
00094 ReadersWritersLock::~ReadersWritersLock() throw()
00095 {
00096 #ifdef USE_PTHREAD_RWLOCK
00097   // Loop needed in case another thread holds the lock.
00098   while(1)
00099     {
00100       // Try to destroy the lock.
00101       int result = pthread_rwlock_destroy(&rwlock);
00102       // If we succeeded, we're done.
00103       if(result == 0) break;
00104       // The only error we expect is EBUSY (meaning that another
00105       // thread has locked it since we released it above).
00106       assert(result == EBUSY);
00107       // Acquire a write lock, possibly blocking.  (This should wait
00108       // until no other thread is using it.)
00109       result = pthread_rwlock_wrlock(&rwlock);
00110       assert(result == 0);
00111       // Release the lock we just acquired, as we're about to try to
00112       // destroy it again.
00113       result = pthread_rwlock_unlock(&rwlock);
00114       assert(result == 0);
00115     }
00116 #else
00117     // Ensure that no other thread has it locked or is waiting to lock
00118     // it.
00119   while(1)
00120     {
00121       // Get a write lock.
00122       acquireWrite();
00123       // Check to see if any threads are waiting for the lock.
00124       mu.lock();
00125       bool waiters = (q_head != 0);
00126       mu.unlock();
00127       if(waiters)
00128         {
00129           // Some other thread waiting.  Release the lock to allow
00130           // them to get it and try again.
00131           releaseWrite();
00132         }
00133       else
00134         {
00135           // We have it write locked and no other thread holds the
00136           // lock: safe to delete it.
00137           break;
00138         }
00139     }
00140 #endif
00141 }
00142 
00143 ReadersWritersLock::ReadersWritersLock(bool favorWriters) throw()
00144 {
00145 #ifdef USE_PTHREAD_RWLOCK
00146   write_locked = false;
00147   while(1)
00148     {
00149       int result = pthread_rwlock_init(&rwlock, NULL);
00150       // If we succeeded, we're done.
00151       if(result == 0) break;
00152       // The only error we expect is EAGAIN.
00153       assert(result == EAGAIN);
00154     }
00155 #else
00156     readers = 0;
00157     writers = 0;
00158 
00159     q_head = 0;
00160     q_tail = 0;
00161 
00162 #if !defined(NO_ACQUIRE_DEBUG_DATA)
00163     acquire_time = 0;
00164     acquire_thread = 0;
00165 #endif
00166 
00167 #endif
00168 }
00169 
00170 void ReadersWritersLock::acquireRead() throw()
00171 {
00172   RWLOCK_ACQUIRE_READ_START(this);
00173 #ifdef USE_PTHREAD_RWLOCK
00174   while(1)
00175     {
00176       int result = pthread_rwlock_rdlock(&rwlock);
00177       // If we succeeded, we're done.
00178       if(result == 0) break;
00179       // The only error we expect is EAGAIN.
00180       assert(result == EAGAIN);
00181     }
00182 #else
00183     mu.lock();
00184     // If there's a writer ...
00185     if((writers > 0) ||
00186        // .. or another thread is waiting ...
00187        ((q_head != 0) &&
00188         // ... and it's not the special case of a single group of
00189         // readers some of which haven't acquired the lock yet (in
00190         // which case it's safe to acquire a read lock immediately)
00191         // ...
00192         !((q_head == q_tail) && !q_head->writer)))
00193       // ... wait our turn.
00194       {
00195         RWLock_Queue_Item *q_entry = 0;
00196 
00197         // If the tail is a reader, wait along with it (as the
00198         // condition variable will be broadcast).
00199         if((q_tail != 0) && (!q_tail->writer))
00200           {
00201             assert(q_head != 0);
00202             q_entry = q_tail;
00203             
00204           }
00205         // Otherwise create a new entry and put it at the end of the
00206         // queue.
00207         else
00208           {
00209             q_entry = NEW_CONSTR(RWLock_Queue_Item, (false, this->q_tail));
00210 
00211             // If this is the first entry, it's the head also.
00212             if(q_head == 0)
00213               {
00214                 assert(q_tail->prev == 0);
00215                 q_head = q_entry;
00216               }
00217 
00218             // q_head and q_tail must either both be non-zero or both be
00219             // zero.
00220             assert(((q_head != 0) && (q_tail != 0)) ||
00221                    ((q_head == 0) && (q_tail == 0)));
00222           }
00223 
00224         assert(q_entry != 0);
00225 
00226         q_entry->waiting_count++;
00227         q_entry->my_turn.wait(this->mu);
00228         q_entry->waiting_count--;
00229 
00230         // We should only ever be woken up when this entry is at the
00231         // head and there are no writers.
00232         assert(q_entry == q_head);
00233         assert(writers == 0);
00234 
00235         // If nobody's waiting here any more, remove this entry.
00236         if(q_entry->waiting_count == 0)
00237           {
00238             q_head = q_head->next;
00239 
00240             // If case this was the last entry in the queue, fix
00241             // q_tail as well.
00242             if(q_tail == q_entry)
00243               {
00244                 assert(q_head == 0);
00245                 q_tail = 0;
00246               }
00247 
00248             // q_head and q_tail must either both be non-zero or both be
00249             // zero.
00250             assert(((q_head != 0) && (q_tail != 0)) ||
00251                    ((q_head == 0) && (q_tail == 0)));
00252 
00253             // Delete the entry
00254             delete q_entry;
00255           }
00256       }
00257 
00258     // There's now one more reader.
00259     readers++;
00260 
00261 #if !defined(NO_ACQUIRE_DEBUG_DATA)
00262     if(readers == 1)
00263       {
00264         // If we're the first reader, save the debug info
00265         acquire_time = time(NULL);
00266         acquire_thread = pthread_self();
00267       }
00268 #endif
00269 
00270     mu.unlock();
00271 #endif
00272     RWLOCK_ACQUIRE_READ_DONE(this, false);
00273 }
00274 
00275 void ReadersWritersLock::releaseRead() throw()
00276 {
00277   RWLOCK_RELEASE_START(this);
00278 #ifdef USE_PTHREAD_RWLOCK
00279   int result = pthread_rwlock_unlock(&rwlock);
00280   // We expect no errors.
00281   assert(result == 0);
00282 #else
00283     mu.lock();
00284 
00285     // There's one less reader now
00286     assert(readers > 0);
00287     readers--;
00288 
00289 #if !defined(NO_ACQUIRE_DEBUG_DATA)
00290     if(readers == 0)
00291       {
00292         // If we were the last reader, erase the debug info
00293         acquire_time = 0;
00294         acquire_thread = 0;
00295       }
00296 #endif
00297 
00298     // If there are no more readers and there's a thread waiting its
00299     // turn, signal it to wake up.
00300     if((readers == 0) && (q_head != 0))
00301       {
00302         // There are only two possible cases:
00303 
00304         // 1. This is a thread waiting for a write lock, and thus only
00305         // one thread will be waiting.
00306 
00307         // 2. This is a bundle of readers that this thread was a part
00308         // of, but not all threads in it have gotten their turn yet.
00309 
00310         // We could probably use signal rather than broadcast, but it
00311         // shouldn't hurt to broadcast.
00312         q_head->my_turn.broadcast();
00313       }
00314     mu.unlock();
00315 #endif
00316     RWLOCK_RELEASE_DONE(this);
00317 }
00318 
00319 void ReadersWritersLock::acquireWrite() throw()
00320 {
00321   RWLOCK_ACQUIRE_WRITE_START(this);
00322 #ifdef USE_PTHREAD_RWLOCK
00323   while(1)
00324     {
00325       int result = pthread_rwlock_wrlock(&rwlock);
00326       // If we succeeded, we're done.
00327       if(result == 0) break;
00328       // The only error we expect is EAGAIN.
00329       assert(result == EAGAIN);
00330     }
00331   write_locked = true;
00332 #else
00333     mu.lock();
00334 
00335     // If any threads hold the lock or writer another thread is
00336     // waiting, wait our turn.
00337     if((readers > 0) || (writers > 0) || (q_head != 0))
00338       {
00339         // Allocate a queue entry on the stack.  We can do this for
00340         // writers, as only one thread will use this queue entry.
00341         RWLock_Queue_Item q_entry(true, this->q_tail);
00342 
00343         // If this is the first entry, it's the head also.
00344         if(q_head == 0)
00345           {
00346             assert(q_tail->prev == 0);
00347             q_head = &q_entry;
00348           }
00349 
00350         // q_head and q_tail must either both be non-zero or both be
00351         // zero.
00352         assert(((q_head != 0) && (q_tail != 0)) ||
00353                ((q_head == 0) && (q_tail == 0)));
00354 
00355         // Wait our turn.
00356         q_entry.my_turn.wait(this->mu);
00357 
00358         // We should only ever be woken up when this entry is at the
00359         // head and no thread holds the lock.
00360         assert(&q_entry == q_head);
00361         assert((writers == 0) && (readers == 0));
00362 
00363         // Remove this entry from the head of the queue.
00364         q_head = q_head->next;
00365 
00366         // If case this was the last entry in the queue, fix
00367         // q_tail as well.
00368         if(q_tail == &q_entry)
00369           {
00370             assert(q_head == 0);
00371             q_tail = 0;
00372           }
00373 
00374         // q_head and q_tail must either both be non-zero or both be
00375         // zero.
00376         assert(((q_head != 0) && (q_tail != 0)) ||
00377                ((q_head == 0) && (q_tail == 0)));
00378       }
00379     writers++;
00380 
00381 #if !defined(NO_ACQUIRE_DEBUG_DATA)
00382     // We're the sole writer, so save the debug info
00383     acquire_time = time(NULL);
00384     acquire_thread = pthread_self();
00385 #endif
00386 
00387     mu.unlock();
00388 #endif
00389     RWLOCK_ACQUIRE_WRITE_DONE(this, false);
00390 }
00391 
00392 void ReadersWritersLock::releaseWrite() throw()
00393 {
00394   RWLOCK_RELEASE_START(this);
00395 #ifdef USE_PTHREAD_RWLOCK
00396   assert(write_locked);
00397   write_locked = false;
00398   int result = pthread_rwlock_unlock(&rwlock);
00399   // We expect no errors.
00400   assert(result == 0);
00401 #else
00402     mu.lock();
00403     assert(writers == 1);
00404     writers = 0;
00405 
00406 #if !defined(NO_ACQUIRE_DEBUG_DATA)
00407     // We were the sole writer, so erase the debug info
00408     acquire_time = 0;
00409     acquire_thread = 0;
00410 #endif
00411 
00412     // If there are threads waiting, signal at least one to wake up.
00413     if(q_head != 0)
00414       {
00415         // If it's a writer, there will be only one thread waiting on
00416         // this condition variable.  If it's a reader, there may be
00417         // multiple threads waiting.  Therefore we always broadcast.
00418         q_head->my_turn.broadcast();
00419       }
00420     mu.unlock();
00421 #endif
00422     RWLOCK_RELEASE_DONE(this);
00423 }
00424 
00425 bool ReadersWritersLock::tryRead() throw()
00426 {
00427   RWLOCK_ACQUIRE_READ_START(this);
00428   bool gotit;
00429 #ifdef USE_PTHREAD_RWLOCK
00430   int result = pthread_rwlock_tryrdlock(&rwlock);
00431   gotit = (result == 0);
00432 #else
00433     mu.lock();
00434     // If any thread holds a write lock, or any thread is queued
00435     // waiting for the lock, don't take it.  Otherwise, there can only
00436     // be readers at the moment so acquire the lock.
00437     if((writers > 0) || (q_head != 0)) {
00438         gotit = false;
00439     } else {
00440         gotit = true;
00441         readers++;
00442 
00443 #if !defined(NO_ACQUIRE_DEBUG_DATA)
00444         if(readers == 1)
00445           {
00446             // If we're the first reader, save the debug info
00447             acquire_time = time(NULL);
00448             acquire_thread = pthread_self();
00449           }
00450 #endif
00451     }
00452     mu.unlock();
00453 #endif
00454     RWLOCK_ACQUIRE_READ_DONE(this, !gotit);
00455     return gotit;
00456 }
00457 
00458 bool ReadersWritersLock::tryWrite() throw()
00459 {
00460   bool gotit;
00461   RWLOCK_ACQUIRE_WRITE_START(this);
00462 #ifdef USE_PTHREAD_RWLOCK
00463   int result = pthread_rwlock_trywrlock(&rwlock);
00464   if(result == 0) write_locked = true;
00465   gotit = (result == 0);
00466 #else
00467     mu.lock();
00468     // If any thread holds the lock, or any thread is queued waiting
00469     // for the lock, don't take it.  Otherwise, acquire the lock.
00470     if ((readers > 0) || (writers > 0) || (q_head != 0)) {
00471         gotit = false;
00472     } else {
00473         gotit = true;
00474         writers++;
00475 #if !defined(NO_ACQUIRE_DEBUG_DATA)
00476         // We're the sole writer, so save the debug info
00477         acquire_time = time(NULL);
00478         acquire_thread = pthread_self();
00479 #endif
00480     }
00481     mu.unlock();
00482 #endif
00483     RWLOCK_ACQUIRE_WRITE_DONE(this, !gotit);
00484     return gotit;
00485 }
00486 
00487 bool ReadersWritersLock::release() throw()
00488 {
00489   RWLOCK_RELEASE_START(this);
00490   bool hadwrite;
00491 #ifdef USE_PTHREAD_RWLOCK
00492   hadwrite = write_locked;
00493   if(write_locked) write_locked = false;
00494   int result = pthread_rwlock_unlock(&rwlock);
00495   // We expect no errors.
00496   assert(result == 0);
00497 #else
00498     mu.lock();
00499     hadwrite = writers > 0;
00500     if (hadwrite) {
00501         assert(writers == 1);
00502         writers = 0;
00503     } else {
00504         assert(readers > 0);
00505         readers--;
00506     }
00507 
00508 #if !defined(NO_ACQUIRE_DEBUG_DATA)
00509     if(hadwrite || (readers == 0))
00510       {
00511         // If we were the last one holding the lock, erase the debug
00512         // info
00513         acquire_time = 0;
00514         acquire_thread = 0;
00515       }
00516 #endif
00517 
00518     // If there are threads waiting, and either the lock is no longer
00519     // held or readers are waiting while other threads hold a read
00520     // lock, signal at least one to wake up.
00521     if((q_head != 0) &&
00522        (hadwrite || (readers == 0) || !q_head->writer))
00523       {
00524         // If it's a writer, there will be only one thread waiting on
00525         // this condition variable.  If it's a reader, there may be
00526         // multiple threads waiting.  Therefore we always broadcast.
00527         q_head->my_turn.broadcast();
00528       }
00529     
00530     mu.unlock();
00531 #endif
00532     RWLOCK_RELEASE_DONE(this);
00533     return hadwrite;
00534 }
00535 
00536 /*
00537 void ReadersWritersLock::downgradeWriteToRead() throw()
00538 {
00539 #ifdef USE_PTHREAD_RWLOCK
00540   // We must have the write lock
00541   assert(write_locked);
00542   write_locked = false;
00543   // Release the lock.
00544   int result = pthread_rwlock_unlock(&rwlock);
00545   // We expect no errors.
00546   assert(result == 0);
00547   // Loop to acquire read lock.
00548   while(1)
00549     {
00550       int result = pthread_rwlock_rdlock(&rwlock);
00551       // If we succeeded, we're done.
00552       if(result == 0) break;
00553       // The only error we expect is EAGAIN.
00554       assert(result == EAGAIN);
00555     }
00556 #else
00557     mu.lock();
00558     assert(writers == 1);
00559     writers = 0;
00560     assert(readers == 0);
00561     readers = 1;
00562     mu.unlock();
00563     waitingToRead.broadcast();
00564 #endif
00565 }
00566 
00567 bool ReadersWritersLock::downgrade() throw()
00568 {
00569 #ifdef USE_PTHREAD_RWLOCK
00570   bool hadwrite = write_locked;
00571   if(write_locked) write_locked = false;
00572   int result = pthread_rwlock_unlock(&rwlock);
00573   // We expect no errors.
00574   assert(result == 0);
00575   // Loop to acquire read lock.
00576   while(1)
00577     {
00578       int result = pthread_rwlock_rdlock(&rwlock);
00579       // If we succeeded, we're done.
00580       if(result == 0) break;
00581       // The only error we expect is EAGAIN.
00582       assert(result == EAGAIN);
00583     }
00584   return hadwrite;
00585 #else
00586     bool hadwrite;
00587     mu.lock();
00588     hadwrite = writers > 0;
00589     if (hadwrite) {
00590         assert(writers == 1);
00591         assert(readers == 0);
00592         writers = 0;
00593         readers = 1;
00594     } else {
00595         assert(readers > 0);
00596     }
00597     mu.unlock();
00598     if (hadwrite) {
00599         waitingToRead.broadcast();
00600     }
00601     return hadwrite;
00602 #endif
00603 }
00604 
00605 */

Generated on Mon May 8 00:48:46 2006 for Vesta by  doxygen 1.4.2