Main Page | Namespace List | Class Hierarchy | Class List | Directories | File List | Namespace Members | Class Members | File Members

pushcons.cc

Go to the documentation of this file.
00001 // -*- Mode: C++; -*-
00002 //                            Package   : omniEvents
00003 //   pushcons.cc              Created   : 1/4/98
00004 //                            Author    : Paul Nader (pwn)
00005 //
00006 //    Copyright (C) 1998 Paul Nader, 2003-2004 Alex Tingle
00007 //
00008 //    This file is part of the omniEvents application.
00009 //
00010 //    omniEvents is free software; you can redistribute it and/or
00011 //    modify it under the terms of the GNU Lesser General Public
00012 //    License as published by the Free Software Foundation; either
00013 //    version 2.1 of the License, or (at your option) any later version.
00014 //
00015 //    omniEvents is distributed in the hope that it will be useful,
00016 //    but WITHOUT ANY WARRANTY; without even the implied warranty of
00017 //    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00018 //    Lesser General Public License for more details.
00019 //
00020 //    You should have received a copy of the GNU Lesser General Public
00021 //    License along with this library; if not, write to the Free Software
00022 //    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00023 //
00024 // Description:
00025 //    Push Model consumer implementation
00026 //
00027 
00028 /*
00029   $Log: pushcons.cc,v $
00030   Revision 1.12.2.1  2005/06/16 09:39:49  alextingle
00031   Fixed theoretical race caused by sloppy use of condition variable.
00032 
00033   Revision 1.12  2004/10/08 09:06:08  alextingle
00034   More robust exception minor code handling.
00035 
00036   Revision 1.11  2004/08/18 17:49:45  alextingle
00037   Added check for SIGPIPE before trying to use it.
00038 
00039   Revision 1.10  2004/08/06 16:19:23  alextingle
00040   -k & -K options removed.
00041   Naming service names may now be as complex as you like.
00042 
00043   Revision 1.9  2004/04/30 17:54:47  alextingle
00044   Corrected handling of CORBA::Any.
00045 
00046   Revision 1.8  2004/04/20 16:52:17  alextingle
00047   All examples updated for latest version on omniEvents. Server may now be
00048   specified as a 'corbaloc' string or IOR, instead of as naming service id/kind.
00049 
00050   Revision 1.7  2004/04/01 22:28:36  alextingle
00051   Corrected usage message.
00052 
00053   Revision 1.6  2004/03/23 19:09:26  alextingle
00054   Fixed typos.
00055 
00056   Revision 1.5  2004/02/21 19:07:45  alextingle
00057   Corrected servants to use POA instead of BOA.
00058 
00059   Revision 1.4  2004/02/04 22:29:55  alextingle
00060   Reworked all C++ examples.
00061   Removed catch(...) as it tends to make it harder to see what's going on.
00062   Now uses POA instead of BOA.
00063   Uses omniORB4's Exception name probing.
00064   No longer uses 'naming.h/cc' utility code.
00065 
00066   Revision 1.3  2003/11/03 22:19:56  alextingle
00067   Removed all platform specific switches. Now uses autoconf, config.h.
00068   Removed stub header in order to allow makefile dependency checking to work
00069   correctly.
00070   Corrected usage of omni_condition/omni_mutex. Mutexes are now always unlocked by
00071   the same thread that locked them.
00072 
00073   Revision 1.1.1.1.2.1  2002/09/28 22:20:51  shamus13
00074   Added ifdefs to enable omniEvents to compile
00075   with both omniORB3 and omniORB4. If __OMNIORB4__
00076   is defined during compilation, omniORB4 headers
00077   and command line option syntax is used, otherwise
00078   fall back to omniORB3 style.
00079 
00080   Revision 1.1.1.1  2002/09/25 19:00:26  shamus13
00081   Import of OmniEvents source tree from release 2.1.1
00082 
00083   Revision 0.13  2000/08/30 04:39:48  naderp
00084   Port to omniORB 3.0.1.
00085 
00086   Revision 0.12  2000/03/16 05:37:27  naderp
00087   Added stdlib.h for getopt.
00088 
00089   Revision 0.11  2000/03/06 13:27:02  naderp
00090   Using util getRootNamingContext function.
00091   Using stub headers.
00092   Fixed error messages.
00093 
00094   Revision 0.10  2000/03/02 03:20:24  naderp
00095   Added retry resiliency for handling COMM_FAUILURE exceptions.
00096 
00097   Revision 0.9  1999/11/02 13:39:15  naderp
00098   Added <signal.h>
00099 
00100   Revision 0.8  1999/11/02 07:57:04  naderp
00101   Updated usage.
00102 
00103 Revision 0.7  99/11/01  18:10:29  18:10:29  naderp (Paul Nader)
00104 Added ahndling of COMM_FAILURE exception for connect_push_consumer.
00105 
00106 Revision 0.6  99/11/01  16:11:03  16:11:03  naderp (Paul Nader)
00107 omniEvents 2.0 Release.
00108 
00109 Revision 0.5  99/10/27  19:46:01  19:46:01  naderp (Paul Nader)
00110 Ignoring Unix SIGPIPE signal.
00111 Catching COMM_FAILURE exception for obtain_push_supplier.
00112 Continuing if it fails to obtain Proxy Supplier.
00113 Try/Catch block for disconnect_push_supplier.
00114 
00115 Revision 0.4  99/04/23  16:05:46  16:05:46  naderp (Paul Nader)
00116 gcc port.
00117 
00118 Revision 0.3  99/04/23  09:34:03  09:34:03  naderp (Paul Nader)
00119 Windows Port.
00120 
00121 Revision 0.2  99/04/21  18:06:26  18:06:26  naderp (Paul Nader)
00122 *** empty log message ***
00123 
00124 Revision 0.1.1.1  98/11/27  16:59:37  16:59:37  naderp (Paul Nader)
00125 Added -s option to sleep after disconnecting.
00126 
00127 Revision 0.1  98/11/25  14:08:21  14:08:21  naderp (Paul Nader)
00128 Initial Revision
00129 
00130 */
00131 
00132 #ifdef HAVE_CONFIG_H
00133 #  include "config.h"
00134 #endif
00135 
00136 #ifdef HAVE_GETOPT
00137 #  include <unistd.h>
00138 extern char* optarg;
00139 extern int optind;
00140 #else
00141 #  include "getopt.h"
00142 #endif
00143 
00144 #ifdef HAVE_IOSTREAM
00145 #  include <iostream>
00146 #else
00147 #  include <iostream.h>
00148 #endif
00149 
00150 #ifdef HAVE_STD_IOSTREAM
00151 using namespace std;
00152 #endif
00153 
00154 #ifdef HAVE_STDLIB_H
00155 #  include <stdlib.h>
00156 #endif
00157 
00158 #ifdef HAVE_SIGNAL_H
00159 #  include <signal.h>
00160 #endif
00161 
00162 #include "CosEventComm.hh"
00163 #include "CosEventChannelAdmin.hh"
00164 #include "naming.h"
00165 
00166 static omni_mutex mutex;
00167 static omni_condition connect_cond(&mutex);
00168 static void usage(int argc, char **argv);
00169 
00170 class Consumer_i : virtual public POA_CosEventComm::PushConsumer {
00171 public:
00172   Consumer_i(long disconnect=0): _disconnect(disconnect) {}
00173 
00174   void push(const CORBA::Any& data);
00175   void disconnect_push_consumer ();
00176 
00177 private:
00178   long _disconnect;
00179 };
00180 
00181 void Consumer_i::push(const CORBA::Any& data) {
00182   CORBA::ULong l;
00183   static int i = 0;
00184 
00185   i++;
00186   if( data>>=l )
00187   {
00188     cout<<"Push Consumer: push() called. Data : "<< l <<endl;
00189 
00190     // Exercise Disconnect
00191     if (i == _disconnect)
00192     {
00193        i = 0;
00194        // NOTE : The proxy_supplier object is disposed at the server
00195        //        during the disconnect_push_supplier call. Do NOT
00196        //        use the proxy_supplier reference after disconnecting.
00197 
00198        // Signal main thread to disconnect and re-connect.
00199        omni_mutex_lock condition_lock(mutex); // ensure main thread in wait()
00200        connect_cond.signal();
00201     }
00202   }
00203   else
00204   {
00205     cerr<<"Push Consumer: push() called. UNEXPECTED TYPE"<<endl;
00206   }
00207 }
00208 
00209 void Consumer_i::disconnect_push_consumer () {
00210   cout << "Push Consumer: disconnected." << endl;
00211 }
00212 
00213 int
00214 main(int argc, char **argv)
00215 {
00216   //
00217   // Start orb.
00218   CORBA::ORB_ptr orb = CORBA::ORB_init(argc,argv);
00219 
00220   // Process Options
00221   int         discnum       =0;
00222   int         sleepInterval =0;
00223   const char* channelName   ="EventChannel";
00224 
00225   int c;
00226   while ((c = getopt(argc,argv,"hd:s:n:")) != EOF)
00227   {
00228      switch (c)
00229      {
00230         case 'd': discnum = atoi(optarg);
00231                   break;
00232 
00233         case 's': sleepInterval = atoi(optarg);
00234                   break;
00235 
00236         case 'n': channelName = optarg;
00237                   break;
00238 
00239         case 'h': usage(argc,argv);
00240                   exit(0);
00241         default : usage(argc,argv);
00242                   exit(-1);
00243      }
00244   }
00245 
00246 #if defined(HAVE_SIGNAL_H) && defined(SIGPIPE)
00247   // Ignore broken pipes
00248   signal(SIGPIPE, SIG_IGN);
00249 #endif
00250 
00251   Consumer_i* consumer = new Consumer_i (discnum);
00252   CosEventChannelAdmin::EventChannel_var channel;
00253 
00254   const char* action=""; // Use this variable to help report errors.
00255   try {
00256     CORBA::Object_var obj;
00257 
00258     action="resolve initial reference 'RootPOA'";
00259     obj=orb->resolve_initial_references("RootPOA");
00260     PortableServer::POA_var rootPoa =PortableServer::POA::_narrow(obj);
00261     if(CORBA::is_nil(rootPoa))
00262         throw CORBA::OBJECT_NOT_EXIST();
00263 
00264     action="activate the RootPOA's POAManager";
00265     PortableServer::POAManager_var pman =rootPoa->the_POAManager();
00266     pman->activate();
00267 
00268     //
00269     // Obtain object reference to EventChannel
00270     // (from command-line argument or from the Naming Service).
00271     if(optind<argc)
00272     {
00273       action="convert URI from command line into object reference";
00274       obj=orb->string_to_object(argv[optind]);
00275     }
00276     else
00277     {
00278       action="resolve initial reference 'NameService'";
00279       obj=orb->resolve_initial_references("NameService");
00280       CosNaming::NamingContext_var rootContext=
00281         CosNaming::NamingContext::_narrow(obj);
00282       if(CORBA::is_nil(rootContext))
00283           throw CORBA::OBJECT_NOT_EXIST();
00284 
00285       action="find EventChannel in NameService";
00286       cout << action << endl;
00287       obj=rootContext->resolve(str2name(channelName));
00288     }
00289 
00290     action="narrow object reference to event channel";
00291     channel=CosEventChannelAdmin::EventChannel::_narrow(obj);
00292     if(CORBA::is_nil(channel))
00293     {
00294        cerr << "Failed to narrow Event Channel reference." << endl;
00295        exit(1);
00296     }
00297 
00298   }
00299   catch(CORBA::ORB::InvalidName& ex) { // resolve_initial_references
00300      cerr<<"Failed to "<<action<<". ORB::InvalidName"<<endl;
00301      exit(1);
00302   }
00303   catch(CosNaming::NamingContext::InvalidName& ex) { // resolve
00304      cerr<<"Failed to "<<action<<". NamingContext::InvalidName"<<endl;
00305      exit(1);
00306   }
00307   catch(CosNaming::NamingContext::NotFound& ex) { // resolve
00308      cerr<<"Failed to "<<action<<". NamingContext::NotFound"<<endl;
00309      exit(1);
00310   }
00311   catch(CosNaming::NamingContext::CannotProceed& ex) { // resolve
00312      cerr<<"Failed to "<<action<<". NamingContext::CannotProceed"<<endl;
00313      exit(1);
00314   }
00315   catch(CORBA::TRANSIENT& ex) { // _narrow()
00316      cerr<<"Failed to "<<action<<". TRANSIENT"<<endl;
00317      exit(1);
00318   }
00319   catch(CORBA::OBJECT_NOT_EXIST& ex) { // _narrow()
00320      cerr<<"Failed to "<<action<<". OBJECT_NOT_EXIST"<<endl;
00321      exit(1);
00322   }
00323   catch(CORBA::SystemException& ex) {
00324      cerr<<"Failed to "<<action<<".";
00325 #if defined(HAVE_OMNIORB4)
00326      cerr<<" "<<ex._name();
00327      if(ex.NP_minorString())
00328          cerr<<" ("<<ex.NP_minorString()<<")";
00329 #endif
00330      cerr<<endl;
00331      exit(1);
00332   }
00333   catch(CORBA::Exception& ex) {
00334      cerr<<"Failed to "<<action<<"."
00335 #if defined(HAVE_OMNIORB4)
00336        " "<<ex._name()
00337 #endif
00338        <<endl;
00339      exit(1);
00340   }
00341 
00342   //
00343   // Get Consumer admin interface - retrying on Comms Failure.
00344   CosEventChannelAdmin::ConsumerAdmin_var consumer_admin;
00345   while (1)
00346   {
00347      try {
00348         consumer_admin = channel->for_consumers ();
00349         if (CORBA::is_nil (consumer_admin))
00350         {
00351            cerr << "Event Channel returned nil Consumer Admin!" << endl;
00352            exit(1);
00353         }
00354         break;
00355      }
00356      catch (CORBA::COMM_FAILURE& ex) {
00357         cerr << "Caught COMM_FAILURE exception "
00358              << "obtaining Consumer Admin! Retrying..."
00359              << endl;
00360         continue;
00361      }
00362   }
00363   cout << "Obtained ConsumerAdmin." << endl;
00364 
00365   omni_mutex_lock condition_lock(mutex);
00366   while (1) {
00367      //
00368      // Get proxy supplier - retrying on Comms Failure.
00369      CosEventChannelAdmin::ProxyPushSupplier_var proxy_supplier;
00370      while (1)
00371      {
00372         try {
00373            proxy_supplier = consumer_admin->obtain_push_supplier ();
00374            if (CORBA::is_nil (proxy_supplier))
00375            {
00376               cerr << "Consumer Admin returned nil proxy_supplier!"
00377                    << endl;
00378               exit (1);
00379            }
00380            break;
00381         }
00382         catch (CORBA::COMM_FAILURE& ex) {
00383            cerr << "Caught COMM_FAILURE Exception "
00384                 << "obtaining Push Supplier! Retrying..."
00385                 << endl;
00386            continue;
00387         }
00388      }
00389      cout << "Obtained ProxyPushSupplier." << endl;
00390    
00391      //
00392      // Connect Push Consumer - retrying on Comms Failure.
00393      while (1)
00394      {
00395         try {
00396            proxy_supplier->connect_push_consumer(consumer->_this());
00397            break;
00398         }
00399         catch (CORBA::BAD_PARAM& ex) {
00400            cerr << "Caught BAD_PARAM Exception connecting Push Consumer!"
00401                 << endl;
00402            exit (1);
00403         }
00404         catch (CosEventChannelAdmin::AlreadyConnected& ex) {
00405            cerr << "Proxy Push Supplier already connected!"
00406                 << endl;
00407            break;
00408         }
00409         catch (CORBA::COMM_FAILURE& ex) {
00410            cerr << "Caught COMM_FAILURE exception "
00411                 << "connecting Push Consumer! Retrying..."
00412                 << endl;
00413            continue;
00414         }
00415      }
00416      cout << "Connected Push Consumer." << endl;
00417 
00418      // Wait for indication to disconnect before re-connecting.
00419      connect_cond.wait();
00420 
00421      // Disconnect - retrying on Comms Failure.
00422      while (1)
00423      {
00424         try {
00425            proxy_supplier->disconnect_push_supplier();
00426            break;
00427         }
00428         catch (CORBA::COMM_FAILURE& ex) {
00429            cerr << "Caught COMM_FAILURE Exception "
00430                 << "disconnecting Push Consumer! Retrying..."
00431                 << endl;
00432            continue;
00433         }
00434      }
00435      cout << "Disconnected Push Consumer." << endl;
00436    
00437      // Yawn
00438      cout << "Sleeping " << sleepInterval << " seconds." << endl;
00439      omni_thread::sleep(sleepInterval);
00440   }
00441 
00442   // NEVER GET HERE
00443   return 0;
00444 }
00445 
00446 static void
00447 usage(int argc, char **argv)
00448 {
00449   cerr<<
00450 "\nCreate a PushConsumer to receive events from a channel.\n"
00451 "syntax: "<<(argc?argv[0]:"pushcons")<<" OPTIONS [CHANNEL_URI]\n"
00452 "\n"
00453 "CHANNEL_URI: The event channel may be specified as a URI.\n"
00454 " This may be an IOR, or a corbaloc::: or corbaname::: URI.\n"
00455 "\n"
00456 "OPTIONS:                                         DEFAULT:\n"
00457 " -d NUM   disconnect after receiving NUM events   [0 - never disconnect]\n"
00458 " -s SECS  sleep SECS seconds after disconnecting  [0]\n"
00459 " -n NAME  channel name (if URI is not specified)  [\"EventChannel\"]\n"
00460 " -h       display this help text\n" << endl;
00461 }

Generated on Fri Aug 26 20:56:14 2005 for OmniEvents by  doxygen 1.4.3-20050530