00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "EventChannel.h"
00025 #include "ConsumerAdmin.h"
00026 #include "SupplierAdmin.h"
00027 #include "omniEventsLog.h"
00028 #include "Orb.h"
00029
00030 #include <list>
00031
00032 namespace OmniEvents {
00033
00034
00035 CosEventChannelAdmin::ConsumerAdmin_ptr EventChannel_i::for_consumers()
00036 {
00037 if(!_consumerAdmin || _shutdownRequested)
00038 throw CORBA::OBJECT_NOT_EXIST();
00039 return _consumerAdmin->_this();
00040 }
00041
00042
00043 CosEventChannelAdmin::SupplierAdmin_ptr EventChannel_i::for_suppliers()
00044 {
00045 if(!_supplierAdmin || _shutdownRequested)
00046 throw CORBA::OBJECT_NOT_EXIST();
00047 return _supplierAdmin->_this();
00048 }
00049
00050
00051 void EventChannel_i::destroy()
00052 {
00053 if(_shutdownRequested)
00054 throw CORBA::OBJECT_NOT_EXIST();
00055
00056
00057 _shutdownRequested=true;
00058
00059 DB(5,"EventChannel_i::destroy()")
00060
00061
00062 if(_consumerAdmin)
00063 _consumerAdmin->disconnect();
00064 if(_supplierAdmin)
00065 _supplierAdmin->disconnect();
00066 }
00067
00068
00069 EventChannel_i::EventChannel_i(EventChannelStore* store)
00070 : Servant(PortableServer::POA::_nil()),
00071 _eventChannelStore(store),
00072 _consumerAdmin(NULL),
00073 _supplierAdmin(NULL),
00074 _poaManager(),
00075 _shutdownRequested(false),
00076 _properties(),
00077 _mapper(NULL),
00078 _lock(),
00079 _refCount(1)
00080 {}
00081
00082
00083 void EventChannel_i::activate(
00084 const char* channelName,
00085 const PersistNode* node
00086 )
00087 {
00088
00089
00090
00091 createPoa(channelName);
00092
00093 if(node)
00094 _properties._attr=node->_attr;
00095
00096
00097 _consumerAdmin=new ConsumerAdmin_i(*this,_poa);
00098
00099
00100 _supplierAdmin=new SupplierAdmin_i(*this,_poa);
00101
00102 if(node)
00103 {
00104 PersistNode* saNode =node->child("SupplierAdmin");
00105 if(saNode)
00106 _supplierAdmin->reincarnate(*saNode);
00107
00108 PersistNode* caNode =node->child("ConsumerAdmin");
00109 if(caNode)
00110 _consumerAdmin->reincarnate(*caNode);
00111 }
00112
00113 activateObjectWithId("EventChannel");
00114
00115
00116
00117 _remove_ref();
00118
00119
00120 setInsName(_properties.attrString("InsName"));
00121
00122
00123 start_undetached();
00124 }
00125
00126
00127 EventChannel_i::~EventChannel_i()
00128 {
00129 DB(20,"~EventChannel_i()")
00130
00131
00132
00133
00134
00135 if(_mapper)
00136 {
00137 _mapper->destroy();
00138 _mapper=NULL;
00139 }
00140 if(_consumerAdmin)
00141 {
00142 _consumerAdmin->_remove_ref();
00143 _consumerAdmin=NULL;
00144 }
00145 if(_supplierAdmin)
00146 {
00147 _supplierAdmin->_remove_ref();
00148 _supplierAdmin=NULL;
00149 }
00150 }
00151
00152
00153 void* EventChannel_i::run_undetached(void*)
00154 {
00155
00156 assert(!CORBA::is_nil(_poa));
00157
00158 const char* action="";
00159 try
00160 {
00161 if(_eventChannelStore)
00162 {
00163 action="add this object to the store";
00164 _eventChannelStore->insert(this);
00165 }
00166
00167 if(omniEventsLog::exists())
00168 {
00169 action="create this object in the persistency database";
00170 WriteLock log;
00171 output(log.os);
00172 }
00173
00174
00175 action="run main loop";
00176 mainLoop();
00177
00178 if(_eventChannelStore)
00179 {
00180 action="remove this object from the store";
00181 _eventChannelStore->erase(this);
00182 }
00183
00184 if(_shutdownRequested)
00185 {
00186 if(omniEventsLog::exists())
00187 {
00188 action="remove record from persistency database";
00189 CORBA::String_var poaName =_poa->the_name();
00190 WriteLock log;
00191 log.os<<"-ecf/"<<poaName.in()<<'\n';
00192 }
00193 action="destroy POA";
00194 _poa->destroy(
00195 CORBA::Boolean(1) ,
00196 CORBA::Boolean(0)
00197 );
00198 _poa=PortableServer::POA::_nil();
00199
00200 }
00201
00202 }
00203 catch(PortableServer::POAManager::AdapterInactive& ex) {
00204 DB(0,"EventChannel_i::run_undetached() - failed to "<<action<<
00205 ", POA deactivated from the outside.")
00206 }
00207 catch (CORBA::SystemException& ex) {
00208 DB(0,"EventChannel_i::run_undetached() - failed to "<<action<<
00209 ", System exception: "<<ex._name()<<" ("<<NP_MINORSTRING(ex)<<")")
00210 }
00211 catch (CORBA::Exception& ex) {
00212 DB(0,"EventChannel_i::run_undetached() - failed to "<<action<<
00213 ", CORBA exception: "<<ex._name())
00214 }
00215
00216
00217 return NULL;
00218 }
00219
00220
00221 void EventChannel_i::mainLoop()
00222 {
00223 _poaManager->activate();
00224 unsigned long localCyclePeriod_ns=cyclePeriod_ns();
00225 while(_refCount>0 && !_shutdownRequested)
00226 {
00227
00228
00229 _poaManager->hold_requests(CORBA::Boolean(1) );
00230
00231 if(_shutdownRequested) break;
00232
00233 list<CORBA::Any*> events;
00234 _supplierAdmin->collect(events);
00235 _consumerAdmin->send(events);
00236 assert(events.empty());
00237
00238 _poaManager->activate();
00239
00240
00241
00242
00243 omni_thread::sleep(0,localCyclePeriod_ns);
00244 }
00245 }
00246
00247
00248 void EventChannel_i::_add_ref()
00249 {
00250 #if OMNIEVENTS__DEBUG_REF_COUNTS
00251 DB(20,"EventChannel_i::_add_ref()")
00252 #endif
00253 omni_mutex_lock pause(_lock);
00254 ++_refCount;
00255 }
00256
00257
00258 void EventChannel_i::_remove_ref()
00259 {
00260 #if OMNIEVENTS__DEBUG_REF_COUNTS
00261 DB(20,"EventChannel_i::_remove_ref()")
00262 #endif
00263 int myref;
00264 {
00265 omni_mutex_lock pause(_lock);
00266 myref = --_refCount;
00267 }
00268
00269 if(myref<0)
00270 {
00271 DB(2,"EventChannel has negative ref count! "<<myref)
00272 }
00273 else if(myref==0)
00274 {
00275 DB(15,"EventChannel has zero ref count -- shutdown.")
00276 join(NULL);
00277 }
00278 }
00279
00280
00281 void EventChannel_i::output(ostream& os)
00282 {
00283 CORBA::String_var poaName =_poa->the_name();
00284 string name =string("ecf/")+poaName.in();
00285 _properties.output(os,name);
00286 if(_supplierAdmin)
00287 _supplierAdmin->output(os);
00288 if(_consumerAdmin)
00289 _consumerAdmin->output(os);
00290 }
00291
00292
00293 void EventChannel_i::setInsName(const string v)
00294 {
00295 Mapper* newMapper =NULL;
00296 try
00297 {
00298
00299
00300
00301 if(!v.empty())
00302 {
00303
00304 CORBA::Object_var obj( _this() );
00305 newMapper=new Mapper(v.c_str(),obj.in());
00306 }
00307
00308 if(_mapper)
00309 _mapper->destroy();
00310 _mapper=newMapper;
00311
00312 }
00313 catch(...)
00314 {
00315
00316 delete newMapper;
00317 throw;
00318 }
00319 }
00320
00321
00322 void EventChannel_i::createPoa(const char* channelName)
00323 {
00324 using namespace PortableServer;
00325 POA_ptr p=Orb::inst()._RootPOA.in();
00326
00327
00328
00329
00330
00331
00332
00333
00334
00335
00336 CORBA::PolicyList policies;
00337 policies.length(3);
00338 policies[0]=p->create_lifespan_policy(PERSISTENT);
00339 policies[1]=p->create_id_assignment_policy(USER_ID);
00340 policies[2]=p->create_thread_policy(SINGLE_THREAD_MODEL);
00341
00342 try
00343 {
00344 try
00345 {
00346
00347
00348 _poa=p->create_POA(channelName,POAManager::_nil(),policies);
00349 _poaManager=_poa->the_POAManager();
00350 }
00351 catch(POA::AdapterAlreadyExists& ex)
00352 {
00353 DB(0,"EventChannel_i::createPoa() - POA::AdapterAlreadyExists")
00354 throw;
00355 }
00356 catch(POA::InvalidPolicy& ex)
00357 {
00358 DB(0,"EventChannel_i::createPoa() - POA::InvalidPolicy: "<<ex.index)
00359 throw;
00360 }
00361 }
00362 catch(...)
00363 {
00364
00365 for(CORBA::ULong i=0; i<policies.length(); ++i)
00366 policies[i]->destroy();
00367 throw;
00368 }
00369
00370
00371 for(CORBA::ULong i=0; i<policies.length(); ++i)
00372 policies[i]->destroy();
00373 }
00374
00375
00376
00377
00378
00379
00380
00381 EventChannelStore::EventChannelStore()
00382 :_channels(),_lock()
00383 {}
00384
00385 EventChannelStore::~EventChannelStore()
00386 {
00387
00388 }
00389
00390 void EventChannelStore::insert(EventChannel_i* channel)
00391 {
00392 omni_mutex_lock l(_lock);
00393 bool insertOK =_channels.insert(channel).second;
00394 if(!insertOK)
00395 DB(2,"Attempted to store an EventChannel, when it is already stored.");
00396 }
00397
00398 void EventChannelStore::erase(EventChannel_i* channel)
00399 {
00400 omni_mutex_lock l(_lock);
00401 set<EventChannel_i*>::iterator pos =_channels.find(channel);
00402 if(pos==_channels.end())
00403 DB(2,"Failed to erase unknown EventChannel.")
00404 else
00405 _channels.erase(pos);
00406 }
00407
00408 void EventChannelStore::output(ostream &os)
00409 {
00410 omni_mutex_lock l(_lock);
00411 for(set<EventChannel_i*>::iterator i=_channels.begin();
00412 i!=_channels.end();
00413 ++i)
00414 {
00415 (*i)->output(os);
00416 }
00417 }
00418
00419
00420 };
00421