base_thread.cpp

00001 
00002 /***************************************************************************
00003  *  base_thread.cpp - FireVision Base Thread
00004  *
00005  *  Created: Tue May 29 16:41:50 2007
00006  *  Copyright  2006-2009  Tim Niemueller [www.niemueller.de]
00007  *
00008  ****************************************************************************/
00009 
00010 /*  This program is free software; you can redistribute it and/or modify
00011  *  it under the terms of the GNU General Public License as published by
00012  *  the Free Software Foundation; either version 2 of the License, or
00013  *  (at your option) any later version.
00014  *
00015  *  This program is distributed in the hope that it will be useful,
00016  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00017  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00018  *  GNU Library General Public License for more details.
00019  *
00020  *  Read the full text in the LICENSE.GPL file in the doc directory.
00021  */
00022 
00023 #include "base_thread.h"
00024 #include "acquisition_thread.h"
00025 #include "aqt_vision_threads.h"
00026 
00027 #include <core/threading/thread.h>
00028 #include <core/threading/mutex.h>
00029 #include <core/threading/mutex_locker.h>
00030 #include <core/threading/barrier.h>
00031 #include <utils/logging/logger.h>
00032 
00033 #include <fvutils/system/camargp.h>
00034 #include <fvutils/ipc/shm_image.h>
00035 #include <fvutils/ipc/shm_lut.h>
00036 #include <cams/factory.h>
00037 #include <cams/cam_exceptions.h>
00038 #include <cams/control/factory.h>
00039 #include <core/exceptions/software.h>
00040 
00041 #include <aspect/vision.h>
00042 
00043 #include <algorithm>
00044 #include <unistd.h>
00045 
00046 using namespace fawkes;
00047 using namespace firevision;
00048 
00049 /** @class FvBaseThread "base_thread.h"
00050  * FireVision base thread.
00051  * This implements the functionality of the FvBasePlugin.
00052  * @author Tim Niemueller
00053  */
00054 
00055 /** Constructor. */
00056 FvBaseThread::FvBaseThread()
00057   : Thread("FvBaseThread", Thread::OPMODE_WAITFORWAKEUP),
00058     BlockedTimingAspect(BlockedTimingAspect::WAKEUP_HOOK_SENSOR),
00059     VisionMasterAspect(this)
00060 {
00061   // default to 30 seconds
00062   __aqt_timeout = 30;
00063   __aqt_barrier = new Barrier(1);
00064 }
00065 
00066 
00067 /** Destructor. */
00068 FvBaseThread::~FvBaseThread()
00069 {
00070   delete __aqt_barrier;
00071 }
00072 
00073 
00074 void
00075 FvBaseThread::init()
00076 {
00077   // wipe all previously existing FireVision shared memory segments
00078   // that are orphaned
00079   SharedMemoryImageBuffer::cleanup(/* use lister */ false);
00080   SharedMemoryLookupTable::cleanup(/* use lister */ false);
00081 }
00082 
00083 
00084 void
00085 FvBaseThread::finalize()
00086 {
00087   __aqts.lock();
00088   for (__ait = __aqts.begin(); __ait != __aqts.end(); ++__ait) {
00089     thread_collector->remove(__ait->second);
00090     delete __ait->second;
00091   }
00092   __aqts.clear();
00093   __aqts.unlock();
00094   __owned_controls.lock();
00095   LockList<CameraControl *>::iterator i;
00096   for (i = __owned_controls.begin(); i != __owned_controls.end(); ++i) {
00097     delete *i;
00098   }
00099   __owned_controls.clear();
00100   __owned_controls.unlock();
00101 }
00102 
00103 
00104 /** Thread loop. */
00105 void
00106 FvBaseThread::loop()
00107 {
00108   __aqts.lock();
00109 
00110   try {
00111     for (__ait = __aqts.begin(); __ait != __aqts.end(); ++__ait) {
00112       __ait->second->set_vt_prepfin_hold(true);
00113     }
00114   } catch (Exception &e) {
00115     logger->log_warn(name(), "Cannot get prepfin hold status, skipping this loop");
00116     for (__ait = __aqts.begin(); __ait != __aqts.end(); ++__ait) {
00117       __ait->second->set_vt_prepfin_hold(false);
00118     }
00119     __aqts.unlock();
00120     return;
00121   }
00122 
00123   // Wakeup all cyclic acquisition threads and wait for them
00124   for (__ait = __aqts.begin(); __ait != __aqts.end(); ++__ait) {
00125     if ( __ait->second->aqtmode() == FvAcquisitionThread::AqtCyclic ) {
00126       //logger->log_debug(name(), "Waking Thread %s", __ait->second->name());
00127       __ait->second->wakeup(__aqt_barrier);
00128     }
00129   }
00130 
00131   __aqt_barrier->wait();
00132 
00133   // Check for aqt timeouts
00134   for (__ait = __aqts.begin(); __ait != __aqts.end();) {
00135     if ( __ait->second->vision_threads->empty() &&
00136          (__ait->second->vision_threads->empty_time() > __aqt_timeout) ) {
00137 
00138       logger->log_info(name(), "Acquisition thread %s timed out, destroying",
00139                        __ait->second->name());
00140 
00141 
00142       thread_collector->remove(__ait->second);
00143       delete __ait->second;
00144       __aqts.erase(__ait++);
00145     } else {
00146       ++__ait;
00147     }
00148   }
00149 
00150   __started_threads.lock();
00151   fawkes::LockMap<Thread *, FvAcquisitionThread *>::iterator stit = __started_threads.begin();
00152   while (stit != __started_threads.end()) {
00153 
00154     logger->log_info(name(), "Thread %s has been started, %zu",
00155                      stit->second->name(), __started_threads.size());
00156 
00157     // if the thread is registered in that aqt mark it running
00158     stit->second->vision_threads->set_thread_running(stit->first);
00159 
00160     if ( stit->second->vision_threads->has_cyclic_thread() ) {
00161       if (stit->second->aqtmode() != FvAcquisitionThread::AqtCyclic ) {
00162         logger->log_info(name(), "Switching acquisition thread %s to cyclic mode",
00163                          stit->second->name());
00164 
00165         stit->second->prepare_finalize();
00166         stit->second->cancel();
00167         stit->second->join();
00168         stit->second->set_aqtmode(FvAcquisitionThread::AqtCyclic);
00169         stit->second->start();
00170         stit->second->cancel_finalize();
00171       }
00172     } else if (stit->second->aqtmode() != FvAcquisitionThread::AqtContinuous ) {
00173       logger->log_info(name(), "Switching acquisition thread %s to continuous mode",
00174                        stit->second->name());
00175       stit->second->prepare_finalize();
00176       stit->second->cancel();
00177       stit->second->join();
00178       stit->second->set_aqtmode(FvAcquisitionThread::AqtContinuous);
00179       stit->second->start();
00180       stit->second->cancel_finalize();
00181     }
00182 
00183     // Make thread actually capture data
00184     stit->second->set_enabled(true);
00185 
00186     fawkes::LockMap<Thread *, FvAcquisitionThread *>::iterator stittmp = stit;
00187     ++stit;
00188     __started_threads.erase( stittmp );
00189   }
00190   __started_threads.unlock();
00191 
00192   // Re-create barrier as necessary after _adding_ threads
00193   unsigned int num_cyclic_threads = 0;
00194   for (__ait = __aqts.begin(); __ait != __aqts.end(); ++__ait) {
00195     if ( __ait->second->vision_threads->has_cyclic_thread() ) {
00196       ++num_cyclic_threads;
00197     }
00198   }
00199   cond_recreate_barrier(num_cyclic_threads);
00200 
00201   for (__ait = __aqts.begin(); __ait != __aqts.end(); ++__ait) {
00202     __ait->second->set_vt_prepfin_hold(false);
00203   }
00204 
00205   __aqts.unlock();
00206 }
00207 
00208 
00209 /** Get vision master.
00210  * @return vision master
00211  */
00212 VisionMaster *
00213 FvBaseThread::vision_master()
00214 {
00215   return this;
00216 }
00217 
00218 
00219 Camera *
00220 FvBaseThread::register_for_camera(const char *camera_string, Thread *thread,
00221                                   colorspace_t cspace)
00222 {
00223   Camera *c = NULL;
00224   __aqts.lock();
00225 
00226   logger->log_info(name(), "Thread '%s' registers for camera '%s'", thread->name(), camera_string);
00227 
00228   VisionAspect *vision_thread = dynamic_cast<VisionAspect *>(thread);
00229   if ( vision_thread == NULL ) {
00230     throw TypeMismatchException("Thread is not a vision thread");
00231   }
00232 
00233   CameraArgumentParser *cap = new CameraArgumentParser(camera_string);
00234   try {
00235     std::string id = cap->cam_type() + "." + cap->cam_id();
00236     if ( __aqts.find(id) != __aqts.end() ) {
00237       // this camera has already been loaded
00238       c = __aqts[id]->camera_instance(cspace,
00239                                     (vision_thread->vision_thread_mode() ==
00240                                      VisionAspect::CONTINUOUS));
00241 
00242       __aqts[id]->vision_threads->add_waiting_thread(thread);
00243 
00244     } else {
00245       Camera *cam = NULL;
00246       try {
00247         cam = CameraFactory::instance(cap);
00248         cam->open();
00249         cam->start();
00250       } catch (Exception &e) {
00251         delete cam;
00252         e.append("Could not open or start camera");
00253         throw;
00254       }
00255 
00256       FvAcquisitionThread *aqt = new FvAcquisitionThread(id.c_str(), cam, logger, clock);
00257 
00258       c = aqt->camera_instance(cspace, (vision_thread->vision_thread_mode() ==
00259                                         VisionAspect::CONTINUOUS));
00260 
00261       aqt->vision_threads->add_waiting_thread(thread);
00262 
00263       __aqts[id] = aqt;
00264       thread_collector->add(aqt);
00265 
00266       // no need to recreate barrier, by default aqts operate in continuous mode
00267 
00268       logger->log_info(name(), "Acquisition thread '%s' started for thread '%s' and camera '%s'",
00269                        aqt->name(), thread->name(), id.c_str());
00270 
00271     }
00272 
00273     thread->add_notification_listener(this);
00274 
00275   } catch (UnknownCameraTypeException &e) {
00276     delete cap;
00277     e.append("FvBaseVisionMaster: could not instantiate camera");
00278     __aqts.unlock();
00279     throw;
00280   } catch (Exception &e) {
00281     delete cap;
00282     e.append("FvBaseVisionMaster: could not open or start camera");
00283     __aqts.unlock();
00284     throw;
00285   }
00286 
00287   delete cap;
00288 
00289   __aqts.unlock();
00290   return c;
00291 }
00292 
00293 
00294 Camera *
00295 FvBaseThread::register_for_raw_camera(const char *camera_string, Thread *thread)
00296 {
00297   Camera *camera = register_for_camera(camera_string, thread, CS_UNKNOWN);
00298   CameraArgumentParser cap(camera_string);
00299   try {
00300     std::string id = cap.cam_type() + "." + cap.cam_id();
00301     __aqts.lock();
00302     if ( __aqts.find(id) != __aqts.end() ) {
00303       __aqts[id]->raw_subscriber_thread = thread;
00304     }
00305     __aqts.unlock();
00306   } catch (Exception &e) {
00307     __aqts.unlock();
00308     throw;
00309   }
00310   return camera;
00311 }
00312 
00313 CameraControl *
00314 FvBaseThread::create_camctrl(const char *camera_string)
00315 {
00316   CameraControl *cc = CameraControlFactory::instance(camera_string);
00317   if (cc) {
00318     __owned_controls.lock();
00319     __owned_controls.push_back(cc);
00320     __owned_controls.sort();
00321     __owned_controls.unique();
00322     __owned_controls.unlock();
00323     return cc;
00324   } else {
00325     throw Exception("Cannot create camera control of desired type");
00326   }
00327 }
00328 
00329 CameraControl *
00330 FvBaseThread::acquire_camctrl(const char *cam_string)
00331 {
00332   CameraArgumentParser cap(cam_string);
00333   std::string id = cap.cam_type() + "." + cap.cam_id();
00334 
00335   // Has this camera been loaded?
00336   MutexLocker lock(__aqts.mutex());
00337   if (__aqts.find(id) != __aqts.end()) {
00338     return CameraControlFactory::instance(__aqts[id]->get_camera());
00339   } else {
00340     return create_camctrl(cam_string);
00341   }
00342 }
00343 
00344 
00345 CameraControl *
00346 FvBaseThread::acquire_camctrl(const char *cam_string,
00347                               const std::type_info &typeinf)
00348 {
00349   CameraArgumentParser cap(cam_string);
00350   std::string id = cap.cam_type() + "." + cap.cam_id();
00351 
00352   // Has this camera been loaded?
00353   MutexLocker lock(__aqts.mutex());
00354   if (__aqts.find(id) != __aqts.end()) {
00355     return CameraControlFactory::instance(typeinf, __aqts[id]->get_camera());
00356   } else {
00357     return create_camctrl(cam_string);
00358   }
00359 }
00360 
00361 
00362 void
00363 FvBaseThread::release_camctrl(CameraControl *cc)
00364 {
00365   __owned_controls.lock();
00366   LockList<CameraControl *>::iterator f;
00367   if ((f = std::find(__owned_controls.begin(), __owned_controls.end(), cc)) != __owned_controls.end()) {
00368     delete *f;
00369     __owned_controls.erase(f);
00370   }
00371   __owned_controls.unlock();
00372 }
00373 
00374 
00375 /** Conditionally re-create barriers.
00376  * Re-create barriers if the number of cyclic threads has changed.
00377  * @param num_cyclic_threads new number of cyclic threads
00378  */
00379 void
00380 FvBaseThread::cond_recreate_barrier(unsigned int num_cyclic_threads)
00381 {
00382   if ( (num_cyclic_threads + 1) != __aqt_barrier->count() ) {
00383     delete __aqt_barrier;
00384     __aqt_barrier = new Barrier( num_cyclic_threads + 1 ); // +1 for base thread
00385   }
00386 }
00387 
00388 
00389 void
00390 FvBaseThread::unregister_thread(Thread *thread)
00391 {
00392   __aqts.lock();
00393   unsigned int num_cyclic_threads = 0;
00394 
00395   for (__ait = __aqts.begin(); __ait != __aqts.end(); ++__ait) {
00396 
00397     // Remove thread from all aqts
00398     __ait->second->vision_threads->remove_thread(thread);
00399 
00400     if (__ait->second->raw_subscriber_thread == thread) {
00401       __ait->second->raw_subscriber_thread = NULL;
00402     }
00403 
00404     if ( __ait->second->vision_threads->has_cyclic_thread() ) {
00405       ++num_cyclic_threads;
00406 
00407     } else if (__ait->second->aqtmode() != FvAcquisitionThread::AqtContinuous ) {
00408       logger->log_info(name(), "Switching acquisition thread %s to continuous mode "
00409                                "on unregister", __ait->second->name());
00410 
00411       __ait->second->prepare_finalize();
00412       __ait->second->cancel();
00413       __ait->second->join();
00414       __ait->second->set_aqtmode(FvAcquisitionThread::AqtContinuous);
00415       __ait->second->start();
00416       __ait->second->cancel_finalize();
00417     }
00418   }
00419   // Recreate as necessary after _removing_ threads
00420   cond_recreate_barrier(num_cyclic_threads);
00421 
00422   __aqts.unlock();
00423 }
00424 
00425 
00426 bool
00427 FvBaseThread::thread_started(Thread *thread) throw()
00428 {
00429   __aqts.lock();
00430   for (__ait = __aqts.begin(); __ait != __aqts.end(); ++__ait) {
00431     if (__ait->second->vision_threads->has_waiting_thread(thread)) {
00432       __started_threads.lock();
00433       __started_threads[thread] = __ait->second;
00434       __started_threads.unlock();
00435     }
00436   }
00437   __aqts.unlock();
00438 
00439   return false;
00440 }
00441 
00442 
00443 bool
00444 FvBaseThread::thread_init_failed(Thread *thread) throw()
00445 {
00446   __aqts.lock();
00447   for (__ait = __aqts.begin(); __ait != __aqts.end(); ++__ait) {
00448     __ait->second->vision_threads->remove_waiting_thread(thread);
00449   }
00450   __aqts.unlock();
00451 
00452   return false;
00453 }

Generated on 1 Mar 2011 for Fawkes API by  doxygen 1.6.1