server_thread.cpp
00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include <netcomm/fawkes/server_thread.h>
00025 #include <netcomm/fawkes/server_client_thread.h>
00026 #include <netcomm/utils/acceptor_thread.h>
00027 #include <netcomm/fawkes/message.h>
00028 #include <netcomm/fawkes/handler.h>
00029 #include <netcomm/fawkes/message_queue.h>
00030 #include <netcomm/fawkes/message_content.h>
00031 #include <core/threading/thread_collector.h>
00032 #include <core/threading/mutex.h>
00033 #include <core/exception.h>
00034
00035 #include <unistd.h>
00036
00037 namespace fawkes {
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052 FawkesNetworkServerThread::FawkesNetworkServerThread(unsigned int fawkes_port,
00053 ThreadCollector *thread_collector)
00054 : Thread("FawkesNetworkServerThread", Thread::OPMODE_WAITFORWAKEUP)
00055 {
00056 this->thread_collector = thread_collector;
00057 clients.clear();
00058 next_client_id = 1;
00059 inbound_messages = new FawkesNetworkMessageQueue();
00060
00061 acceptor_thread = new NetworkAcceptorThread(this, fawkes_port,
00062 "FawkesNetworkAcceptorThread");
00063 if ( thread_collector ) {
00064 thread_collector->add(acceptor_thread);
00065 } else {
00066 acceptor_thread->start();
00067 }
00068 }
00069
00070
00071
00072 FawkesNetworkServerThread::~FawkesNetworkServerThread()
00073 {
00074 for (cit = clients.begin(); cit != clients.end(); ++cit) {
00075 if ( thread_collector ) {
00076 thread_collector->remove((*cit).second);
00077 } else {
00078 (*cit).second->cancel();
00079 (*cit).second->join();
00080 }
00081 delete (*cit).second;
00082 }
00083 if ( thread_collector ) {
00084 thread_collector->remove(acceptor_thread);
00085 } else {
00086 acceptor_thread->cancel();
00087 acceptor_thread->join();
00088 }
00089 delete acceptor_thread;
00090
00091 delete inbound_messages;
00092 }
00093
00094
00095
00096
00097
00098
00099 void
00100 FawkesNetworkServerThread::add_connection(StreamSocket *s) throw()
00101 {
00102 FawkesNetworkServerClientThread *client = new FawkesNetworkServerClientThread(s, this);
00103
00104 clients.lock();
00105 client->set_clid(next_client_id);
00106 if ( thread_collector ) {
00107 thread_collector->add(client);
00108 } else {
00109 client->start();
00110 }
00111 clients[next_client_id] = client;
00112 for (hit = handlers.begin(); hit != handlers.end(); ++hit) {
00113 (*hit).second->client_connected(next_client_id);
00114 }
00115 ++next_client_id;
00116 clients.unlock();
00117
00118 wakeup();
00119 }
00120
00121
00122
00123
00124
00125 void
00126 FawkesNetworkServerThread::add_handler(FawkesNetworkHandler *handler)
00127 {
00128 handlers.lock();
00129 if ( handlers.find(handler->id()) != handlers.end()) {
00130 handlers.unlock();
00131 throw Exception("Handler already registered");
00132 }
00133 handlers[handler->id()] = handler;
00134 handlers.unlock();
00135 }
00136
00137
00138
00139
00140
00141 void
00142 FawkesNetworkServerThread::remove_handler(FawkesNetworkHandler *handler)
00143 {
00144 handlers.lock();
00145 if( handlers.find(handler->id()) != handlers.end() ) {
00146 handlers.erase(handler->id());
00147 }
00148 handlers.unlock();
00149 }
00150
00151
00152
00153
00154
00155
00156
00157
00158 void
00159 FawkesNetworkServerThread::loop()
00160 {
00161 clients.lock();
00162
00163
00164 cit = clients.begin();
00165 while (cit != clients.end()) {
00166 if ( ! cit->second->alive() ) {
00167 if ( thread_collector ) {
00168 thread_collector->remove((*cit).second);
00169 } else {
00170 cit->second->cancel();
00171 cit->second->join();
00172 }
00173 usleep(5000);
00174 delete cit->second;
00175 unsigned int clid = (*cit).first;
00176 ++cit;
00177 clients.erase(clid);
00178 for (hit = handlers.begin(); hit != handlers.end(); ++hit) {
00179 (*hit).second->client_disconnected(clid);
00180 }
00181 } else {
00182 ++cit;
00183 }
00184 }
00185
00186
00187 inbound_messages->lock();
00188 while ( ! inbound_messages->empty() ) {
00189 FawkesNetworkMessage *m = inbound_messages->front();
00190 if ( handlers.find(m->cid()) != handlers.end()) {
00191 handlers[m->cid()]->handle_network_message(m);
00192 }
00193 m->unref();
00194 inbound_messages->pop();
00195 }
00196 inbound_messages->unlock();
00197
00198 clients.unlock();
00199 }
00200
00201
00202
00203 void
00204 FawkesNetworkServerThread::force_send()
00205 {
00206 clients.lock();
00207 for (cit = clients.begin(); cit != clients.end(); ++cit) {
00208 (*cit).second->force_send();
00209 }
00210 clients.unlock();
00211 }
00212
00213
00214
00215
00216
00217
00218
00219
00220 void
00221 FawkesNetworkServerThread::broadcast(FawkesNetworkMessage *msg)
00222 {
00223 for (cit = clients.begin(); cit != clients.end(); ++cit) {
00224 if ( (*cit).second->alive() ) {
00225 msg->ref();
00226 (*cit).second->enqueue(msg);
00227 }
00228 }
00229 msg->unref();
00230 }
00231
00232
00233
00234
00235
00236
00237
00238
00239
00240
00241 void
00242 FawkesNetworkServerThread::broadcast(unsigned short int component_id,
00243 unsigned short int msg_id,
00244 void *payload, unsigned int payload_size)
00245 {
00246 FawkesNetworkMessage *m = new FawkesNetworkMessage(component_id, msg_id,
00247 payload, payload_size);
00248 broadcast(m);
00249 }
00250
00251
00252
00253
00254
00255
00256 void
00257 FawkesNetworkServerThread::broadcast(unsigned short int component_id, unsigned short int msg_id)
00258 {
00259 FawkesNetworkMessage *m = new FawkesNetworkMessage(component_id, msg_id);
00260 broadcast(m);
00261 }
00262
00263
00264
00265
00266
00267
00268
00269
00270
00271
00272
00273
00274
00275 void
00276 FawkesNetworkServerThread::send(FawkesNetworkMessage *msg)
00277 {
00278 unsigned int clid = msg->clid();
00279 if ( clients.find(clid) != clients.end() ) {
00280 if ( clients[clid]->alive() ) {
00281 clients[clid]->enqueue(msg);
00282 } else {
00283 throw Exception("Client %u not alive", clid);
00284 }
00285 } else {
00286 throw Exception("Client %u not found", clid);
00287 }
00288 }
00289
00290
00291
00292
00293
00294
00295
00296
00297
00298
00299
00300 void
00301 FawkesNetworkServerThread::send(unsigned int to_clid,
00302 unsigned short int component_id, unsigned short int msg_id,
00303 void *payload, unsigned int payload_size)
00304 {
00305 FawkesNetworkMessage *m = new FawkesNetworkMessage(to_clid, component_id, msg_id,
00306 payload, payload_size);
00307 send(m);
00308 }
00309
00310
00311
00312
00313
00314
00315
00316
00317
00318
00319 void
00320 FawkesNetworkServerThread::send(unsigned int to_clid,
00321 unsigned short int component_id, unsigned short int msg_id,
00322 FawkesNetworkMessageContent *content)
00323 {
00324 FawkesNetworkMessage *m = new FawkesNetworkMessage(to_clid, component_id, msg_id,
00325 content);
00326 send(m);
00327 }
00328
00329
00330
00331
00332
00333
00334
00335
00336
00337
00338 void
00339 FawkesNetworkServerThread::send(unsigned int to_clid,
00340 unsigned short int component_id, unsigned short int msg_id)
00341 {
00342 FawkesNetworkMessage *m = new FawkesNetworkMessage(to_clid, component_id, msg_id);
00343 send(m);
00344 }
00345
00346
00347
00348
00349
00350
00351
00352
00353 void
00354 FawkesNetworkServerThread::dispatch(FawkesNetworkMessage *msg)
00355 {
00356 msg->ref();
00357 inbound_messages->push_locked(msg);
00358 }
00359
00360 }