00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #ifndef OMNIEVENTS__PROXYPUSHSUPPLIER_H
00025 #define OMNIEVENTS__PROXYPUSHSUPPLIER_H
00026
00027 #ifdef HAVE_CONFIG_H
00028 # include "config.h"
00029 #endif
00030
00031 #ifdef HAVE_IOSTREAM
00032 # include <iostream>
00033 #else
00034 # include <iostream.h>
00035 #endif
00036
00037 #include "Callback.h"
00038 #include "EventQueue.h"
00039 #include "ProxyManager.h"
00040
00041 #include "CosEventChannelAdmin.hh"
00042
00043 #ifdef HAVE_STD_IOSTREAM
00044 using namespace std;
00045 #endif
00046
00047 namespace OmniEvents {
00048
00049 class ProxyPushSupplierManager
00050 : public ProxyManager,
00051 public omni_thread
00052 {
00053 public:
00054 PortableServer::Servant incarnate(
00055 const PortableServer::ObjectId& oid,
00056 PortableServer::POA_ptr poa
00057 );
00059 void etherealize(
00060 const PortableServer::ObjectId& oid,
00061 PortableServer::POA_ptr adapter,
00062 PortableServer::Servant serv,
00063 CORBA::Boolean cleanup_in_progress,
00064 CORBA::Boolean remaining_activations
00065 );
00066 public:
00067 ProxyPushSupplierManager(PortableServer::POA_ptr parentPoa,EventQueue& q);
00068 ~ProxyPushSupplierManager();
00069 CosEventChannelAdmin::ProxyPushSupplier_ptr createObject();
00070
00072 void disconnect();
00073
00074 void* run_undetached(void*);
00075 void _add_ref();
00076 void _remove_ref();
00077
00078 omni_mutex _lock;
00079 omni_condition _condition;
00080
00085 class PauseThenWake
00086 {
00087 ProxyPushSupplierManager* _p;
00088 PauseThenWake(const PauseThenWake&);
00089 PauseThenWake();
00090 public:
00091 inline PauseThenWake(ProxyPushSupplierManager* p);
00092 inline ~PauseThenWake();
00093 };
00094
00095 private:
00096 EventQueue& _queue;
00097 int _refCount;
00098 };
00099
00100
00101 class ProxyPushSupplier_i
00102 : public virtual POA_CosEventChannelAdmin::ProxyPushSupplier,
00103 public Proxy,
00104 public EventQueue::Reader,
00105 public Callback
00106 {
00107 public:
00108 void connect_push_consumer(CosEventComm::PushConsumer_ptr pushConsumer);
00109 void disconnect_push_supplier();
00110 public:
00111 ProxyPushSupplier_i(PortableServer::POA_ptr poa, EventQueue& q);
00112 ~ProxyPushSupplier_i();
00113 OMNIEVENTS__DEBUG_REF_COUNTS__DECL
00114
00118 inline void trigger(bool& busy, bool& waiting);
00119
00121 void callback(CORBA::Request_ptr req);
00122 void reincarnate(const string& oid, const PersistNode& node);
00123 void output(ostream &os);
00124 private:
00125 CosEventComm::PushConsumer_var _target;
00126 bool _targetIsProxy;
00127 };
00128
00129
00130
00131
00132
00133
00134 inline ProxyPushSupplierManager::PauseThenWake::PauseThenWake(
00135 ProxyPushSupplierManager* p
00136 ):_p(p)
00137 {
00138 if(_p)
00139 _p->_lock.lock();
00140 }
00141
00142 inline ProxyPushSupplierManager::PauseThenWake::~PauseThenWake()
00143 {
00144 if(_p)
00145 {
00146 _p->_lock.unlock();
00147 _p->_condition.signal();
00148 }
00149 }
00150
00151
00152 };
00153
00154 #endif // OMNIEVENTS__PROXYPUSHSUPPLIER_H