Main Page | Namespace List | Class Hierarchy | Class List | Directories | File List | Namespace Members | Class Members | File Members

pullsupp.cc

Go to the documentation of this file.
00001 // -*- Mode: C++; -*-
00002 //                            Package   : omniEvents
00003 //   pullsupp.cc              Created   : 1/4/98
00004 //                            Author    : Paul Nader (pwn)
00005 //
00006 //    Copyright (C) 1998 Paul Nader, 2003-2004 Alex Tingle
00007 //
00008 //    This file is part of the omniEvents application.
00009 //
00010 //    omniEvents is free software; you can redistribute it and/or
00011 //    modify it under the terms of the GNU Lesser General Public
00012 //    License as published by the Free Software Foundation; either
00013 //    version 2.1 of the License, or (at your option) any later version.
00014 //
00015 //    omniEvents is distributed in the hope that it will be useful,
00016 //    but WITHOUT ANY WARRANTY; without even the implied warranty of
00017 //    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00018 //    Lesser General Public License for more details.
00019 //
00020 //    You should have received a copy of the GNU Lesser General Public
00021 //    License along with this library; if not, write to the Free Software
00022 //    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00023 //
00024 // Description:
00025 //    Pull Model supplier implementation.
00026 //      
00027 
00028 /*
00029   $Log: pullsupp.cc,v $
00030   Revision 1.9  2004/10/08 09:06:08  alextingle
00031   More robust exception minor code handling.
00032 
00033   Revision 1.8  2004/08/18 17:49:45  alextingle
00034   Added check for SIGPIPE before trying to use it.
00035 
00036   Revision 1.7  2004/08/06 16:19:23  alextingle
00037   -k & -K options removed.
00038   Naming service names may now be as complex as you like.
00039 
00040   Revision 1.6  2004/04/20 16:52:05  alextingle
00041   All examples updated for latest version on omniEvents. Server may now be
00042   specified as a 'corbaloc' string or IOR, instead of as naming service id/kind.
00043 
00044   Revision 1.5  2004/02/04 22:29:55  alextingle
00045   Reworked all C++ examples.
00046   Removed catch(...) as it tends to make it harder to see what's going on.
00047   Now uses POA instead of BOA.
00048   Uses omniORB4's Exception name probing.
00049   No longer uses 'naming.h/cc' utility code.
00050 
00051   Revision 1.4  2003/11/17 08:47:09  alextingle
00052   Corrected typo.
00053 
00054   Revision 1.2  2003/11/16 23:24:22  alex
00055   Corrected typo.
00056 
00057   Revision 1.1.1.1  2003/11/11 22:28:56  alex
00058   Import of testing 2.3.0
00059 
00060   Revision 1.3  2003/11/03 22:20:26  alextingle
00061   Removed all platform specific switches. Now uses autoconf, config.h.
00062   Removed stub header in order to allow makefile dependency checking to work
00063   correctly.
00064 
00065   Revision 1.1.1.1.2.1  2002/09/28 22:20:51  shamus13
00066   Added ifdefs to enable omniEvents to compile
00067   with both omniORB3 and omniORB4. If __OMNIORB4__
00068   is defined during compilation, omniORB4 headers
00069   and command line option syntax is used, otherwise
00070   fall back to omniORB3 style.
00071 
00072   Revision 1.1.1.1  2002/09/25 19:00:26  shamus13
00073   Import of OmniEvents source tree from release 2.1.1
00074 
00075   Revision 0.11  2000/08/30 04:39:48  naderp
00076   Port to omniORB 3.0.1.
00077 
00078   Revision 0.10  2000/03/16 05:37:27  naderp
00079   Added stdlib.h for getopt.
00080 
00081   Revision 0.9  2000/03/06 13:26:32  naderp
00082   Using util getRootNamingContext function.
00083   Using stub headers.
00084   Fixed error messages.
00085 
00086   Revision 0.8  2000/03/02 03:16:26  naderp
00087   Added retry resiliency for handling COMM_FAUILURE exceptions.
00088   Replaced condition variable by counting semaphore.
00089 
00090   Revision 0.7  1999/11/02 13:39:13  naderp
00091   Added <signal.h>
00092 
00093   Revision 0.6  1999/11/02 07:57:22  naderp
00094   Updated usage.
00095 
00096 Revision 0.5  99/11/01  16:10:12  16:10:12  naderp (Paul Nader)
00097 omniEvents 2.0 Release.
00098 Ignoring SIGPIPE for UNIX platforms.
00099 
00100 Revision 0.4  99/04/23  16:05:44  16:05:44  naderp (Paul Nader)
00101 gcc port.
00102 
00103 Revision 0.3  99/04/23  09:34:01  09:34:01  naderp (Paul Nader)
00104 Windows Port.
00105 
00106 Revision 0.2  99/04/21  18:06:25  18:06:25  naderp (Paul Nader)
00107 *** empty log message ***
00108 
00109 Revision 0.1.1.1  98/11/27  16:59:33  16:59:33  naderp (Paul Nader)
00110 Added -s option to sleep after disconnecting.
00111 
00112 Revision 0.1  98/11/25  14:08:07  14:08:07  naderp (Paul Nader)
00113 Initial Revision
00114 
00115 */
00116 
00117 #ifdef HAVE_CONFIG_H
00118 #  include "config.h"
00119 #endif
00120 
00121 #ifdef HAVE_GETOPT
00122 #  include <unistd.h>
00123 extern char* optarg;
00124 extern int optind;
00125 #else
00126 #  include "getopt.h"
00127 #endif
00128 
00129 #ifdef HAVE_IOSTREAM
00130 #  include <iostream>
00131 #else
00132 #  include <iostream.h>
00133 #endif
00134 
00135 #ifdef HAVE_STD_IOSTREAM
00136 using namespace std;
00137 #endif
00138 
00139 #ifdef HAVE_STDLIB_H
00140 #  include <stdlib.h>
00141 #endif
00142 
00143 #ifdef HAVE_SIGNAL_H
00144 #  include <signal.h>
00145 #endif
00146 
00147 #include "CosEventComm.hh"
00148 #include "CosEventChannelAdmin.hh"
00149 #include "naming.h"
00150 
00151 static omni_semaphore connect_cond(0);
00152 static void usage(int argc, char **argv);
00153 
00154 class Supplier_i : virtual public POA_CosEventComm::PullSupplier {
00155 public:
00156   Supplier_i (long disconnect = 0) : i(0), _disconnect(disconnect), l(0) {};
00157   CORBA::Any *pull();
00158   CORBA::Any *try_pull(CORBA::Boolean &has_event);
00159   void disconnect_pull_supplier ();
00160 
00161 private:
00162   long i;
00163   long _disconnect;
00164   CORBA::ULong l;
00165 };
00166 
00167 void
00168 Supplier_i::disconnect_pull_supplier () {
00169   cout << "Pull Supplier: disconnected by channel." << endl;
00170 }
00171 
00172 CORBA::Any *
00173 Supplier_i::pull() {
00174    cout << "Pull Supplier: pull() called. Data : ";
00175    CORBA::Any *any = new CORBA::Any();
00176    *any <<= l++;
00177    cout << l-1 << endl;
00178 
00179    // Exercise Disconnect
00180    if ((_disconnect > 0) && (i == _disconnect)) {
00181       i = 0;
00182       // Signal main thread to disconnect and re-connect.
00183       connect_cond.post();
00184    }
00185    i++;
00186    return (any);
00187 }
00188 
00189 CORBA::Any *
00190 Supplier_i::try_pull(CORBA::Boolean &has_event)
00191 {
00192    cout << "Pull Supplier: try_pull() called. Data : ";
00193    CORBA::Any *any = new CORBA::Any();
00194    *any <<= l++;
00195    cout << l-1 << endl;
00196    has_event = 1;
00197 
00198    // Exercise Disconnect
00199    if ((_disconnect > 0) && (i == _disconnect)) {
00200       i = 0;
00201       // Signal main thread to disconnect and re-connect.
00202       connect_cond.post();
00203    }
00204    i++;
00205    return (any);
00206 }
00207 
00208 
00209 int
00210 main (int argc, char** argv)
00211 {
00212 #if defined(HAVE_OMNIORB4)
00213   CORBA::ORB_var orb =CORBA::ORB_init(argc,argv,"omniORB4");
00214 #else
00215   CORBA::ORB_var orb =CORBA::ORB_init(argc,argv,"omniORB3");
00216 #endif
00217 
00218   // Process Options
00219   int         discnum       =0;
00220   int         sleepInterval =0;
00221   const char* channelName   ="EventChannel";
00222 
00223   int c;
00224   while ((c = getopt(argc,argv,"d:s:n:h")) != EOF)
00225   {
00226      switch (c)
00227      {
00228         case 'd': discnum = atoi(optarg);
00229                   break;
00230 
00231         case 's': sleepInterval = atoi(optarg);
00232                   break;
00233 
00234         case 'n': channelName = optarg;
00235                   break;
00236 
00237         case 'h':
00238         default : usage(argc,argv);
00239                   exit(-1);
00240                   break;
00241      }
00242   }
00243 
00244 #if defined(HAVE_SIGNAL_H) && defined(SIGPIPE)
00245   // Ignore broken pipes
00246   signal(SIGPIPE, SIG_IGN);
00247 #endif
00248 
00249   Supplier_i* supplier = new Supplier_i (discnum);
00250   CosEventChannelAdmin::EventChannel_var channel;
00251 
00252   const char* action=""; // Use this variable to help report errors.
00253   try {
00254     CORBA::Object_var obj;
00255 
00256     action="resolve initial reference 'RootPOA'";
00257     obj=orb->resolve_initial_references("RootPOA");
00258     PortableServer::POA_var rootPoa =PortableServer::POA::_narrow(obj);
00259     if(CORBA::is_nil(rootPoa))
00260         throw CORBA::OBJECT_NOT_EXIST();
00261 
00262     action="activate the RootPOA's POAManager";
00263     PortableServer::POAManager_var pman =rootPoa->the_POAManager();
00264     pman->activate();
00265 
00266     //
00267     // Obtain object reference to EventChannel
00268     // (from command-line argument or from the Naming Service).
00269     if(optind<argc)
00270     {
00271       action="convert URI from command line into object reference";
00272       obj=orb->string_to_object(argv[optind]);
00273     }
00274     else
00275     {
00276       action="resolve initial reference 'NameService'";
00277       obj=orb->resolve_initial_references("NameService");
00278       CosNaming::NamingContext_var rootContext=
00279         CosNaming::NamingContext::_narrow(obj);
00280       if(CORBA::is_nil(rootContext))
00281           throw CORBA::OBJECT_NOT_EXIST();
00282 
00283       action="find EventChannel in NameService";
00284       cout << action << endl;
00285       obj=rootContext->resolve(str2name(channelName));
00286     }
00287 
00288     action="narrow object reference to event channel";
00289     channel=CosEventChannelAdmin::EventChannel::_narrow(obj);
00290     if(CORBA::is_nil(channel))
00291     {
00292        cerr << "Failed to narrow Event Channel reference." << endl;
00293        exit(1);
00294     }
00295 
00296   }
00297   catch(CORBA::ORB::InvalidName& ex) { // resolve_initial_references
00298      cerr<<"Failed to "<<action<<". ORB::InvalidName"<<endl;
00299      exit(1);
00300   }
00301   catch(CosNaming::NamingContext::InvalidName& ex) { // resolve
00302      cerr<<"Failed to "<<action<<". NamingContext::InvalidName"<<endl;
00303      exit(1);
00304   }
00305   catch(CosNaming::NamingContext::NotFound& ex) { // resolve
00306      cerr<<"Failed to "<<action<<". NamingContext::NotFound"<<endl;
00307      exit(1);
00308   }
00309   catch(CosNaming::NamingContext::CannotProceed& ex) { // resolve
00310      cerr<<"Failed to "<<action<<". NamingContext::CannotProceed"<<endl;
00311      exit(1);
00312   }
00313   catch(CORBA::TRANSIENT& ex) { // _narrow()
00314      cerr<<"Failed to "<<action<<". TRANSIENT"<<endl;
00315      exit(1);
00316   }
00317   catch(CORBA::OBJECT_NOT_EXIST& ex) { // _narrow()
00318      cerr<<"Failed to "<<action<<". OBJECT_NOT_EXIST"<<endl;
00319      exit(1);
00320   }
00321   catch(CORBA::SystemException& ex) {
00322      cerr<<"Failed to "<<action<<".";
00323 #if defined(HAVE_OMNIORB4)
00324      cerr<<" "<<ex._name();
00325      if(ex.NP_minorString())
00326          cerr<<" ("<<ex.NP_minorString()<<")";
00327 #endif
00328      cerr<<endl;
00329      exit(1);
00330   }
00331   catch(CORBA::Exception& ex) {
00332      cerr<<"Failed to "<<action<<"."
00333 #if defined(HAVE_OMNIORB4)
00334        " "<<ex._name()
00335 #endif
00336        <<endl;
00337      exit(1);
00338   }
00339 
00340   //
00341   // Get Supplier Admin interface - retrying on Comms Failure.
00342   CosEventChannelAdmin::SupplierAdmin_var supplier_admin;
00343   while (1)
00344   {
00345      try {
00346         supplier_admin = channel->for_suppliers ();
00347         if (CORBA::is_nil(supplier_admin))
00348         {
00349            cerr << "Event Channel returned nil Supplier Admin!"
00350                 << endl;
00351            exit(1);
00352         }
00353         break;
00354      }
00355      catch (CORBA::COMM_FAILURE& ex) {
00356         cerr << "Caught COMM_FAILURE exception "
00357              << "obtaining Supplier Admin! Retrying..."
00358              << endl;
00359         continue;
00360      }
00361   }
00362   cout << "Obtained SupplierAdmin." << endl;
00363 
00364   while (1)
00365   {
00366      //
00367      // Get proxy consumer - retrying on Comms Failure.
00368      CosEventChannelAdmin::ProxyPullConsumer_var proxy_consumer;
00369      while (1)
00370      {
00371         try {
00372            proxy_consumer = supplier_admin->obtain_pull_consumer ();
00373            if (CORBA::is_nil(proxy_consumer))
00374            {
00375               cerr << "Supplier Admin returned nil proxy_consumer!"
00376                    << endl;
00377               exit(1);
00378            }
00379            break;
00380         }
00381         catch (CORBA::COMM_FAILURE& ex) {
00382            cerr << "Caught COMM_FAILURE exception "
00383                 << "obtaining Proxy Pull Consumer! Retrying..."
00384                 << endl;
00385            continue;
00386         }
00387      }
00388      cout << "Obtained ProxyPullConsumer." << endl;
00389    
00390      // Connect Pull Supplier - retrying on Comms Failure.
00391      CosEventComm::PullSupplier_var supplierRef =supplier->_this();
00392      while (1)
00393      {
00394         try {
00395            proxy_consumer->connect_pull_supplier(supplierRef.in());
00396            break;
00397         }
00398         catch (CORBA::BAD_PARAM& ex) {
00399            cerr<<"Caught BAD_PARAM Exception connecting Pull Supplier!"<<endl;
00400            exit(1);
00401         }
00402         catch (CosEventChannelAdmin::AlreadyConnected& ex) {
00403            cerr << "Pull Supplier already connected!"
00404                 << endl;
00405            break;
00406         }
00407         catch (CORBA::COMM_FAILURE& ex) {
00408            cerr << "Caught COMM_FAILURE exception "
00409                 << "connecting Pull Supplier! Retrying..."
00410                 << endl;
00411            continue;
00412         }
00413      }
00414      cout << "Connected Pull Supplier." << endl;
00415 
00416      // Wait for indication to disconnect before re-connecting.
00417      connect_cond.wait();
00418 
00419      // Disconnect - retrying on Comms Failure.
00420      while (1)
00421      {
00422         try {
00423            proxy_consumer->disconnect_pull_consumer();
00424            break;
00425         }
00426         catch (CORBA::COMM_FAILURE& ex) {
00427            cerr << "Caught COMM_FAILURE exception "
00428                 << "disconnecting Pull Supplier! Retrying..."
00429                 << endl;
00430            continue;
00431         }
00432      }
00433      cout << "Disconnected Pull Supplier." << endl;
00434 
00435      // Yawn.
00436      cout << "Sleeping " << sleepInterval << " seconds." << endl;
00437      omni_thread::sleep(sleepInterval);
00438   }
00439 
00440   // Not Reached
00441   return 0;
00442 }
00443 
00444 static void
00445 usage(int argc, char **argv)
00446 {
00447   cerr<<
00448 "\nCreate a PullSupplier to send events to a channel.\n"
00449 "syntax: "<<(argc?argv[0]:"pullsupp")<<" OPTIONS [CHANNEL_URI]\n"
00450 "\n"
00451 "CHANNEL_URI: The event channel may be specified as a URI.\n"
00452 " This may be an IOR, or a corbaloc::: or corbaname::: URI.\n"
00453 "\n"
00454 "OPTIONS:                                         DEFAULT:\n"
00455 " -d NUM   disconnect after sending NUM events     [0 - never disconnect]\n"
00456 " -s SECS  sleep SECS seconds after disconnecting  [0]\n"
00457 " -n NAME  channel name (if URI is not specified)  [\"EventChannel\"]\n"
00458 " -h       display this help text\n" << endl;
00459 }

Generated on Fri Aug 26 20:56:14 2005 for OmniEvents by  doxygen 1.4.3-20050530