00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070
00071
00072
00073
00074
00075
00076
00077
00078
00079
00080
00081
00082
00083
00084
00085
00086
00087
00088
00089
00090
00091
00092
00093
00094
00095
00096
00097
00098
00099
00100
00101
00102
00103
00104
00105
00106
00107
00108
00109
00110
00111
00112
00113
00114
00115
00116
00117
00118
00119
00120
00121 #ifdef HAVE_CONFIG_H
00122 # include "config.h"
00123 #endif
00124
00125 #ifdef HAVE_GETOPT
00126 # include <unistd.h>
00127 extern char* optarg;
00128 extern int optind;
00129 #else
00130 # include "getopt.h"
00131 #endif
00132
00133 #ifdef HAVE_IOSTREAM
00134 # include <iostream>
00135 #else
00136 # include <iostream.h>
00137 #endif
00138
00139 #ifdef HAVE_STD_IOSTREAM
00140 using namespace std;
00141 #endif
00142
00143 #ifdef HAVE_STDLIB_H
00144 # include <stdlib.h>
00145 #endif
00146
00147 #ifdef HAVE_SIGNAL_H
00148 # include <signal.h>
00149 #endif
00150
00151 #include "CosEventComm.hh"
00152 #include "CosEventChannelAdmin.hh"
00153 #include "naming.h"
00154
00155 static void usage(int argc, char **argv);
00156
00157 class Consumer_i : virtual public POA_CosEventComm::PullConsumer {
00158 public:
00159 Consumer_i () {};
00160 void disconnect_pull_consumer ();
00161 };
00162
00163 void Consumer_i::disconnect_pull_consumer () {
00164 cout << "Pull Consumer: disconnected." << endl;
00165 }
00166
00167 int
00168 main(int argc, char **argv)
00169 {
00170
00171
00172 CORBA::ORB_ptr orb = CORBA::ORB_init(argc,argv);
00173
00174
00175 bool trymode =false;
00176 int discnum =0;
00177 bool refnil =false;
00178 int sleepInterval =0;
00179 const char* channelName ="EventChannel";
00180
00181 int c;
00182 while ((c = getopt(argc,argv,"td:rs:n:h")) != EOF)
00183 {
00184 switch (c)
00185 {
00186 case 't': trymode = true;
00187 break;
00188
00189 case 'd': discnum = atoi(optarg);
00190 break;
00191
00192 case 'r': refnil = true;
00193 break;
00194
00195 case 's': sleepInterval = atoi(optarg);
00196 break;
00197
00198 case 'n': channelName = optarg;
00199 break;
00200
00201 case 'h':
00202 default : usage(argc,argv);
00203 exit(-1);
00204 break;
00205 }
00206 }
00207
00208 #if defined(HAVE_SIGNAL_H) && defined(SIGPIPE)
00209
00210 signal(SIGPIPE, SIG_IGN);
00211 #endif
00212
00213 Consumer_i* consumer =NULL;
00214 CosEventChannelAdmin::EventChannel_var channel;
00215
00216 const char* action="";
00217 try {
00218 CORBA::Object_var obj;
00219
00220
00221
00222
00223
00224 if (! refnil)
00225 {
00226 consumer=new Consumer_i();
00227
00228 action="resolve initial reference 'RootPOA'";
00229 obj=orb->resolve_initial_references("RootPOA");
00230 PortableServer::POA_var rootPoa =PortableServer::POA::_narrow(obj);
00231 if(CORBA::is_nil(rootPoa))
00232 throw CORBA::OBJECT_NOT_EXIST();
00233
00234 action="activate the RootPOA's POAManager";
00235 PortableServer::POAManager_var pman =rootPoa->the_POAManager();
00236 pman->activate();
00237 }
00238
00239
00240
00241
00242 if(optind<argc)
00243 {
00244 action="convert URI from command line into object reference";
00245 obj=orb->string_to_object(argv[optind]);
00246 }
00247 else
00248 {
00249 action="resolve initial reference 'NameService'";
00250 obj=orb->resolve_initial_references("NameService");
00251 CosNaming::NamingContext_var rootContext=
00252 CosNaming::NamingContext::_narrow(obj);
00253 if(CORBA::is_nil(rootContext))
00254 throw CORBA::OBJECT_NOT_EXIST();
00255
00256 action="find EventChannel in NameService";
00257 cout << action << endl;
00258 obj=rootContext->resolve(str2name(channelName));
00259 }
00260
00261 action="narrow object reference to event channel";
00262 channel=CosEventChannelAdmin::EventChannel::_narrow(obj);
00263 if(CORBA::is_nil(channel))
00264 {
00265 cerr << "Failed to narrow Event Channel reference." << endl;
00266 exit(1);
00267 }
00268
00269 }
00270 catch(CORBA::ORB::InvalidName& ex) {
00271 cerr<<"Failed to "<<action<<". ORB::InvalidName"<<endl;
00272 exit(1);
00273 }
00274 catch(CosNaming::NamingContext::InvalidName& ex) {
00275 cerr<<"Failed to "<<action<<". NamingContext::InvalidName"<<endl;
00276 exit(1);
00277 }
00278 catch(CosNaming::NamingContext::NotFound& ex) {
00279 cerr<<"Failed to "<<action<<". NamingContext::NotFound"<<endl;
00280 exit(1);
00281 }
00282 catch(CosNaming::NamingContext::CannotProceed& ex) {
00283 cerr<<"Failed to "<<action<<". NamingContext::CannotProceed"<<endl;
00284 exit(1);
00285 }
00286 catch(CORBA::TRANSIENT& ex) {
00287 cerr<<"Failed to "<<action<<". TRANSIENT"<<endl;
00288 exit(1);
00289 }
00290 catch(CORBA::OBJECT_NOT_EXIST& ex) {
00291 cerr<<"Failed to "<<action<<". OBJECT_NOT_EXIST"<<endl;
00292 exit(1);
00293 }
00294 catch(CORBA::SystemException& ex) {
00295 cerr<<"Failed to "<<action<<".";
00296 #if defined(HAVE_OMNIORB4)
00297 cerr<<" "<<ex._name();
00298 if(ex.NP_minorString())
00299 cerr<<" ("<<ex.NP_minorString()<<")";
00300 #endif
00301 cerr<<endl;
00302 exit(1);
00303 }
00304 catch(CORBA::Exception& ex) {
00305 cerr<<"Failed to "<<action<<"."
00306 #if defined(HAVE_OMNIORB4)
00307 " "<<ex._name()
00308 #endif
00309 <<endl;
00310 exit(1);
00311 }
00312
00313
00314
00315 CosEventChannelAdmin::ConsumerAdmin_var consumer_admin;
00316 while (1)
00317 {
00318 try {
00319 consumer_admin = channel->for_consumers ();
00320 if (CORBA::is_nil (consumer_admin))
00321 {
00322 cerr << "Event Channel returned nil Consumer Admin!" << endl;
00323 exit (1);
00324 }
00325 break;
00326 }
00327 catch (CORBA::COMM_FAILURE& ex) {
00328 cerr << "Caught COMM_FAILURE exception "
00329 << "obtaining Consumer Admin! Retrying..."
00330 << endl;
00331 continue;
00332 }
00333 }
00334 cout << "Obtained Consumer Admin." << endl;
00335
00336 while (1)
00337 {
00338
00339
00340 CosEventChannelAdmin::ProxyPullSupplier_var proxy_supplier;
00341 while (1)
00342 {
00343 try {
00344 proxy_supplier = consumer_admin->obtain_pull_supplier ();
00345 if (CORBA::is_nil (proxy_supplier))
00346 {
00347 cerr << "Consumer Admin returned nil Proxy Supplier!" << endl;
00348 exit (1);
00349 }
00350 break;
00351 }
00352 catch (CORBA::COMM_FAILURE& ex) {
00353 cerr << "Caught COMM_FAILURE Exception "
00354 << "obtaining Pull Supplier! Retrying..."
00355 << endl;
00356 continue;
00357 }
00358 }
00359 cout << "Obtained ProxyPullSupplier." << endl;
00360
00361
00362
00363 CosEventComm::PullConsumer_ptr cptr =CosEventComm::PullConsumer::_nil();
00364 if (! refnil) {
00365 cptr=consumer->_this();
00366 }
00367
00368 while (1)
00369 {
00370 try {
00371 proxy_supplier->connect_pull_consumer(cptr);
00372 break;
00373 }
00374 catch (CORBA::BAD_PARAM& ex) {
00375 cerr << "Caught BAD_PARAM exception connecting Pull Consumer!"<<endl;
00376 exit (1);
00377 }
00378 catch (CosEventChannelAdmin::AlreadyConnected& ex) {
00379 cerr << "Proxy Pull Supplier already connected!"
00380 << endl;
00381 break;
00382 }
00383 catch (CORBA::COMM_FAILURE& ex) {
00384 cerr << "Caught COMM_FAILURE Exception "
00385 << "connecting Pull Consumer! Retrying..."
00386 << endl;
00387 continue;
00388 }
00389 }
00390 cout << "Connected Pull Consumer." << endl;
00391
00392
00393 CORBA::Any *data;
00394 CORBA::ULong l = 0;
00395 for (int i=0; (discnum == 0) || (i < discnum); i++)
00396 {
00397 if(trymode)
00398 {
00399 try {
00400 CORBA::Boolean has_event;
00401 data = proxy_supplier->try_pull(has_event);
00402 cout << "Consumer: try_pull() called. Data : " << flush;
00403 if (has_event)
00404 {
00405 l = 0;
00406 *data >>= l;
00407 cout << l << endl;
00408 delete data;
00409 }
00410 else
00411 {
00412 cout << "None" << endl;
00413 }
00414 }
00415 catch (CosEventComm::Disconnected& ex) {
00416 cout << endl;
00417 cerr << "Failed. Caught Disconnected Exception !" << endl;
00418 }
00419 catch (CORBA::COMM_FAILURE& ex) {
00420 cout << endl;
00421 cerr << "Failed. Caught COMM_FAILURE Exception !" << endl;
00422 }
00423 catch (CORBA::Exception& ex) {
00424 cout << endl;
00425 cerr<<"CORBA exception, unable to try_pull()"
00426 #ifdef HAVE_OMNIORB4
00427 <<": "<<ex._name()
00428 #endif
00429 << endl;
00430 }
00431 }
00432 else
00433 {
00434 try {
00435 cout << "Pull Consumer: pull() called. ";
00436 cout.flush();
00437 data = proxy_supplier->pull();
00438 l = 0;
00439 *data >>= l;
00440 cout << "Data : " << l << endl;
00441 delete data;
00442 }
00443 catch(CORBA::TRANSIENT&) {
00444 cout << "caught TRANSIENT." << endl;
00445 omni_thread::sleep(1);
00446 }
00447 catch (CosEventComm::Disconnected& ex) {
00448 cout << endl;
00449 cerr << "Failed. Caught Disconnected exception!" << endl;
00450 exit(1);
00451 }
00452 catch (CORBA::COMM_FAILURE& ex) {
00453 cout << endl;
00454 cerr << "Failed. Caught COMM_FAILURE exception!" << endl;
00455 exit(1);
00456 }
00457 catch (CORBA::SystemException& ex) {
00458 cout << endl;
00459 cerr<<"System exception, unable to pull()";
00460 #ifdef HAVE_OMNIORB4
00461 cerr<<": "<<ex._name();
00462 if(ex.NP_minorString())
00463 cerr<<" ("<<ex.NP_minorString()<<")";
00464 #endif
00465 cerr<< endl;
00466 exit(1);
00467 }
00468 catch (CORBA::Exception& ex) {
00469 cout << endl;
00470 cerr<<"CORBA exception, unable to pull()"
00471 #ifdef HAVE_OMNIORB4
00472 <<": "<<ex._name()
00473 #endif
00474 << endl;
00475 exit(1);
00476 }
00477 }
00478 }
00479
00480
00481 while (1)
00482 {
00483 try {
00484 proxy_supplier->disconnect_pull_supplier();
00485 break;
00486 }
00487 catch (CORBA::COMM_FAILURE& ex) {
00488 cerr << "Caught COMM_FAILURE exception "
00489 << "disconnecting Pull Consumer! Retrying..."
00490 << endl;
00491 continue;
00492 }
00493 }
00494 cout << "Disconnected Pull Consumer." << endl;
00495
00496
00497 cout << "Sleeping " << sleepInterval << " seconds." << endl;
00498 omni_thread::sleep(sleepInterval);
00499 }
00500
00501
00502 return 0;
00503 }
00504
00505 static void
00506 usage(int argc, char **argv)
00507 {
00508 cerr<<
00509 "\nCreate a PullConsumer to receive events from a channel.\n"
00510 "syntax: "<<(argc?argv[0]:"pullcons")<<" OPTIONS [CHANNEL_URI]\n"
00511 "\n"
00512 "CHANNEL_URI: The event channel may be specified as a URI.\n"
00513 " This may be an IOR, or a corbaloc::: or corbaname::: URI.\n"
00514 "\n"
00515 "OPTIONS: DEFAULT:\n"
00516 " -t enable try_pull mode\n"
00517 " -r connect using a nil reference\n"
00518 " -d NUM disconnect after receiving NUM events [0 - never disconnect]\n"
00519 " -s SECS sleep SECS seconds after disconnecting [0]\n"
00520 " -n NAME channel name (if URI is not specified) [\"EventChannel\"]\n"
00521 " -h display this help text\n" << endl;
00522 }