00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070
00071
00072
00073
00074
00075
00076
00077
00078
00079
00080
00081
00082
00083
00084
00085
00086
00087
00088
00089
00090
00091
00092
00093
00094
00095
00096
00097
00098
00099
00100
00101
00102
00103
00104
00105
00106
00107
00108
00109
00110
00111
00112
00113
00114
00115
00116
00117
00118
00119
00120
00121
00122
00123
00124
00125
00126
00127
00128
00129
00130
00131
00132 #ifdef HAVE_CONFIG_H
00133 # include "config.h"
00134 #endif
00135
00136 #ifdef HAVE_GETOPT
00137 # include <unistd.h>
00138 extern char* optarg;
00139 extern int optind;
00140 #else
00141 # include "getopt.h"
00142 #endif
00143
00144 #ifdef HAVE_IOSTREAM
00145 # include <iostream>
00146 #else
00147 # include <iostream.h>
00148 #endif
00149
00150 #ifdef HAVE_STD_IOSTREAM
00151 using namespace std;
00152 #endif
00153
00154 #ifdef HAVE_STDLIB_H
00155 # include <stdlib.h>
00156 #endif
00157
00158 #ifdef HAVE_SIGNAL_H
00159 # include <signal.h>
00160 #endif
00161
00162 #include "CosEventComm.hh"
00163 #include "CosEventChannelAdmin.hh"
00164 #include "naming.h"
00165
00166 static omni_mutex mutex;
00167 static omni_condition connect_cond(&mutex);
00168 static void usage(int argc, char **argv);
00169
00170 class Consumer_i : virtual public POA_CosEventComm::PushConsumer {
00171 public:
00172 Consumer_i(long disconnect=0): _disconnect(disconnect) {}
00173
00174 void push(const CORBA::Any& data);
00175 void disconnect_push_consumer ();
00176
00177 private:
00178 long _disconnect;
00179 };
00180
00181 void Consumer_i::push(const CORBA::Any& data) {
00182 CORBA::ULong l;
00183 static int i = 0;
00184
00185 i++;
00186 if( data>>=l )
00187 {
00188 cout<<"Push Consumer: push() called. Data : "<< l <<endl;
00189
00190
00191 if (i == _disconnect)
00192 {
00193 i = 0;
00194
00195
00196
00197
00198
00199 omni_mutex_lock condition_lock(mutex);
00200 connect_cond.signal();
00201 }
00202 }
00203 else
00204 {
00205 cerr<<"Push Consumer: push() called. UNEXPECTED TYPE"<<endl;
00206 }
00207 }
00208
00209 void Consumer_i::disconnect_push_consumer () {
00210 cout << "Push Consumer: disconnected." << endl;
00211 }
00212
00213 int
00214 main(int argc, char **argv)
00215 {
00216
00217
00218 CORBA::ORB_ptr orb = CORBA::ORB_init(argc,argv);
00219
00220
00221 int discnum =0;
00222 int sleepInterval =0;
00223 const char* channelName ="EventChannel";
00224
00225 int c;
00226 while ((c = getopt(argc,argv,"hd:s:n:")) != EOF)
00227 {
00228 switch (c)
00229 {
00230 case 'd': discnum = atoi(optarg);
00231 break;
00232
00233 case 's': sleepInterval = atoi(optarg);
00234 break;
00235
00236 case 'n': channelName = optarg;
00237 break;
00238
00239 case 'h': usage(argc,argv);
00240 exit(0);
00241 default : usage(argc,argv);
00242 exit(-1);
00243 }
00244 }
00245
00246 #if defined(HAVE_SIGNAL_H) && defined(SIGPIPE)
00247
00248 signal(SIGPIPE, SIG_IGN);
00249 #endif
00250
00251 Consumer_i* consumer = new Consumer_i (discnum);
00252 CosEventChannelAdmin::EventChannel_var channel;
00253
00254 const char* action="";
00255 try {
00256 CORBA::Object_var obj;
00257
00258 action="resolve initial reference 'RootPOA'";
00259 obj=orb->resolve_initial_references("RootPOA");
00260 PortableServer::POA_var rootPoa =PortableServer::POA::_narrow(obj);
00261 if(CORBA::is_nil(rootPoa))
00262 throw CORBA::OBJECT_NOT_EXIST();
00263
00264 action="activate the RootPOA's POAManager";
00265 PortableServer::POAManager_var pman =rootPoa->the_POAManager();
00266 pman->activate();
00267
00268
00269
00270
00271 if(optind<argc)
00272 {
00273 action="convert URI from command line into object reference";
00274 obj=orb->string_to_object(argv[optind]);
00275 }
00276 else
00277 {
00278 action="resolve initial reference 'NameService'";
00279 obj=orb->resolve_initial_references("NameService");
00280 CosNaming::NamingContext_var rootContext=
00281 CosNaming::NamingContext::_narrow(obj);
00282 if(CORBA::is_nil(rootContext))
00283 throw CORBA::OBJECT_NOT_EXIST();
00284
00285 action="find EventChannel in NameService";
00286 cout << action << endl;
00287 obj=rootContext->resolve(str2name(channelName));
00288 }
00289
00290 action="narrow object reference to event channel";
00291 channel=CosEventChannelAdmin::EventChannel::_narrow(obj);
00292 if(CORBA::is_nil(channel))
00293 {
00294 cerr << "Failed to narrow Event Channel reference." << endl;
00295 exit(1);
00296 }
00297
00298 }
00299 catch(CORBA::ORB::InvalidName& ex) {
00300 cerr<<"Failed to "<<action<<". ORB::InvalidName"<<endl;
00301 exit(1);
00302 }
00303 catch(CosNaming::NamingContext::InvalidName& ex) {
00304 cerr<<"Failed to "<<action<<". NamingContext::InvalidName"<<endl;
00305 exit(1);
00306 }
00307 catch(CosNaming::NamingContext::NotFound& ex) {
00308 cerr<<"Failed to "<<action<<". NamingContext::NotFound"<<endl;
00309 exit(1);
00310 }
00311 catch(CosNaming::NamingContext::CannotProceed& ex) {
00312 cerr<<"Failed to "<<action<<". NamingContext::CannotProceed"<<endl;
00313 exit(1);
00314 }
00315 catch(CORBA::TRANSIENT& ex) {
00316 cerr<<"Failed to "<<action<<". TRANSIENT"<<endl;
00317 exit(1);
00318 }
00319 catch(CORBA::OBJECT_NOT_EXIST& ex) {
00320 cerr<<"Failed to "<<action<<". OBJECT_NOT_EXIST"<<endl;
00321 exit(1);
00322 }
00323 catch(CORBA::SystemException& ex) {
00324 cerr<<"Failed to "<<action<<".";
00325 #if defined(HAVE_OMNIORB4)
00326 cerr<<" "<<ex._name();
00327 if(ex.NP_minorString())
00328 cerr<<" ("<<ex.NP_minorString()<<")";
00329 #endif
00330 cerr<<endl;
00331 exit(1);
00332 }
00333 catch(CORBA::Exception& ex) {
00334 cerr<<"Failed to "<<action<<"."
00335 #if defined(HAVE_OMNIORB4)
00336 " "<<ex._name()
00337 #endif
00338 <<endl;
00339 exit(1);
00340 }
00341
00342
00343
00344 CosEventChannelAdmin::ConsumerAdmin_var consumer_admin;
00345 while (1)
00346 {
00347 try {
00348 consumer_admin = channel->for_consumers ();
00349 if (CORBA::is_nil (consumer_admin))
00350 {
00351 cerr << "Event Channel returned nil Consumer Admin!" << endl;
00352 exit(1);
00353 }
00354 break;
00355 }
00356 catch (CORBA::COMM_FAILURE& ex) {
00357 cerr << "Caught COMM_FAILURE exception "
00358 << "obtaining Consumer Admin! Retrying..."
00359 << endl;
00360 continue;
00361 }
00362 }
00363 cout << "Obtained ConsumerAdmin." << endl;
00364
00365 omni_mutex_lock condition_lock(mutex);
00366 while (1) {
00367
00368
00369 CosEventChannelAdmin::ProxyPushSupplier_var proxy_supplier;
00370 while (1)
00371 {
00372 try {
00373 proxy_supplier = consumer_admin->obtain_push_supplier ();
00374 if (CORBA::is_nil (proxy_supplier))
00375 {
00376 cerr << "Consumer Admin returned nil proxy_supplier!"
00377 << endl;
00378 exit (1);
00379 }
00380 break;
00381 }
00382 catch (CORBA::COMM_FAILURE& ex) {
00383 cerr << "Caught COMM_FAILURE Exception "
00384 << "obtaining Push Supplier! Retrying..."
00385 << endl;
00386 continue;
00387 }
00388 }
00389 cout << "Obtained ProxyPushSupplier." << endl;
00390
00391
00392
00393 while (1)
00394 {
00395 try {
00396 proxy_supplier->connect_push_consumer(consumer->_this());
00397 break;
00398 }
00399 catch (CORBA::BAD_PARAM& ex) {
00400 cerr << "Caught BAD_PARAM Exception connecting Push Consumer!"
00401 << endl;
00402 exit (1);
00403 }
00404 catch (CosEventChannelAdmin::AlreadyConnected& ex) {
00405 cerr << "Proxy Push Supplier already connected!"
00406 << endl;
00407 break;
00408 }
00409 catch (CORBA::COMM_FAILURE& ex) {
00410 cerr << "Caught COMM_FAILURE exception "
00411 << "connecting Push Consumer! Retrying..."
00412 << endl;
00413 continue;
00414 }
00415 }
00416 cout << "Connected Push Consumer." << endl;
00417
00418
00419 connect_cond.wait();
00420
00421
00422 while (1)
00423 {
00424 try {
00425 proxy_supplier->disconnect_push_supplier();
00426 break;
00427 }
00428 catch (CORBA::COMM_FAILURE& ex) {
00429 cerr << "Caught COMM_FAILURE Exception "
00430 << "disconnecting Push Consumer! Retrying..."
00431 << endl;
00432 continue;
00433 }
00434 }
00435 cout << "Disconnected Push Consumer." << endl;
00436
00437
00438 cout << "Sleeping " << sleepInterval << " seconds." << endl;
00439 omni_thread::sleep(sleepInterval);
00440 }
00441
00442
00443 return 0;
00444 }
00445
00446 static void
00447 usage(int argc, char **argv)
00448 {
00449 cerr<<
00450 "\nCreate a PushConsumer to receive events from a channel.\n"
00451 "syntax: "<<(argc?argv[0]:"pushcons")<<" OPTIONS [CHANNEL_URI]\n"
00452 "\n"
00453 "CHANNEL_URI: The event channel may be specified as a URI.\n"
00454 " This may be an IOR, or a corbaloc::: or corbaname::: URI.\n"
00455 "\n"
00456 "OPTIONS: DEFAULT:\n"
00457 " -d NUM disconnect after receiving NUM events [0 - never disconnect]\n"
00458 " -s SECS sleep SECS seconds after disconnecting [0]\n"
00459 " -n NAME channel name (if URI is not specified) [\"EventChannel\"]\n"
00460 " -h display this help text\n" << endl;
00461 }