Main Page | Namespace List | Class Hierarchy | Class List | Directories | File List | Namespace Members | Class Members | File Members

events.cc

Go to the documentation of this file.
00001 // -*- Mode: C++; -*-
00002 //                            Package   : omniEvents
00003 //   events.cc                Created   : 2004/05/02
00004 //                            Author    : Alex Tingle
00005 //
00006 //    Copyright (C) 2004 Alex Tingle
00007 //
00008 //    This file is part of the omniEvents application.
00009 //
00010 //    omniEvents is free software; you can redistribute it and/or
00011 //    modify it under the terms of the GNU Lesser General Public
00012 //    License as published by the Free Software Foundation; either
00013 //    version 2.1 of the License, or (at your option) any later version.
00014 //
00015 //    omniEvents is distributed in the hope that it will be useful,
00016 //    but WITHOUT ANY WARRANTY; without even the implied warranty of
00017 //    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00018 //    Lesser General Public License for more details.
00019 //
00020 //    You should have received a copy of the GNU Lesser General Public
00021 //    License along with this library; if not, write to the Free Software
00022 //    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00023 //
00024 // Description:
00025 //    Push Model streamer.
00026 //      
00027 
00028 #ifdef HAVE_CONFIG_H
00029 #  include "config.h"
00030 #endif
00031 
00032 #ifdef HAVE_GETOPT
00033 #  include <unistd.h>
00034 extern char* optarg;
00035 extern int optind;
00036 #else
00037 #  include "getopt.h"
00038 #endif
00039 
00040 #ifdef HAVE_IOSTREAM
00041 #  include <iostream>
00042 #else
00043 #  include <iostream.h>
00044 #endif
00045 
00046 #ifdef HAVE_STD_IOSTREAM
00047 using namespace std;
00048 #endif
00049 
00050 #ifdef HAVE_STDLIB_H
00051 #  include <stdlib.h>
00052 #endif
00053 
00054 #include <stdio.h>
00055 
00056 #if defined HAVE_UNISTD_H
00057 #  include <unistd.h> // read(), write()
00058 #elif defined __WIN32__
00059 #  include <io.h>
00060 #  define write(fd,buf,count) _write(fd,buf,count)
00061 #  define read(fd,buf,count)  _read(fd,buf,count)
00062 #  define ssize_t int
00063 #endif
00064 
00065 #ifdef HAVE_SIGNAL_H
00066 #  include <signal.h>
00067 #endif
00068 
00069 #include "CosEventComm.hh"
00070 #include "CosEventChannelAdmin.hh"
00071 #include "naming.h"
00072 
00073 #ifndef STDIN_FILENO
00074 # define STDIN_FILENO 0
00075 # define STDOUT_FILENO 1
00076 #endif
00077 
00078 CORBA::ORB_ptr orb;
00079 
00080 static void usage(int argc, char **argv);
00081 
00082 //
00083 // Time
00084 //
00085 
00086 #define BILLION 1000000000
00087 
00088 class Time;
00089 class Time
00090 {
00091 private:
00092   CORBA::ULong _sec;
00093   CORBA::ULong _nano;
00094 public:
00095   static Time current()
00096   {
00097     Time result;
00098     unsigned long sec,nano;
00099     omni_thread::get_time(&sec,&nano);
00100     result._sec=sec;
00101     result._nano=nano;
00102     return result;
00103   }
00104   static void sleepUntil(const Time& futureTime)
00105   {
00106     Time now =current();
00107     if(now<futureTime)
00108     {
00109       Time offset=futureTime-now;
00110       omni_thread::sleep(offset._sec,offset._nano);
00111     }
00112   }
00113   //
00114   Time():_sec(0),_nano(0){}
00115   Time(CORBA::ULong sec,CORBA::ULong nano):_sec(sec),_nano(nano){}
00116   Time(const Time& right):_sec(right._sec),_nano(right._nano){}
00117   Time& operator=(const Time& right)
00118   {
00119     if(this!=&right)
00120     {
00121       _sec =right._sec;
00122       _nano=right._nano;
00123     }
00124     return *this;
00125   }
00126   bool operator<(const Time& right) const
00127   {
00128     if(_sec==right._sec)
00129         return _nano<right._nano;
00130     else
00131         return _sec<right._sec;
00132   }
00133   Time& operator+=(const Time& right)
00134   {
00135     _sec +=right._sec;
00136     _nano+=right._nano;
00137     if(_nano>BILLION)
00138     {
00139       _nano=_nano%BILLION;
00140       ++_sec;
00141     }
00142     return *this;
00143   }
00144   Time operator+(const Time& right) const
00145   {
00146     Time result(*this);
00147     result+=right;
00148     return result;
00149   }
00150   Time& operator-=(const Time& right)
00151   {
00152     if(operator<(right))
00153     {
00154       cerr<<"Negative time!"<<endl;
00155       throw CORBA::BAD_PARAM();
00156     }
00157     _sec-=right._sec;
00158     if(_nano<right._nano)
00159     {
00160       _nano+=BILLION;
00161       --_sec;
00162     }
00163     _nano-=right._nano;
00164     return *this;
00165   }
00166   Time operator-(const Time& right) const
00167   {
00168     Time result(*this);
00169     result-=right;
00170     return result;
00171   }
00172   void operator>>=(cdrMemoryStream& s) const
00173   {
00174     _sec>>=s;
00175     _nano>>=s;
00176   }
00177   void operator<<=(cdrMemoryStream& s)
00178   {
00179     _sec<<=s;
00180     _nano<<=s;
00181   }
00182   bool is_nil() const { return(_sec==0 && _nano==0); }
00183 }; // end class Time
00184 
00185 
00186 //
00187 // Consumer_i
00188 //
00189 
00190 class Consumer_i : virtual public POA_CosEventComm::PushConsumer
00191 {
00192 public:
00193   Consumer_i(long disconnect=0): _memstream() {}
00194   void push(const CORBA::Any& data)
00195   {
00196     // Record the event timestamp.
00197     Time now=Time::current();
00198     now>>=_memstream;
00199     // stream event data.
00200     data>>=_memstream;
00201     // Write to file.
00202     write(STDOUT_FILENO,_memstream.bufPtr(),_memstream.bufSize());
00203     // Reset.
00204     _memstream.rewindPtrs();
00205   }
00206   void disconnect_push_consumer()
00207   {
00208     cout<<"disconnected"<<endl;
00209     orb->shutdown(0);
00210   }
00211   void consume(
00212     CosEventChannelAdmin::EventChannel_ptr channel,
00213     const char*& action)
00214   {
00215     action="get ConsumerAdmin";
00216     CosEventChannelAdmin::ConsumerAdmin_var consumer_admin =
00217       channel->for_consumers();
00218 
00219     action="get ProxyPushSupplier";
00220     CosEventChannelAdmin::ProxyPushSupplier_var proxy_supplier =
00221       consumer_admin->obtain_push_supplier();
00222 
00223     action="connect to ProxyPushSupplier";
00224     proxy_supplier->connect_push_consumer(_this());
00225   }
00226 private:
00227   cdrMemoryStream _memstream;
00228 };
00229 
00230 
00231 //
00232 // Supplier_i
00233 //
00234 
00235 class Supplier_i : virtual public POA_CosEventComm::PushSupplier
00236 {
00237 public:
00238   Supplier_i(): _connected(true) {}
00239   void disconnect_push_supplier()
00240   {
00241     cout<<"disconnected"<<endl;
00242     _connected=false;
00243   }
00244   void supply(
00245     CosEventChannelAdmin::EventChannel_ptr channel,
00246     const char*& action)
00247   {
00248     action="get SupplierAdmin";
00249     CosEventChannelAdmin::SupplierAdmin_var supplier_admin =
00250       channel->for_suppliers();
00251 
00252     action="get ProxyPushConsumer";
00253     CosEventChannelAdmin::ProxyPushConsumer_var proxy_consumer =
00254       supplier_admin->obtain_push_consumer();
00255 
00256     action="connect to ProxyPushConsumer";
00257     proxy_consumer->connect_push_supplier(_this());
00258 
00259     char buf[1024];
00260     ssize_t len;
00261     action="read standard input";
00262     // Stream start time (seconds,nanoseconds)
00263     Time offsetTime;
00264     while(_connected && (len=read(STDIN_FILENO,buf,1024)))
00265     {
00266       CORBA::Any any;
00267       cdrMemoryStream memstr;
00268       action="put_octet_array";
00269       memstr.put_octet_array( (_CORBA_Octet*)buf, (int)len );
00270       while(_connected && memstr.currentInputPtr()<memstr.bufSize())
00271       {
00272         action="unmarshal";
00273         Time eventTime;
00274         eventTime<<=memstr;
00275         any<<=memstr;
00276 
00277         if(offsetTime.is_nil()) // first time special.
00278            offsetTime=Time::current()-eventTime;
00279         Time::sleepUntil(eventTime+offsetTime);
00280 
00281         action="push";
00282         proxy_consumer->push(any);
00283       }
00284     }
00285   }
00286 private:
00287   bool _connected;
00288 };
00289 
00290 
00291 //
00292 // main()
00293 //
00294 
00295 int main(int argc, char **argv)
00296 {
00297   //
00298   // Start orb.
00299 #if defined(HAVE_OMNIORB4)
00300   orb=CORBA::ORB_init(argc,argv,"omniORB4");
00301 #else
00302   orb=CORBA::ORB_init(argc,argv,"omniORB3");
00303 #endif
00304 
00305   // Process Options
00306   bool supplierMode =false;
00307   const char* channelName ="EventChannel";
00308 
00309   int c;
00310   while ((c = getopt(argc,argv,"shn:")) != EOF)
00311   {
00312      switch (c)
00313      {
00314         case 's': supplierMode=true;
00315                   break;
00316 
00317         case 'n': channelName = optarg;
00318                   break;
00319 
00320         case 'h': usage(argc,argv);
00321                   exit(0);
00322         default : usage(argc,argv);
00323                   exit(-1);
00324      }
00325   }
00326 
00327 #if defined(HAVE_SIGNAL_H) && defined(SIGPIPE)
00328   // Ignore broken pipes
00329   signal(SIGPIPE, SIG_IGN);
00330 #endif
00331 
00332   const char* action=""; // Use this variable to help report errors.
00333   try {
00334     CORBA::Object_var obj;
00335 
00336     action="resolve initial reference 'RootPOA'";
00337     obj=orb->resolve_initial_references("RootPOA");
00338     PortableServer::POA_var rootPoa =PortableServer::POA::_narrow(obj);
00339     if(CORBA::is_nil(rootPoa))
00340         throw CORBA::OBJECT_NOT_EXIST();
00341 
00342     action="activate the RootPOA's POAManager";
00343     PortableServer::POAManager_var pman =rootPoa->the_POAManager();
00344     pman->activate();
00345 
00346     //
00347     // Obtain object reference to EventChannel
00348     // (from command-line argument or from the Naming Service).
00349     if(optind<argc)
00350     {
00351       action="convert URI from command line into object reference";
00352       obj=orb->string_to_object(argv[optind]);
00353     }
00354     else
00355     {
00356       action="resolve initial reference 'NameService'";
00357       obj=orb->resolve_initial_references("NameService");
00358       CosNaming::NamingContext_var rootContext=
00359         CosNaming::NamingContext::_narrow(obj);
00360       if(CORBA::is_nil(rootContext))
00361           throw CORBA::OBJECT_NOT_EXIST();
00362 
00363       action="find EventChannel in NameService";
00364       cout << action << endl;
00365       obj=rootContext->resolve(str2name(channelName));
00366     }
00367 
00368     action="narrow object reference to event channel";
00369     CosEventChannelAdmin::EventChannel_var channel =
00370       CosEventChannelAdmin::EventChannel::_narrow(obj);
00371     if(CORBA::is_nil(channel))
00372     {
00373        cerr << "Failed to narrow Event Channel reference." << endl;
00374        exit(1);
00375     }
00376 
00377     if(supplierMode)
00378     {
00379       action="construct PushSupplier";
00380       Supplier_i* supplier =new Supplier_i();
00381       supplier->supply(channel,action);
00382     }
00383     else
00384     {
00385       action="construct PushConsumer";
00386       Consumer_i* consumer =new Consumer_i();
00387       consumer->consume(channel,action);
00388 
00389       action="run ORB";
00390       orb->run();
00391     }
00392 
00393     return 0;
00394 
00395   }
00396   catch(CORBA::ORB::InvalidName& ex) { // resolve_initial_references
00397      cerr<<"Failed to "<<action<<". ORB::InvalidName"<<endl;
00398   }
00399   catch(CosNaming::NamingContext::InvalidName& ex) { // resolve
00400      cerr<<"Failed to "<<action<<". NamingContext::InvalidName"<<endl;
00401   }
00402   catch(CosNaming::NamingContext::NotFound& ex) { // resolve
00403      cerr<<"Failed to "<<action<<". NamingContext::NotFound"<<endl;
00404   }
00405   catch(CosNaming::NamingContext::CannotProceed& ex) { // resolve
00406      cerr<<"Failed to "<<action<<". NamingContext::CannotProceed"<<endl;
00407   }
00408   catch(CORBA::TRANSIENT& ex) { // _narrow()
00409      cerr<<"Failed to "<<action<<". TRANSIENT"<<endl;
00410   }
00411   catch(CORBA::OBJECT_NOT_EXIST& ex) { // _narrow()
00412      cerr<<"Failed to "<<action<<". OBJECT_NOT_EXIST"<<endl;
00413   }
00414   catch(CORBA::SystemException& ex) {
00415      cerr<<"Failed to "<<action<<"."
00416 #if defined(HAVE_OMNIORB4)
00417        " "<<ex._name()<<" ("<<ex.NP_minorString()<<")"
00418 #endif
00419        <<endl;
00420   }
00421   catch(CORBA::Exception& ex) {
00422      cerr<<"Failed to "<<action<<"."
00423 #if defined(HAVE_OMNIORB4)
00424        " "<<ex._name()
00425 #endif
00426        <<endl;
00427   }
00428 
00429   return 1;
00430 }
00431 
00432 static void usage(int argc, char **argv)
00433 {
00434   cerr<<
00435 "\nStream events from a channel to stdout, or (-s) from stdin to a channel.\n"
00436 "syntax: "<<(argc?argv[0]:"events")<<" OPTIONS [CHANNEL_URI]\n"
00437 "\n"
00438 "CHANNEL_URI: The event channel may be specified as a URI.\n"
00439 " This may be an IOR, or a corbaloc::: or corbaname::: URI.\n"
00440 "\n"
00441 "OPTIONS:                                         DEFAULT:\n"
00442 " -s       supply mode. Read events from stdin.\n"
00443 " -n NAME  channel name (if URI is not specified)  [\"EventChannel\"]\n"
00444 " -h       display this help text\n" << endl;
00445 }

Generated on Fri Aug 26 20:56:14 2005 for OmniEvents by  doxygen 1.4.3-20050530