Main Page | Namespace List | Class Hierarchy | Class List | Directories | File List | Namespace Members | Class Members | File Members

ProxyPushConsumer.cc

Go to the documentation of this file.
00001 //                            Package   : omniEvents
00002 // ProxyPushConsumer.cc       Created   : 2003/12/04
00003 //                            Author    : Alex Tingle
00004 //
00005 //    Copyright (C) 2003,2005 Alex Tingle.
00006 //
00007 //    This file is part of the omniEvents application.
00008 //
00009 //    omniEvents is free software; you can redistribute it and/or
00010 //    modify it under the terms of the GNU Lesser General Public
00011 //    License as published by the Free Software Foundation; either
00012 //    version 2.1 of the License, or (at your option) any later version.
00013 //
00014 //    omniEvents is distributed in the hope that it will be useful,
00015 //    but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 //    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00017 //    Lesser General Public License for more details.
00018 //
00019 //    You should have received a copy of the GNU Lesser General Public
00020 //    License along with this library; if not, write to the Free Software
00021 //    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 //
00023 
00024 #include "ProxyPushConsumer.h"
00025 #include "ConsumerAdmin.h"
00026 #include "Orb.h"
00027 #include "omniEventsLog.h"
00028 #include "PersistNode.h"
00029 
00030 #include <assert.h>
00031 
00032 namespace OmniEvents {
00033 
00034 void ProxyPushConsumer_i::connect_push_supplier(
00035   CosEventComm::PushSupplier_ptr pushSupplier)
00036 {
00037   // pushSupplier is permitted to be nil.
00038   if(CORBA::is_nil(pushSupplier))
00039       return;
00040 
00041   string oidstr =currentObjectId();
00042   Connections_t::iterator pos =_connections.find(oidstr);
00043 
00044   if(pos!=_connections.end())
00045       throw CosEventChannelAdmin::AlreadyConnected();
00046 
00047   Connection* newConnection =
00048     new Connection(
00049           _channelName.in(),
00050           oidstr,
00051           CosEventComm::PushSupplier::_duplicate(pushSupplier)
00052         );
00053   _connections.insert( Connections_t::value_type(oidstr,newConnection) );
00054 
00055   // Test to see whether pushSupplier is a ProxyPushSupplier.
00056   // If so, then we will aggressively try to reconnect, when we are reincarnated
00057   CORBA::Request_var req =pushSupplier->_request("_is_a");
00058   req->add_in_arg() <<= CosEventChannelAdmin::_tc_ProxyPushSupplier->id();
00059   req->set_return_type(CORBA::_tc_boolean);
00060   req->send_deferred();
00061   Orb::inst().deferredRequest(req._retn(),newConnection); // Register callback
00062 
00063   if(omniEventsLog::exists())
00064   {
00065     WriteLock log;
00066     newConnection->output(log.os);
00067   }
00068 }
00069 
00070 
00071 void ProxyPushConsumer_i::disconnect_push_consumer()
00072 {
00073 #ifdef HAVE_OMNIORB4
00074   DB(5,"ProxyPushConsumer_i::disconnect_push_consumer()")
00075   string oidstr =currentObjectId();
00076   Connections_t::iterator pos =_connections.find(oidstr);
00077 
00078   if(pos!=_connections.end())
00079   {
00080     CORBA::Request_var req =
00081       pos->second->_target->_request("disconnect_push_supplier");
00082     pos->second->_remove_ref();
00083     _connections.erase(pos);
00084     // The following line could result in a reentrant callback, if this call was
00085     // not made through the POA => must erase the connection BEFORE this point.
00086     req->send_deferred();
00087     Orb::inst().deferredRequest(req._retn());
00088     if(omniEventsLog::exists())
00089     {
00090       // Erase this connection from the log file.
00091       WriteLock log;
00092       log.os<<"-ecf/"<<_channelName.in();
00093       log.os<<"/SupplierAdmin/ProxyPushConsumer/"<<oidstr<<'\n';
00094     }
00095   }
00096 #else /* Silently ignore disconnects with omniORB3 */
00097   DB(5,"Ignoring disconnect_push_consumer(). Upgrade to omniORB4!")
00098 #endif
00099 }
00100 
00101 
00102 void ProxyPushConsumer_i::push(const CORBA::Any& event)
00103 {
00104 #ifdef OMNIEVENTS_REAL_TIME_PUSH
00105   if(!_useLocalQueue)
00106   {
00107     _consumerAdmin.send(new CORBA::Any(event));
00108     _useLocalQueue=true;
00109   }
00110   else
00111 #endif
00112     _queue.push_back(new CORBA::Any(event));
00113 }
00114 
00115 
00116 ProxyPushConsumer_i::ProxyPushConsumer_i(
00117   PortableServer::POA_ptr p,
00118   list<CORBA::Any*>&      q,
00119   ConsumerAdmin_i&        consumerAdmin
00120 )
00121 : Servant(PortableServer::POA::_nil()),
00122   _connections(),
00123   _channelName(p->the_name()),
00124   _consumerAdmin(consumerAdmin),
00125   _queue(q),
00126   _useLocalQueue(false)
00127 {
00128   _consumerAdmin._add_ref();
00129 
00130   using namespace PortableServer;
00131 
00132   // POLICIES:
00133   //  Lifespan          =PERSISTENT             // we can persist
00134   //  Assignment        =USER_ID                // write our own oid
00135   //  Uniqueness        =MULTIPLE_ID            // only one servant
00136   //  ImplicitActivation=NO_IMPLICIT_ACTIVATION // disable auto activation
00137   //  RequestProcessing =USE_DEFAULT_SERVANT    // only one servant
00138   //  ServantRetention  =NON_RETAIN             // stateless POA
00139   //  Thread            =SINGLE_THREAD_MODEL    // keep it simple
00140 
00141   CORBA::PolicyList policies;
00142   policies.length(7);
00143   policies[0]=p->create_lifespan_policy(PERSISTENT);
00144   policies[1]=p->create_id_assignment_policy(USER_ID);
00145   policies[2]=p->create_id_uniqueness_policy(MULTIPLE_ID);
00146   policies[3]=p->create_implicit_activation_policy(NO_IMPLICIT_ACTIVATION);
00147   policies[4]=p->create_request_processing_policy(USE_DEFAULT_SERVANT);
00148   policies[5]=p->create_servant_retention_policy(NON_RETAIN);
00149   policies[6]=p->create_thread_policy(SINGLE_THREAD_MODEL);
00150 
00151   try
00152   {  
00153     // Create a POA for this proxy type in this channel.
00154     string          poaName =string(_channelName.in())+".ProxyPushConsumer";
00155     POAManager_var  parentManager =p->the_POAManager();
00156     _poa=p->create_POA(poaName.c_str(),parentManager.in(),policies);
00157   }
00158   catch(POA::AdapterAlreadyExists&) // create_POA
00159   {
00160     DB(0,"ProxyPushConsumer_i::ProxyPushConsumer_i() - "
00161           "POA::AdapterAlreadyExists")
00162   }
00163   catch(POA::InvalidPolicy& ex) // create_POA
00164   {
00165     DB(0,"ProxyPushConsumer_i::ProxyPushConsumer_i() - "
00166           "POA::InvalidPolicy: "<<ex.index)
00167   }
00168 
00169   // Destroy the policy objects (Not strictly necessary in omniORB)
00170   for(CORBA::ULong i=0; i<policies.length(); ++i)
00171       policies[i]->destroy();
00172 
00173   // This object is the POA's default servant.
00174   _poa->set_servant(this);
00175 }
00176 
00177 
00178 ProxyPushConsumer_i::~ProxyPushConsumer_i()
00179 {
00180   DB(20,"~ProxyPushConsumer_i()")
00181   for(Connections_t::iterator i =_connections.begin();
00182                               i!=_connections.end();
00183                             ++i)
00184   {
00185     i->second->_remove_ref();
00186   }
00187   _connections.clear();
00188 
00189   _consumerAdmin._remove_ref();
00190 }
00191 
00192 
00193 CosEventChannelAdmin::ProxyPushConsumer_ptr
00194 ProxyPushConsumer_i::createObject()
00195 {
00196   return createNarrowedReference<CosEventChannelAdmin::ProxyPushConsumer>(
00197            _poa.in(),
00198            CosEventChannelAdmin::_tc_ProxyPushConsumer->id()
00199          );
00200 }
00201 
00202 
00203 void ProxyPushConsumer_i::disconnect()
00204 {
00205   // Note. We are (probably) in the EventChannel's thread.
00206   Connections_t::iterator curr,next=_connections.begin();
00207   while(next!=_connections.end())
00208   {
00209     curr=next++;
00210     CORBA::Request_var req =
00211       curr->second->_target->_request("disconnect_push_supplier");
00212     curr->second->_remove_ref();
00213     _connections.erase(curr);
00214     // The following line could result in a reentrant callback
00215     // => must erase the connection BEFORE this point.
00216     req->send_deferred();
00217     Orb::inst().deferredRequest(req._retn());
00218   }
00219 }
00220 
00221 
00222 void ProxyPushConsumer_i::reincarnate(const PersistNode& node)
00223 {
00224   // Reincarnate all connections from node's children.
00225   for(map<string,PersistNode*>::const_iterator i=node._child.begin();
00226       i!=node._child.end();
00227       ++i)
00228   {
00229     const char* oidstr =i->first.c_str();
00230     string      ior( i->second->attrString("IOR") );
00231     bool        isProxy( i->second->attrLong("proxy") );
00232     assert(_connections.find(oidstr)==_connections.end());
00233     try
00234     {
00235       using namespace CosEventComm;
00236       using namespace CosEventChannelAdmin;
00237 
00238       PushSupplier_var supp =string_to_<PushSupplier>(ior.c_str());
00239       _connections.insert(Connections_t::value_type(
00240         oidstr,
00241         new Connection(_channelName.in(),oidstr,supp._retn(),isProxy)
00242       ));
00243       DB(5,"Reincarnated ProxyPushConsumer: "<<oidstr)
00244 
00245       // If supp is a ProxyPushSupplier, then try to reconnect.
00246       if(isProxy)
00247       {
00248         DB(15,"Attempting to reconnect ProxyPushConsumer: "<<oidstr)
00249         // This will only work if the proxy is implemented in the same way as
00250         // omniEvents, so connect_() automatically creates a proxy.
00251         ProxyPushSupplier_var proxySupp =
00252           string_to_<ProxyPushSupplier>(ior.c_str());
00253         PortableServer::ObjectId_var objectId =
00254           PortableServer::string_to_ObjectId(oidstr);
00255         CORBA::Object_var obj =
00256           _poa->create_reference_with_id(
00257             objectId.in(),
00258             CosEventChannelAdmin::_tc_ProxyPushConsumer->id()
00259           );
00260         PushConsumer_var thisCons =CosEventComm::PushConsumer::_narrow(obj);
00261         proxySupp->connect_push_consumer(thisCons.in());
00262         DB(7,"Reconnected ProxyPushConsumer: "<<oidstr)
00263       }
00264     }
00265     catch(CORBA::BAD_PARAM&) {
00266       // This will happen when IOR fails to narrow.
00267       DB(5,"Failed to reincarnate ProxyPushConsumer: "<<oidstr)
00268     }
00269     catch(CosEventChannelAdmin::AlreadyConnected&){ //connect_push_consumer()
00270       // The supplier doesn't need to be reconnected.
00271       DB(7,"Remote ProxyPushSupplier already connected: "<<oidstr)
00272     }
00273     catch(CosEventChannelAdmin::TypeError&){ // connect_push_consumer()
00274       // Don't know what to make of this...
00275       DB(2,"Remote ProxyPushSupplier threw TypeError: "<<oidstr)
00276     }
00277     catch(CORBA::OBJECT_NOT_EXIST&) {} // object 'supp' not responding.
00278     catch(CORBA::TRANSIENT&       ) {} // object 'supp' not responding.
00279     catch(CORBA::COMM_FAILURE&    ) {} // object 'supp' not responding.
00280   } // end loop for(i)
00281 }
00282 
00283 
00284 void ProxyPushConsumer_i::output(ostream& os) const
00285 {
00286   for(Connections_t::const_iterator i=_connections.begin();
00287       i!=_connections.end();
00288       ++i)
00289   {
00290     i->second->output(os);
00291   }
00292 }
00293 
00294 
00295 string ProxyPushConsumer_i::currentObjectId() const
00296 {
00297 #ifdef HAVE_OMNIORB4
00298   try
00299   {
00300     using namespace PortableServer;
00301     ObjectId_var oid =Orb::inst()._POACurrent->get_object_id();
00302     CORBA::String_var oidStr =ObjectId_to_string(oid.in());
00303     return string(oidStr.in());
00304   }
00305   catch(PortableServer::Current::NoContext&) // get_object_id()
00306   {
00307     DB(0,"No context!!")
00308   }
00309   catch(CORBA::BAD_PARAM&) // ObjectId_to_string()
00310   {
00311     // Should never get here in omniORB, because ObjectID is a char*.
00312     assert(0);
00313   }
00314   return "ERROR";
00315 #else
00316   throw CORBA::NO_IMPLEMENT();
00317 #endif
00318 }
00319 
00320 
00321 //
00322 //  ProxyPushConsumer_i::Connection
00323 //
00324 
00325 #if OMNIEVENTS__DEBUG_SERVANT
00326 int ProxyPushConsumer_i::Connection::_objectCount =0;
00327 #endif
00328 
00329 ProxyPushConsumer_i::Connection::Connection(
00330   const char*                    channelName,
00331   const string&                  oidstr,
00332   CosEventComm::PushSupplier_ptr pushSupplier,
00333   bool                           isProxy
00334 ):Callback(),
00335   _channelName(channelName),
00336   _oidstr(oidstr),
00337   _target(pushSupplier),
00338   _targetIsProxy(isProxy)
00339 {
00340 #if OMNIEVENTS__DEBUG_SERVANT
00341   ++_objectCount;
00342   DB(21,"ProxyPushConsumer_i::Connection::Connection() count="<<_objectCount)
00343 #endif
00344 }
00345 
00346 ProxyPushConsumer_i::Connection::~Connection()
00347 {
00348 #if OMNIEVENTS__DEBUG_SERVANT
00349   --_objectCount;
00350   DB(20,"ProxyPushConsumer_i::Connection::~Connection() count="<<_objectCount)
00351 #else
00352   DB(20,"ProxyPushConsumer_i::Connection::~Connection()")
00353 #endif
00354 }
00355 
00356 OMNIEVENTS__DEBUG_REF_COUNTS__DEFN(ProxyPushConsumer_i::Connection)
00357 
00358 void ProxyPushConsumer_i::Connection::callback(CORBA::Request_ptr req)
00359 {
00360   bool save =_targetIsProxy;
00361   if(req->return_value()>>=CORBA::Any::to_boolean(_targetIsProxy))
00362   {
00363     if(_targetIsProxy && omniEventsLog::exists())
00364     {
00365       WriteLock log;
00366       output(log.os);
00367       DB(15,"ProxyPushConsumer is federated.");
00368     }
00369   }
00370   else
00371   {
00372     DB(2,"ProxyPushConsumer got unexpected callback.");
00373     _targetIsProxy=save; // Reset it just to be sure.
00374   }
00375 }
00376 
00377 void ProxyPushConsumer_i::Connection::output(ostream& os) const
00378 {
00379   os<<"ecf/"<<_channelName;
00380   os<<"/SupplierAdmin/ProxyPushConsumer/"<<_oidstr;
00381 
00382   if(!CORBA::is_nil(_target.in()))
00383   {
00384     CORBA::String_var iorstr;
00385     iorstr = Orb::inst()._orb->object_to_string(_target.in());
00386     os<<" IOR="<<iorstr.in();
00387     if(_targetIsProxy)
00388         os<<" proxy=1";
00389   }
00390   os<<" ;;\n";
00391 }
00392 
00393 
00394 }; // end namespace OmniEvents

Generated on Fri Aug 26 20:56:14 2005 for OmniEvents by  doxygen 1.4.3-20050530