00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070
00071
00072
00073
00074
00075
00076
00077
00078
00079
00080
00081
00082
00083
00084
00085
00086
00087
00088
00089
00090
00091
00092
00093
00094
00095
00096
00097
00098
00099
00100
00101
00102
00103
00104
00105
00106
00107
00108
00109
00110
00111
00112
00113
00114
00115
00116
00117
00118
00119
00120
00121
00122
00123
00124
00125
00126
00127
00128
00129
00130
00131
00132
00133
00134
00135
00136
00137
00138
00139
00140
00141
00142
00143
00144
00145
00146
00147
00148
00149
00150 #include "omniEventsLog.h"
00151
00152 #ifdef HAVE_CONFIG_H
00153 # include "config.h"
00154 #endif
00155
00156 #include <stdio.h>
00157
00158 #ifdef HAVE_STDLIB_H
00159 # include <stdlib.h>
00160 #endif
00161
00162 #ifdef HAVE_SYS_TYPES_H
00163 # include <sys/types.h>
00164 #endif
00165
00166 #ifdef HAVE_SYS_STAT_H
00167 # include <sys/stat.h>
00168 #endif
00169
00170 #ifdef HAVE_FCNTL_H
00171 # include <fcntl.h>
00172 #endif
00173
00174 #if defined(__VMS) && __CRTL_VER < 70000000
00175 # include <omniVMS/unlink.hxx>
00176 #endif
00177
00178 #ifdef __WIN32__
00179 # include <io.h>
00180 # include <winbase.h>
00181 # define stat(x,y) _stat(x,y)
00182 # define unlink(x) _unlink(x)
00183 # define STRUCT_STAT struct _stat
00184 #else
00185 # define STRUCT_STAT struct stat
00186 #endif // __WIN32__
00187
00188 #ifdef HAVE_UNISTD_H
00189 # include <unistd.h>
00190 #endif
00191
00192 #ifdef HAVE_LIBC_H
00193 # include <libc.h>
00194 #endif
00195
00196 #ifdef HAVE_SYS_PARAM_H
00197 # include <sys/param.h>
00198 #endif
00199
00200 #include <errno.h>
00201 #include <time.h>
00202 #include <assert.h>
00203 #include "gethostname.h"
00204
00205 #include "EventChannelFactory.h"
00206 #include "Orb.h"
00207 #include "defaults.h"
00208
00209
00210
00211
00212
00213 #if defined(HAVE_FSTREAM_OPEN)
00214 # define FLAG_TRUNCATE ios::trunc
00215 # define FLAG_APPEND ios::app
00216 # define FLAG_SYNC 0
00217 #elif defined(HAVE_FSTREAM_ATTACH)
00218 # if defined(__WIN32__)
00219 # define FLAG_SYNC 0
00220 # elif defined(O_SYNC)
00221 # define FLAG_SYNC O_SYNC
00222 # else
00223 # define FLAG_SYNC O_FSYNC // FreeBSD 3.2 does not have O_SYNC???
00224 # endif
00225 # define FLAG_TRUNCATE O_CREAT|O_TRUNC
00226 # define FLAG_APPEND O_APPEND
00227 #else
00228 # error "Can't open a file without ofstream::open() or ofstream::attach()"
00229 #endif
00230
00231
00232
00233
00234
00235 #ifdef __VMS
00236 # define VMS_SEMICOLON ";"
00237 #else
00238 # define VMS_SEMICOLON
00239 #endif
00240
00241 extern int yyparse();
00242 extern int yydebug;
00243 extern FILE *yyin;
00244
00245 namespace OmniEvents {
00246
00252 class timestamp
00253 {
00254 char str[29];
00255 public:
00256 timestamp(void)
00257 {
00258 str[0] = '[';
00259 str[1] = str[28] = '\0';
00260 }
00261 const char* t(void)
00262 {
00263 time_t t =time(NULL);
00264 char* p =ctime(&t);
00265 if(strncmp(p, &str[1], 24) == 0)
00266 return "";
00267 strncpy(&str[1], p, 24);
00268 str[25] = ']';
00269 str[26] = ' ';
00270 str[27] = ' ';
00271 return str;
00272 }
00273 };
00274
00275 timestamp ts;
00276
00277
00278
00279
00280
00281 omniEventsLog *omniEventsLog::theLog = NULL;
00282
00283 omniEventsLog::omniEventsLog(const char* logdir) :
00284 _logstream(),
00285 _activeFilename(NULL),
00286 _backupFilename(NULL),
00287 _checkpointFilename(NULL),
00288 _workerThread(NULL),
00289 _factory(NULL),
00290 _checkpointNeeded(true),
00291 _lock()
00292 {
00293 omniEventsLog::theLog = this;
00294 initializeFileNames(logdir);
00295 }
00296
00297
00298 omniEventsLog::~omniEventsLog()
00299 {
00300 DB(20, "omniEventsLog::~omniEventsLog()");
00301
00302
00303
00304
00305
00306
00307
00308 if(NULL != _factory)
00309 {
00310 _factory->_remove_ref();
00311 _factory = NULL;
00312 }
00313 omniEventsLog::theLog = NULL;
00314 }
00315
00316
00317 bool omniEventsLog::fileExists(const char* filename) const
00318 {
00319 STRUCT_STAT sb;
00320 return(::stat(filename,&sb) == 0);
00321 }
00322
00323
00324 PersistNode* omniEventsLog::bootstrap(int port, const char* endPointNoListen)
00325 {
00326
00327
00328 PersistNode* initialState=new PersistNode();
00329 PersistNode* ecf =initialState->addnode("ecf");
00330 ecf->addattr("port",port);
00331 if(endPointNoListen && endPointNoListen[0])
00332 ecf->addattr(string("endPointNoListen=")+endPointNoListen);
00333 return initialState;
00334 }
00335
00336
00337 PersistNode* omniEventsLog::parse()
00338 {
00339
00340
00341 ifstream persiststream(_activeFilename);
00342 if(!persiststream)
00343 {
00344 cerr << "Error: cannot read database file '"
00345 << _activeFilename << "'." << endl;
00346 if( fileExists(_backupFilename) )
00347 {
00348 cerr <<
00349 " Backup file '" << _backupFilename << "' exists.\n"
00350 " Either rename it to '" << _activeFilename << "' to\n"
00351 " to recover the server's state, or delete it to create a new\n"
00352 " database file." << endl;
00353 }
00354 exit(1);
00355 }
00356 PersistNode* initialState=new PersistNode(persiststream);
00357 persiststream.close();
00358
00359
00360
00361 const char* errorStr =NULL;
00362 PersistNode* ecf=initialState->child("ecf");
00363 if(!ecf)
00364 errorStr="Can't find EventChannelFactory.";
00365 else if(ecf->attrLong("port",-1)<=0)
00366 errorStr="EventChannelFactory is not assigned a valid port.";
00367
00368 if(errorStr)
00369 {
00370 cerr<<"Error parsing database '"<<_activeFilename<<"'.\n"
00371 <<errorStr<<" Try deleting the file (and any backup)."<<endl;
00372 exit(1);
00373 }
00374
00375 return initialState;
00376 }
00377
00378
00379 void omniEventsLog::incarnateFactory(PersistNode* initialState)
00380 {
00381 assert(initialState!=NULL);
00382
00383
00384
00385 try
00386 {
00387 openOfstream(_logstream,_activeFilename,FLAG_APPEND);
00388 }
00389 catch (IOError& ex)
00390 {
00391 cerr << "Error: cannot "
00392 << (fileExists(_activeFilename)?"write to":"create new")
00393 << " database file '" << _activeFilename
00394 << "': " << strerror(errno) << endl;
00395 cerr << "\nUse option '-l' or set the environment variable "
00396 << OMNIEVENTS_LOGDIR_ENV_VAR
00397 << "\nto specify the directory where the files are kept.\n"
00398 << endl;
00399 _logstream.close();
00400 unlink(_activeFilename);
00401 exit(1);
00402 }
00403
00404
00405
00406 PersistNode* ecf=initialState->child("ecf");
00407 assert(ecf!=NULL);
00408 _factory =new EventChannelFactory_i(*ecf);
00409 CORBA::Object_var obj;
00410 assert(!CORBA::is_nil(obj = _factory->_this()));
00411 }
00412
00413
00414 void omniEventsLog::runWorker()
00415 {
00416 assert(_factory!=NULL);
00417
00418 _workerThread=new omniEventsLogWorker(
00419 this,
00420 &omniEventsLog::checkpoint,
00421 omni_thread::PRIORITY_NORMAL
00422 );
00423 }
00424
00425
00426 void omniEventsLog::output(ostream& os)
00427 {
00428 _factory->output(os);
00429 os<<endl;
00430 }
00431
00432
00433 void omniEventsLog::checkpoint(void)
00434 {
00435 int idle_time_btw_chkpt;
00436 static int firstCheckPoint = 1;
00437 char *itbc = getenv("OMNIEVENTS_ITBC");
00438 if (itbc == NULL || sscanf(itbc,"%d",&idle_time_btw_chkpt) != 1)
00439 {
00440 idle_time_btw_chkpt=OMNIEVENTS_LOG_CHECKPOINT_PERIOD;
00441 }
00442
00443 omni_mutex mutex;
00444 omni_condition cond(&mutex);
00445
00446 mutex.lock();
00447 while (1) {
00448
00449
00450
00451
00452
00453 if (! firstCheckPoint)
00454 {
00455 unsigned long s, n;
00456 omni_thread::get_time(&s, &n, idle_time_btw_chkpt);
00457 cond.timedwait(s,n);
00458
00459 _lock.lock();
00460 if(!_checkpointNeeded)
00461 {
00462 _lock.unlock();
00463 continue;
00464 }
00465 }
00466 else
00467 {
00468 _lock.lock();
00469 firstCheckPoint = 0;
00470 }
00471
00472 DB(1,ts.t() << "Checkpointing Phase 1: Prepare.")
00473
00474 ofstream ckpf;
00475 int fd = -1;
00476
00477 try
00478 {
00479 try
00480 {
00481 openOfstream(ckpf,_checkpointFilename,FLAG_TRUNCATE|FLAG_SYNC,&fd);
00482 }
00483 catch(IOError& ex)
00484 {
00485 DB(0,ts.t() << "Error: cannot open checkpoint file '"
00486 << _checkpointFilename << "' for writing.")
00487 throw;
00488 }
00489
00490 output(ckpf);
00491
00492 ckpf.close();
00493 if(!ckpf)
00494 throw IOError();
00495
00496
00497 #if defined(__sunos__) && defined(__SUNPRO_CC) && __SUNPRO_CC < 0x500
00498 if(close(fd) < 0)
00499 throw IOError();
00500 #endif
00501
00502 }
00503 catch(IOError& ex)
00504 {
00505 DB(0,ts.t()<<"I/O error writing checkpoint file: "<<strerror(errno)
00506 <<"\nAbandoning checkpoint")
00507 ckpf.close();
00508
00509 #if defined(__sunos__) && defined(__SUNPRO_CC) && __SUNPRO_CC < 0x500
00510 close(fd);
00511 #endif
00512 unlink(_checkpointFilename);
00513 _lock.unlock();
00514 continue;
00515 }
00516
00517
00518
00519
00520
00521 DB(1,ts.t() << "Checkpointing Phase 2: Commit.")
00522
00523
00524 #if defined(__sunos__) && defined(__SUNPRO_CC) && __SUNPRO_CC < 0x500
00525 close(_logstream.rdbuf()->fd());
00526 #endif
00527
00528 _logstream.close();
00529
00530 unlink(_backupFilename);
00531
00532 #if defined(__WIN32__)
00533 if(rename(_activeFilename, _backupFilename) != 0)
00534 #elif defined(__VMS)
00535 if(rename(_activeFilename, _backupFilename) < 0)
00536 #else
00537 if(link(_activeFilename,_backupFilename) < 0)
00538 #endif
00539 {
00540
00541 DB(0,ts.t() << "Error: failed to link backup file '"
00542 << _backupFilename << "' to old log file '"
00543 << _activeFilename << "'.")
00544 exit(1);
00545 }
00546
00547 #if !defined( __VMS) && !defined(__WIN32__)
00548 if(unlink(_activeFilename) < 0)
00549 {
00550
00551 DB(0,ts.t() << "Error: failed to unlink old log file '"
00552 << _activeFilename << "': " << strerror(errno))
00553 exit(1);
00554 }
00555 #endif
00556
00557 #if defined(__WIN32__)
00558 if(rename(_checkpointFilename,_activeFilename) != 0)
00559 #elif defined(__VMS)
00560 if(rename(_checkpointFilename,_activeFilename) < 0)
00561 #else
00562 if(link(_checkpointFilename,_activeFilename) < 0)
00563 #endif
00564 {
00565
00566 DB(0,ts.t() << "Error: failed to link log file '" << _activeFilename
00567 << "' to checkpoint file '" << _checkpointFilename << "'.")
00568 exit(1);
00569 }
00570
00571 #if !defined( __VMS) && !defined(__WIN32__)
00572 if (unlink(_checkpointFilename) < 0)
00573 {
00574
00575 DB(0,ts.t() << "Error: failed to unlink checkpoint file '"
00576 << _checkpointFilename << "'.")
00577 exit(1);
00578 }
00579 #endif
00580
00581 try
00582 {
00583 openOfstream(_logstream,_activeFilename,FLAG_APPEND|FLAG_SYNC,&fd);
00584 }
00585 catch (IOError& ex)
00586 {
00587 DB(0,ts.t() << "Error: cannot open new log file '" << _activeFilename
00588 << "' for writing.")
00589 exit(1);
00590 }
00591
00592 DB(1,ts.t() << "Checkpointing completed.")
00593
00594 _checkpointNeeded=false;
00595 _lock.unlock();
00596 }
00597 mutex.unlock();
00598 }
00599
00600
00611 void omniEventsLog::initializeFileNames(const char* logdir)
00612 {
00613 if(!logdir)
00614 logdir=getenv(OMNIEVENTS_LOGDIR_ENV_VAR);
00615 if(!logdir)
00616 logdir=OMNIEVENTS_LOG_DEFAULT_LOCATION;
00617
00618 const char* logname ="omnievents-";
00619 char hostname[MAXHOSTNAMELEN];
00620 if (0!=gethostname(hostname,MAXHOSTNAMELEN))
00621 {
00622 cerr << "Error: cannot get the name of this host." << endl;
00623 exit(1);
00624 }
00625 const char* sep ="";
00626
00627 #if defined(__WIN32__)
00628 sep="\\";
00629 #elif defined(__VMS)
00630 char last( logdir[strlen(logdir)-1] );
00631 if (last != ':' && last != ']')
00632 {
00633 cerr << "Error: " << OMNIEVENTS_LOGDIR_ENV_VAR << " (" << logdir
00634 << ") is not a directory name." << endl;
00635 exit(1);
00636 }
00637 #else // Unix
00638 if (logdir[0] != '/')
00639 {
00640 cerr << "Error: " << OMNIEVENTS_LOGDIR_ENV_VAR << " (" << logdir
00641 << ") is not an absolute path name." << endl;
00642 exit(1);
00643 }
00644 if (logdir[strlen(logdir)-1] != '/')
00645 sep="/";
00646 #endif
00647
00648
00649
00650
00651 setFilename(_activeFilename,logdir,sep,logname,hostname,".log" VMS_SEMICOLON);
00652 setFilename(_backupFilename,logdir,sep,logname,hostname,".bak" VMS_SEMICOLON);
00653 setFilename(
00654 _checkpointFilename,logdir,sep,logname,hostname,".ckp" VMS_SEMICOLON);
00655 }
00656
00657
00661 void omniEventsLog::setFilename(
00662 char*& filename, const char* logdir, const char* sep,
00663 const char* logname, const char* hostname, const char* ext)
00664 {
00665 size_t len=1+
00666 strlen(logdir)+strlen(sep)+strlen(logname)+strlen(hostname)+strlen(ext);
00667 filename=new char[len];
00668 sprintf(filename,"%s%s%s%s%s",logdir,sep,logname,hostname,ext);
00669 }
00670
00671
00685 void omniEventsLog::openOfstream(
00686 ofstream& s, const char* filename, int flags, int* fd)
00687 {
00688 #if defined(HAVE_FSTREAM_OPEN)
00689 # ifdef HAVE_STD_IOSTREAM
00690 ios::openmode openmodeflags =ios::out|ios::openmode(flags);
00691 # else
00692 int openmodeflags =ios::out|flags;
00693 # endif
00694
00695 # ifdef FSTREAM_OPEN_PROT
00696 s.open(filename,openmodeflags,0644);
00697 # else
00698 s.open(filename,openmodeflags);
00699 # endif
00700 if (!s)
00701 throw IOError();
00702
00703 #elif defined(HAVE_FSTREAM_ATTACH)
00704 # ifdef __WIN32__
00705 int localFd = _open(filename, O_WRONLY | flags, _S_IWRITE);
00706 # else
00707 int localFd = open(filename, O_WRONLY | flags, 0644);
00708 # endif
00709 if (localFd < 0)
00710 throw IOError();
00711 if(fd)
00712 (*fd)=localFd;
00713 s.attach(localFd);
00714 #endif
00715 }
00716
00717
00718
00719
00720
00721 omniEventsLogWorker::omniEventsLogWorker(
00722 omniEventsLog* object,
00723 Method method,
00724 priority_t priority
00725 ):omni_thread(NULL,priority)
00726 {
00727 DB(15, "omniEventsLogWorker::omniEventsLogWorker()");
00728
00729 _method=method;
00730 _object=object;
00731
00732 start_undetached();
00733 }
00734
00735
00736 void* omniEventsLogWorker::run_undetached(void *)
00737 {
00738 try {
00739 DB(15, "omniEventsLogWorker : run_undetached Start");
00740 (_object->*_method)();
00741 DB(15, "omniEventsLogWorker : run_undetached End");
00742 }
00743 catch (CORBA::SystemException& ex) {
00744 DB(0,"omniEventsLogWorker killed by CORBA system exception"
00745 IF_OMNIORB4(": "<<ex._name()<<" ("<<NP_MINORSTRING(ex)<<")") ".")
00746 }
00747 catch (CORBA::Exception& ex) {
00748 DB(0,"omniEventsLogWorker killed by CORBA exception"
00749 IF_OMNIORB4(": "<<ex._name()<<) ".")
00750 }
00751 catch(...) {
00752 DB(0,"omniEventsLogWorker killed by unknown exception.")
00753 }
00754 return NULL;
00755 }
00756
00757 omniEventsLogWorker::~omniEventsLogWorker()
00758 {
00759 DB(20, "omniEventsLogWorker::~omniEventsLogWorker()");
00760 }
00761
00762
00763 };