00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "ProxyPushConsumer.h"
00025 #include "ConsumerAdmin.h"
00026 #include "Orb.h"
00027 #include "omniEventsLog.h"
00028 #include "PersistNode.h"
00029
00030 #include <assert.h>
00031
00032 namespace OmniEvents {
00033
00034 void ProxyPushConsumer_i::connect_push_supplier(
00035 CosEventComm::PushSupplier_ptr pushSupplier)
00036 {
00037
00038 if(CORBA::is_nil(pushSupplier))
00039 return;
00040
00041 string oidstr =currentObjectId();
00042 Connections_t::iterator pos =_connections.find(oidstr);
00043
00044 if(pos!=_connections.end())
00045 throw CosEventChannelAdmin::AlreadyConnected();
00046
00047 Connection* newConnection =
00048 new Connection(
00049 _channelName.in(),
00050 oidstr,
00051 CosEventComm::PushSupplier::_duplicate(pushSupplier)
00052 );
00053 _connections.insert( Connections_t::value_type(oidstr,newConnection) );
00054
00055
00056
00057 CORBA::Request_var req =pushSupplier->_request("_is_a");
00058 req->add_in_arg() <<= CosEventChannelAdmin::_tc_ProxyPushSupplier->id();
00059 req->set_return_type(CORBA::_tc_boolean);
00060 req->send_deferred();
00061 Orb::inst().deferredRequest(req._retn(),newConnection);
00062
00063 if(omniEventsLog::exists())
00064 {
00065 WriteLock log;
00066 newConnection->output(log.os);
00067 }
00068 }
00069
00070
00071 void ProxyPushConsumer_i::disconnect_push_consumer()
00072 {
00073 #ifdef HAVE_OMNIORB4
00074 DB(5,"ProxyPushConsumer_i::disconnect_push_consumer()")
00075 string oidstr =currentObjectId();
00076 Connections_t::iterator pos =_connections.find(oidstr);
00077
00078 if(pos!=_connections.end())
00079 {
00080 CORBA::Request_var req =
00081 pos->second->_target->_request("disconnect_push_supplier");
00082 pos->second->_remove_ref();
00083 _connections.erase(pos);
00084
00085
00086 req->send_deferred();
00087 Orb::inst().deferredRequest(req._retn());
00088 if(omniEventsLog::exists())
00089 {
00090
00091 WriteLock log;
00092 log.os<<"-ecf/"<<_channelName.in();
00093 log.os<<"/SupplierAdmin/ProxyPushConsumer/"<<oidstr<<'\n';
00094 }
00095 }
00096 #else
00097 DB(5,"Ignoring disconnect_push_consumer(). Upgrade to omniORB4!")
00098 #endif
00099 }
00100
00101
00102 void ProxyPushConsumer_i::push(const CORBA::Any& event)
00103 {
00104 #ifdef OMNIEVENTS_REAL_TIME_PUSH
00105 if(!_useLocalQueue)
00106 {
00107 _consumerAdmin.send(new CORBA::Any(event));
00108 _useLocalQueue=true;
00109 }
00110 else
00111 #endif
00112 _queue.push_back(new CORBA::Any(event));
00113 }
00114
00115
00116 ProxyPushConsumer_i::ProxyPushConsumer_i(
00117 PortableServer::POA_ptr p,
00118 list<CORBA::Any*>& q,
00119 ConsumerAdmin_i& consumerAdmin
00120 )
00121 : Servant(PortableServer::POA::_nil()),
00122 _connections(),
00123 _channelName(p->the_name()),
00124 _consumerAdmin(consumerAdmin),
00125 _queue(q),
00126 _useLocalQueue(false)
00127 {
00128 _consumerAdmin._add_ref();
00129
00130 using namespace PortableServer;
00131
00132
00133
00134
00135
00136
00137
00138
00139
00140
00141 CORBA::PolicyList policies;
00142 policies.length(7);
00143 policies[0]=p->create_lifespan_policy(PERSISTENT);
00144 policies[1]=p->create_id_assignment_policy(USER_ID);
00145 policies[2]=p->create_id_uniqueness_policy(MULTIPLE_ID);
00146 policies[3]=p->create_implicit_activation_policy(NO_IMPLICIT_ACTIVATION);
00147 policies[4]=p->create_request_processing_policy(USE_DEFAULT_SERVANT);
00148 policies[5]=p->create_servant_retention_policy(NON_RETAIN);
00149 policies[6]=p->create_thread_policy(SINGLE_THREAD_MODEL);
00150
00151 try
00152 {
00153
00154 string poaName =string(_channelName.in())+".ProxyPushConsumer";
00155 POAManager_var parentManager =p->the_POAManager();
00156 _poa=p->create_POA(poaName.c_str(),parentManager.in(),policies);
00157 }
00158 catch(POA::AdapterAlreadyExists&)
00159 {
00160 DB(0,"ProxyPushConsumer_i::ProxyPushConsumer_i() - "
00161 "POA::AdapterAlreadyExists")
00162 }
00163 catch(POA::InvalidPolicy& ex)
00164 {
00165 DB(0,"ProxyPushConsumer_i::ProxyPushConsumer_i() - "
00166 "POA::InvalidPolicy: "<<ex.index)
00167 }
00168
00169
00170 for(CORBA::ULong i=0; i<policies.length(); ++i)
00171 policies[i]->destroy();
00172
00173
00174 _poa->set_servant(this);
00175 }
00176
00177
00178 ProxyPushConsumer_i::~ProxyPushConsumer_i()
00179 {
00180 DB(20,"~ProxyPushConsumer_i()")
00181 for(Connections_t::iterator i =_connections.begin();
00182 i!=_connections.end();
00183 ++i)
00184 {
00185 i->second->_remove_ref();
00186 }
00187 _connections.clear();
00188
00189 _consumerAdmin._remove_ref();
00190 }
00191
00192
00193 CosEventChannelAdmin::ProxyPushConsumer_ptr
00194 ProxyPushConsumer_i::createObject()
00195 {
00196 return createNarrowedReference<CosEventChannelAdmin::ProxyPushConsumer>(
00197 _poa.in(),
00198 CosEventChannelAdmin::_tc_ProxyPushConsumer->id()
00199 );
00200 }
00201
00202
00203 void ProxyPushConsumer_i::disconnect()
00204 {
00205
00206 Connections_t::iterator curr,next=_connections.begin();
00207 while(next!=_connections.end())
00208 {
00209 curr=next++;
00210 CORBA::Request_var req =
00211 curr->second->_target->_request("disconnect_push_supplier");
00212 curr->second->_remove_ref();
00213 _connections.erase(curr);
00214
00215
00216 req->send_deferred();
00217 Orb::inst().deferredRequest(req._retn());
00218 }
00219 }
00220
00221
00222 void ProxyPushConsumer_i::reincarnate(const PersistNode& node)
00223 {
00224
00225 for(map<string,PersistNode*>::const_iterator i=node._child.begin();
00226 i!=node._child.end();
00227 ++i)
00228 {
00229 const char* oidstr =i->first.c_str();
00230 string ior( i->second->attrString("IOR") );
00231 bool isProxy( i->second->attrLong("proxy") );
00232 assert(_connections.find(oidstr)==_connections.end());
00233 try
00234 {
00235 using namespace CosEventComm;
00236 using namespace CosEventChannelAdmin;
00237
00238 PushSupplier_var supp =string_to_<PushSupplier>(ior.c_str());
00239 _connections.insert(Connections_t::value_type(
00240 oidstr,
00241 new Connection(_channelName.in(),oidstr,supp._retn(),isProxy)
00242 ));
00243 DB(5,"Reincarnated ProxyPushConsumer: "<<oidstr)
00244
00245
00246 if(isProxy)
00247 {
00248 DB(15,"Attempting to reconnect ProxyPushConsumer: "<<oidstr)
00249
00250
00251 ProxyPushSupplier_var proxySupp =
00252 string_to_<ProxyPushSupplier>(ior.c_str());
00253 PortableServer::ObjectId_var objectId =
00254 PortableServer::string_to_ObjectId(oidstr);
00255 CORBA::Object_var obj =
00256 _poa->create_reference_with_id(
00257 objectId.in(),
00258 CosEventChannelAdmin::_tc_ProxyPushConsumer->id()
00259 );
00260 PushConsumer_var thisCons =CosEventComm::PushConsumer::_narrow(obj);
00261 proxySupp->connect_push_consumer(thisCons.in());
00262 DB(7,"Reconnected ProxyPushConsumer: "<<oidstr)
00263 }
00264 }
00265 catch(CORBA::BAD_PARAM&) {
00266
00267 DB(5,"Failed to reincarnate ProxyPushConsumer: "<<oidstr)
00268 }
00269 catch(CosEventChannelAdmin::AlreadyConnected&){
00270
00271 DB(7,"Remote ProxyPushSupplier already connected: "<<oidstr)
00272 }
00273 catch(CosEventChannelAdmin::TypeError&){
00274
00275 DB(2,"Remote ProxyPushSupplier threw TypeError: "<<oidstr)
00276 }
00277 catch(CORBA::OBJECT_NOT_EXIST&) {}
00278 catch(CORBA::TRANSIENT& ) {}
00279 catch(CORBA::COMM_FAILURE& ) {}
00280 }
00281 }
00282
00283
00284 void ProxyPushConsumer_i::output(ostream& os) const
00285 {
00286 for(Connections_t::const_iterator i=_connections.begin();
00287 i!=_connections.end();
00288 ++i)
00289 {
00290 i->second->output(os);
00291 }
00292 }
00293
00294
00295 string ProxyPushConsumer_i::currentObjectId() const
00296 {
00297 #ifdef HAVE_OMNIORB4
00298 try
00299 {
00300 using namespace PortableServer;
00301 ObjectId_var oid =Orb::inst()._POACurrent->get_object_id();
00302 CORBA::String_var oidStr =ObjectId_to_string(oid.in());
00303 return string(oidStr.in());
00304 }
00305 catch(PortableServer::Current::NoContext&)
00306 {
00307 DB(0,"No context!!")
00308 }
00309 catch(CORBA::BAD_PARAM&)
00310 {
00311
00312 assert(0);
00313 }
00314 return "ERROR";
00315 #else
00316 throw CORBA::NO_IMPLEMENT();
00317 #endif
00318 }
00319
00320
00321
00322
00323
00324
00325 #if OMNIEVENTS__DEBUG_SERVANT
00326 int ProxyPushConsumer_i::Connection::_objectCount =0;
00327 #endif
00328
00329 ProxyPushConsumer_i::Connection::Connection(
00330 const char* channelName,
00331 const string& oidstr,
00332 CosEventComm::PushSupplier_ptr pushSupplier,
00333 bool isProxy
00334 ):Callback(),
00335 _channelName(channelName),
00336 _oidstr(oidstr),
00337 _target(pushSupplier),
00338 _targetIsProxy(isProxy)
00339 {
00340 #if OMNIEVENTS__DEBUG_SERVANT
00341 ++_objectCount;
00342 DB(21,"ProxyPushConsumer_i::Connection::Connection() count="<<_objectCount)
00343 #endif
00344 }
00345
00346 ProxyPushConsumer_i::Connection::~Connection()
00347 {
00348 #if OMNIEVENTS__DEBUG_SERVANT
00349 --_objectCount;
00350 DB(20,"ProxyPushConsumer_i::Connection::~Connection() count="<<_objectCount)
00351 #else
00352 DB(20,"ProxyPushConsumer_i::Connection::~Connection()")
00353 #endif
00354 }
00355
00356 OMNIEVENTS__DEBUG_REF_COUNTS__DEFN(ProxyPushConsumer_i::Connection)
00357
00358 void ProxyPushConsumer_i::Connection::callback(CORBA::Request_ptr req)
00359 {
00360 bool save =_targetIsProxy;
00361 if(req->return_value()>>=CORBA::Any::to_boolean(_targetIsProxy))
00362 {
00363 if(_targetIsProxy && omniEventsLog::exists())
00364 {
00365 WriteLock log;
00366 output(log.os);
00367 DB(15,"ProxyPushConsumer is federated.");
00368 }
00369 }
00370 else
00371 {
00372 DB(2,"ProxyPushConsumer got unexpected callback.");
00373 _targetIsProxy=save;
00374 }
00375 }
00376
00377 void ProxyPushConsumer_i::Connection::output(ostream& os) const
00378 {
00379 os<<"ecf/"<<_channelName;
00380 os<<"/SupplierAdmin/ProxyPushConsumer/"<<_oidstr;
00381
00382 if(!CORBA::is_nil(_target.in()))
00383 {
00384 CORBA::String_var iorstr;
00385 iorstr = Orb::inst()._orb->object_to_string(_target.in());
00386 os<<" IOR="<<iorstr.in();
00387 if(_targetIsProxy)
00388 os<<" proxy=1";
00389 }
00390 os<<" ;;\n";
00391 }
00392
00393
00394 };