fuse_client.cpp
00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include <fvutils/net/fuse_client.h>
00025
00026 #include <fvutils/net/fuse_transceiver.h>
00027 #include <fvutils/net/fuse_message_queue.h>
00028 #include <fvutils/net/fuse_message.h>
00029 #include <fvutils/net/fuse_client_handler.h>
00030
00031 #include <core/threading/mutex.h>
00032 #include <core/threading/wait_condition.h>
00033 #include <core/exceptions/software.h>
00034 #include <netcomm/socket/stream.h>
00035 #include <netcomm/utils/exceptions.h>
00036
00037 #include <cstring>
00038 #include <netinet/in.h>
00039 #include <cstdlib>
00040 #include <unistd.h>
00041
00042 using namespace fawkes;
00043
00044 namespace firevision {
00045 #if 0
00046 }
00047 #endif
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064 FuseClient::FuseClient(const char *hostname, unsigned short int port,
00065 FuseClientHandler *handler)
00066 : Thread("FuseClient")
00067 {
00068 __hostname = strdup(hostname);
00069 __port = port;
00070 __handler = handler;
00071
00072 __wait_timeout = 10;
00073
00074 __inbound_msgq = new FuseNetworkMessageQueue();
00075 __outbound_msgq = new FuseNetworkMessageQueue();
00076
00077 __mutex = new Mutex();
00078 __recv_mutex = new Mutex();
00079 __recv_waitcond = new WaitCondition(__recv_mutex);
00080 __socket = new StreamSocket();
00081 __greeting_mutex = new Mutex();
00082 __greeting_waitcond = new WaitCondition(__greeting_mutex);
00083
00084 __alive = true;
00085 __greeting_received = false;
00086 }
00087
00088
00089
00090 FuseClient::~FuseClient()
00091 {
00092 free(__hostname);
00093
00094 while ( ! __inbound_msgq->empty() ) {
00095 FuseNetworkMessage *m = __inbound_msgq->front();
00096 m->unref();
00097 __inbound_msgq->pop();
00098 }
00099 delete __inbound_msgq;
00100
00101 while ( ! __outbound_msgq->empty() ) {
00102 FuseNetworkMessage *m = __outbound_msgq->front();
00103 m->unref();
00104 __outbound_msgq->pop();
00105 }
00106 delete __outbound_msgq;
00107
00108 delete __mutex;
00109 delete __recv_mutex;
00110 delete __recv_waitcond;
00111 delete __socket;
00112 delete __greeting_mutex;
00113 delete __greeting_waitcond;
00114 }
00115
00116
00117
00118 void
00119 FuseClient::connect()
00120 {
00121 __socket->connect(__hostname, __port);
00122
00123 FUSE_greeting_message_t *greetmsg = (FUSE_greeting_message_t *)malloc(sizeof(FUSE_greeting_message_t));
00124 greetmsg->version = htonl(FUSE_CURRENT_VERSION);
00125 __outbound_msgq->push(new FuseNetworkMessage(FUSE_MT_GREETING,
00126 greetmsg, sizeof(FUSE_greeting_message_t)));
00127 }
00128
00129
00130
00131 void
00132 FuseClient::disconnect()
00133 {
00134 __mutex->lock();
00135 delete __socket;
00136 __socket = new StreamSocket();
00137 __alive = false;
00138 __mutex->unlock();
00139 }
00140
00141
00142
00143 void
00144 FuseClient::send()
00145 {
00146 try {
00147 FuseNetworkTransceiver::send(__socket, __outbound_msgq);
00148 } catch (ConnectionDiedException &e) {
00149 e.print_trace();
00150 __socket->close();
00151 __alive = false;
00152 __handler->fuse_connection_died();
00153 __recv_waitcond->wake_all();
00154 }
00155 }
00156
00157
00158
00159 void
00160 FuseClient::recv()
00161 {
00162 __recv_mutex->lock();
00163 try {
00164 while ( __socket->available() ) {
00165 FuseNetworkTransceiver::recv(__socket, __inbound_msgq);
00166 }
00167 } catch (ConnectionDiedException &e) {
00168 e.print_trace();
00169 __socket->close();
00170 __alive = false;
00171 __handler->fuse_connection_died();
00172 __recv_waitcond->wake_all();
00173 }
00174 __recv_mutex->unlock();
00175 }
00176
00177
00178
00179
00180
00181
00182
00183 void
00184 FuseClient::enqueue(FuseNetworkMessage *m)
00185 {
00186 __outbound_msgq->push_locked(m);
00187 }
00188
00189
00190
00191
00192
00193
00194
00195 void
00196 FuseClient::enqueue(FUSE_message_type_t type, void *payload, size_t payload_size)
00197 {
00198 FuseNetworkMessage *m = new FuseNetworkMessage(type, payload, payload_size);
00199 __outbound_msgq->push_locked(m);
00200 }
00201
00202
00203
00204
00205
00206 void
00207 FuseClient::enqueue(FUSE_message_type_t type)
00208 {
00209 FuseNetworkMessage *m = new FuseNetworkMessage(type);
00210 __outbound_msgq->push_locked(m);
00211 }
00212
00213
00214
00215
00216
00217
00218
00219
00220 void
00221 FuseClient::enqueue_and_wait(FuseNetworkMessage *m)
00222 {
00223 __recv_mutex->lock();
00224 __outbound_msgq->push_locked(m);
00225 __recv_waitcond->wait();
00226 __recv_mutex->unlock();
00227 }
00228
00229
00230
00231
00232
00233
00234
00235
00236 void
00237 FuseClient::enqueue_and_wait(FUSE_message_type_t type, void *payload, size_t payload_size)
00238 {
00239 FuseNetworkMessage *m = new FuseNetworkMessage(type, payload, payload_size);
00240 __recv_mutex->lock();
00241 __outbound_msgq->push_locked(m);
00242 __recv_waitcond->wait();
00243 __recv_mutex->unlock();
00244 }
00245
00246
00247
00248
00249
00250
00251 void
00252 FuseClient::enqueue_and_wait(FUSE_message_type_t type)
00253 {
00254 FuseNetworkMessage *m = new FuseNetworkMessage(type);
00255 __recv_mutex->lock();
00256 __outbound_msgq->push_locked(m);
00257 __recv_waitcond->wait();
00258 __recv_mutex->unlock();
00259 }
00260
00261
00262
00263
00264
00265
00266
00267
00268 void
00269 FuseClient::sleep()
00270 {
00271 try {
00272 __socket->poll(__wait_timeout , Socket::POLL_IN);
00273 } catch (Exception &e) {
00274 }
00275 }
00276
00277
00278
00279
00280
00281 void
00282 FuseClient::loop()
00283 {
00284 __mutex->lock();
00285
00286 if ( ! __alive ) {
00287 __mutex->unlock();
00288 usleep(10000);
00289 return;
00290 }
00291
00292 bool wake = false;
00293
00294 send();
00295 sleep();
00296 recv();
00297
00298
00299
00300 __inbound_msgq->lock();
00301 while ( ! __inbound_msgq->empty() ) {
00302 FuseNetworkMessage *m = __inbound_msgq->front();
00303
00304 if ( m->type() == FUSE_MT_GREETING ) {
00305 FUSE_greeting_message_t *gm = m->msg<FUSE_greeting_message_t>();
00306 if ( ntohl(gm->version) != FUSE_CURRENT_VERSION ) {
00307 __handler->fuse_invalid_server_version(FUSE_CURRENT_VERSION, ntohl(gm->version));
00308 __alive = false;
00309 } else {
00310 __greeting_mutex->lock();
00311 __greeting_received = true;
00312 __greeting_waitcond->wake_all();
00313 __greeting_mutex->unlock();
00314 __handler->fuse_connection_established();
00315 }
00316 } else {
00317 __handler->fuse_inbound_received(m);
00318 wake = true;
00319 }
00320
00321 m->unref();
00322 __inbound_msgq->pop();
00323 }
00324 __inbound_msgq->unlock();
00325
00326 if ( wake ) {
00327 __recv_waitcond->wake_all();
00328 }
00329 __mutex->unlock();
00330 }
00331
00332
00333
00334
00335
00336
00337 void
00338 FuseClient::wait()
00339 {
00340 __recv_mutex->lock();
00341 __recv_waitcond->wait();
00342 __recv_mutex->unlock();
00343 }
00344
00345
00346
00347
00348
00349
00350
00351
00352
00353 void
00354 FuseClient::wait_greeting()
00355 {
00356 __greeting_mutex->lock();
00357 while (! __greeting_received) {
00358 __greeting_waitcond->wait();
00359 }
00360 __greeting_mutex->unlock();
00361 }
00362
00363 }