Main Page | Namespace List | Class Hierarchy | Class List | Directories | File List | Namespace Members | Class Members | File Members

ProxyPullSupplier.cc

Go to the documentation of this file.
00001 //                            Package   : omniEvents
00002 // ProxyPullSupplier.cc       Created   : 2003/12/04
00003 //                            Author    : Alex Tingle
00004 //
00005 //    Copyright (C) 2003-2005 Alex Tingle.
00006 //
00007 //    This file is part of the omniEvents application.
00008 //
00009 //    omniEvents is free software; you can redistribute it and/or
00010 //    modify it under the terms of the GNU Lesser General Public
00011 //    License as published by the Free Software Foundation; either
00012 //    version 2.1 of the License, or (at your option) any later version.
00013 //
00014 //    omniEvents is distributed in the hope that it will be useful,
00015 //    but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 //    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00017 //    Lesser General Public License for more details.
00018 //
00019 //    You should have received a copy of the GNU Lesser General Public
00020 //    License along with this library; if not, write to the Free Software
00021 //    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 //
00023 
00024 #include "ProxyPullSupplier.h"
00025 #include "EventChannel.h"
00026 #include "Orb.h"
00027 #include "omniEventsLog.h"
00028 #include "PersistNode.h"
00029 #include <assert.h>
00030 
00031 namespace OmniEvents {
00032 
00033 //
00034 //  ProxyPullSupplierManager
00035 //
00036 
00037 PortableServer::Servant ProxyPullSupplierManager::incarnate(
00038   const PortableServer::ObjectId& oid,
00039   PortableServer::POA_ptr         poa
00040 )
00041 {
00042   // Evict the oldest proxy servant, if we have reached the maximum number.
00043   if(_servants.size()>=_channel.maxNumProxies())
00044   {
00045     ProxyPullSupplier_i* oldest =NULL;
00046     unsigned long        age    =0;
00047     for(set<Proxy*>::iterator i=_servants.begin(); i!=_servants.end(); ++i)
00048         if(!oldest || dynamic_cast<ProxyPullSupplier_i*>(*i)->timestamp()<age)
00049         {
00050           oldest=dynamic_cast<ProxyPullSupplier_i*>(*i);
00051           age=oldest->timestamp();
00052         }
00053     DB(5,"Evicting oldest ProxyPullSupplier to make space for a new one")
00054     try{ oldest->disconnect_pull_supplier(); }catch(CORBA::OBJECT_NOT_EXIST&){}
00055   }
00056   // Make a new servant.
00057   ProxyPullSupplier_i* result =new ProxyPullSupplier_i(_managedPoa,_queue);
00058   _servants.insert(result);
00059   return result;
00060 }
00061 
00062 ProxyPullSupplierManager::ProxyPullSupplierManager(
00063   const EventChannel_i&   channel,
00064   PortableServer::POA_ptr parentPoa,
00065   EventQueue&             q
00066 )
00067 : ProxyManager(parentPoa),
00068   _queue(q),
00069   _channel(channel)
00070 {
00071   ProxyManager::activate("ProxyPullSupplier");
00072 }
00073 
00074 ProxyPullSupplierManager::~ProxyPullSupplierManager()
00075 {
00076   DB(20,"~ProxyPullSupplierManager()")
00077 }
00078 
00079 OMNIEVENTS__DEBUG_REF_COUNTS__DEFN(ProxyPullSupplierManager)
00080 
00081 CosEventChannelAdmin::ProxyPullSupplier_ptr
00082 ProxyPullSupplierManager::createObject()
00083 {  
00084   return createNarrowedReference<CosEventChannelAdmin::ProxyPullSupplier>(
00085            _managedPoa.in(),
00086            CosEventChannelAdmin::_tc_ProxyPullSupplier->id()
00087          );
00088 }
00089 
00090 void ProxyPullSupplierManager::disconnect()
00091 {
00092   for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i)
00093   {
00094     ProxyPullSupplier_i* pps =dynamic_cast<ProxyPullSupplier_i*>(*i);
00095     // We are in the EventChannel's thread.
00096     // Make sure all calls go though the ProxyPullSupplier POA.
00097     CosEventChannelAdmin::ProxyPullSupplier_var ppsv =pps->_this(); 
00098     ppsv->disconnect_pull_supplier();
00099 
00100   }
00101 }
00102 
00103 
00104 //
00105 //  ProxyPullSupplier_i
00106 //
00107 
00108 // CORBA interface methods
00109 
00110 void ProxyPullSupplier_i::connect_pull_consumer(
00111   CosEventComm::PullConsumer_ptr pullConsumer
00112 )
00113 {
00114   if(_connected || !CORBA::is_nil(_target) || !CORBA::is_nil(_req))
00115       throw CosEventChannelAdmin::AlreadyConnected();
00116   touch();
00117   _connected=true;
00118   if(!CORBA::is_nil(pullConsumer))
00119       _target=CosEventComm::PullConsumer::_duplicate(pullConsumer);
00120 
00121   if(omniEventsLog::exists())
00122   {
00123     WriteLock log;
00124     output(log.os);
00125   }
00126 }
00127 
00128 void ProxyPullSupplier_i::disconnect_pull_supplier()
00129 {
00130   DB(5,"ProxyPullSupplier_i::disconnect_pull_supplier()");
00131   touch();
00132   eraseKey("ConsumerAdmin/ProxyPullSupplier");
00133   deactivateObject();
00134   if(!_connected)
00135   {
00136     throw CORBA::OBJECT_NOT_EXIST(
00137       IFELSE_OMNIORB4(omni::OBJECT_NOT_EXIST_NoMatch,0),
00138       CORBA::COMPLETED_NO
00139     );
00140   }
00141   else if(!CORBA::is_nil(_target))
00142   {
00143     CORBA::Request_var req=_target->_request("disconnect_pull_consumer");
00144     _target=CosEventComm::PullConsumer::_nil();
00145     req->send_deferred();
00146     Orb::inst().deferredRequest(req._retn());
00147   }
00148 }
00149 
00150 CORBA::Any* ProxyPullSupplier_i::pull()
00151 {
00152   if(!_connected)
00153       throw CosEventComm::Disconnected();
00154   touch();
00155   if(moreEvents())
00156       return new CORBA::Any(*nextEvent());
00157   else
00158       throw CORBA::TRANSIENT(
00159         IFELSE_OMNIORB4(omni::TRANSIENT_CallTimedout,0),
00160         CORBA::COMPLETED_NO
00161       );
00162 }
00163 
00164 CORBA::Any* ProxyPullSupplier_i::try_pull(CORBA::Boolean& has_event)
00165 {
00166   if(!_connected)
00167       throw CosEventComm::Disconnected();
00168   touch();
00169   if(moreEvents())
00170   {
00171     has_event=1;
00172     return new CORBA::Any(*nextEvent());
00173   }
00174   else
00175   {
00176     has_event=0;
00177     return new CORBA::Any();
00178   }
00179 }
00180 
00181 //
00182 
00183 ProxyPullSupplier_i::ProxyPullSupplier_i(
00184   PortableServer::POA_ptr poa,
00185   EventQueue& q
00186 )
00187 : Proxy(poa),
00188   EventQueue::Reader(q),
00189   _target(CosEventComm::PullConsumer::_nil()),
00190   _connected(false),
00191   _timestamp(0)
00192 {
00193   touch();
00194 }
00195 
00196 ProxyPullSupplier_i::~ProxyPullSupplier_i()
00197 {
00198   DB(20,"~ProxyPullSupplier_i()")
00199 }
00200 
00201 void ProxyPullSupplier_i::reincarnate(
00202   const string&      oid,
00203   const PersistNode& node
00204 )
00205 {
00206   CosEventComm::PullConsumer_var pullConsumer =
00207     string_to_<CosEventComm::PullConsumer>(node.attrString("IOR").c_str());
00208   // Do not activate until we know that we have read a valid target.
00209   activateObjectWithId(oid.c_str());
00210   connect_pull_consumer(pullConsumer.in());
00211 }
00212 
00213 void ProxyPullSupplier_i::output(ostream& os)
00214 {
00215   basicOutput(os,"ConsumerAdmin/ProxyPullSupplier",_target.in());
00216 }
00217 
00218 inline void ProxyPullSupplier_i::touch()
00219 {
00220   unsigned long nsec; // dummy
00221   omni_thread::get_time(&_timestamp,&nsec);
00222 }
00223 
00224 }; // end namespace OmniEvents

Generated on Fri Aug 26 20:56:14 2005 for OmniEvents by  doxygen 1.4.3-20050530