00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "ConsumerAdmin.h"
00025
00026 #include "EventChannel.h"
00027 #include "ProxyPushSupplier.h"
00028 #include "ProxyPullSupplier.h"
00029 #include "Orb.h"
00030 #include "PersistNode.h"
00031 #include "Filter.h"
00032
00033 namespace OmniEvents {
00034
00035
00036 CosEventChannelAdmin::ProxyPushSupplier_ptr
00037 ConsumerAdmin_i::obtain_push_supplier()
00038 {
00039 if(!_pushSupplier)
00040 _pushSupplier=new ProxyPushSupplierManager(_poa,_queue);
00041 return _pushSupplier->createObject();
00042 }
00043
00044
00045 CosEventChannelAdmin::ProxyPullSupplier_ptr
00046 ConsumerAdmin_i::obtain_pull_supplier()
00047 {
00048 if(!_pullSupplier)
00049 _pullSupplier=new ProxyPullSupplierManager(_channel,_poa,_queue);
00050 return _pullSupplier->createObject();
00051 }
00052
00053
00054 ConsumerAdmin_i::ConsumerAdmin_i(
00055 const EventChannel_i& channel,
00056 PortableServer::POA_ptr poa
00057 )
00058 : Servant(poa),
00059 _channel(channel),
00060 _queue(channel.maxQueueLength()),
00061 _pushSupplier(NULL),
00062 _pullSupplier(NULL)
00063 {
00064 if(_channel.properties().hasAttr("FilterId"))
00065 {
00066 string rid =_channel.properties().attrString("FilterId");
00067 _queue.setFilter(new FilterByRepositoryId(rid.c_str()));
00068 }
00069 else if(_channel.properties().hasAttr("FilterKind"))
00070 {
00071 CORBA::TCKind kind =
00072 CORBA::TCKind(_channel.properties().attrLong("FilterKind"));
00073 _queue.setFilter(new FilterByTCKind(kind));
00074 }
00075
00076 activateObjectWithId("ConsumerAdmin");
00077 }
00078
00079
00080 ConsumerAdmin_i::~ConsumerAdmin_i()
00081 {
00082 DB(20,"~ConsumerAdmin_i()")
00083 if(_pushSupplier)
00084 {
00085 _pushSupplier->_remove_ref();
00086 _pushSupplier=NULL;
00087 }
00088 if(_pullSupplier)
00089 {
00090 _pullSupplier->_remove_ref();
00091 _pullSupplier=NULL;
00092 }
00093 }
00094
00095
00096 OMNIEVENTS__DEBUG_REF_COUNTS__DEFN(ConsumerAdmin_i)
00097
00098
00099 void ConsumerAdmin_i::send(CORBA::Any* event)
00100 {
00101 ProxyPushSupplierManager::PauseThenWake p(_pushSupplier);
00102 _queue.append(event);
00103 }
00104
00105
00106 void ConsumerAdmin_i::send(list<CORBA::Any*>& events)
00107 {
00108 if(!events.empty())
00109 {
00110 ProxyPushSupplierManager::PauseThenWake p(_pushSupplier);
00111 for(list<CORBA::Any*>::iterator i=events.begin(); i!=events.end(); ++i)
00112 _queue.append( *i );
00113 events.clear();
00114 }
00115 }
00116
00117
00118 void ConsumerAdmin_i::disconnect()
00119 {
00120 if(_pushSupplier)
00121 _pushSupplier->disconnect();
00122 if(_pullSupplier)
00123 _pullSupplier->disconnect();
00124 }
00125
00126
00127 void ConsumerAdmin_i::reincarnate(const PersistNode& node)
00128 {
00129
00130 PersistNode* pushsNode =node.child("ProxyPushSupplier");
00131 if(pushsNode && !pushsNode->_child.empty())
00132 {
00133 _pushSupplier=new ProxyPushSupplierManager(_poa,_queue);
00134 _pushSupplier->reincarnate(*pushsNode);
00135 }
00136
00137
00138 PersistNode* pullsNode =node.child("ProxyPullSupplier");
00139 if(pullsNode && !pullsNode->_child.empty())
00140 {
00141 _pullSupplier=new ProxyPullSupplierManager(_channel,_poa,_queue);
00142 _pullSupplier->reincarnate(*pullsNode);
00143 }
00144 }
00145
00146
00147 void ConsumerAdmin_i::output(ostream& os)
00148 {
00149 if(_pushSupplier)
00150 {
00151 omni_mutex_lock l(_pushSupplier->_lock);
00152 _pushSupplier->output(os);
00153 }
00154 if(_pullSupplier)
00155 {
00156 _pullSupplier->output(os);
00157 }
00158 }
00159
00160
00161 };