Main Page | Namespace List | Class Hierarchy | Class List | Directories | File List | Namespace Members | Class Members | File Members

ProxyPushSupplier.cc

Go to the documentation of this file.
00001 //                            Package   : omniEvents
00002 // ProxyPushSupplier.cc       Created   : 2003/12/04
00003 //                            Author    : Alex Tingle
00004 //
00005 //    Copyright (C) 2003,2005 Alex Tingle.
00006 //
00007 //    This file is part of the omniEvents application.
00008 //
00009 //    omniEvents is free software; you can redistribute it and/or
00010 //    modify it under the terms of the GNU Lesser General Public
00011 //    License as published by the Free Software Foundation; either
00012 //    version 2.1 of the License, or (at your option) any later version.
00013 //
00014 //    omniEvents is distributed in the hope that it will be useful,
00015 //    but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 //    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00017 //    Lesser General Public License for more details.
00018 //
00019 //    You should have received a copy of the GNU Lesser General Public
00020 //    License along with this library; if not, write to the Free Software
00021 //    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 //
00023 
00024 #include "ProxyPushSupplier.h"
00025 #include "Orb.h"
00026 #include "omniEventsLog.h"
00027 #include "PersistNode.h"
00028 #include <assert.h>
00029 
00030 namespace OmniEvents {
00031 
00035 class omni_mutex_kcol {
00036     omni_mutex& mutex;
00037 public:
00038     omni_mutex_kcol(omni_mutex& m) : mutex(m) { mutex.unlock(); }
00039     ~omni_mutex_kcol(void) { mutex.lock(); }
00040 private:
00041     // dummy copy constructor and operator= to prevent copying
00042     omni_mutex_kcol(const omni_mutex_kcol&);
00043     omni_mutex_kcol& operator=(const omni_mutex_kcol&);
00044 };
00045 
00046 
00047 //
00048 //  ProxyPushSupplierManager
00049 //
00050 
00051 PortableServer::Servant
00052 ProxyPushSupplierManager::incarnate(
00053   const PortableServer::ObjectId& oid,
00054   PortableServer::POA_ptr         poa
00055 )
00056 {
00057   ProxyPushSupplier_i* result =new ProxyPushSupplier_i(_managedPoa,_queue);
00058   PauseThenWake p(this);
00059   _servants.insert(result);
00060   return result;
00061 }
00062 
00063 void
00064 ProxyPushSupplierManager::etherealize(
00065   const PortableServer::ObjectId& oid,
00066   PortableServer::POA_ptr         adapter,
00067   PortableServer::Servant         serv,
00068   CORBA::Boolean                  cleanup_in_progress,
00069   CORBA::Boolean                  remaining_activations
00070 )
00071 {
00072   // This etherealize method needs a special implementation because
00073   // ProxyPushSupplier_i objects are freed with _remove_ref() rather than
00074   // delete.
00075   // Otherwise, this method strongly resembles ProxyManager::etherealize().
00076   omni_mutex_lock pause(_lock);
00077   ProxyPushSupplier_i* narrowed =dynamic_cast<ProxyPushSupplier_i*>(serv);
00078   assert(narrowed!=NULL);
00079   set<Proxy*>::iterator pos =_servants.find(narrowed);
00080   if(pos!=_servants.end())
00081   {
00082     _servants.erase(pos);
00083     narrowed->_remove_ref();
00084   }
00085   else
00086   {
00087     DB(1,"\t\teh? - POA attempted to etherealize unknown servant.");
00088   }
00089 }
00090 
00091 ProxyPushSupplierManager::ProxyPushSupplierManager(
00092   PortableServer::POA_ptr parentPoa,
00093   EventQueue& q
00094 )
00095 : ProxyManager(parentPoa),
00096   omni_thread(NULL,PRIORITY_HIGH),
00097   _queue(q),
00098   _lock(),_condition(&_lock),
00099   _refCount(1)
00100 {
00101   ProxyManager::activate("ProxyPushSupplier");
00102   start_undetached();
00103 }
00104 
00105 ProxyPushSupplierManager::~ProxyPushSupplierManager()
00106 {
00107   DB(20,"~ProxyPushSupplierManager()")
00108 }
00109 
00110 CosEventChannelAdmin::ProxyPushSupplier_ptr
00111 ProxyPushSupplierManager::createObject()
00112 {  
00113   return createNarrowedReference<CosEventChannelAdmin::ProxyPushSupplier>(
00114            _managedPoa.in(),
00115            CosEventChannelAdmin::_tc_ProxyPushSupplier->id()
00116          );
00117 }
00118 
00119 void ProxyPushSupplierManager::disconnect()
00120 {
00121   for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i)
00122   {
00123     Proxy* p =*i; // Sun's CC requires this temporary.
00124     ProxyPushSupplier_i* pps =static_cast<ProxyPushSupplier_i*>(p);
00125     // We are in the EventChannel's thread.
00126     // Make sure all calls go though the ProxyPushSupplier POA.
00127     CosEventChannelAdmin::ProxyPushSupplier_var ppsv =pps->_this(); 
00128     ppsv->disconnect_push_supplier();
00129   }
00130 }
00131 
00132 void*
00133 ProxyPushSupplierManager::run_undetached(void*)
00134 {
00135   // This loop repeatedly triggers all of the servants in turn. As long as
00136   // something happens each time, then we loop as fast as we can.
00137   // As soon as activity dries up, we start to wait longer and longer between
00138   // loops (up to a maximum). When there is no work to do, just block until
00139   // a new event arrives.
00140   //
00141   // Rationale: The faster we loop the more events we can deliver to each
00142   // consumer per second. However, when nothing is happening, this busy loop
00143   // just soaks up CPU and kills performance. The optimum sleep time varies
00144   // wildly from platform to platform, and also depends upon the typical ping
00145   // time to the consumers.
00146   //
00147   // This dynamic approach should deliver reasonable performance when things
00148   // are hectic, but not soak up too much CPU when not much is happening.
00149   //
00150   const unsigned long sleepTimeNanosec0 =0x8000;   // 33us (doubled before use)
00151   const unsigned long maxSleepNanosec   =0x800000; // 8.4ms
00152   unsigned long sleepTimeNanosec =sleepTimeNanosec0;
00153 
00154   omni_mutex_lock conditionLock(_lock);
00155   while(true)
00156   {
00157     try {
00158       if(_refCount<1)
00159           break;
00160 
00161       bool busy=false;
00162       bool waiting=false;
00163 
00164       // Trigger each servant in turn.
00165       for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i)
00166       {
00167         Proxy* p =*i; // Sun's CC requires this temporary.
00168         ProxyPushSupplier_i* pps =static_cast<ProxyPushSupplier_i*>(p);
00169         pps->trigger(busy,waiting);
00170       }
00171 
00172       if(busy)
00173       {
00174         // Something happened last time round. So we'll be optimistic and
00175         // immediately go round for another go. Briefly unlock the mutex first,
00176         // just to let the other kids get in if they need to.
00177         omni_mutex_kcol l(_lock); // 'lock' reversed!
00178         // Reset the sleep time.
00179         sleepTimeNanosec=sleepTimeNanosec0;
00180       }
00181       else if(waiting)
00182       {
00183         // Nothing happened, so we'll wait for a bit and then give it another
00184         // go. Each time we wait for twice as long, up to the maximum.
00185         if(sleepTimeNanosec<maxSleepNanosec)
00186             sleepTimeNanosec<<=1; // (multiply by 2)
00187         unsigned long sec,nsec;
00188         omni_thread::get_time(&sec,&nsec,0,sleepTimeNanosec);
00189         _condition.timedwait(sec,nsec);
00190       }
00191       else
00192       {
00193         // There is nothing to do, so block until a new event arrives.
00194         _condition.wait();
00195       }
00196 
00197     }
00198     catch (CORBA::SystemException& ex) {
00199       DB(2,"ProxyPushSupplierManager ignoring CORBA system exception"
00200          IF_OMNIORB4(": "<<ex._name()<<" ("<<NP_MINORSTRING(ex)<<")") ".")
00201     }
00202     catch (CORBA::Exception& ex) {
00203       DB(2,"ProxyPushSupplierManager ignoring CORBA exception"
00204          IF_OMNIORB4(": "<<ex._name()<<) ".")
00205     }
00206     catch(...) {
00207       DB(2,"ProxyPushSupplierManager thread killed by unknown exception.")
00208       break;
00209     }
00210   }
00211   return NULL;
00212 }
00213 
00214 void ProxyPushSupplierManager::_add_ref()
00215 {
00216 #if OMNIEVENTS__DEBUG_REF_COUNTS
00217   DB(20,"ProxyPushSupplierManager::_add_ref()")
00218 #endif
00219   omni_mutex_lock pause(_lock);
00220   ++_refCount;
00221 }
00222 
00223 void ProxyPushSupplierManager::_remove_ref()
00224 {
00225 #if OMNIEVENTS__DEBUG_REF_COUNTS
00226   DB(20,"ProxyPushSupplierManager::_remove_ref()")
00227 #endif
00228   int myref;
00229   {
00230     PauseThenWake p(this);
00231     myref = --_refCount;
00232   }
00233   if(myref<0)
00234   {
00235     DB(2,"ProxyPushSupplierManager has negative ref count! "<<myref)
00236   }
00237   else if(myref==0)
00238   {
00239     DB(15,"ProxyPushSupplierManager has zero ref count -- shutdown.")
00240     join(NULL);
00241   }
00242 }
00243 
00244 
00245 //
00246 //  ProxyPushSupplier_i
00247 //
00248 
00249 void ProxyPushSupplier_i::connect_push_consumer(
00250   CosEventComm::PushConsumer_ptr pushConsumer)
00251 {
00252   if(CORBA::is_nil(pushConsumer))
00253       throw CORBA::BAD_PARAM();
00254   if(!CORBA::is_nil(_target) || !CORBA::is_nil(_req))
00255       throw CosEventChannelAdmin::AlreadyConnected();
00256   _target=CosEventComm::PushConsumer::_duplicate(pushConsumer);
00257 
00258   // Test to see whether pushSupplier is a ProxyPushSupplier.
00259   // If so, then we will aggressively try to reconnect, when we are reincarnated
00260   CORBA::Request_var req =_target->_request("_is_a");
00261   req->add_in_arg() <<= CosEventChannelAdmin::_tc_ProxyPushConsumer->id();
00262   req->set_return_type(CORBA::_tc_boolean);
00263   req->send_deferred();
00264   Orb::inst().deferredRequest(req._retn(),this); // Register for callback
00265 
00266   if(omniEventsLog::exists())
00267   {
00268     WriteLock log;
00269     output(log.os);
00270   }
00271 }
00272 
00273 
00274 void ProxyPushSupplier_i::disconnect_push_supplier()
00275 {
00276   DB(5,"ProxyPushSupplier_i::disconnect_push_supplier()");
00277   eraseKey("ConsumerAdmin/ProxyPushSupplier");
00278   deactivateObject();
00279   if(CORBA::is_nil(_target))
00280   {
00281     throw CORBA::OBJECT_NOT_EXIST(
00282       IFELSE_OMNIORB4(omni::OBJECT_NOT_EXIST_NoMatch,0),
00283       CORBA::COMPLETED_NO
00284     );
00285   }
00286   else
00287   {
00288     CORBA::Request_var req=_target->_request("disconnect_push_consumer");
00289     _target=CosEventComm::PushConsumer::_nil();
00290     req->send_deferred();
00291     Orb::inst().deferredRequest(req._retn());
00292   }
00293 }
00294 
00295 
00296 ProxyPushSupplier_i::ProxyPushSupplier_i(
00297   PortableServer::POA_ptr poa,
00298   EventQueue&             q
00299 )
00300 : Proxy(poa),
00301   EventQueue::Reader(q),
00302   _target(CosEventComm::PushConsumer::_nil()),
00303   _targetIsProxy(false)
00304 {
00305   // pass
00306 }
00307 
00308 ProxyPushSupplier_i::~ProxyPushSupplier_i()
00309 {
00310   DB(20,"~ProxyPushSupplier_i()")
00311 }
00312 
00313 OMNIEVENTS__DEBUG_REF_COUNTS__DEFN(ProxyPushSupplier_i)
00314 
00315 inline void ProxyPushSupplier_i::trigger(bool& busy, bool& waiting)
00316 {
00317   if(!CORBA::is_nil(_req) && _req->poll_response()) // response has arrived
00318   {
00319     CORBA::Environment_ptr env=_req->env(); // No need to free environment.
00320     if(!CORBA::is_nil(env) && env->exception())
00321     {
00322       // Shut down the connection
00323       CORBA::Exception* ex =env->exception(); // No need to free exception.
00324       DB(10,"ProxyPushSupplier got exception" IF_OMNIORB4(": "<<ex->_name()) );
00325       Orb::inst().reportObjectFailure(HERE,_target.in(),ex);
00326       _req=CORBA::Request::_nil();
00327 
00328       // Try to notify the Consumer that the connection is closing.
00329       CORBA::Request_var req=_target->_request("disconnect_push_consumer");
00330       req->send_deferred();
00331       Orb::inst().deferredRequest(req._retn());
00332 
00333       _target=CosEventComm::PushConsumer::_nil(); // disconnected.
00334       eraseKey("ConsumerAdmin/ProxyPushSupplier");
00335       deactivateObject();
00336       return; // No more work to do
00337     }
00338     _req=CORBA::Request::_nil();
00339     busy=true;
00340   }
00341   if(CORBA::is_nil(_req) && !CORBA::is_nil(_target) && moreEvents())
00342   {
00343     _req=_target->_request("push");
00344     _req->add_in_arg() <<= *(nextEvent());
00345     _req->send_deferred();
00346     busy=true;
00347   }
00348   if(!CORBA::is_nil(_req)) // More work to do, if _req NOT nil.
00349       waiting=true;
00350 }
00351 
00352 
00353 void ProxyPushSupplier_i::callback(CORBA::Request_ptr req)
00354 {
00355   if(_targetIsProxy)
00356   {
00357     // There should only ever be one of these callbacks per proxy,
00358     // because each proxy should only be connected once.
00359     DB(2,"WARNING: Multiple connections to ProxyPushSupplier.");
00360   }
00361   else if(req->return_value()>>=CORBA::Any::to_boolean(_targetIsProxy))
00362   {
00363     if(_targetIsProxy && omniEventsLog::exists())
00364     {
00365       WriteLock log;
00366       output(log.os);
00367       DB(15,"ProxyPushSupplier is federated.");
00368     }
00369   }
00370   else
00371   {
00372     DB(2,"ProxyPushSupplier got unexpected callback.");
00373     _targetIsProxy=false; // Reset it just to be sure.
00374   }
00375 }
00376 
00377 
00378 void ProxyPushSupplier_i::reincarnate(
00379   const string&      oid,
00380   const PersistNode& node
00381 )
00382 {
00383   try
00384   {
00385     using namespace CosEventChannelAdmin;
00386 
00387     string ior( node.attrString("IOR").c_str() );
00388     CosEventComm::PushConsumer_var pushConsumer =
00389       string_to_<CosEventComm::PushConsumer>(ior.c_str());
00390     // Do not activate until we know that we have read a valid target.
00391     activateObjectWithId(oid.c_str());
00392     _remove_ref();
00393     _target=pushConsumer._retn();
00394     _targetIsProxy=bool(node.attrLong("proxy"));
00395 
00396     // If pushConsumer is a proxy, then try to reconnect.
00397     if(_targetIsProxy)
00398     {
00399       DB(15,"Attempting to reconnect ProxyPushSupplier: "<<oid.c_str())
00400       // This will only work if the proxy is implemented in the same way as
00401       // omniEvents, so connect_() automatically creates a proxy.
00402       ProxyPushConsumer_var proxyCons =
00403         string_to_<ProxyPushConsumer>(ior.c_str());
00404       CosEventComm::PushSupplier_var thisSupp =_this();
00405       proxyCons->connect_push_supplier(thisSupp);
00406       DB(7,"Reconnected ProxyPushSupplier: "<<oid.c_str())
00407     }
00408   }
00409   catch(CosEventChannelAdmin::AlreadyConnected&){ // connect_push_supplier()
00410     // The supplier doesn't need to be reconnected.
00411     DB(7,"Remote ProxyPushConsumer already connected: "<<oid.c_str())
00412   }
00413   catch(CosEventChannelAdmin::TypeError&){ // connect_push_supplier()
00414     // Don't know what to make of this...
00415     DB(2,"Remote ProxyPushConsumer threw TypeError: "<<oid.c_str())
00416   }
00417   catch(CORBA::OBJECT_NOT_EXIST&) {} // object 'pushConsumer' not responding.
00418   catch(CORBA::TRANSIENT&       ) {} // object 'pushConsumer' not responding.
00419   catch(CORBA::COMM_FAILURE&    ) {} // object 'pushConsumer' not responding.
00420 }
00421 
00422 
00423 void ProxyPushSupplier_i::output(ostream &os)
00424 {
00425   basicOutput(
00426     os,"ConsumerAdmin/ProxyPushSupplier",
00427     _target.in(),
00428     _targetIsProxy? " proxy=1": NULL
00429   );
00430 }
00431 
00432 
00433 }; // end namespace OmniEvents

Generated on Fri Aug 26 20:56:14 2005 for OmniEvents by  doxygen 1.4.3-20050530