00001
00002 #include <memory>
00003 #include <iostream>
00004
00005 #include <string.h>
00006
00007 #include "SysPlusPlus/Tools.h"
00008 #include "SysPlusPlus/GenCfg.h"
00009 #include "SysPlusPlus/syscall.h"
00010 #include "ComPlusPlus/Comm.h"
00011 #include "ComPlusPlus/File.h"
00012 #include "ComPlusPlus/Poll.h"
00013 #include "SysPlusPlus/ComException.h"
00014
00015 #include "ComPlusPlus/Socket.h"
00016 #include "ComPlusPlus/SocketUdp.h"
00017 #include "ComPlusPlus/SocketTcp.h"
00018
00019
00029 #define MAXUDP 65536
00030
00031 compp::Comm::Comm ( unsigned int readaheadbuffersize ) {
00032
00033 this->fd=-1;
00034
00035 compp::GenCfg * Cfg = (compp::GenCfg*) compp::GenCfg::Instance();
00036
00037
00038 this->ReadAhead = false;
00039
00040 this->ReadAheadBuffer = NULL;
00041 this->ReadAheadBufferTotalSize = readaheadbuffersize ;
00042 this->ReadAheadBufferSize = 0;
00043 this->ReadAheadBufferIndex = 0;
00044
00045 this->fd_attached = Cfg->GetCommFDAttached ();
00046 this->WriteTimeout = Cfg->GetCommWriteTimeout() ;
00047 this->ReadTimeout = Cfg->GetCommReadTimeout() ;
00048 this->MaxNumBytesToRead = Cfg->GetCommMaxNumBytesToRead();
00049 this->LineDelimiter = '\n';
00050
00051 IsOpen=false;
00052 FdClosed=false;
00053
00054 }
00055
00062 compp::Comm::~Comm ( ) {
00063
00064 if ( NULL != ReadAheadBuffer ) {
00065 delete ReadAheadBuffer;
00066 }
00067
00068 if ( this->fd == -1 )
00069 return ;
00070
00071 if ( FD_IsAttached() )
00072 this->Close ();
00073
00074 }
00075
00081 void compp::Comm::AttachFD() {
00082 this->fd_attached = true;
00083 }
00084
00085
00086
00087
00088 void compp::Comm::DetachFD () {
00089 this->fd_attached = false;
00090 }
00091
00096 bool compp::Comm::FD_IsAttached() {
00097 return this->fd_attached;
00098 }
00099
00104 void compp::Comm::Close ( ) {
00105
00106 if ( this->fd==-1 )
00107 return ;
00108
00109 syspp::Call::Close(this->fd);
00110
00111 IsOpen=false;
00112 this->fd=-1;
00113 this->ReadAheadBufferSize = 0;
00114 this->ReadAheadBufferIndex = 0;
00115
00116 }
00117
00122 bool compp::Comm::PollSnd ( int usecs ) {
00123
00124 if ( this->fd == -1 ) {
00125 throw syspp::ComException ( "compp::Comm::Ressource not opened");
00126 }
00127
00128 fd_set wrset, errset;
00129 struct timeval tv, *tvp;
00130
00131 FD_ZERO( & wrset );
00132 FD_ZERO( & errset );
00133
00134 if ( usecs >= 0 ) {
00135 memset (&tv, 0, sizeof(tv));
00136 tv.tv_sec = usecs / 1000000;
00137 tv.tv_usec = usecs % 1000000;
00138 tvp = &tv;
00139 } else
00140 tvp = NULL;
00141
00142 int sfd = this->fd;
00143
00144 FD_SET( sfd, &wrset);
00145 FD_SET( sfd, &errset);
00146
00147 switch ( syspp::Call::Select ( sfd + 1, NULL, &wrset, &errset, tvp ) ) {
00148 case -1: {
00149 throw syspp::ComException ( "compp::Comm::PollSnd. Select failed" );
00150 break;
00151 }
00152 case 0: {
00153 return false;
00154 }
00155 case 1: {
00156 if ( FD_ISSET( sfd, &wrset ) )
00157 return true;
00158 else {
00159 return false ;
00160 }
00161 }
00162 default: {
00163 return false ;
00164 }
00165 }
00166
00167
00168 return false;
00169 }
00170
00175 bool compp::Comm::PollRcv ( int usecs ) {
00176
00177 if ( this->fd == -1 ) {
00178 throw syspp::ComException ( "compp::Comm::Ressource not opened");
00179 }
00180
00181 fd_set rdset, errset;
00182 struct timeval tv, *tvp;
00183
00184 FD_ZERO( & rdset );
00185 FD_ZERO( & errset );
00186
00187 if ( usecs >= 0 ) {
00188 memset (&tv, 0, sizeof(tv));
00189 tv.tv_sec = usecs / 1000000;
00190 tv.tv_usec = usecs % 1000000;
00191 tvp = &tv;
00192 } else
00193 tvp = NULL;
00194
00195 int sfd = this->fd;
00196
00197 FD_SET( sfd, &rdset);
00198 FD_SET( sfd, &errset);
00199
00200
00201 switch ( syspp::Call::Select ( sfd + 1, &rdset, NULL, &errset, tvp ) ) {
00202 case -1: {
00203 throw syspp::ComException ( "compp::Comm::PollRcv. Select failed" );
00204 break;
00205 }
00206 case 0: {
00207 return false;
00208 }
00209 case 1: {
00210 if ( FD_ISSET( sfd, &rdset ) )
00211 return true;
00212 else {
00213 return false;
00214 }
00215 }
00216 default: {
00217 return false;
00218 }
00219 }
00220
00221
00222 return false;
00223 }
00224
00225
00229 bool compp::Comm::SetOptBlocking () {
00230
00231 #if 0
00232 u_long iMode = 0;
00233 syspp::Call::Ioctlsocket(this->fd, FIONBIO, &iMode);
00234 return true;
00235 #else
00236 int i;
00237
00238 i = syspp::Call::Fcntl ( this->fd, F_GETFL, 0L ) ;
00239
00240 if ( -1 == i ) {
00241 throw syspp::ComException ( "compp::Comm::SetOptBlocking (): Could not call fcntl()");
00242 }
00243
00244 i ^= O_NONBLOCK;
00245
00246 i = syspp::Call::Fcntl ( this->fd, F_SETFL, i );
00247
00248 if ( -1 == i ) {
00249 throw syspp::ComException ( "compp::Comm::SetOptBlocking Cannot Set Option Blocking");
00250 }
00251
00252 return true;
00253 #endif
00254 }
00255
00259 bool compp::Comm::SetOptNonBlocking ( ) {
00260
00261 int i;
00262
00263 i = syspp::Call::Fcntl ( this->fd, F_GETFL, 0L ) ;
00264
00265 if ( -1 == i ) {
00266 throw syspp::ComException ( "compp::Comm::SetOptNonBlocking Could not call fcntl(GETFL)");
00267 }
00268
00269 i |= O_NONBLOCK;
00270 i = syspp::Call::Fcntl ( this->fd, F_SETFL, i );
00271
00272 if ( -1 == i ) {
00273 throw syspp::ComException ( "compp::Comm::SetOptNonBlocking Could not call fcntl(SETFL)");
00274 }
00275
00276 return true;
00277 }
00278
00283 bool compp::Comm::IsEOF () const{
00284 if ( this->fd == -1 || FdClosed == true || IsOpen == false ) {
00285 return true;
00286 }
00287
00288 return false;
00289 }
00290
00291
00297 int compp::Comm::Write ( const char *buf, const int count ) {
00298
00299
00300 if ( this->fd == -1 || !IsOpen || FdClosed ) {
00301 throw syspp::ComException ( "compp::Comm::Ressource not opened");
00302 }
00303
00304 int ret;
00305
00306 ret = syspp::Call::Send ( this->fd, (void *) buf, count, 0 );
00307
00308 if ( ret == -1 && errno == EBADF )
00309 FdClosed=true;
00310
00311 return ret;
00312 }
00313
00314
00319 int compp::Comm::Write ( const std::string & buf ) {
00320
00321 if ( this->fd == -1 || !IsOpen || FdClosed ) {
00322 throw syspp::ComException ( "compp::Comm::Ressource not opened");
00323 }
00324
00325 int ret;
00326
00327 ret = Write ( (char *) buf.data(), buf.length()) ;
00328 return ret;
00329 }
00330
00331
00338 int compp::Comm::Writen ( const char *buf, int count ) {
00339
00340 if ( this->fd == -1 || !IsOpen || FdClosed ) {
00341 throw syspp::ComException ( "compp::Comm::Ressource not opened");
00342 }
00343
00344 int i= 0, len = count;
00345 int remain=0;
00346 char *c = (char*) buf;
00347 syspp::int64 from =0, to = 0, now = 0;
00348
00349 if ( len == 0 )
00350 return 0;
00351
00352 if ( WriteTimeout > 0 ) {
00353 from = syspp::Tools::Time64();
00354 now = from;
00355 to = from + WriteTimeout;
00356 }
00357
00358 for ( remain = len ; remain > 0; ) {
00359
00360 if ( PollSnd ( to - now ) == false ) {
00361 errno = EAGAIN;
00362 return -1;
00363 }
00364
00365 i = Write ( c, remain);
00366
00367 if ( i == -1 && errno == EBADF ) {
00368 FdClosed=true;
00369 }
00370
00371 if ( i <= 0 ) {
00372 if ( errno != EAGAIN )
00373 throw syspp::ComException ( "Error writing to Comm Channel" );
00374 if ( i == 0 )
00375 errno = 0;
00376 return i;
00377
00378 } else {
00379 remain -= i;
00380
00381 if ( remain == 0 )
00382 return len;
00383
00384 if ( remain > 0)
00385 c += i;
00386 }
00387
00388 if ( WriteTimeout > 0 ) {
00389
00390 now = syspp::Tools::Time64();
00391
00392 if ( now > to ) {
00393 errno = EAGAIN;
00394 return (len - remain);
00395 }
00396 }
00397 }
00398
00399 return len;
00400 }
00401
00406 int compp::Comm::Writen ( const std::string & buf ) {
00407
00408
00409
00410
00411
00412
00413
00414 if ( this->fd == -1 || !IsOpen || FdClosed ) {
00415 throw syspp::ComException ( "compp::Comm::Ressource not opened");
00416 }
00417
00418 return Writen ( buf.c_str(), buf.length() ) ;
00419 }
00420
00421
00422 int compp::Comm::operator<< ( const std::string & buf ) {
00423
00424 return Writen (buf );
00425
00426 }
00427
00433 int compp::Comm::Read ( char *buf, int count ) {
00434
00435 if ( count <= 0 )
00436 return count;
00437
00438 if ( this->fd == -1 || !IsOpen || FdClosed ) {
00439 throw syspp::ComException ( "compp::Comm::Ressource not opened");
00440 }
00441
00442 int ret;
00443
00444 if ( ReadAhead == false ) {
00445
00446 ret = syspp::Call::Recv ( this->fd, (void *) buf, count,0 );
00447 if ( ret == 0 )
00448 FdClosed=true;
00449 return ret ;
00450 }
00451
00452
00453
00454 if ( this->ReadAheadBufferSize == 0 ) {
00455 if ( (unsigned int ) count > ReadAheadBufferTotalSize ) {
00456 ret = syspp::Call::Recv ( this->fd, (void *) buf, count, 0 );
00457
00458 if ( ret == 0 )
00459 FdClosed=true;
00460 return ret;
00461 }
00462
00463
00464 this->ReadAheadBufferSize = syspp::Call::Recv( this->fd, (void *) this->ReadAheadBuffer, ReadAheadBufferTotalSize , 0);
00465
00466
00467 this->ReadAheadBufferIndex = 0;
00468 if ( this->ReadAheadBufferSize <= 0) {
00469 FdClosed=true;
00470 return this->ReadAheadBufferSize;
00471 }
00472
00473 }
00474
00475
00476 if ( ( unsigned int ) count > ( this->ReadAheadBufferSize - this->ReadAheadBufferIndex) ) {
00477 count = this->ReadAheadBufferSize- this->ReadAheadBufferIndex;
00478 }
00479
00480 memcpy ( (void*) buf, (void*) & (this->ReadAheadBuffer[this->ReadAheadBufferIndex]), count );
00481 ret = count ;
00482 this->ReadAheadBufferIndex += count;
00483 if ( this->ReadAheadBufferIndex == this->ReadAheadBufferSize ) {
00484 this->ReadAheadBufferIndex = 0;
00485 this->ReadAheadBufferSize = 0;
00486 }
00487
00488 return ret;
00489 }
00490
00491
00496 int compp::Comm::Read ( std::string &buf, int count ) {
00497
00498 if ( this->fd == -1 || !IsOpen || FdClosed ) {
00499 throw syspp::ComException ( "compp::Comm::Ressource not opened");
00500 }
00501
00502 buf = "";
00503 char c[1000] ;
00504 int ret;
00505 int i;
00506
00507 for ( i = 0; i < count; ) {
00508 if ( ( ret = Read (c, count-i )) == -1 ) {
00509 throw syspp::ComException ( " Read Error ");
00510 }
00511 if ( ret == 0 ) {
00512 FdClosed=true;
00513 return i;
00514 }
00515 buf.append ( c, ret );
00516 i += ret;
00517 if ( ret != ( count-i ))
00518 return i;
00519 }
00520 return i;
00521 }
00522
00523
00528 bool compp::Comm::ReadLn ( char * buf, int count ) {
00529
00530 if ( dynamic_cast<compp::SocketUdp*>(this) !=NULL ) {
00531 throw syspp::ComException ( "compp::Comm::ReadLn does not make sense on UDP Socket");
00532 }
00533
00534 if ( this->fd == -1 || !IsOpen || FdClosed ) {
00535 throw syspp::ComException ( "compp::Comm::Ressource not opened");
00536 }
00537
00538 int i, j;
00539 char c;
00540
00541 std::memset ( buf, (int)0, count);
00542 for ( i=0; i<count; ++i ) {
00543 if ( PollRcv ( ReadTimeout )) {
00544 j = Read( &c, 1 );
00545 if ( j == 0)
00546 return false;
00547
00548 if ( c == this->LineDelimiter ) {
00549 buf[i] = '\0';
00550 return true;
00551 }
00552
00553 buf[i] = c;
00554 } else
00555 return false;
00556 }
00557
00558 return false;
00559 }
00560
00566 bool compp::Comm::ReadLn ( std::string & s ) {
00567
00568 s ="";
00569 if ( dynamic_cast<compp::SocketUdp*>(this) !=NULL ) {
00570 throw syspp::ComException ( "compp::Comm::ReadLN does not make sense on UDP Socket");
00571 }
00572
00573 if ( this->fd == -1 || !IsOpen || FdClosed ) {
00574 throw syspp::ComException ( "compp::Comm::Ressource not opened");
00575 }
00576
00577 int i, j;
00578 char c;
00579
00580 for ( i=0; i < this->MaxNumBytesToRead; ++i ) {
00581 if ( PollRcv ( this->ReadTimeout )) {
00582 j = Read( &c, 1 );
00583 if ( j == 0)
00584 return false;
00585
00586 if ( c == this->LineDelimiter ) {
00587 return true;
00588 }
00589
00590 s += c;
00591 } else
00592 return false;
00593 }
00594 return false;
00595 }
00596
00603 int compp::Comm::Readn ( char *buf, int count ) {
00604
00605 if ( this->fd == -1 || !IsOpen || FdClosed ) {
00606 throw syspp::ComException ( "compp::Comm::Ressource not opened");
00607 }
00608
00609 int i=0, bytesdone =0, len = count, timeleft;
00610 int remain;
00611 char *c = (char*) buf;
00612 syspp::int64 from=0, to=0, now =0 ;
00613
00614
00615 if ( ReadTimeout > 0 ) {
00616 from = syspp::Tools::Time64() ;
00617 to = from + ReadTimeout;
00618 now = syspp::Tools::Time64() ;
00619 }
00620
00621
00622 for ( remain = len ; remain > 0; ) {
00623 errno = 0;
00624
00625 if ( ReadTimeout > 0 ) {
00626 timeleft = to - now;
00627
00628 if ( ( false == this->ReadAhead ) ||
00629 ( true == this->ReadAhead &&( this->ReadAheadBufferSize == 0 || this->ReadAheadBufferIndex >= this->ReadAheadBufferSize ) ) ) {
00630
00631 if ( PollRcv ( timeleft ) == false ) {
00632 errno = EAGAIN;
00633 return -1;
00634 }
00635 }
00636 }
00637
00638 i = Read ( c, remain);
00639 if ( i == 0 ) {
00640 errno = 0;
00641 if ( count > 0 )
00642 FdClosed=true;
00643 return bytesdone;
00644 }
00645 if ( i < 0 ) {
00646 if ( errno != EAGAIN ) {
00647 throw syspp::ComException ( "Read Error ");
00648 } else {
00649 errno = 0;
00650 }
00651 } else {
00652 remain -= i;
00653 if ( remain == 0 ) {
00654 errno = 0;
00655 return len;
00656 }
00657
00658 if ( remain > 0) {
00659 c += i;
00660 bytesdone+=i;
00661 }
00662 }
00663
00664 if ( ReadTimeout > 0 ) {
00665
00666 now = syspp::Tools::Time64();
00667 if ( now > to) {
00668 errno = EAGAIN;
00669 return (len - remain);
00670 }
00671 }
00672 }
00673
00674 return len;
00675 }
00676
00685 int compp::Comm::Readn ( std::string &buf, int count ) {
00686
00687 if ( this->fd == -1 || !IsOpen || FdClosed ) {
00688 throw syspp::ComException ( "compp::Comm::Ressource not opened");
00689 }
00690
00691 int i;
00692 std::auto_ptr<char> c (new char[count+1]);
00693
00694 try {
00695 i = this->Readn ( (char*)c.get(), count ) ;
00696 buf= "";
00697 buf.append ( c.get(), i );
00698 } catch ( syspp::ComException e) {
00699 throw syspp::ComException( e.what());
00700 }
00701 return i;
00702
00703 }
00704
00710 void compp::Comm::SetMaxNumBytesToRead ( int num ) {
00711 MaxNumBytesToRead = num;
00712 }
00713
00717 void compp::Comm::SetWriteTimeout( int tmout ) {
00718 WriteTimeout = tmout ;
00719 }
00723 int compp::Comm::GetWriteTimeout( ) const {
00724 return WriteTimeout ;
00725 }
00729 void compp::Comm::SetReadTimeout( int tmout ) {
00730 ReadTimeout = tmout ;
00731 }
00735 int compp::Comm::GetReadTimeout() const {
00736 return ReadTimeout ;
00737 }
00738
00742 int compp::Comm::GetFd () const {
00743 return fd;
00744 }
00745
00749 void compp::Comm::SetLineDelimiter ( char c ) {
00750 this->LineDelimiter = c;
00751 }
00755 void compp::Comm::SetReadAhead ( bool yn ) {
00756
00757 if ( dynamic_cast<compp::SocketUdp*>(this) !=NULL ) {
00758 this->ReadAhead = false;
00759 return;
00760 }
00761
00762 if ( true == this->ReadAhead && NULL != ReadAheadBuffer) {
00763 delete [] ReadAheadBuffer;
00764 ReadAheadBuffer = NULL;
00765 }
00766
00767 this->ReadAhead = yn;
00768
00769 if ( true == ReadAhead ) {
00770 if ( ReadAheadBufferTotalSize > 0 ) {
00771 ReadAheadBuffer = new char [ReadAheadBufferTotalSize];
00772 } else {
00773 this->ReadAhead = false;
00774 ReadAheadBuffer = NULL;
00775 }
00776 } else {
00777 if ( NULL != ReadAheadBuffer ) {
00778 delete ReadAheadBuffer;
00779 }
00780 }
00781 }