00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "ProxyPullConsumer.h"
00025 #include "Orb.h"
00026 #include "omniEventsLog.h"
00027 #include "PersistNode.h"
00028 #include <assert.h>
00029
00030 namespace OmniEvents {
00031
00032
00033
00034
00035
00036 PortableServer::Servant
00037 ProxyPullConsumerManager::incarnate(
00038 const PortableServer::ObjectId& oid,
00039 PortableServer::POA_ptr poa
00040 )
00041 {
00042 DB(20,"ProxyPullConsumerManager::incarnate()")
00043 ProxyPullConsumer_i* result =new ProxyPullConsumer_i(_managedPoa,_queue);
00044 _servants.insert(result);
00045 return result;
00046 }
00047
00048 ProxyPullConsumerManager::ProxyPullConsumerManager(
00049 PortableServer::POA_ptr parentPoa,
00050 list<CORBA::Any*>& q
00051 )
00052 : ProxyManager(parentPoa),
00053 _queue(q)
00054 {
00055 ProxyManager::activate("ProxyPullConsumer");
00056 }
00057
00058 ProxyPullConsumerManager::~ProxyPullConsumerManager()
00059 {
00060 DB(20,"~ProxyPullConsumerManager()")
00061 }
00062
00063 OMNIEVENTS__DEBUG_REF_COUNTS__DEFN(ProxyPullConsumerManager)
00064
00065 CosEventChannelAdmin::ProxyPullConsumer_ptr
00066 ProxyPullConsumerManager::createObject()
00067 {
00068 return createNarrowedReference<CosEventChannelAdmin::ProxyPullConsumer>(
00069 _managedPoa.in(),
00070 CosEventChannelAdmin::_tc_ProxyPullConsumer->id()
00071 );
00072 }
00073
00074 void ProxyPullConsumerManager::collect()
00075 {
00076
00077 for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i)
00078 {
00079 ProxyPullConsumer_i* proxy=dynamic_cast<ProxyPullConsumer_i*>(*i);
00080 proxy->collect();
00081 }
00082 }
00083
00084 void ProxyPullConsumerManager::triggerRequest()
00085 {
00086
00087 for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i)
00088 {
00089 ProxyPullConsumer_i* proxy=dynamic_cast<ProxyPullConsumer_i*>(*i);
00090 proxy->triggerRequest();
00091 }
00092 }
00093
00094 void ProxyPullConsumerManager::disconnect()
00095 {
00096 for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i)
00097 {
00098 Proxy* p =*i;
00099 ProxyPullConsumer_i* ppc =static_cast<ProxyPullConsumer_i*>(p);
00100
00101
00102 CosEventChannelAdmin::ProxyPullConsumer_var ppcv =ppc->_this();
00103 ppcv->disconnect_pull_consumer();
00104 }
00105 }
00106
00107
00108
00109
00110
00111
00112
00113
00114 void ProxyPullConsumer_i::connect_pull_supplier(
00115 CosEventComm::PullSupplier_ptr pullSupplier
00116 )
00117 {
00118 if(CORBA::is_nil(pullSupplier))
00119 throw CORBA::BAD_PARAM();
00120 if(!CORBA::is_nil(_target) || !CORBA::is_nil(_req))
00121 throw CosEventChannelAdmin::AlreadyConnected();
00122 _target=CosEventComm::PullSupplier::_duplicate(pullSupplier);
00123
00124 if(omniEventsLog::exists())
00125 {
00126 WriteLock log;
00127 output(log.os);
00128 }
00129 }
00130
00131 void ProxyPullConsumer_i::disconnect_pull_consumer()
00132 {
00133 DB(5,"ProxyPullConsumer_i::disconnect_pull_consumer()");
00134 eraseKey("SupplierAdmin/ProxyPullConsumer");
00135 deactivateObject();
00136 if(CORBA::is_nil(_target))
00137 {
00138 throw CORBA::OBJECT_NOT_EXIST(
00139 IFELSE_OMNIORB4(omni::OBJECT_NOT_EXIST_NoMatch,0),
00140 CORBA::COMPLETED_NO
00141 );
00142 }
00143 else
00144 {
00145 CORBA::Request_var req=_target->_request("disconnect_pull_supplier");
00146 _target=CosEventComm::PullSupplier::_nil();
00147 req->send_deferred();
00148 Orb::inst().deferredRequest(req._retn());
00149 }
00150 }
00151
00152
00153
00154 ProxyPullConsumer_i::ProxyPullConsumer_i(
00155 PortableServer::POA_ptr poa,
00156 list<CORBA::Any*>& q
00157 )
00158 : Proxy(poa),
00159 _target(CosEventComm::PullSupplier::_nil()),
00160 _queue(q),
00161 _mode(Pull),
00162 _exceptionCount(0)
00163 {}
00164
00165 ProxyPullConsumer_i::~ProxyPullConsumer_i()
00166 {
00167 DB(20,"~ProxyPullConsumer_i()")
00168 }
00169
00170 void ProxyPullConsumer_i::collect()
00171 {
00172 if(!CORBA::is_nil(_req) && _req->poll_response())
00173 {
00174 const char* opname =_req->operation();
00175 assert(opname);
00176 CORBA::Environment_ptr env =_req->env();
00177
00178 if(!CORBA::is_nil(env) && env->exception())
00179 {
00180 CORBA::Exception* ex =env->exception();
00181 DB(10,"ProxyPullConsumer got exception"
00182 IF_OMNIORB4(<<": "<<ex->_name())<<", op:"<<opname);
00183 if(0==strcmp("pull",opname) || 0==strcmp("try_pull",opname))
00184 {
00185 ++_exceptionCount;
00186 _mode=( _mode==Pull? TryPull: Pull );
00187 }
00188 else
00189 DB(2,"Ignoring unrecognised response. operation:"<<opname);
00190 if(_exceptionCount>=4)
00191 {
00192 Orb::inst().reportObjectFailure(HERE,_target.in(),ex);
00193
00194
00195 CORBA::Request_var req=_target->_request("disconnect_pull_supplier");
00196 req->send_deferred();
00197 Orb::inst().deferredRequest(req._retn());
00198
00199 _target=CosEventComm::PullSupplier::_nil();
00200 eraseKey("SupplierAdmin/ProxyPullConsumer");
00201 deactivateObject();
00202 }
00203 }
00204 else
00205 {
00206
00207 bool hasEvent=false;
00208 if(0==strcmp("pull",opname))
00209 {
00210 hasEvent=true;
00211 }
00212 else if(0==strcmp("try_pull",opname))
00213 {
00214 CORBA::NVList_ptr args=_req->arguments();
00215 if(args->count()==1)
00216 {
00217 CORBA::NamedValue_var hasEventArg=args->item(0);
00218 if(0==strcmp(hasEventArg->name(),"has_event"))
00219 {
00220 CORBA::Any* a =hasEventArg->value();
00221 CORBA::Boolean b;
00222 CORBA::Any::to_boolean tb(b);
00223 hasEvent=(((*a)>>=tb) && b);
00224 }
00225 }
00226 }
00227
00228 if(hasEvent)
00229 {
00230 CORBA::Any* event =new CORBA::Any();
00231 _req->return_value() >>= (*event);
00232 _queue.push_back(event);
00233 }
00234
00235 _exceptionCount=0;
00236 }
00237 _req=CORBA::Request::_nil();
00238 }
00239 }
00240
00241 void ProxyPullConsumer_i::triggerRequest()
00242 {
00243 if(CORBA::is_nil(_req) && !CORBA::is_nil(_target))
00244 {
00245 switch(_mode)
00246 {
00247 case Pull:
00248 _req=_target->_request("pull");
00249 break;
00250 case TryPull:
00251 _req=_target->_request("try_pull");
00252 _req->add_out_arg("has_event")<<=CORBA::Any::from_boolean(1);
00253 break;
00254 default:
00255 assert(0);
00256 }
00257 _req->set_return_type(CORBA::_tc_any);
00258 _req->send_deferred();
00259 }
00260 }
00261
00262 void ProxyPullConsumer_i::reincarnate(
00263 const string& oid,
00264 const PersistNode& node
00265 )
00266 {
00267 CosEventComm::PullSupplier_var pullSupplier =
00268 string_to_<CosEventComm::PullSupplier>(node.attrString("IOR").c_str());
00269
00270 activateObjectWithId(oid.c_str());
00271 connect_pull_supplier(pullSupplier.in());
00272 }
00273
00274 void ProxyPullConsumer_i::output(ostream& os)
00275 {
00276 basicOutput(os,"SupplierAdmin/ProxyPullConsumer",_target.in());
00277 }
00278
00279 };