Main Page | Namespace List | Class Hierarchy | Class List | Directories | File List | Namespace Members | Class Members | File Members

ConsumerAdmin.cc

Go to the documentation of this file.
00001 //                            Package   : omniEvents
00002 // ConsumerAdmin.cc           Created   : 2003/12/04
00003 //                            Author    : Alex Tingle
00004 //
00005 //    Copyright (C) 2003-2005 Alex Tingle.
00006 //
00007 //    This file is part of the omniEvents application.
00008 //
00009 //    omniEvents is free software; you can redistribute it and/or
00010 //    modify it under the terms of the GNU Lesser General Public
00011 //    License as published by the Free Software Foundation; either
00012 //    version 2.1 of the License, or (at your option) any later version.
00013 //
00014 //    omniEvents is distributed in the hope that it will be useful,
00015 //    but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 //    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00017 //    Lesser General Public License for more details.
00018 //
00019 //    You should have received a copy of the GNU Lesser General Public
00020 //    License along with this library; if not, write to the Free Software
00021 //    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 //
00023 
00024 #include "ConsumerAdmin.h"
00025 
00026 #include "EventChannel.h"
00027 #include "ProxyPushSupplier.h"
00028 #include "ProxyPullSupplier.h"
00029 #include "Orb.h"
00030 #include "PersistNode.h"
00031 #include "Filter.h"
00032 
00033 namespace OmniEvents {
00034 
00035 
00036 CosEventChannelAdmin::ProxyPushSupplier_ptr
00037 ConsumerAdmin_i::obtain_push_supplier()
00038 {
00039   if(!_pushSupplier)
00040       _pushSupplier=new ProxyPushSupplierManager(_poa,_queue);
00041   return _pushSupplier->createObject();
00042 }
00043 
00044 
00045 CosEventChannelAdmin::ProxyPullSupplier_ptr
00046 ConsumerAdmin_i::obtain_pull_supplier()
00047 {
00048   if(!_pullSupplier)
00049       _pullSupplier=new ProxyPullSupplierManager(_channel,_poa,_queue);
00050   return _pullSupplier->createObject();
00051 }
00052 
00053 
00054 ConsumerAdmin_i::ConsumerAdmin_i(
00055   const EventChannel_i&   channel,
00056   PortableServer::POA_ptr poa
00057 )
00058 : Servant(poa),
00059   _channel(channel),
00060   _queue(channel.maxQueueLength()),
00061   _pushSupplier(NULL),
00062   _pullSupplier(NULL)
00063 {
00064   if(_channel.properties().hasAttr("FilterId"))
00065   {
00066     string rid =_channel.properties().attrString("FilterId");
00067     _queue.setFilter(new FilterByRepositoryId(rid.c_str()));
00068   }
00069   else if(_channel.properties().hasAttr("FilterKind"))
00070   {
00071     CORBA::TCKind kind =
00072       CORBA::TCKind(_channel.properties().attrLong("FilterKind"));
00073     _queue.setFilter(new FilterByTCKind(kind));
00074   }
00075 
00076   activateObjectWithId("ConsumerAdmin");
00077 }
00078 
00079 
00080 ConsumerAdmin_i::~ConsumerAdmin_i()
00081 {
00082   DB(20,"~ConsumerAdmin_i()")
00083   if(_pushSupplier)
00084   {
00085     _pushSupplier->_remove_ref(); // terminates thread.
00086     _pushSupplier=NULL;
00087   }
00088   if(_pullSupplier)
00089   {
00090     _pullSupplier->_remove_ref();
00091     _pullSupplier=NULL;
00092   }
00093 }
00094 
00095 
00096 OMNIEVENTS__DEBUG_REF_COUNTS__DEFN(ConsumerAdmin_i)
00097 
00098 
00099 void ConsumerAdmin_i::send(CORBA::Any* event)
00100 {
00101   ProxyPushSupplierManager::PauseThenWake p(_pushSupplier);
00102   _queue.append(event);
00103 }
00104 
00105 
00106 void ConsumerAdmin_i::send(list<CORBA::Any*>& events)
00107 {
00108   if(!events.empty())
00109   {
00110     ProxyPushSupplierManager::PauseThenWake p(_pushSupplier);
00111     for(list<CORBA::Any*>::iterator i=events.begin(); i!=events.end(); ++i)
00112         _queue.append( *i );
00113     events.clear();
00114   }
00115 }
00116 
00117 
00118 void ConsumerAdmin_i::disconnect()
00119 {
00120   if(_pushSupplier)
00121      _pushSupplier->disconnect();
00122   if(_pullSupplier)
00123      _pullSupplier->disconnect();
00124 }
00125 
00126 
00127 void ConsumerAdmin_i::reincarnate(const PersistNode& node)
00128 {
00129   // Build Push Supplier proxies
00130   PersistNode* pushsNode =node.child("ProxyPushSupplier");
00131   if(pushsNode && !pushsNode->_child.empty())
00132   {
00133     _pushSupplier=new ProxyPushSupplierManager(_poa,_queue);
00134     _pushSupplier->reincarnate(*pushsNode);
00135   }
00136 
00137   // Build Pull Supplier proxies
00138   PersistNode* pullsNode =node.child("ProxyPullSupplier");
00139   if(pullsNode && !pullsNode->_child.empty())
00140   {
00141     _pullSupplier=new ProxyPullSupplierManager(_channel,_poa,_queue);
00142     _pullSupplier->reincarnate(*pullsNode);
00143   }
00144 }
00145 
00146 
00147 void ConsumerAdmin_i::output(ostream& os)
00148 {
00149   if(_pushSupplier)
00150   {
00151     omni_mutex_lock l(_pushSupplier->_lock);
00152     _pushSupplier->output(os);
00153   }
00154   if(_pullSupplier)
00155   {
00156     _pullSupplier->output(os);
00157   }
00158 }
00159 
00160 
00161 }; // end namespace OmniEvents

Generated on Fri Aug 26 20:56:13 2005 for OmniEvents by  doxygen 1.4.3-20050530