00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "ProxyPushSupplier.h"
00025 #include "Orb.h"
00026 #include "omniEventsLog.h"
00027 #include "PersistNode.h"
00028 #include <assert.h>
00029
00030 namespace OmniEvents {
00031
00035 class omni_mutex_kcol {
00036 omni_mutex& mutex;
00037 public:
00038 omni_mutex_kcol(omni_mutex& m) : mutex(m) { mutex.unlock(); }
00039 ~omni_mutex_kcol(void) { mutex.lock(); }
00040 private:
00041
00042 omni_mutex_kcol(const omni_mutex_kcol&);
00043 omni_mutex_kcol& operator=(const omni_mutex_kcol&);
00044 };
00045
00046
00047
00048
00049
00050
00051 PortableServer::Servant
00052 ProxyPushSupplierManager::incarnate(
00053 const PortableServer::ObjectId& oid,
00054 PortableServer::POA_ptr poa
00055 )
00056 {
00057 ProxyPushSupplier_i* result =new ProxyPushSupplier_i(_managedPoa,_queue);
00058 PauseThenWake p(this);
00059 _servants.insert(result);
00060 return result;
00061 }
00062
00063 void
00064 ProxyPushSupplierManager::etherealize(
00065 const PortableServer::ObjectId& oid,
00066 PortableServer::POA_ptr adapter,
00067 PortableServer::Servant serv,
00068 CORBA::Boolean cleanup_in_progress,
00069 CORBA::Boolean remaining_activations
00070 )
00071 {
00072
00073
00074
00075
00076 omni_mutex_lock pause(_lock);
00077 ProxyPushSupplier_i* narrowed =dynamic_cast<ProxyPushSupplier_i*>(serv);
00078 assert(narrowed!=NULL);
00079 set<Proxy*>::iterator pos =_servants.find(narrowed);
00080 if(pos!=_servants.end())
00081 {
00082 _servants.erase(pos);
00083 narrowed->_remove_ref();
00084 }
00085 else
00086 {
00087 DB(1,"\t\teh? - POA attempted to etherealize unknown servant.");
00088 }
00089 }
00090
00091 ProxyPushSupplierManager::ProxyPushSupplierManager(
00092 PortableServer::POA_ptr parentPoa,
00093 EventQueue& q
00094 )
00095 : ProxyManager(parentPoa),
00096 omni_thread(NULL,PRIORITY_HIGH),
00097 _queue(q),
00098 _lock(),_condition(&_lock),
00099 _refCount(1)
00100 {
00101 ProxyManager::activate("ProxyPushSupplier");
00102 start_undetached();
00103 }
00104
00105 ProxyPushSupplierManager::~ProxyPushSupplierManager()
00106 {
00107 DB(20,"~ProxyPushSupplierManager()")
00108 }
00109
00110 CosEventChannelAdmin::ProxyPushSupplier_ptr
00111 ProxyPushSupplierManager::createObject()
00112 {
00113 return createNarrowedReference<CosEventChannelAdmin::ProxyPushSupplier>(
00114 _managedPoa.in(),
00115 CosEventChannelAdmin::_tc_ProxyPushSupplier->id()
00116 );
00117 }
00118
00119 void ProxyPushSupplierManager::disconnect()
00120 {
00121 for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i)
00122 {
00123 Proxy* p =*i;
00124 ProxyPushSupplier_i* pps =static_cast<ProxyPushSupplier_i*>(p);
00125
00126
00127 CosEventChannelAdmin::ProxyPushSupplier_var ppsv =pps->_this();
00128 ppsv->disconnect_push_supplier();
00129 }
00130 }
00131
00132 void*
00133 ProxyPushSupplierManager::run_undetached(void*)
00134 {
00135
00136
00137
00138
00139
00140
00141
00142
00143
00144
00145
00146
00147
00148
00149
00150 const unsigned long sleepTimeNanosec0 =0x8000;
00151 const unsigned long maxSleepNanosec =0x800000;
00152 unsigned long sleepTimeNanosec =sleepTimeNanosec0;
00153
00154 omni_mutex_lock conditionLock(_lock);
00155 while(true)
00156 {
00157 try {
00158 if(_refCount<1)
00159 break;
00160
00161 bool busy=false;
00162 bool waiting=false;
00163
00164
00165 for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i)
00166 {
00167 Proxy* p =*i;
00168 ProxyPushSupplier_i* pps =static_cast<ProxyPushSupplier_i*>(p);
00169 pps->trigger(busy,waiting);
00170 }
00171
00172 if(busy)
00173 {
00174
00175
00176
00177 omni_mutex_kcol l(_lock);
00178
00179 sleepTimeNanosec=sleepTimeNanosec0;
00180 }
00181 else if(waiting)
00182 {
00183
00184
00185 if(sleepTimeNanosec<maxSleepNanosec)
00186 sleepTimeNanosec<<=1;
00187 unsigned long sec,nsec;
00188 omni_thread::get_time(&sec,&nsec,0,sleepTimeNanosec);
00189 _condition.timedwait(sec,nsec);
00190 }
00191 else
00192 {
00193
00194 _condition.wait();
00195 }
00196
00197 }
00198 catch (CORBA::SystemException& ex) {
00199 DB(2,"ProxyPushSupplierManager ignoring CORBA system exception"
00200 IF_OMNIORB4(": "<<ex._name()<<" ("<<NP_MINORSTRING(ex)<<")") ".")
00201 }
00202 catch (CORBA::Exception& ex) {
00203 DB(2,"ProxyPushSupplierManager ignoring CORBA exception"
00204 IF_OMNIORB4(": "<<ex._name()<<) ".")
00205 }
00206 catch(...) {
00207 DB(2,"ProxyPushSupplierManager thread killed by unknown exception.")
00208 break;
00209 }
00210 }
00211 return NULL;
00212 }
00213
00214 void ProxyPushSupplierManager::_add_ref()
00215 {
00216 #if OMNIEVENTS__DEBUG_REF_COUNTS
00217 DB(20,"ProxyPushSupplierManager::_add_ref()")
00218 #endif
00219 omni_mutex_lock pause(_lock);
00220 ++_refCount;
00221 }
00222
00223 void ProxyPushSupplierManager::_remove_ref()
00224 {
00225 #if OMNIEVENTS__DEBUG_REF_COUNTS
00226 DB(20,"ProxyPushSupplierManager::_remove_ref()")
00227 #endif
00228 int myref;
00229 {
00230 PauseThenWake p(this);
00231 myref = --_refCount;
00232 }
00233 if(myref<0)
00234 {
00235 DB(2,"ProxyPushSupplierManager has negative ref count! "<<myref)
00236 }
00237 else if(myref==0)
00238 {
00239 DB(15,"ProxyPushSupplierManager has zero ref count -- shutdown.")
00240 join(NULL);
00241 }
00242 }
00243
00244
00245
00246
00247
00248
00249 void ProxyPushSupplier_i::connect_push_consumer(
00250 CosEventComm::PushConsumer_ptr pushConsumer)
00251 {
00252 if(CORBA::is_nil(pushConsumer))
00253 throw CORBA::BAD_PARAM();
00254 if(!CORBA::is_nil(_target) || !CORBA::is_nil(_req))
00255 throw CosEventChannelAdmin::AlreadyConnected();
00256 _target=CosEventComm::PushConsumer::_duplicate(pushConsumer);
00257
00258
00259
00260 CORBA::Request_var req =_target->_request("_is_a");
00261 req->add_in_arg() <<= CosEventChannelAdmin::_tc_ProxyPushConsumer->id();
00262 req->set_return_type(CORBA::_tc_boolean);
00263 req->send_deferred();
00264 Orb::inst().deferredRequest(req._retn(),this);
00265
00266 if(omniEventsLog::exists())
00267 {
00268 WriteLock log;
00269 output(log.os);
00270 }
00271 }
00272
00273
00274 void ProxyPushSupplier_i::disconnect_push_supplier()
00275 {
00276 DB(5,"ProxyPushSupplier_i::disconnect_push_supplier()");
00277 eraseKey("ConsumerAdmin/ProxyPushSupplier");
00278 deactivateObject();
00279 if(CORBA::is_nil(_target))
00280 {
00281 throw CORBA::OBJECT_NOT_EXIST(
00282 IFELSE_OMNIORB4(omni::OBJECT_NOT_EXIST_NoMatch,0),
00283 CORBA::COMPLETED_NO
00284 );
00285 }
00286 else
00287 {
00288 CORBA::Request_var req=_target->_request("disconnect_push_consumer");
00289 _target=CosEventComm::PushConsumer::_nil();
00290 req->send_deferred();
00291 Orb::inst().deferredRequest(req._retn());
00292 }
00293 }
00294
00295
00296 ProxyPushSupplier_i::ProxyPushSupplier_i(
00297 PortableServer::POA_ptr poa,
00298 EventQueue& q
00299 )
00300 : Proxy(poa),
00301 EventQueue::Reader(q),
00302 _target(CosEventComm::PushConsumer::_nil()),
00303 _targetIsProxy(false)
00304 {
00305
00306 }
00307
00308 ProxyPushSupplier_i::~ProxyPushSupplier_i()
00309 {
00310 DB(20,"~ProxyPushSupplier_i()")
00311 }
00312
00313 OMNIEVENTS__DEBUG_REF_COUNTS__DEFN(ProxyPushSupplier_i)
00314
00315 inline void ProxyPushSupplier_i::trigger(bool& busy, bool& waiting)
00316 {
00317 if(!CORBA::is_nil(_req) && _req->poll_response())
00318 {
00319 CORBA::Environment_ptr env=_req->env();
00320 if(!CORBA::is_nil(env) && env->exception())
00321 {
00322
00323 CORBA::Exception* ex =env->exception();
00324 DB(10,"ProxyPushSupplier got exception" IF_OMNIORB4(": "<<ex->_name()) );
00325 Orb::inst().reportObjectFailure(HERE,_target.in(),ex);
00326 _req=CORBA::Request::_nil();
00327
00328
00329 CORBA::Request_var req=_target->_request("disconnect_push_consumer");
00330 req->send_deferred();
00331 Orb::inst().deferredRequest(req._retn());
00332
00333 _target=CosEventComm::PushConsumer::_nil();
00334 eraseKey("ConsumerAdmin/ProxyPushSupplier");
00335 deactivateObject();
00336 return;
00337 }
00338 _req=CORBA::Request::_nil();
00339 busy=true;
00340 }
00341 if(CORBA::is_nil(_req) && !CORBA::is_nil(_target) && moreEvents())
00342 {
00343 _req=_target->_request("push");
00344 _req->add_in_arg() <<= *(nextEvent());
00345 _req->send_deferred();
00346 busy=true;
00347 }
00348 if(!CORBA::is_nil(_req))
00349 waiting=true;
00350 }
00351
00352
00353 void ProxyPushSupplier_i::callback(CORBA::Request_ptr req)
00354 {
00355 if(_targetIsProxy)
00356 {
00357
00358
00359 DB(2,"WARNING: Multiple connections to ProxyPushSupplier.");
00360 }
00361 else if(req->return_value()>>=CORBA::Any::to_boolean(_targetIsProxy))
00362 {
00363 if(_targetIsProxy && omniEventsLog::exists())
00364 {
00365 WriteLock log;
00366 output(log.os);
00367 DB(15,"ProxyPushSupplier is federated.");
00368 }
00369 }
00370 else
00371 {
00372 DB(2,"ProxyPushSupplier got unexpected callback.");
00373 _targetIsProxy=false;
00374 }
00375 }
00376
00377
00378 void ProxyPushSupplier_i::reincarnate(
00379 const string& oid,
00380 const PersistNode& node
00381 )
00382 {
00383 try
00384 {
00385 using namespace CosEventChannelAdmin;
00386
00387 string ior( node.attrString("IOR").c_str() );
00388 CosEventComm::PushConsumer_var pushConsumer =
00389 string_to_<CosEventComm::PushConsumer>(ior.c_str());
00390
00391 activateObjectWithId(oid.c_str());
00392 _remove_ref();
00393 _target=pushConsumer._retn();
00394 _targetIsProxy=bool(node.attrLong("proxy"));
00395
00396
00397 if(_targetIsProxy)
00398 {
00399 DB(15,"Attempting to reconnect ProxyPushSupplier: "<<oid.c_str())
00400
00401
00402 ProxyPushConsumer_var proxyCons =
00403 string_to_<ProxyPushConsumer>(ior.c_str());
00404 CosEventComm::PushSupplier_var thisSupp =_this();
00405 proxyCons->connect_push_supplier(thisSupp);
00406 DB(7,"Reconnected ProxyPushSupplier: "<<oid.c_str())
00407 }
00408 }
00409 catch(CosEventChannelAdmin::AlreadyConnected&){
00410
00411 DB(7,"Remote ProxyPushConsumer already connected: "<<oid.c_str())
00412 }
00413 catch(CosEventChannelAdmin::TypeError&){
00414
00415 DB(2,"Remote ProxyPushConsumer threw TypeError: "<<oid.c_str())
00416 }
00417 catch(CORBA::OBJECT_NOT_EXIST&) {}
00418 catch(CORBA::TRANSIENT& ) {}
00419 catch(CORBA::COMM_FAILURE& ) {}
00420 }
00421
00422
00423 void ProxyPushSupplier_i::output(ostream &os)
00424 {
00425 basicOutput(
00426 os,"ConsumerAdmin/ProxyPushSupplier",
00427 _target.in(),
00428 _targetIsProxy? " proxy=1": NULL
00429 );
00430 }
00431
00432
00433 };