interruptible_barrier.cpp

00001 
00002 /***************************************************************************
00003  *  interruptible_barrier.cpp - Interruptible Barrier
00004  *
00005  *  Created: Sat Jan 31 12:30:32 2009
00006  *  Copyright  2006-2009  Tim Niemueller [www.niemueller.de]
00007  *
00008  ****************************************************************************/
00009 
00010 /*  This program is free software; you can redistribute it and/or modify
00011  *  it under the terms of the GNU General Public License as published by
00012  *  the Free Software Foundation; either version 2 of the License, or
00013  *  (at your option) any later version. A runtime exception applies to
00014  *  this software (see LICENSE.GPL_WRE file mentioned below for details).
00015  *
00016  *  This program is distributed in the hope that it will be useful,
00017  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00018  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00019  *  GNU Library General Public License for more details.
00020  *
00021  *  Read the full text in the LICENSE.GPL_WRE file in the doc directory.
00022  */
00023 
00024 #include <core/threading/interruptible_barrier.h>
00025 #include <core/threading/thread_list.h>
00026 #include <core/exceptions/system.h>
00027 #include <core/macros.h>
00028 
00029 #include <core/threading/mutex.h>
00030 #include <core/threading/wait_condition.h>
00031 
00032 namespace fawkes {
00033 #if 0 /* just to make Emacs auto-indent happy */
00034 }
00035 #endif
00036 
00037 
00038 /// @cond INTERNALS
00039 class InterruptibleBarrierData
00040 {
00041  public:
00042   unsigned int   threads_left;
00043   Mutex         *mutex;
00044   WaitCondition *waitcond;
00045   bool           own_mutex;
00046 
00047   InterruptibleBarrierData(Mutex *mutex)
00048   {
00049     if (mutex) {
00050       this->mutex = mutex;
00051       own_mutex   = false;
00052     } else {
00053       this->mutex = new Mutex();
00054       own_mutex   = true;
00055     }
00056     waitcond = new WaitCondition(this->mutex);
00057   }
00058 
00059   ~InterruptibleBarrierData()
00060   {
00061     if (own_mutex)  delete mutex;
00062     delete waitcond;
00063   }
00064 };
00065 /// @endcond
00066 
00067 
00068 /** @class InterruptibleBarrier <core/threading/barrier.h>
00069  * A barrier is a synchronization tool which blocks until a given number of
00070  * threads have reached the barrier. This particular implementations allows for
00071  * giving a timeout after which the waiting is aborted.
00072  *
00073  * For general information when a barrier is useful see the Barrier class.
00074  *
00075  * Additionally to the general barrier features the InterruptibleBarrier::wait()
00076  * can be given a timeout after which the waiting is aborted.
00077  * Since the POSIX standard does not provide a timed wait for barriers this
00078  * implementation uses a Mutex and WaitCondition internally to achieve the
00079  * desired result.
00080  *
00081  * @see Barrier
00082  * @ingroup Threading
00083  * @author Tim Niemueller
00084  */
00085 
00086 
00087 /** Constructor.
00088  * @param count the number of threads to wait for
00089  */
00090 InterruptibleBarrier::InterruptibleBarrier(unsigned int count)
00091   : Barrier()
00092 {
00093   _count = count;
00094   if ( _count == 0 ) {
00095     throw Exception("Barrier count must be at least 1");
00096   }
00097   __data = new InterruptibleBarrierData(NULL);
00098   __data->threads_left = 0;
00099   __passed_threads = RefPtr<ThreadList>(new ThreadList());
00100 
00101   __interrupted = false;
00102   __timeout     = false;
00103 }
00104 
00105 
00106 /** Constructor with custom mutex.
00107  * Use this constructor only if you really know what you are doing. This constructor
00108  * allows to pass a mutex that is used internally for the barrier. Note that in
00109  * this case it is your duty to lock the mutex before the wait() and unlock
00110  * afterwards! It combines features of a barrier and a wait condition.
00111  * @param mutex Mutex to use
00112  * @param count the number of threads to wait for
00113  */
00114 InterruptibleBarrier::InterruptibleBarrier(Mutex *mutex, unsigned int count)
00115   : Barrier()
00116 {
00117   _count = count;
00118   if ( _count == 0 ) {
00119     throw Exception("Barrier count must be at least 1");
00120   }
00121   __data = new InterruptibleBarrierData(mutex);
00122   __data->threads_left = 0;
00123   __passed_threads = RefPtr<ThreadList>(new ThreadList());
00124 
00125   __interrupted = false;
00126   __timeout     = false;
00127 }
00128 
00129 /** Destructor */
00130 InterruptibleBarrier::~InterruptibleBarrier()
00131 {
00132   delete __data;
00133 }
00134 
00135 
00136 /** Get a list of threads that passed the barrier.
00137  * The list contains the threads that passed the barrier. With some book keeping
00138  * outside of the barrier you can determine which threads you expected at the
00139  * barrier but did not pass it.
00140  * @return refptr to list of threads that passed the barrier.
00141  */
00142 RefPtr<ThreadList>
00143 InterruptibleBarrier::passed_threads()
00144 {
00145   return __passed_threads;
00146 }
00147 
00148 
00149 /** Interrupt the barrier.
00150  * This will cause all threads currently waiting on the barrier to
00151  * throw an exception and no further thread will wait.
00152  * You have to call reset() the before you use this barrier
00153  * the next time.
00154  */
00155 void
00156 InterruptibleBarrier::interrupt() throw()
00157 {
00158   if (likely(__data->own_mutex))  __data->mutex->lock();
00159   __interrupted = true;
00160   __data->waitcond->wake_all();
00161   if (likely(__data->own_mutex))  __data->mutex->unlock();
00162 }
00163 
00164 
00165 /** Clears the barrier.
00166  * Call this method when you want to use the barrier the next time after
00167  * an interrupt or timeout occured. Make sure all threads that should have
00168  * passed the barrier the last time did pass it.
00169  */
00170 void
00171 InterruptibleBarrier::reset() throw()
00172 {
00173   if (likely(__data->own_mutex))  __data->mutex->lock();
00174   __interrupted        = false;
00175   __timeout            = false;
00176   __data->threads_left = _count;
00177   __passed_threads.clear();
00178   if (likely(__data->own_mutex))  __data->mutex->unlock();  
00179 }
00180 
00181 
00182 /** Wait for other threads.
00183  * This method will block until as many threads have called wait as you have
00184  * given count to the constructor. Note that if the barrier is interrupted or
00185  * times out you need to call reset() to get the barrier into a re-usable state.
00186  * It is your duty to make sure that all threads using the barrier are in a
00187  * cohesive state.
00188  * @param timeout_sec relative timeout in seconds, added to timeout_nanosec
00189  * @param timeout_nanosec timeout in nanoseconds
00190  * @return true, if the barrier was properly reached, false if the barrier timeout
00191  * was reached and the wait did not finish properly.
00192  * @exception InterruptedException thrown if the barrier was forcefully interrupted
00193  * by calling interrupt().
00194  */
00195 bool
00196 InterruptibleBarrier::wait(unsigned int timeout_sec, unsigned int timeout_nanosec)
00197 {
00198   if (likely(__data->own_mutex))  __data->mutex->lock();
00199 
00200   if ( __data->threads_left == 0 ) {
00201     // first to come
00202     __timeout = __interrupted = false;
00203     __data->threads_left = _count;
00204     __passed_threads->clear();
00205   } else {
00206     if ( __interrupted || __timeout ) {
00207       // interrupted or timed out threads need to be reset if they should be reused
00208       if (likely(__data->own_mutex))  __data->mutex->unlock();
00209       return true;
00210     }
00211   }
00212   --__data->threads_left;
00213   try {
00214     __passed_threads->push_back_locked(Thread::current_thread());
00215   } catch (Exception &e) {
00216     // Cannot do anything more useful :-/
00217     // to stay fully compatible with Barrier we do *not* re-throw
00218     e.print_trace();
00219   }
00220 
00221   bool local_timeout = false;
00222   while ( __data->threads_left && !__interrupted && !__timeout && ! local_timeout) {
00223     local_timeout = ! __data->waitcond->reltimed_wait(timeout_sec, timeout_nanosec);
00224   }
00225   if (local_timeout) __timeout = true;
00226   if ( __interrupted ) {
00227     if (likely(__data->own_mutex))  __data->mutex->unlock();
00228     throw InterruptedException("InterruptibleBarrier forcefully interrupted, only "
00229                                "%u of %u threads reached the barrier",
00230                                _count - __data->threads_left, _count);
00231   }
00232 
00233   __data->waitcond->wake_all();
00234   if (likely(__data->own_mutex))  __data->mutex->unlock();
00235 
00236   return ! __timeout;
00237 }
00238 
00239 } // end namespace fawkes

Generated on 1 Mar 2011 for Fawkes API by  doxygen 1.6.1