Main Page | Namespace List | Class Hierarchy | Class List | Directories | File List | Namespace Members | Class Members | File Members

SupplierAdmin.cc

Go to the documentation of this file.
00001 //                            Package   : omniEvents
00002 // SupplierAdmin.h            Created   : 2003/12/04
00003 //                            Author    : Alex Tingle
00004 //
00005 //    Copyright (C) 2003-2005 Alex Tingle.
00006 //
00007 //    This file is part of the omniEvents application.
00008 //
00009 //    omniEvents is free software; you can redistribute it and/or
00010 //    modify it under the terms of the GNU Lesser General Public
00011 //    License as published by the Free Software Foundation; either
00012 //    version 2.1 of the License, or (at your option) any later version.
00013 //
00014 //    omniEvents is distributed in the hope that it will be useful,
00015 //    but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 //    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00017 //    Lesser General Public License for more details.
00018 //
00019 //    You should have received a copy of the GNU Lesser General Public
00020 //    License along with this library; if not, write to the Free Software
00021 //    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 //
00023 
00024 #include "SupplierAdmin.h"
00025 
00026 #include "EventChannel.h"
00027 #include "ProxyPushConsumer.h"
00028 #include "ProxyPullConsumer.h"
00029 #include "Orb.h"
00030 #include "PersistNode.h"
00031 
00032 #define MILLION 1000000
00033 #define BILLION 1000000000
00034 
00035 namespace OmniEvents {
00036 
00037 CosEventChannelAdmin::ProxyPushConsumer_ptr
00038 SupplierAdmin_i::obtain_push_consumer()
00039 {
00040   return _pushConsumer->createObject();
00041 }
00042 
00043 
00044 CosEventChannelAdmin::ProxyPullConsumer_ptr
00045 SupplierAdmin_i::obtain_pull_consumer()
00046 {
00047   if(!_pullConsumer)
00048       _pullConsumer=new ProxyPullConsumerManager(_poa,_queue);
00049   return _pullConsumer->createObject();
00050 }
00051 
00052 
00053 SupplierAdmin_i::SupplierAdmin_i(
00054   const EventChannel_i&   channel,
00055   PortableServer::POA_ptr poa
00056 )
00057 : Servant(poa),
00058   _channel(channel),
00059   _pushConsumer(NULL),
00060   _pullConsumer(NULL),
00061   _queue(),
00062   _nextPull(0,0)
00063 {
00064   // Initialise _nextPull. Only set it if the cycle period is LESS than the
00065   // pull retry period - otherwise just pull every cycle.
00066   if(_channel.pullRetryPeriod_ms() > (_channel.cyclePeriod_ns()/MILLION))
00067   {
00068     omni_thread::get_time(&(_nextPull.first),&(_nextPull.second));
00069   }
00070 
00071   // Always create the ProxyPushConsumer_i default servant. This allows
00072   // lazy clients to connect suppliers without having to go through the
00073   // proper procedure - they can make up an appropriate ObjectId, call push()
00074   // and it will just work (TM).
00075   // Note: A SupplierAdmin_i is always created by the EventChannel to allow this
00076   // behaviour.
00077   _pushConsumer=new ProxyPushConsumer_i(_poa,_queue,_channel.consumerAdmin());
00078   
00079   activateObjectWithId("SupplierAdmin");
00080 }
00081 
00082 
00083 SupplierAdmin_i::~SupplierAdmin_i()
00084 {
00085   DB(20,"~SupplierAdmin_i()")
00086   if(_pullConsumer)
00087   {
00088     _pullConsumer->_remove_ref();
00089     _pullConsumer=NULL;
00090   }
00091   if(_pushConsumer)
00092   {
00093     delete _pushConsumer;
00094     _pushConsumer=NULL;
00095   }
00096   for(list<CORBA::Any*>::iterator i=_queue.begin(); i!=_queue.end(); ++i)
00097       delete *i;
00098 }
00099 
00100 
00101 OMNIEVENTS__DEBUG_REF_COUNTS__DEFN(SupplierAdmin_i)
00102 
00103 
00104 void SupplierAdmin_i::collect(list<CORBA::Any*>& events)
00105 {
00106   if(_pullConsumer)
00107   {
00108     _pullConsumer->collect();
00109     if(0==_nextPull.first)
00110     { // No delay between pulls.
00111       _pullConsumer->triggerRequest();
00112     }
00113     else
00114     { // Only trigger new pull() calls if `pullRetry' ms have passed.
00115       pair<unsigned long,unsigned long> now;
00116       omni_thread::get_time(&(now.first),&(now.second));
00117       if(now>=_nextPull)
00118       {
00119         _pullConsumer->triggerRequest();
00120 
00121         CORBA::ULong p =_channel.pullRetryPeriod_ms();
00122         do{
00123           _nextPull.second += (p%1000)*MILLION;                    // nsec
00124           _nextPull.first  +=  p/1000 + _nextPull.second/BILLION;  // sec
00125           _nextPull.second %= BILLION;                             // nsec
00126         } while(now>=_nextPull);
00127       }
00128     }
00129   }
00130   _pushConsumer->trigger();
00131   // Pick up events from both pull & push consumers.
00132   events=_queue;
00133   _queue.clear();
00134 }
00135 
00136 
00137 void SupplierAdmin_i::disconnect()
00138 {
00139   if(_pushConsumer)
00140      _pushConsumer->disconnect();
00141   if(_pullConsumer)
00142      _pullConsumer->disconnect();
00143 }
00144 
00145 
00146 void SupplierAdmin_i::reincarnate(const PersistNode& node)
00147 {
00148   // Build Push Consumer proxies
00149   PersistNode* pushcNode =node.child("ProxyPushConsumer");
00150   if(pushcNode && !pushcNode->_child.empty())
00151   {
00152     assert(_pushConsumer!=NULL);
00153     _pushConsumer->reincarnate(*pushcNode);
00154   }
00155 
00156   // Build Pull Consumer proxies
00157   PersistNode* pullcNode =node.child("ProxyPullConsumer");
00158   if(pullcNode && !pullcNode->_child.empty())
00159   {
00160     if(!_pullConsumer)
00161         _pullConsumer=new ProxyPullConsumerManager(_poa,_queue);
00162     _pullConsumer->reincarnate(*pullcNode);
00163   }
00164 }
00165 
00166 
00167 void SupplierAdmin_i::output(ostream& os)
00168 {
00169   if(_pushConsumer)
00170      _pushConsumer->output(os);
00171   if(_pullConsumer)
00172      _pullConsumer->output(os);
00173 }
00174 
00175 
00176 }; // end namespace OmniEvents

Generated on Fri Aug 26 20:56:14 2005 for OmniEvents by  doxygen 1.4.3-20050530