00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070
00071
00072
00073
00074
00075
00076
00077
00078
00079
00080
00081
00082
00083
00084
00085
00086
00087
00088
00089
00090
00091
00092
00093
00094
00095
00096
00097
00098
00099
00100
00101
00102
00103
00104
00105
00106
00107
00108
00109
00110
00111
00112
00113
00114
00115
00116
00117 #ifdef HAVE_CONFIG_H
00118 # include "config.h"
00119 #endif
00120
00121 #ifdef HAVE_GETOPT
00122 # include <unistd.h>
00123 extern char* optarg;
00124 extern int optind;
00125 #else
00126 # include "getopt.h"
00127 #endif
00128
00129 #ifdef HAVE_IOSTREAM
00130 # include <iostream>
00131 #else
00132 # include <iostream.h>
00133 #endif
00134
00135 #ifdef HAVE_STD_IOSTREAM
00136 using namespace std;
00137 #endif
00138
00139 #ifdef HAVE_STDLIB_H
00140 # include <stdlib.h>
00141 #endif
00142
00143 #ifdef HAVE_SIGNAL_H
00144 # include <signal.h>
00145 #endif
00146
00147 #include "CosEventComm.hh"
00148 #include "CosEventChannelAdmin.hh"
00149 #include "naming.h"
00150
00151 static omni_semaphore connect_cond(0);
00152 static void usage(int argc, char **argv);
00153
00154 class Supplier_i : virtual public POA_CosEventComm::PullSupplier {
00155 public:
00156 Supplier_i (long disconnect = 0) : i(0), _disconnect(disconnect), l(0) {};
00157 CORBA::Any *pull();
00158 CORBA::Any *try_pull(CORBA::Boolean &has_event);
00159 void disconnect_pull_supplier ();
00160
00161 private:
00162 long i;
00163 long _disconnect;
00164 CORBA::ULong l;
00165 };
00166
00167 void
00168 Supplier_i::disconnect_pull_supplier () {
00169 cout << "Pull Supplier: disconnected by channel." << endl;
00170 }
00171
00172 CORBA::Any *
00173 Supplier_i::pull() {
00174 cout << "Pull Supplier: pull() called. Data : ";
00175 CORBA::Any *any = new CORBA::Any();
00176 *any <<= l++;
00177 cout << l-1 << endl;
00178
00179
00180 if ((_disconnect > 0) && (i == _disconnect)) {
00181 i = 0;
00182
00183 connect_cond.post();
00184 }
00185 i++;
00186 return (any);
00187 }
00188
00189 CORBA::Any *
00190 Supplier_i::try_pull(CORBA::Boolean &has_event)
00191 {
00192 cout << "Pull Supplier: try_pull() called. Data : ";
00193 CORBA::Any *any = new CORBA::Any();
00194 *any <<= l++;
00195 cout << l-1 << endl;
00196 has_event = 1;
00197
00198
00199 if ((_disconnect > 0) && (i == _disconnect)) {
00200 i = 0;
00201
00202 connect_cond.post();
00203 }
00204 i++;
00205 return (any);
00206 }
00207
00208
00209 int
00210 main (int argc, char** argv)
00211 {
00212 #if defined(HAVE_OMNIORB4)
00213 CORBA::ORB_var orb =CORBA::ORB_init(argc,argv,"omniORB4");
00214 #else
00215 CORBA::ORB_var orb =CORBA::ORB_init(argc,argv,"omniORB3");
00216 #endif
00217
00218
00219 int discnum =0;
00220 int sleepInterval =0;
00221 const char* channelName ="EventChannel";
00222
00223 int c;
00224 while ((c = getopt(argc,argv,"d:s:n:h")) != EOF)
00225 {
00226 switch (c)
00227 {
00228 case 'd': discnum = atoi(optarg);
00229 break;
00230
00231 case 's': sleepInterval = atoi(optarg);
00232 break;
00233
00234 case 'n': channelName = optarg;
00235 break;
00236
00237 case 'h':
00238 default : usage(argc,argv);
00239 exit(-1);
00240 break;
00241 }
00242 }
00243
00244 #if defined(HAVE_SIGNAL_H) && defined(SIGPIPE)
00245
00246 signal(SIGPIPE, SIG_IGN);
00247 #endif
00248
00249 Supplier_i* supplier = new Supplier_i (discnum);
00250 CosEventChannelAdmin::EventChannel_var channel;
00251
00252 const char* action="";
00253 try {
00254 CORBA::Object_var obj;
00255
00256 action="resolve initial reference 'RootPOA'";
00257 obj=orb->resolve_initial_references("RootPOA");
00258 PortableServer::POA_var rootPoa =PortableServer::POA::_narrow(obj);
00259 if(CORBA::is_nil(rootPoa))
00260 throw CORBA::OBJECT_NOT_EXIST();
00261
00262 action="activate the RootPOA's POAManager";
00263 PortableServer::POAManager_var pman =rootPoa->the_POAManager();
00264 pman->activate();
00265
00266
00267
00268
00269 if(optind<argc)
00270 {
00271 action="convert URI from command line into object reference";
00272 obj=orb->string_to_object(argv[optind]);
00273 }
00274 else
00275 {
00276 action="resolve initial reference 'NameService'";
00277 obj=orb->resolve_initial_references("NameService");
00278 CosNaming::NamingContext_var rootContext=
00279 CosNaming::NamingContext::_narrow(obj);
00280 if(CORBA::is_nil(rootContext))
00281 throw CORBA::OBJECT_NOT_EXIST();
00282
00283 action="find EventChannel in NameService";
00284 cout << action << endl;
00285 obj=rootContext->resolve(str2name(channelName));
00286 }
00287
00288 action="narrow object reference to event channel";
00289 channel=CosEventChannelAdmin::EventChannel::_narrow(obj);
00290 if(CORBA::is_nil(channel))
00291 {
00292 cerr << "Failed to narrow Event Channel reference." << endl;
00293 exit(1);
00294 }
00295
00296 }
00297 catch(CORBA::ORB::InvalidName& ex) {
00298 cerr<<"Failed to "<<action<<". ORB::InvalidName"<<endl;
00299 exit(1);
00300 }
00301 catch(CosNaming::NamingContext::InvalidName& ex) {
00302 cerr<<"Failed to "<<action<<". NamingContext::InvalidName"<<endl;
00303 exit(1);
00304 }
00305 catch(CosNaming::NamingContext::NotFound& ex) {
00306 cerr<<"Failed to "<<action<<". NamingContext::NotFound"<<endl;
00307 exit(1);
00308 }
00309 catch(CosNaming::NamingContext::CannotProceed& ex) {
00310 cerr<<"Failed to "<<action<<". NamingContext::CannotProceed"<<endl;
00311 exit(1);
00312 }
00313 catch(CORBA::TRANSIENT& ex) {
00314 cerr<<"Failed to "<<action<<". TRANSIENT"<<endl;
00315 exit(1);
00316 }
00317 catch(CORBA::OBJECT_NOT_EXIST& ex) {
00318 cerr<<"Failed to "<<action<<". OBJECT_NOT_EXIST"<<endl;
00319 exit(1);
00320 }
00321 catch(CORBA::SystemException& ex) {
00322 cerr<<"Failed to "<<action<<".";
00323 #if defined(HAVE_OMNIORB4)
00324 cerr<<" "<<ex._name();
00325 if(ex.NP_minorString())
00326 cerr<<" ("<<ex.NP_minorString()<<")";
00327 #endif
00328 cerr<<endl;
00329 exit(1);
00330 }
00331 catch(CORBA::Exception& ex) {
00332 cerr<<"Failed to "<<action<<"."
00333 #if defined(HAVE_OMNIORB4)
00334 " "<<ex._name()
00335 #endif
00336 <<endl;
00337 exit(1);
00338 }
00339
00340
00341
00342 CosEventChannelAdmin::SupplierAdmin_var supplier_admin;
00343 while (1)
00344 {
00345 try {
00346 supplier_admin = channel->for_suppliers ();
00347 if (CORBA::is_nil(supplier_admin))
00348 {
00349 cerr << "Event Channel returned nil Supplier Admin!"
00350 << endl;
00351 exit(1);
00352 }
00353 break;
00354 }
00355 catch (CORBA::COMM_FAILURE& ex) {
00356 cerr << "Caught COMM_FAILURE exception "
00357 << "obtaining Supplier Admin! Retrying..."
00358 << endl;
00359 continue;
00360 }
00361 }
00362 cout << "Obtained SupplierAdmin." << endl;
00363
00364 while (1)
00365 {
00366
00367
00368 CosEventChannelAdmin::ProxyPullConsumer_var proxy_consumer;
00369 while (1)
00370 {
00371 try {
00372 proxy_consumer = supplier_admin->obtain_pull_consumer ();
00373 if (CORBA::is_nil(proxy_consumer))
00374 {
00375 cerr << "Supplier Admin returned nil proxy_consumer!"
00376 << endl;
00377 exit(1);
00378 }
00379 break;
00380 }
00381 catch (CORBA::COMM_FAILURE& ex) {
00382 cerr << "Caught COMM_FAILURE exception "
00383 << "obtaining Proxy Pull Consumer! Retrying..."
00384 << endl;
00385 continue;
00386 }
00387 }
00388 cout << "Obtained ProxyPullConsumer." << endl;
00389
00390
00391 CosEventComm::PullSupplier_var supplierRef =supplier->_this();
00392 while (1)
00393 {
00394 try {
00395 proxy_consumer->connect_pull_supplier(supplierRef.in());
00396 break;
00397 }
00398 catch (CORBA::BAD_PARAM& ex) {
00399 cerr<<"Caught BAD_PARAM Exception connecting Pull Supplier!"<<endl;
00400 exit(1);
00401 }
00402 catch (CosEventChannelAdmin::AlreadyConnected& ex) {
00403 cerr << "Pull Supplier already connected!"
00404 << endl;
00405 break;
00406 }
00407 catch (CORBA::COMM_FAILURE& ex) {
00408 cerr << "Caught COMM_FAILURE exception "
00409 << "connecting Pull Supplier! Retrying..."
00410 << endl;
00411 continue;
00412 }
00413 }
00414 cout << "Connected Pull Supplier." << endl;
00415
00416
00417 connect_cond.wait();
00418
00419
00420 while (1)
00421 {
00422 try {
00423 proxy_consumer->disconnect_pull_consumer();
00424 break;
00425 }
00426 catch (CORBA::COMM_FAILURE& ex) {
00427 cerr << "Caught COMM_FAILURE exception "
00428 << "disconnecting Pull Supplier! Retrying..."
00429 << endl;
00430 continue;
00431 }
00432 }
00433 cout << "Disconnected Pull Supplier." << endl;
00434
00435
00436 cout << "Sleeping " << sleepInterval << " seconds." << endl;
00437 omni_thread::sleep(sleepInterval);
00438 }
00439
00440
00441 return 0;
00442 }
00443
00444 static void
00445 usage(int argc, char **argv)
00446 {
00447 cerr<<
00448 "\nCreate a PullSupplier to send events to a channel.\n"
00449 "syntax: "<<(argc?argv[0]:"pullsupp")<<" OPTIONS [CHANNEL_URI]\n"
00450 "\n"
00451 "CHANNEL_URI: The event channel may be specified as a URI.\n"
00452 " This may be an IOR, or a corbaloc::: or corbaname::: URI.\n"
00453 "\n"
00454 "OPTIONS: DEFAULT:\n"
00455 " -d NUM disconnect after sending NUM events [0 - never disconnect]\n"
00456 " -s SECS sleep SECS seconds after disconnecting [0]\n"
00457 " -n NAME channel name (if URI is not specified) [\"EventChannel\"]\n"
00458 " -h display this help text\n" << endl;
00459 }