Main Page | Namespace List | Class Hierarchy | Class List | Directories | File List | Namespace Members | Class Members | File Members

EventChannel.cc

Go to the documentation of this file.
00001 //                            Package   : omniEvents
00002 // EventChannel.cc            Created   : 2003/12/04
00003 //                            Author    : Alex Tingle
00004 //
00005 //    Copyright (C) 2003-2005 Alex Tingle.
00006 //
00007 //    This file is part of the omniEvents application.
00008 //
00009 //    omniEvents is free software; you can redistribute it and/or
00010 //    modify it under the terms of the GNU Lesser General Public
00011 //    License as published by the Free Software Foundation; either
00012 //    version 2.1 of the License, or (at your option) any later version.
00013 //
00014 //    omniEvents is distributed in the hope that it will be useful,
00015 //    but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 //    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00017 //    Lesser General Public License for more details.
00018 //
00019 //    You should have received a copy of the GNU Lesser General Public
00020 //    License along with this library; if not, write to the Free Software
00021 //    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 //
00023 
00024 #include "EventChannel.h"
00025 #include "ConsumerAdmin.h"
00026 #include "SupplierAdmin.h"
00027 #include "omniEventsLog.h"
00028 #include "Orb.h"
00029 
00030 #include <list>
00031 
00032 namespace OmniEvents {
00033 
00034 // CORBA interface methods
00035 CosEventChannelAdmin::ConsumerAdmin_ptr EventChannel_i::for_consumers()
00036 {
00037   if(!_consumerAdmin || _shutdownRequested)
00038       throw CORBA::OBJECT_NOT_EXIST();
00039   return _consumerAdmin->_this();
00040 }
00041 
00042 
00043 CosEventChannelAdmin::SupplierAdmin_ptr EventChannel_i::for_suppliers()
00044 {
00045   if(!_supplierAdmin || _shutdownRequested)
00046       throw CORBA::OBJECT_NOT_EXIST();
00047   return _supplierAdmin->_this();
00048 }
00049 
00050 
00051 void EventChannel_i::destroy()
00052 {
00053   if(_shutdownRequested)
00054       throw CORBA::OBJECT_NOT_EXIST();
00055 
00056   // Prevent further incoming connections.
00057   _shutdownRequested=true;
00058 
00059   DB(5,"EventChannel_i::destroy()")
00060 
00061   // Send disconnect messages to connected clients.
00062   if(_consumerAdmin)
00063      _consumerAdmin->disconnect();
00064   if(_supplierAdmin)
00065      _supplierAdmin->disconnect();
00066 }
00067 
00068 
00069 EventChannel_i::EventChannel_i(EventChannelStore* store)
00070 : Servant(PortableServer::POA::_nil()),
00071   _eventChannelStore(store),
00072   _consumerAdmin(NULL),
00073   _supplierAdmin(NULL),
00074   _poaManager(),
00075   _shutdownRequested(false),
00076   _properties(),
00077   _mapper(NULL),
00078   _lock(),
00079   _refCount(1)
00080 {}
00081 
00082 
00083 void EventChannel_i::activate(
00084   const char*        channelName,
00085   const PersistNode* node
00086 )
00087 {
00088   // The order of these various initialization methods is very important.
00089   // I've documented dependencies as 'REQUIRES' comments.
00090 
00091   createPoa(channelName);
00092 
00093   if(node)
00094       _properties._attr=node->_attr;
00095 
00096   // REQUIRES: _properties
00097   _consumerAdmin=new ConsumerAdmin_i(*this,_poa);
00098 
00099   // REQUIRES: _consumerAdmin, _properties
00100   _supplierAdmin=new SupplierAdmin_i(*this,_poa);
00101 
00102   if(node)
00103   {
00104     PersistNode* saNode =node->child("SupplierAdmin");
00105     if(saNode)
00106         _supplierAdmin->reincarnate(*saNode);
00107 
00108     PersistNode* caNode =node->child("ConsumerAdmin");
00109     if(caNode)
00110         _consumerAdmin->reincarnate(*caNode);
00111   }
00112 
00113   activateObjectWithId("EventChannel");
00114   
00115   // Remove the constructor's reference. This object will now be destroyed when
00116   // the POA releases it.
00117   _remove_ref();
00118 
00119   // REQUIRES: activate() ...since it uses _this().
00120   setInsName(_properties.attrString("InsName"));
00121 
00122   // Start the channel's thread running.
00123   start_undetached();
00124 }
00125 
00126 
00127 EventChannel_i::~EventChannel_i()
00128 {
00129   DB(20,"~EventChannel_i()")
00130   // Destroy the mapper object, even when the EventChannel is being shut down
00131   // without a call to destroy(). This can happen if the channel is
00132   // implemented through libomniEvents - the channel could be shut down and
00133   // later reincarnated in the same process. The Mapper's lifecycle should
00134   // match that of the EventChannel.
00135   if(_mapper)
00136   {
00137     _mapper->destroy();
00138     _mapper=NULL;
00139   }
00140   if(_consumerAdmin)
00141   {
00142     _consumerAdmin->_remove_ref();
00143     _consumerAdmin=NULL;
00144   }
00145   if(_supplierAdmin)
00146   {
00147     _supplierAdmin->_remove_ref();
00148     _supplierAdmin=NULL;
00149   }
00150 }
00151 
00152 
00153 void* EventChannel_i::run_undetached(void*)
00154 {
00155   // Ensure that activate() is called before start()/run().
00156   assert(!CORBA::is_nil(_poa));
00157 
00158   const char* action="";
00159   try
00160   {
00161     if(_eventChannelStore)
00162     {
00163       action="add this object to the store";
00164       _eventChannelStore->insert(this);
00165     }
00166 
00167     if(omniEventsLog::exists())
00168     {
00169       action="create this object in the persistency database";
00170       WriteLock log;
00171       output(log.os);
00172     }
00173 
00174     // Process events until the channel is destroyed.
00175     action="run main loop";
00176     mainLoop();
00177 
00178     if(_eventChannelStore)
00179     {
00180       action="remove this object from the store";
00181       _eventChannelStore->erase(this);
00182     }
00183 
00184     if(_shutdownRequested)
00185     {
00186       if(omniEventsLog::exists())
00187       {
00188         action="remove record from persistency database";
00189         CORBA::String_var poaName =_poa->the_name();
00190         WriteLock log;
00191         log.os<<"-ecf/"<<poaName.in()<<'\n';
00192       }
00193       action="destroy POA";
00194       _poa->destroy(
00195         CORBA::Boolean(1) /* etherealize_objects */,
00196         CORBA::Boolean(0) /* wait_for_completion */
00197       );
00198       _poa=PortableServer::POA::_nil();
00199 
00200     } // end if(_shutdownRequested)
00201 
00202   }
00203   catch(PortableServer::POAManager::AdapterInactive& ex) {
00204     DB(0,"EventChannel_i::run_undetached() - failed to "<<action<<
00205        ", POA deactivated from the outside.")
00206   }
00207   catch (CORBA::SystemException& ex) {
00208     DB(0,"EventChannel_i::run_undetached() - failed to "<<action<<
00209        ", System exception: "<<ex._name()<<" ("<<NP_MINORSTRING(ex)<<")")
00210   }
00211   catch (CORBA::Exception& ex) {
00212     DB(0,"EventChannel_i::run_undetached() - failed to "<<action<<
00213        ", CORBA exception: "<<ex._name())
00214   }
00215 
00216   // Thread now exits, and this object is deleted.
00217   return NULL;
00218 }
00219 
00220 
00221 void EventChannel_i::mainLoop()
00222 {
00223   _poaManager->activate();
00224   unsigned long localCyclePeriod_ns=cyclePeriod_ns();
00225   while(_refCount>0 && !_shutdownRequested)
00226   {
00227     //
00228     // TRANSFER PHASE - transfer events from SupplierAdmin to ConsumerAdmin.
00229     _poaManager->hold_requests(CORBA::Boolean(1) /* wait_for_completion */);
00230 
00231     if(_shutdownRequested) break;
00232 
00233     list<CORBA::Any*> events;
00234     _supplierAdmin->collect(events);
00235     _consumerAdmin->send(events);
00236     assert(events.empty());
00237 
00238     _poaManager->activate();
00239     
00240     //
00241     // COMMUNICATION PHASE - talk with clients' suppliers & consumers.
00242     // Note: On Linux the resolution of nanosleep is a huge 10ms.
00243     omni_thread::sleep(0,localCyclePeriod_ns);
00244   }
00245 }
00246 
00247 
00248 void EventChannel_i::_add_ref()
00249 {
00250 #if OMNIEVENTS__DEBUG_REF_COUNTS
00251   DB(20,"EventChannel_i::_add_ref()")
00252 #endif
00253   omni_mutex_lock pause(_lock);
00254   ++_refCount;
00255 }
00256 
00257 
00258 void EventChannel_i::_remove_ref()
00259 {
00260 #if OMNIEVENTS__DEBUG_REF_COUNTS
00261   DB(20,"EventChannel_i::_remove_ref()")
00262 #endif
00263   int myref;
00264   {
00265     omni_mutex_lock pause(_lock);
00266     myref = --_refCount;
00267   }
00268 
00269   if(myref<0)
00270   {
00271     DB(2,"EventChannel has negative ref count! "<<myref)
00272   }
00273   else if(myref==0)
00274   {
00275     DB(15,"EventChannel has zero ref count -- shutdown.")
00276     join(NULL);
00277   }
00278 }
00279 
00280 
00281 void EventChannel_i::output(ostream& os)
00282 {
00283   CORBA::String_var poaName =_poa->the_name();
00284   string name =string("ecf/")+poaName.in();
00285   _properties.output(os,name);
00286   if(_supplierAdmin)
00287      _supplierAdmin->output(os);
00288   if(_consumerAdmin)
00289      _consumerAdmin->output(os);
00290 }
00291 
00292 
00293 void EventChannel_i::setInsName(const string v)
00294 {
00295   Mapper* newMapper =NULL;
00296   try
00297   {
00298 
00299     // If _insName is set, then create a mapper object to allow clients to
00300     // find this object with a `corbaloc' string.
00301     if(!v.empty())
00302     {
00303       // !! Throws when there is already an object named 'v' in the INSPOA.
00304       CORBA::Object_var obj( _this() );
00305       newMapper=new Mapper(v.c_str(),obj.in());
00306     }
00307     // Deactivate the old _mapper object.
00308     if(_mapper)
00309        _mapper->destroy();
00310     _mapper=newMapper;
00311 
00312   }
00313   catch(...)
00314   {
00315     // Can't use an auto_ptr, because MS VC++ 6 has no auto_ptr::reset()
00316     delete newMapper;
00317     throw;
00318   }
00319 }
00320 
00321 
00322 void EventChannel_i::createPoa(const char* channelName)
00323 {
00324   using namespace PortableServer;
00325   POA_ptr p=Orb::inst()._RootPOA.in();
00326 
00327   // POLICIES:
00328   //  Lifespan          =PERSISTENT             // we can persist
00329   //  Assignment        =USER_ID                // write our own oid
00330   //  Uniqueness        =[default] UNIQUE_ID    // one servant per object
00331   //  ImplicitActivation=[default] IMPLICIT_ACTIVATION // auto activation
00332   //  RequestProcessing =[default] USE_ACTIVE_OBJECT_MAP_ONLY
00333   //  ServantRetention  =[default] RETAIN       // stateless POA
00334   //  Thread            =SINGLE_THREAD_MODEL    // keep it simple
00335 
00336   CORBA::PolicyList policies;
00337   policies.length(3);
00338   policies[0]=p->create_lifespan_policy(PERSISTENT);
00339   policies[1]=p->create_id_assignment_policy(USER_ID);
00340   policies[2]=p->create_thread_policy(SINGLE_THREAD_MODEL);
00341 
00342   try // finally
00343   {
00344       try
00345       {
00346         // Create a new POA (and new POAManager) for this channel.
00347         // The POAManager will be used for all of this channel's POAs.
00348         _poa=p->create_POA(channelName,POAManager::_nil(),policies);
00349         _poaManager=_poa->the_POAManager();
00350       }
00351       catch(POA::AdapterAlreadyExists& ex) // create_POA
00352       {
00353         DB(0,"EventChannel_i::createPoa() - POA::AdapterAlreadyExists")
00354         throw;
00355       }
00356       catch(POA::InvalidPolicy& ex) // create_POA
00357       {
00358         DB(0,"EventChannel_i::createPoa() - POA::InvalidPolicy: "<<ex.index)
00359         throw;
00360       }
00361   }
00362   catch(...) // finally
00363   {
00364     // Destroy the policy objects (Not strictly necessary in omniORB)
00365     for(CORBA::ULong i=0; i<policies.length(); ++i)
00366         policies[i]->destroy();
00367     throw;
00368   }
00369 
00370   // Destroy the policy objects (Not strictly necessary in omniORB)
00371   for(CORBA::ULong i=0; i<policies.length(); ++i)
00372       policies[i]->destroy();
00373 }
00374 
00375 
00376 //
00377 // class EventChannelStore
00378 //
00379 
00380 
00381 EventChannelStore::EventChannelStore()
00382 :_channels(),_lock()
00383 {}
00384 
00385 EventChannelStore::~EventChannelStore()
00386 {
00387   // ?? IMPLEMENT ME
00388 }
00389 
00390 void EventChannelStore::insert(EventChannel_i* channel)
00391 {
00392   omni_mutex_lock l(_lock);
00393   bool insertOK =_channels.insert(channel).second;
00394   if(!insertOK)
00395       DB(2,"Attempted to store an EventChannel, when it is already stored.");
00396 }
00397 
00398 void EventChannelStore::erase(EventChannel_i* channel)
00399 {
00400   omni_mutex_lock l(_lock);
00401   set<EventChannel_i*>::iterator pos =_channels.find(channel);
00402   if(pos==_channels.end())
00403       DB(2,"Failed to erase unknown EventChannel.")
00404   else
00405       _channels.erase(pos);
00406 }
00407 
00408 void EventChannelStore::output(ostream &os)
00409 {
00410   omni_mutex_lock l(_lock);
00411   for(set<EventChannel_i*>::iterator i=_channels.begin();
00412       i!=_channels.end();
00413       ++i)
00414   {
00415     (*i)->output(os);
00416   }
00417 }
00418 
00419 
00420 }; // end namespace OmniEvents
00421 

Generated on Fri Aug 26 20:56:14 2005 for OmniEvents by  doxygen 1.4.3-20050530