00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "SupplierAdmin.h"
00025
00026 #include "EventChannel.h"
00027 #include "ProxyPushConsumer.h"
00028 #include "ProxyPullConsumer.h"
00029 #include "Orb.h"
00030 #include "PersistNode.h"
00031
00032 #define MILLION 1000000
00033 #define BILLION 1000000000
00034
00035 namespace OmniEvents {
00036
00037 CosEventChannelAdmin::ProxyPushConsumer_ptr
00038 SupplierAdmin_i::obtain_push_consumer()
00039 {
00040 return _pushConsumer->createObject();
00041 }
00042
00043
00044 CosEventChannelAdmin::ProxyPullConsumer_ptr
00045 SupplierAdmin_i::obtain_pull_consumer()
00046 {
00047 if(!_pullConsumer)
00048 _pullConsumer=new ProxyPullConsumerManager(_poa,_queue);
00049 return _pullConsumer->createObject();
00050 }
00051
00052
00053 SupplierAdmin_i::SupplierAdmin_i(
00054 const EventChannel_i& channel,
00055 PortableServer::POA_ptr poa
00056 )
00057 : Servant(poa),
00058 _channel(channel),
00059 _pushConsumer(NULL),
00060 _pullConsumer(NULL),
00061 _queue(),
00062 _nextPull(0,0)
00063 {
00064
00065
00066 if(_channel.pullRetryPeriod_ms() > (_channel.cyclePeriod_ns()/MILLION))
00067 {
00068 omni_thread::get_time(&(_nextPull.first),&(_nextPull.second));
00069 }
00070
00071
00072
00073
00074
00075
00076
00077 _pushConsumer=new ProxyPushConsumer_i(_poa,_queue,_channel.consumerAdmin());
00078
00079 activateObjectWithId("SupplierAdmin");
00080 }
00081
00082
00083 SupplierAdmin_i::~SupplierAdmin_i()
00084 {
00085 DB(20,"~SupplierAdmin_i()")
00086 if(_pullConsumer)
00087 {
00088 _pullConsumer->_remove_ref();
00089 _pullConsumer=NULL;
00090 }
00091 if(_pushConsumer)
00092 {
00093 delete _pushConsumer;
00094 _pushConsumer=NULL;
00095 }
00096 for(list<CORBA::Any*>::iterator i=_queue.begin(); i!=_queue.end(); ++i)
00097 delete *i;
00098 }
00099
00100
00101 OMNIEVENTS__DEBUG_REF_COUNTS__DEFN(SupplierAdmin_i)
00102
00103
00104 void SupplierAdmin_i::collect(list<CORBA::Any*>& events)
00105 {
00106 if(_pullConsumer)
00107 {
00108 _pullConsumer->collect();
00109 if(0==_nextPull.first)
00110 {
00111 _pullConsumer->triggerRequest();
00112 }
00113 else
00114 {
00115 pair<unsigned long,unsigned long> now;
00116 omni_thread::get_time(&(now.first),&(now.second));
00117 if(now>=_nextPull)
00118 {
00119 _pullConsumer->triggerRequest();
00120
00121 CORBA::ULong p =_channel.pullRetryPeriod_ms();
00122 do{
00123 _nextPull.second += (p%1000)*MILLION;
00124 _nextPull.first += p/1000 + _nextPull.second/BILLION;
00125 _nextPull.second %= BILLION;
00126 } while(now>=_nextPull);
00127 }
00128 }
00129 }
00130 _pushConsumer->trigger();
00131
00132 events=_queue;
00133 _queue.clear();
00134 }
00135
00136
00137 void SupplierAdmin_i::disconnect()
00138 {
00139 if(_pushConsumer)
00140 _pushConsumer->disconnect();
00141 if(_pullConsumer)
00142 _pullConsumer->disconnect();
00143 }
00144
00145
00146 void SupplierAdmin_i::reincarnate(const PersistNode& node)
00147 {
00148
00149 PersistNode* pushcNode =node.child("ProxyPushConsumer");
00150 if(pushcNode && !pushcNode->_child.empty())
00151 {
00152 assert(_pushConsumer!=NULL);
00153 _pushConsumer->reincarnate(*pushcNode);
00154 }
00155
00156
00157 PersistNode* pullcNode =node.child("ProxyPullConsumer");
00158 if(pullcNode && !pullcNode->_child.empty())
00159 {
00160 if(!_pullConsumer)
00161 _pullConsumer=new ProxyPullConsumerManager(_poa,_queue);
00162 _pullConsumer->reincarnate(*pullcNode);
00163 }
00164 }
00165
00166
00167 void SupplierAdmin_i::output(ostream& os)
00168 {
00169 if(_pushConsumer)
00170 _pushConsumer->output(os);
00171 if(_pullConsumer)
00172 _pullConsumer->output(os);
00173 }
00174
00175
00176 };