00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028 #ifdef HAVE_CONFIG_H
00029 # include "config.h"
00030 #endif
00031
00032 #ifdef HAVE_GETOPT
00033 # include <unistd.h>
00034 extern char* optarg;
00035 extern int optind;
00036 #else
00037 # include "getopt.h"
00038 #endif
00039
00040 #ifdef HAVE_IOSTREAM
00041 # include <iostream>
00042 #else
00043 # include <iostream.h>
00044 #endif
00045
00046 #ifdef HAVE_STD_IOSTREAM
00047 using namespace std;
00048 #endif
00049
00050 #ifdef HAVE_STDLIB_H
00051 # include <stdlib.h>
00052 #endif
00053
00054 #include <stdio.h>
00055
00056 #if defined HAVE_UNISTD_H
00057 # include <unistd.h>
00058 #elif defined __WIN32__
00059 # include <io.h>
00060 # define write(fd,buf,count) _write(fd,buf,count)
00061 # define read(fd,buf,count) _read(fd,buf,count)
00062 # define ssize_t int
00063 #endif
00064
00065 #ifdef HAVE_SIGNAL_H
00066 # include <signal.h>
00067 #endif
00068
00069 #include "CosEventComm.hh"
00070 #include "CosEventChannelAdmin.hh"
00071 #include "naming.h"
00072
00073 #ifndef STDIN_FILENO
00074 # define STDIN_FILENO 0
00075 # define STDOUT_FILENO 1
00076 #endif
00077
00078 CORBA::ORB_ptr orb;
00079
00080 static void usage(int argc, char **argv);
00081
00082
00083
00084
00085
00086 #define BILLION 1000000000
00087
00088 class Time;
00089 class Time
00090 {
00091 private:
00092 CORBA::ULong _sec;
00093 CORBA::ULong _nano;
00094 public:
00095 static Time current()
00096 {
00097 Time result;
00098 unsigned long sec,nano;
00099 omni_thread::get_time(&sec,&nano);
00100 result._sec=sec;
00101 result._nano=nano;
00102 return result;
00103 }
00104 static void sleepUntil(const Time& futureTime)
00105 {
00106 Time now =current();
00107 if(now<futureTime)
00108 {
00109 Time offset=futureTime-now;
00110 omni_thread::sleep(offset._sec,offset._nano);
00111 }
00112 }
00113
00114 Time():_sec(0),_nano(0){}
00115 Time(CORBA::ULong sec,CORBA::ULong nano):_sec(sec),_nano(nano){}
00116 Time(const Time& right):_sec(right._sec),_nano(right._nano){}
00117 Time& operator=(const Time& right)
00118 {
00119 if(this!=&right)
00120 {
00121 _sec =right._sec;
00122 _nano=right._nano;
00123 }
00124 return *this;
00125 }
00126 bool operator<(const Time& right) const
00127 {
00128 if(_sec==right._sec)
00129 return _nano<right._nano;
00130 else
00131 return _sec<right._sec;
00132 }
00133 Time& operator+=(const Time& right)
00134 {
00135 _sec +=right._sec;
00136 _nano+=right._nano;
00137 if(_nano>BILLION)
00138 {
00139 _nano=_nano%BILLION;
00140 ++_sec;
00141 }
00142 return *this;
00143 }
00144 Time operator+(const Time& right) const
00145 {
00146 Time result(*this);
00147 result+=right;
00148 return result;
00149 }
00150 Time& operator-=(const Time& right)
00151 {
00152 if(operator<(right))
00153 {
00154 cerr<<"Negative time!"<<endl;
00155 throw CORBA::BAD_PARAM();
00156 }
00157 _sec-=right._sec;
00158 if(_nano<right._nano)
00159 {
00160 _nano+=BILLION;
00161 --_sec;
00162 }
00163 _nano-=right._nano;
00164 return *this;
00165 }
00166 Time operator-(const Time& right) const
00167 {
00168 Time result(*this);
00169 result-=right;
00170 return result;
00171 }
00172 void operator>>=(cdrMemoryStream& s) const
00173 {
00174 _sec>>=s;
00175 _nano>>=s;
00176 }
00177 void operator<<=(cdrMemoryStream& s)
00178 {
00179 _sec<<=s;
00180 _nano<<=s;
00181 }
00182 bool is_nil() const { return(_sec==0 && _nano==0); }
00183 };
00184
00185
00186
00187
00188
00189
00190 class Consumer_i : virtual public POA_CosEventComm::PushConsumer
00191 {
00192 public:
00193 Consumer_i(long disconnect=0): _memstream() {}
00194 void push(const CORBA::Any& data)
00195 {
00196
00197 Time now=Time::current();
00198 now>>=_memstream;
00199
00200 data>>=_memstream;
00201
00202 write(STDOUT_FILENO,_memstream.bufPtr(),_memstream.bufSize());
00203
00204 _memstream.rewindPtrs();
00205 }
00206 void disconnect_push_consumer()
00207 {
00208 cout<<"disconnected"<<endl;
00209 orb->shutdown(0);
00210 }
00211 void consume(
00212 CosEventChannelAdmin::EventChannel_ptr channel,
00213 const char*& action)
00214 {
00215 action="get ConsumerAdmin";
00216 CosEventChannelAdmin::ConsumerAdmin_var consumer_admin =
00217 channel->for_consumers();
00218
00219 action="get ProxyPushSupplier";
00220 CosEventChannelAdmin::ProxyPushSupplier_var proxy_supplier =
00221 consumer_admin->obtain_push_supplier();
00222
00223 action="connect to ProxyPushSupplier";
00224 proxy_supplier->connect_push_consumer(_this());
00225 }
00226 private:
00227 cdrMemoryStream _memstream;
00228 };
00229
00230
00231
00232
00233
00234
00235 class Supplier_i : virtual public POA_CosEventComm::PushSupplier
00236 {
00237 public:
00238 Supplier_i(): _connected(true) {}
00239 void disconnect_push_supplier()
00240 {
00241 cout<<"disconnected"<<endl;
00242 _connected=false;
00243 }
00244 void supply(
00245 CosEventChannelAdmin::EventChannel_ptr channel,
00246 const char*& action)
00247 {
00248 action="get SupplierAdmin";
00249 CosEventChannelAdmin::SupplierAdmin_var supplier_admin =
00250 channel->for_suppliers();
00251
00252 action="get ProxyPushConsumer";
00253 CosEventChannelAdmin::ProxyPushConsumer_var proxy_consumer =
00254 supplier_admin->obtain_push_consumer();
00255
00256 action="connect to ProxyPushConsumer";
00257 proxy_consumer->connect_push_supplier(_this());
00258
00259 char buf[1024];
00260 ssize_t len;
00261 action="read standard input";
00262
00263 Time offsetTime;
00264 while(_connected && (len=read(STDIN_FILENO,buf,1024)))
00265 {
00266 CORBA::Any any;
00267 cdrMemoryStream memstr;
00268 action="put_octet_array";
00269 memstr.put_octet_array( (_CORBA_Octet*)buf, (int)len );
00270 while(_connected && memstr.currentInputPtr()<memstr.bufSize())
00271 {
00272 action="unmarshal";
00273 Time eventTime;
00274 eventTime<<=memstr;
00275 any<<=memstr;
00276
00277 if(offsetTime.is_nil())
00278 offsetTime=Time::current()-eventTime;
00279 Time::sleepUntil(eventTime+offsetTime);
00280
00281 action="push";
00282 proxy_consumer->push(any);
00283 }
00284 }
00285 }
00286 private:
00287 bool _connected;
00288 };
00289
00290
00291
00292
00293
00294
00295 int main(int argc, char **argv)
00296 {
00297
00298
00299 #if defined(HAVE_OMNIORB4)
00300 orb=CORBA::ORB_init(argc,argv,"omniORB4");
00301 #else
00302 orb=CORBA::ORB_init(argc,argv,"omniORB3");
00303 #endif
00304
00305
00306 bool supplierMode =false;
00307 const char* channelName ="EventChannel";
00308
00309 int c;
00310 while ((c = getopt(argc,argv,"shn:")) != EOF)
00311 {
00312 switch (c)
00313 {
00314 case 's': supplierMode=true;
00315 break;
00316
00317 case 'n': channelName = optarg;
00318 break;
00319
00320 case 'h': usage(argc,argv);
00321 exit(0);
00322 default : usage(argc,argv);
00323 exit(-1);
00324 }
00325 }
00326
00327 #if defined(HAVE_SIGNAL_H) && defined(SIGPIPE)
00328
00329 signal(SIGPIPE, SIG_IGN);
00330 #endif
00331
00332 const char* action="";
00333 try {
00334 CORBA::Object_var obj;
00335
00336 action="resolve initial reference 'RootPOA'";
00337 obj=orb->resolve_initial_references("RootPOA");
00338 PortableServer::POA_var rootPoa =PortableServer::POA::_narrow(obj);
00339 if(CORBA::is_nil(rootPoa))
00340 throw CORBA::OBJECT_NOT_EXIST();
00341
00342 action="activate the RootPOA's POAManager";
00343 PortableServer::POAManager_var pman =rootPoa->the_POAManager();
00344 pman->activate();
00345
00346
00347
00348
00349 if(optind<argc)
00350 {
00351 action="convert URI from command line into object reference";
00352 obj=orb->string_to_object(argv[optind]);
00353 }
00354 else
00355 {
00356 action="resolve initial reference 'NameService'";
00357 obj=orb->resolve_initial_references("NameService");
00358 CosNaming::NamingContext_var rootContext=
00359 CosNaming::NamingContext::_narrow(obj);
00360 if(CORBA::is_nil(rootContext))
00361 throw CORBA::OBJECT_NOT_EXIST();
00362
00363 action="find EventChannel in NameService";
00364 cout << action << endl;
00365 obj=rootContext->resolve(str2name(channelName));
00366 }
00367
00368 action="narrow object reference to event channel";
00369 CosEventChannelAdmin::EventChannel_var channel =
00370 CosEventChannelAdmin::EventChannel::_narrow(obj);
00371 if(CORBA::is_nil(channel))
00372 {
00373 cerr << "Failed to narrow Event Channel reference." << endl;
00374 exit(1);
00375 }
00376
00377 if(supplierMode)
00378 {
00379 action="construct PushSupplier";
00380 Supplier_i* supplier =new Supplier_i();
00381 supplier->supply(channel,action);
00382 }
00383 else
00384 {
00385 action="construct PushConsumer";
00386 Consumer_i* consumer =new Consumer_i();
00387 consumer->consume(channel,action);
00388
00389 action="run ORB";
00390 orb->run();
00391 }
00392
00393 return 0;
00394
00395 }
00396 catch(CORBA::ORB::InvalidName& ex) {
00397 cerr<<"Failed to "<<action<<". ORB::InvalidName"<<endl;
00398 }
00399 catch(CosNaming::NamingContext::InvalidName& ex) {
00400 cerr<<"Failed to "<<action<<". NamingContext::InvalidName"<<endl;
00401 }
00402 catch(CosNaming::NamingContext::NotFound& ex) {
00403 cerr<<"Failed to "<<action<<". NamingContext::NotFound"<<endl;
00404 }
00405 catch(CosNaming::NamingContext::CannotProceed& ex) {
00406 cerr<<"Failed to "<<action<<". NamingContext::CannotProceed"<<endl;
00407 }
00408 catch(CORBA::TRANSIENT& ex) {
00409 cerr<<"Failed to "<<action<<". TRANSIENT"<<endl;
00410 }
00411 catch(CORBA::OBJECT_NOT_EXIST& ex) {
00412 cerr<<"Failed to "<<action<<". OBJECT_NOT_EXIST"<<endl;
00413 }
00414 catch(CORBA::SystemException& ex) {
00415 cerr<<"Failed to "<<action<<"."
00416 #if defined(HAVE_OMNIORB4)
00417 " "<<ex._name()<<" ("<<ex.NP_minorString()<<")"
00418 #endif
00419 <<endl;
00420 }
00421 catch(CORBA::Exception& ex) {
00422 cerr<<"Failed to "<<action<<"."
00423 #if defined(HAVE_OMNIORB4)
00424 " "<<ex._name()
00425 #endif
00426 <<endl;
00427 }
00428
00429 return 1;
00430 }
00431
00432 static void usage(int argc, char **argv)
00433 {
00434 cerr<<
00435 "\nStream events from a channel to stdout, or (-s) from stdin to a channel.\n"
00436 "syntax: "<<(argc?argv[0]:"events")<<" OPTIONS [CHANNEL_URI]\n"
00437 "\n"
00438 "CHANNEL_URI: The event channel may be specified as a URI.\n"
00439 " This may be an IOR, or a corbaloc::: or corbaname::: URI.\n"
00440 "\n"
00441 "OPTIONS: DEFAULT:\n"
00442 " -s supply mode. Read events from stdin.\n"
00443 " -n NAME channel name (if URI is not specified) [\"EventChannel\"]\n"
00444 " -h display this help text\n" << endl;
00445 }