00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "ProxyPullSupplier.h"
00025 #include "EventChannel.h"
00026 #include "Orb.h"
00027 #include "omniEventsLog.h"
00028 #include "PersistNode.h"
00029 #include <assert.h>
00030
00031 namespace OmniEvents {
00032
00033
00034
00035
00036
00037 PortableServer::Servant ProxyPullSupplierManager::incarnate(
00038 const PortableServer::ObjectId& oid,
00039 PortableServer::POA_ptr poa
00040 )
00041 {
00042
00043 if(_servants.size()>=_channel.maxNumProxies())
00044 {
00045 ProxyPullSupplier_i* oldest =NULL;
00046 unsigned long age =0;
00047 for(set<Proxy*>::iterator i=_servants.begin(); i!=_servants.end(); ++i)
00048 if(!oldest || dynamic_cast<ProxyPullSupplier_i*>(*i)->timestamp()<age)
00049 {
00050 oldest=dynamic_cast<ProxyPullSupplier_i*>(*i);
00051 age=oldest->timestamp();
00052 }
00053 DB(5,"Evicting oldest ProxyPullSupplier to make space for a new one")
00054 try{ oldest->disconnect_pull_supplier(); }catch(CORBA::OBJECT_NOT_EXIST&){}
00055 }
00056
00057 ProxyPullSupplier_i* result =new ProxyPullSupplier_i(_managedPoa,_queue);
00058 _servants.insert(result);
00059 return result;
00060 }
00061
00062 ProxyPullSupplierManager::ProxyPullSupplierManager(
00063 const EventChannel_i& channel,
00064 PortableServer::POA_ptr parentPoa,
00065 EventQueue& q
00066 )
00067 : ProxyManager(parentPoa),
00068 _queue(q),
00069 _channel(channel)
00070 {
00071 ProxyManager::activate("ProxyPullSupplier");
00072 }
00073
00074 ProxyPullSupplierManager::~ProxyPullSupplierManager()
00075 {
00076 DB(20,"~ProxyPullSupplierManager()")
00077 }
00078
00079 OMNIEVENTS__DEBUG_REF_COUNTS__DEFN(ProxyPullSupplierManager)
00080
00081 CosEventChannelAdmin::ProxyPullSupplier_ptr
00082 ProxyPullSupplierManager::createObject()
00083 {
00084 return createNarrowedReference<CosEventChannelAdmin::ProxyPullSupplier>(
00085 _managedPoa.in(),
00086 CosEventChannelAdmin::_tc_ProxyPullSupplier->id()
00087 );
00088 }
00089
00090 void ProxyPullSupplierManager::disconnect()
00091 {
00092 for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i)
00093 {
00094 ProxyPullSupplier_i* pps =dynamic_cast<ProxyPullSupplier_i*>(*i);
00095
00096
00097 CosEventChannelAdmin::ProxyPullSupplier_var ppsv =pps->_this();
00098 ppsv->disconnect_pull_supplier();
00099
00100 }
00101 }
00102
00103
00104
00105
00106
00107
00108
00109
00110 void ProxyPullSupplier_i::connect_pull_consumer(
00111 CosEventComm::PullConsumer_ptr pullConsumer
00112 )
00113 {
00114 if(_connected || !CORBA::is_nil(_target) || !CORBA::is_nil(_req))
00115 throw CosEventChannelAdmin::AlreadyConnected();
00116 touch();
00117 _connected=true;
00118 if(!CORBA::is_nil(pullConsumer))
00119 _target=CosEventComm::PullConsumer::_duplicate(pullConsumer);
00120
00121 if(omniEventsLog::exists())
00122 {
00123 WriteLock log;
00124 output(log.os);
00125 }
00126 }
00127
00128 void ProxyPullSupplier_i::disconnect_pull_supplier()
00129 {
00130 DB(5,"ProxyPullSupplier_i::disconnect_pull_supplier()");
00131 touch();
00132 eraseKey("ConsumerAdmin/ProxyPullSupplier");
00133 deactivateObject();
00134 if(!_connected)
00135 {
00136 throw CORBA::OBJECT_NOT_EXIST(
00137 IFELSE_OMNIORB4(omni::OBJECT_NOT_EXIST_NoMatch,0),
00138 CORBA::COMPLETED_NO
00139 );
00140 }
00141 else if(!CORBA::is_nil(_target))
00142 {
00143 CORBA::Request_var req=_target->_request("disconnect_pull_consumer");
00144 _target=CosEventComm::PullConsumer::_nil();
00145 req->send_deferred();
00146 Orb::inst().deferredRequest(req._retn());
00147 }
00148 }
00149
00150 CORBA::Any* ProxyPullSupplier_i::pull()
00151 {
00152 if(!_connected)
00153 throw CosEventComm::Disconnected();
00154 touch();
00155 if(moreEvents())
00156 return new CORBA::Any(*nextEvent());
00157 else
00158 throw CORBA::TRANSIENT(
00159 IFELSE_OMNIORB4(omni::TRANSIENT_CallTimedout,0),
00160 CORBA::COMPLETED_NO
00161 );
00162 }
00163
00164 CORBA::Any* ProxyPullSupplier_i::try_pull(CORBA::Boolean& has_event)
00165 {
00166 if(!_connected)
00167 throw CosEventComm::Disconnected();
00168 touch();
00169 if(moreEvents())
00170 {
00171 has_event=1;
00172 return new CORBA::Any(*nextEvent());
00173 }
00174 else
00175 {
00176 has_event=0;
00177 return new CORBA::Any();
00178 }
00179 }
00180
00181
00182
00183 ProxyPullSupplier_i::ProxyPullSupplier_i(
00184 PortableServer::POA_ptr poa,
00185 EventQueue& q
00186 )
00187 : Proxy(poa),
00188 EventQueue::Reader(q),
00189 _target(CosEventComm::PullConsumer::_nil()),
00190 _connected(false),
00191 _timestamp(0)
00192 {
00193 touch();
00194 }
00195
00196 ProxyPullSupplier_i::~ProxyPullSupplier_i()
00197 {
00198 DB(20,"~ProxyPullSupplier_i()")
00199 }
00200
00201 void ProxyPullSupplier_i::reincarnate(
00202 const string& oid,
00203 const PersistNode& node
00204 )
00205 {
00206 CosEventComm::PullConsumer_var pullConsumer =
00207 string_to_<CosEventComm::PullConsumer>(node.attrString("IOR").c_str());
00208
00209 activateObjectWithId(oid.c_str());
00210 connect_pull_consumer(pullConsumer.in());
00211 }
00212
00213 void ProxyPullSupplier_i::output(ostream& os)
00214 {
00215 basicOutput(os,"ConsumerAdmin/ProxyPullSupplier",_target.in());
00216 }
00217
00218 inline void ProxyPullSupplier_i::touch()
00219 {
00220 unsigned long nsec;
00221 omni_thread::get_time(&_timestamp,&nsec);
00222 }
00223
00224 };