Main Page | Namespace List | Class Hierarchy | Class List | Directories | File List | Namespace Members | Class Members | File Members

ProxyPullConsumer.cc

Go to the documentation of this file.
00001 //                            Package   : omniEvents
00002 // ProxyPullConsumer.cc       Created   : 2003/12/04
00003 //                            Author    : Alex Tingle
00004 //
00005 //    Copyright (C) 2003-2005 Alex Tingle.
00006 //
00007 //    This file is part of the omniEvents application.
00008 //
00009 //    omniEvents is free software; you can redistribute it and/or
00010 //    modify it under the terms of the GNU Lesser General Public
00011 //    License as published by the Free Software Foundation; either
00012 //    version 2.1 of the License, or (at your option) any later version.
00013 //
00014 //    omniEvents is distributed in the hope that it will be useful,
00015 //    but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 //    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00017 //    Lesser General Public License for more details.
00018 //
00019 //    You should have received a copy of the GNU Lesser General Public
00020 //    License along with this library; if not, write to the Free Software
00021 //    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 //
00023 
00024 #include "ProxyPullConsumer.h"
00025 #include "Orb.h"
00026 #include "omniEventsLog.h"
00027 #include "PersistNode.h"
00028 #include <assert.h>
00029 
00030 namespace OmniEvents {
00031 
00032 //
00033 //  ProxyPullConsumerManager
00034 //
00035 
00036 PortableServer::Servant
00037 ProxyPullConsumerManager::incarnate(
00038   const PortableServer::ObjectId& oid,
00039   PortableServer::POA_ptr         poa
00040 )
00041 {
00042   DB(20,"ProxyPullConsumerManager::incarnate()")
00043   ProxyPullConsumer_i* result =new ProxyPullConsumer_i(_managedPoa,_queue);
00044   _servants.insert(result);
00045   return result;
00046 }
00047 
00048 ProxyPullConsumerManager::ProxyPullConsumerManager(
00049   PortableServer::POA_ptr parentPoa,
00050   list<CORBA::Any*>&      q
00051 )
00052 : ProxyManager(parentPoa),
00053   _queue(q)
00054 {
00055   ProxyManager::activate("ProxyPullConsumer");
00056 }
00057 
00058 ProxyPullConsumerManager::~ProxyPullConsumerManager()
00059 {
00060   DB(20,"~ProxyPullConsumerManager()")
00061 }
00062 
00063 OMNIEVENTS__DEBUG_REF_COUNTS__DEFN(ProxyPullConsumerManager)
00064 
00065 CosEventChannelAdmin::ProxyPullConsumer_ptr
00066 ProxyPullConsumerManager::createObject()
00067 {
00068   return createNarrowedReference<CosEventChannelAdmin::ProxyPullConsumer>(
00069            _managedPoa.in(),
00070            CosEventChannelAdmin::_tc_ProxyPullConsumer->id()
00071          );
00072 }
00073 
00074 void ProxyPullConsumerManager::collect()
00075 {
00076   // Collect events from each servant in turn.
00077   for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i)
00078   {
00079     ProxyPullConsumer_i* proxy=dynamic_cast<ProxyPullConsumer_i*>(*i);
00080     proxy->collect();
00081   }
00082 }
00083 
00084 void ProxyPullConsumerManager::triggerRequest()
00085 {
00086   // Trigger each servant in turn.
00087   for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i)
00088   {
00089     ProxyPullConsumer_i* proxy=dynamic_cast<ProxyPullConsumer_i*>(*i);
00090     proxy->triggerRequest();
00091   }
00092 }
00093 
00094 void ProxyPullConsumerManager::disconnect()
00095 {
00096   for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i)
00097   {
00098     Proxy* p =*i; // Sun's CC requires this temporary.
00099     ProxyPullConsumer_i* ppc =static_cast<ProxyPullConsumer_i*>(p);
00100     // We are in the EventChannel's thread.
00101     // Make sure all calls go though the ProxyPullConsumer POA.
00102     CosEventChannelAdmin::ProxyPullConsumer_var ppcv =ppc->_this(); 
00103     ppcv->disconnect_pull_consumer();
00104   }
00105 }
00106 
00107 
00108 //
00109 //  ProxyPullConsumer_i
00110 //
00111 
00112 // CORBA interface methods
00113 
00114 void ProxyPullConsumer_i::connect_pull_supplier(
00115   CosEventComm::PullSupplier_ptr pullSupplier
00116 )
00117 {
00118   if(CORBA::is_nil(pullSupplier))
00119       throw CORBA::BAD_PARAM();
00120   if(!CORBA::is_nil(_target) || !CORBA::is_nil(_req))
00121       throw CosEventChannelAdmin::AlreadyConnected();
00122   _target=CosEventComm::PullSupplier::_duplicate(pullSupplier);
00123 
00124   if(omniEventsLog::exists())
00125   {
00126     WriteLock log;
00127     output(log.os);
00128   }
00129 }
00130 
00131 void ProxyPullConsumer_i::disconnect_pull_consumer()
00132 {
00133   DB(5,"ProxyPullConsumer_i::disconnect_pull_consumer()");
00134   eraseKey("SupplierAdmin/ProxyPullConsumer");
00135   deactivateObject();
00136   if(CORBA::is_nil(_target))
00137   {
00138     throw CORBA::OBJECT_NOT_EXIST(
00139       IFELSE_OMNIORB4(omni::OBJECT_NOT_EXIST_NoMatch,0),
00140       CORBA::COMPLETED_NO
00141     );
00142   }
00143   else
00144   {
00145     CORBA::Request_var req=_target->_request("disconnect_pull_supplier");
00146     _target=CosEventComm::PullSupplier::_nil();
00147     req->send_deferred();
00148     Orb::inst().deferredRequest(req._retn());
00149   }
00150 }
00151 
00152 //
00153 
00154 ProxyPullConsumer_i::ProxyPullConsumer_i(
00155   PortableServer::POA_ptr poa,
00156   list<CORBA::Any*>&      q
00157 )
00158 : Proxy(poa),
00159   _target(CosEventComm::PullSupplier::_nil()),
00160   _queue(q),
00161   _mode(Pull), // Prefer 'pull' method calls.
00162   _exceptionCount(0)
00163 {}
00164 
00165 ProxyPullConsumer_i::~ProxyPullConsumer_i()
00166 {
00167   DB(20,"~ProxyPullConsumer_i()")
00168 }
00169 
00170 void ProxyPullConsumer_i::collect()
00171 {
00172   if(!CORBA::is_nil(_req) && _req->poll_response()) 
00173   {
00174     const char* opname =_req->operation();
00175     assert(opname);
00176     CORBA::Environment_ptr env =_req->env(); // No need to release environment.
00177 
00178     if(!CORBA::is_nil(env) && env->exception()) 
00179     {
00180       CORBA::Exception* ex =env->exception(); // No need to free exception.
00181       DB(10,"ProxyPullConsumer got exception"
00182            IF_OMNIORB4(<<": "<<ex->_name())<<", op:"<<opname);
00183       if(0==strcmp("pull",opname) || 0==strcmp("try_pull",opname))
00184       {
00185         ++_exceptionCount;
00186         _mode=( _mode==Pull? TryPull: Pull ); // Try something else next time.
00187       }
00188       else
00189           DB(2,"Ignoring unrecognised response. operation:"<<opname);
00190       if(_exceptionCount>=4)
00191       {
00192         Orb::inst().reportObjectFailure(HERE,_target.in(),ex);
00193 
00194         // Try to notify the Supplier that the connection is closing.
00195         CORBA::Request_var req=_target->_request("disconnect_pull_supplier");
00196         req->send_deferred();
00197         Orb::inst().deferredRequest(req._retn());
00198 
00199         _target=CosEventComm::PullSupplier::_nil(); // disconnected
00200         eraseKey("SupplierAdmin/ProxyPullConsumer");
00201         deactivateObject();
00202       }
00203     }
00204     else  
00205     {
00206       // Do we have an event?
00207       bool hasEvent=false;
00208       if(0==strcmp("pull",opname))
00209       {
00210         hasEvent=true;
00211       }
00212       else if(0==strcmp("try_pull",opname))
00213       {
00214         CORBA::NVList_ptr args=_req->arguments(); // No need to release args.
00215         if(args->count()==1)
00216         {
00217           CORBA::NamedValue_var hasEventArg=args->item(0);
00218           if(0==strcmp(hasEventArg->name(),"has_event"))
00219           {
00220             CORBA::Any* a =hasEventArg->value();
00221             CORBA::Boolean b;
00222             CORBA::Any::to_boolean tb(b); //MS VC++6 is on drugs!
00223             hasEvent=(((*a)>>=tb) && b);
00224           }
00225         }
00226       }
00227       // Pick up an event, if we have one.
00228       if(hasEvent)
00229       {
00230         CORBA::Any* event =new CORBA::Any();
00231         _req->return_value() >>= (*event);
00232         _queue.push_back(event);
00233       }
00234       // Reset the exception count.
00235       _exceptionCount=0;
00236     }
00237     _req=CORBA::Request::_nil();
00238   }
00239 } // ProxyPullConsumer_i::end collect()
00240 
00241 void ProxyPullConsumer_i::triggerRequest()
00242 {
00243   if(CORBA::is_nil(_req) && !CORBA::is_nil(_target))
00244   {
00245     switch(_mode)
00246     {
00247       case Pull:
00248           _req=_target->_request("pull");
00249           break;
00250       case TryPull:
00251           _req=_target->_request("try_pull");
00252           _req->add_out_arg("has_event")<<=CORBA::Any::from_boolean(1);
00253           break;
00254       default:
00255           assert(0);
00256     }
00257     _req->set_return_type(CORBA::_tc_any);
00258     _req->send_deferred();
00259   }
00260 }
00261 
00262 void ProxyPullConsumer_i::reincarnate(
00263   const string&      oid,
00264   const PersistNode& node
00265 )
00266 {
00267   CosEventComm::PullSupplier_var pullSupplier =
00268     string_to_<CosEventComm::PullSupplier>(node.attrString("IOR").c_str());
00269   // Do not activate until we know that we have read a valid target.
00270   activateObjectWithId(oid.c_str());
00271   connect_pull_supplier(pullSupplier.in());
00272 }
00273 
00274 void ProxyPullConsumer_i::output(ostream& os)
00275 {
00276   basicOutput(os,"SupplierAdmin/ProxyPullConsumer",_target.in());
00277 }
00278 
00279 }; // end namespace OmniEvents

Generated on Fri Aug 26 20:56:14 2005 for OmniEvents by  doxygen 1.4.3-20050530