00001
00002
00003
00004
00005
00006
00007
00008
00009 #include "std/config.H"
00010
00011
00012 #ifdef macosx
00013 #include <sys/ioctl.h>
00014 #endif
00015 #include <cstdlib>
00016 #include <cstdio>
00017 #include <cstring>
00018 #include <cassert>
00019 #include <cctype>
00020 #include <cerrno>
00021
00022 #include "std/fstream.H"
00023
00024 #ifdef WIN32
00025 #define signal(x,y)
00026 #else
00027 #include <csignal>
00028 #endif
00029
00030 #include "std/support.H"
00031 #include "std/time.H"
00032 #include "net.H"
00033 #include "pack.H"
00034
00035
00036 #include <sys/stat.h>
00037 #include <fcntl.h>
00038
00039
00040
00041 #if defined(linux) || defined(_AIX)
00042 #include <sys/ioctl.h>
00043 #elif !defined(WIN32)
00044 #include <sys/filio.h>
00045 #else
00046
00047 #endif
00048
00049
00050
00051 #ifndef WIN32
00052 #include <netinet/tcp.h>
00053 #endif
00054
00055 struct sockaddr_in;
00056 struct hostent;
00057
00058 #ifdef sun
00059 extern "C" int gethostname(char *, int);
00060 #endif
00061
00062
00063 #if !defined(_AIX) && !defined(_SOCKLEN_T) && !defined(linux)
00064 typedef int socklen_t;
00065 #endif
00066
00067 #ifdef WIN32
00068
00069
00070
00071
00072
00073
00074 ssize_t
00075 write_win32(int fildes, const void *buf, size_t nbyte)
00076 {
00077 DWORD val=0;
00078 if (GetFileType((HANDLE)fildes) == FILE_TYPE_DISK)
00079 {
00080 if (!WriteFile((HANDLE)fildes, buf, nbyte, &val, NULL))
00081 {
00082
00083
00084 LPVOID lpMsgBuf;
00085 FormatMessage(
00086 FORMAT_MESSAGE_ALLOCATE_BUFFER |
00087 FORMAT_MESSAGE_FROM_SYSTEM |
00088 FORMAT_MESSAGE_IGNORE_INSERTS,
00089 NULL,
00090 GetLastError(),
00091 MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
00092 (LPTSTR) &lpMsgBuf,
00093 0,
00094 NULL
00095 );
00096
00097 cerr << "write_win32() - Error! Message: " << (LPCTSTR)lpMsgBuf << "\n";
00098
00099 LocalFree( lpMsgBuf );
00100 }
00101 }
00102 else
00103 {
00104 OVERLAPPED overlap;
00105 overlap.hEvent = (HANDLE)NULL;
00106 if (!WriteFile((HANDLE)fildes, buf, nbyte, &val, &overlap))
00107 {
00108 if (!GetOverlappedResult((HANDLE)fildes, &overlap, &val, TRUE))
00109 {
00110 LPVOID lpMsgBuf;
00111 FormatMessage(
00112 FORMAT_MESSAGE_ALLOCATE_BUFFER |
00113 FORMAT_MESSAGE_FROM_SYSTEM |
00114 FORMAT_MESSAGE_IGNORE_INSERTS,
00115 NULL,
00116 GetLastError(),
00117 MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
00118 (LPTSTR) &lpMsgBuf,
00119 0,
00120 NULL
00121 );
00122
00123 cerr << "write_win32() - Error! Message: " << (LPCTSTR)lpMsgBuf << "\n";
00124
00125 LocalFree( lpMsgBuf );
00126
00127 }
00128 }
00129 }
00130 return val;
00131 }
00132
00133 ssize_t
00134 read_win32(int fildes, void *buf, size_t nbyte)
00135 {
00136 DWORD val=0;
00137
00138 DWORD filetype = GetFileType((HANDLE) fildes);
00139
00140 if (fildes == fileno(stdin))
00141 {
00142 ReadConsole(GetStdHandle(STD_INPUT_HANDLE), buf, nbyte, &val, NULL);
00143 }
00144 else if (filetype == FILE_TYPE_DISK)
00145 {
00146 ReadFile((HANDLE)fildes, buf, nbyte, &val, NULL);
00147 }
00148 else if (filetype == FILE_TYPE_CHAR)
00149 {
00150 ReadConsole(GetStdHandle(STD_INPUT_HANDLE), buf, nbyte, &val, NULL);
00151 }
00152 else
00153 {
00154 OVERLAPPED overlap;
00155 overlap.hEvent = (HANDLE)NULL;
00156 if (ReadFile((HANDLE) fildes, buf, nbyte, &val, &overlap)==FALSE)
00157 {
00158 if (!GetOverlappedResult((HANDLE) fildes, &overlap, &val, TRUE))
00159 {
00160 const DWORD error = GetLastError();
00161 cerr << "read_win32, error is: " << error << " - "
00162 << " - " << fildes << endl;
00163 }
00164 }
00165 }
00166 return val;
00167 }
00168 #endif
00169
00170 int
00171 num_bytes_to_read(int fildes)
00172 {
00173 #ifdef WIN32
00174
00175
00176 unsigned long winnum;
00177 int retval = ioctlsocket(fildes, FIONREAD, &winnum);
00178 if (retval)
00179 {
00180 const int error = WSAGetLastError();
00181 if (error == WSAENOTSOCK)
00182 {
00183 HANDLE hndl = (HANDLE) _get_osfhandle(fildes);
00184 DWORD filetype = GetFileType(hndl);
00185 if (filetype == FILE_TYPE_CHAR)
00186 {
00187 DWORD numevents;
00188 if (GetNumberOfConsoleInputEvents(hndl, &numevents))
00189 {
00190 INPUT_RECORD *irec = new INPUT_RECORD[numevents];
00191 DWORD numread;
00192 PeekConsoleInput(hndl, irec, numevents, &numread);
00193 winnum = 0;
00194 static bool PRINT_ERRS = Config::get_var_bool("PRINT_ERRS",false,true);
00195 if (PRINT_ERRS) cerr << "num_bytes_to_read - # Events=" << numevents << "\n";
00196 for (int i = 0; i < (int)numread; i++)
00197 {
00198 if (PRINT_ERRS)
00199 {
00200 if (irec[i].EventType == KEY_EVENT)
00201 {
00202 cerr << "num_bytes_to_read - KEY_EVENT\n";
00203 cerr << " " <<
00204 "bKeyDown=";
00205 if (irec[i].Event.KeyEvent.bKeyDown) cerr << "DOWN\n";
00206 else cerr << "UP\n";
00207 cerr << " " <<
00208 "wRepeatCount=" <<
00209 (irec[i].Event.KeyEvent.wRepeatCount) <<
00210 "\n";
00211 cerr << " " <<
00212 "wVirtualKeyCode=" <<
00213 (irec[i].Event.KeyEvent.wVirtualKeyCode) <<
00214 "\n";
00215 cerr << " " <<
00216 "wVirtualScanCode=" <<
00217 (irec[i].Event.KeyEvent.wVirtualScanCode) <<
00218 "\n";
00219 cerr << " " <<
00220 "uChar=" <<
00221 (irec[i].Event.KeyEvent.uChar.AsciiChar) <<
00222 "\n";
00223 cerr << " " <<
00224 "(int)uChar=" <<
00225 int((irec[i].Event.KeyEvent.uChar.AsciiChar)) <<
00226 "\n";
00227 cerr << " " <<
00228 "dwControlKeyState=" <<
00229 (irec[i].Event.KeyEvent.dwControlKeyState) <<
00230 "\n";
00231 }
00232 else if (irec[i].EventType == MOUSE_EVENT)
00233 cerr << "num_bytes_to_read - MOUSE_EVENT\n";
00234 else if (irec[i].EventType == WINDOW_BUFFER_SIZE_EVENT)
00235 cerr << "num_bytes_to_read - WINDOW_BUFFER_SIZE_EVENT\n";
00236 else if (irec[i].EventType == MENU_EVENT)
00237 cerr << "num_bytes_to_read - MENU_EVENT\n";
00238 else if (irec[i].EventType == FOCUS_EVENT)
00239 cerr << "num_bytes_to_read - FOCUS_EVENT\n";
00240 else
00241 cerr << "num_bytes_to_read - Unknown event!!!!\n";
00242 }
00243 if (irec[i].EventType == KEY_EVENT &&
00244
00245 irec[i].Event.KeyEvent.bKeyDown &&
00246
00247
00248
00249
00250
00251
00252 int((irec[i].Event.KeyEvent.uChar.AsciiChar)) &&
00253 (int((irec[i].Event.KeyEvent.uChar.AsciiChar)) != 27))
00254 {
00255 winnum += irec[i].Event.KeyEvent.wRepeatCount;
00256 }
00257 }
00258 delete [] irec;
00259 if (PRINT_ERRS&&(winnum))
00260 cerr << "num_bytes_to_read - Num=" << winnum << endl;
00261 return winnum;
00262 }
00263 return 0;
00264 }
00265
00266
00267 cerr << "Returning 1" << endl;
00268 return 1;
00269 }
00270 else
00271 {
00272 cerr <<"::num_bytes_to_read() - ioctlsocket() returned "
00273 << retval << ", error:" << error << endl;
00274 WSASetLastError(0);
00275 return -1;
00276 }
00277 }
00278 return (int) winnum;
00279 #else
00280 int num = 0;
00281 int retval = ioctl(fildes, FIONREAD, &num);
00282 if (retval < 0) {
00283 return -1;
00284 }
00285 return num;
00286 #endif
00287 }
00288
00289
00290 static char *
00291 get_host_print_name(
00292 int port,
00293 const char *hname = 0
00294 )
00295 {
00296 static char nbuff[255];
00297 static char buff[255];
00298 if (!hname) {
00299 gethostname(buff, 255);
00300 hname = buff;
00301 }
00302
00303 struct hostent *entry = gethostbyname(hname);
00304 sprintf(nbuff, "%s(%d)", entry ? entry->h_name : hname, port);
00305
00306 return nbuff;
00307 }
00308
00309 static int NET_exception = 0;
00310
00311
00312 extern "C" {
00313 static
00314 void
00315 net_exception_handler(int)
00316 {
00317 NET_exception = 1;
00318 signal(SIGPIPE, net_exception_handler);
00319 }
00320 }
00321
00322
00323
00324
00325
00326
00327
00328
00329
00330
00331
00332
00333
00334 #define CNetHost const NetHost
00335 class NetHost {
00336 protected:
00337 unsigned long addr_;
00338 str_ptr name_;
00339 int port_;
00340
00341 public:
00342
00343 NetHost (const char *hostname);
00344 NetHost (struct sockaddr_in *addr);
00345 NetHost (CNetHost &rhs) : addr_(rhs.addr_),name_(rhs.name_),port_(rhs.port_) { }
00346 NetHost& operator= (CNetHost &rhs) { addr_ = rhs.addr_; name_ = rhs.name_;
00347 port_ = rhs.port_; return *this; }
00348
00349 int port(void) const { return port_; }
00350 str_ptr name(void) const { return name_; }
00351 void get_address(struct sockaddr_in *addr) const {
00352
00353
00354 memset(addr, 0, sizeof(sockaddr_in));
00355 addr->sin_family = AF_INET;
00356 addr->sin_addr.s_addr = addr_;
00357 }
00358 };
00359
00360 NetHost::NetHost(
00361 const char *hostname
00362 )
00363 {
00364 struct hostent *entry;
00365
00366 assert(hostname != NULL);
00367
00368 if (isdigit(hostname[0])) {
00369 unsigned long netAddr = inet_addr(hostname);
00370 entry = gethostbyaddr((const char*) &netAddr, sizeof(netAddr), AF_INET);
00371 if (entry) {
00372 name_ = str_ptr(entry->h_name);
00373 } else name_ = hostname;
00374 addr_ = netAddr;
00375 port_ = -1;
00376 } else {
00377 entry = gethostbyname(hostname);
00378
00379 if (entry == NULL) {
00380 cerr << "NetHost: Could not resolve hostname!" << endl;
00381 exit(1);
00382 }
00383
00384 name_ = str_ptr(entry->h_name);
00385 addr_ = *(unsigned long*)(entry->h_addr_list[0]);
00386 port_ = -1;
00387 }
00388
00389 }
00390
00391 NetHost::NetHost(
00392 struct sockaddr_in *addr
00393 )
00394 {
00395 assert(addr != NULL);
00396
00397 struct hostent *entry;
00398
00399 entry = gethostbyaddr((const char*) &addr->sin_addr,
00400 sizeof(addr->sin_addr),
00401 addr->sin_family);
00402
00403 if (entry == NULL) {
00404 perror("NetHost(sockaddr): gethostbyaddr");
00405 exit(1);
00406 }
00407
00408 port_ = ntohs(addr->sin_port);
00409 name_ = str_ptr(entry->h_name);
00410 addr_ = *(unsigned long*)(entry->h_addr_list[0]);
00411 }
00412
00413
00414
00415
00416 void
00417 NetStream::set_port(
00418 int p
00419 )
00420 {
00421 port_ = p;
00422 print_name_ = get_host_print_name(port(), **name());
00423 }
00424
00425 NetStream::NetStream(
00426 int port,
00427 const char *name
00428 ) : name_(name), port_(port), msgSize_(-1),
00429 processing_(0), print_name_(get_host_print_name(port,name))
00430 {
00431 NetHost host(name);
00432 struct sockaddr_in serv_addr;
00433
00434 host.get_address(&serv_addr);
00435 serv_addr.sin_port = htons((short) port);
00436
00437 if ((_fd = socket(PF_INET, SOCK_STREAM, 0)) < 0)
00438 _die("socket");
00439
00440 else if (connect(_fd, (struct sockaddr*) &serv_addr, sizeof(serv_addr)) < 0)
00441
00442 _die("connect");
00443
00444 else
00445 no_linger(_fd);
00446
00447 block(STD_FALSE);
00448
00449 if (_fd != -1) {
00450 set_blocking(false);
00451 no_tcp_delay();
00452 }
00453
00454 }
00455
00456 NetStream::NetStream(
00457 int fd,
00458 struct sockaddr_in *client,
00459 bool should_block
00460 ): name_(""), port_(-1), msgSize_(-1), processing_(0)
00461 {
00462 _fd = fd;
00463 if (!should_block) set_blocking(false);
00464
00465 if (client == 0)
00466 {
00467 name_ = str_ptr(fd);
00468 port_ = 0;
00469 print_name_ = str_ptr("file descriptor ") + name_;
00470
00471 }
00472 else
00473 {
00474 NetHost host(client);
00475 name_ = host.name();
00476 port_ = -1;
00477 print_name_ = str_ptr(get_host_print_name(port_, **name_));
00478
00479 no_linger(_fd);
00480 if (_fd != -1)
00481 {
00482
00483 no_tcp_delay();
00484 }
00485 }
00486
00487 block(STD_FALSE);
00488 }
00489
00490 NetStream::NetStream(
00491 Cstr_ptr &name,
00492 NetStream::StreamFlags flags) :
00493 name_(name),
00494 port_(0),
00495 msgSize_(-1),
00496 processing_(0),
00497 print_name_(name)
00498 {
00499 #ifdef WIN32
00500 int readable = flags & StreamFlags::read;
00501 int writeable = flags & StreamFlags::write;
00502 int do_ascii = flags & StreamFlags::ascii;
00503 #else
00504 int readable = flags & read;
00505 int writeable = flags & write;
00506 int do_ascii = flags & ascii;
00507 #endif
00508
00509 #if (defined(__GNUC__) && (__GNUC__ > 2 || (__GNUC__ == 2 && __GNUC_MINOR__ >= 97)))
00510 if (do_ascii) {
00511 fstream* fs = 0;
00512 if (readable && writeable)
00513 {
00514
00515
00516
00517 cerr << "NetStream::NetStream() - Stream is readable AND writeable." << endl
00518 << "Warning: truncating stream for writing." << endl;
00519 fs = new fstream(**name, fstream::out | fstream::trunc | fstream::in);
00520 }
00521 else if (writeable)
00522 fs = new fstream(**name, fstream::out | fstream::trunc);
00523 else if (readable)
00524 fs = new fstream(**name, fstream::in);
00525
00526 if(fs && !fs->is_open())
00527 {
00528 delete fs;
00529 fs = 0;
00530 }
00531 _iostream = fs;
00532
00533
00534 _fd = -1;
00535 }
00536 else
00537 #endif
00538 {
00539 #ifndef WIN32
00540 _fd = open(**name,
00541 writeable ? O_WRONLY | O_CREAT | O_TRUNC : O_RDONLY, 0666);
00542 set_blocking(false);
00543 if (_fd == -1)
00544 {
00545 err_ret("NetStream::NetStream() - File: '%s'", **name);
00546 }
00547 #else
00548 if (writeable)
00549 {
00550 _fd = (int)CreateFile(**name, GENERIC_WRITE, FILE_SHARE_READ, NULL,
00551 CREATE_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
00552 }
00553 else
00554 {
00555
00556
00557
00558
00559 _fd = (int)CreateFile(**name, GENERIC_READ, FILE_SHARE_READ, NULL,
00560 OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, NULL);
00561 }
00562 if (_fd == (int)INVALID_HANDLE_VALUE)
00563 {
00564 err_mesg(ERR_LEV_WARN | ERR_INCL_ERRNO, "NetStream::NetStream() - Failed opening: '%s'", **name);
00565 }
00566 else
00567 {
00568 if (do_ascii)
00569 {
00570 int old_fd = _fd;
00571 _fd = _open_osfhandle (_fd, (_O_APPEND) |
00572 ((!writeable)?(_O_RDONLY): (0)) |
00573 ((do_ascii)?(_O_TEXT ): (0)));
00574
00575 err_adv(Config::get_var_bool("PRINT_ERRS",false,true),
00576 "NetStream::NetStream() - Was OS Handle: %d --> Now using C fd: %d", old_fd, _fd);
00577
00578 if (_fd == -1)
00579 err_msg("NetStream::NetStream() - Failed _open_osfhandle: '%s'", **name);
00580 }
00581 }
00582 #endif
00583 }
00584
00585 #if (defined(__GNUC__) && (__GNUC__ > 2 || (__GNUC__ == 2 && __GNUC_MINOR__ >= 97)))
00586 #elif (defined(_MSC_VER) && (_MSC_VER >= 1300))
00587 #else
00588 if (do_ascii)
00589 {
00590 fstream *fs = new fstream;
00591 fs->attach(_fd);
00592 if (fs->is_open()) _iostream = fs;
00593 }
00594 #endif
00595
00596 block(STD_FALSE);
00597 }
00598
00599
00600 NetStream::~NetStream()
00601 {
00602
00603 if (_fd >= 0)
00604 {
00605 if (port_ > 0)
00606 {
00607
00608 cerr << "NetStream : returning file descriptor :" << _fd << endl;
00609 }
00610
00611
00612 set_blocking(true);
00613 #ifndef WIN32
00614 close(_fd);
00615 #else
00616
00617
00618
00619
00620
00621 int ret;
00622 if ((_fd == fileno(stdin)) || (_iostream))
00623 {
00624 ret = _close(_fd);
00625 assert(ret==0);
00626 }
00627 else
00628 {
00629 ret = CloseHandle((HANDLE)_fd);
00630 assert(ret!=0);
00631 }
00632 #endif
00633 }
00634 }
00635
00636 void
00637 NetStream::remove_me()
00638 {
00639 network_->remove_stream(this);
00640 }
00641
00642 void
00643 NetStream::no_tcp_delay(
00644 )
00645 {
00646
00647
00648
00649
00650
00651
00652 if (!Config::get_var_bool("DO_TCP_DELAY",false,true)) {
00653 int on=1;
00654 if (setsockopt(_fd, IPPROTO_TCP, TCP_NODELAY, (char *)&on, sizeof(on))) {
00655 cerr << "NetStream::no_tcp_delay- setsockopt(TCP_NODELAY) on " <<
00656 print_name() << " (" << _fd<< ")";
00657 perror("");
00658 }
00659 }
00660 }
00661
00662 void
00663 NetStream::_die(
00664 const char *msg
00665 )
00666 {
00667 if (!Config::get_var_bool("NO_CONNECT_ERRS",false,true)) {
00668 cerr << "NetStream(" << name_ << ":" << port_ << "): " << msg << ": ";
00669 perror(NULL);
00670 }
00671 _fd = -1;
00672 }
00673
00674 ssize_t
00675 NetStream::read_from_net(
00676 void *buf,
00677 size_t nbytes
00678 ) const
00679 {
00680 char *tmpbuf = (char*) buf;
00681 int numread = 0;
00682 double stime = 0;
00683
00684 while (nbytes) {
00685 #ifdef WIN32
00686 int readb = read_win32(_fd, tmpbuf, nbytes);
00687 #else
00688 int readb = ::read(_fd, tmpbuf, nbytes);
00689 #endif
00690
00691 if (errno == EAGAIN) {
00692 if (Config::get_var_bool("PRINT_ERRS",false,true))
00693 cerr << " bytes read from network (EAGAIN) = " << numread << endl;
00694 return numread + (readb == -1 ? 0:readb);
00695 }
00696
00697 if (readb < 0) {
00698 perror("NetStream::read_from_net : Warning - ");
00699 return -1;
00700 }
00701
00702 if (readb == 0 && nbytes > 0)
00703 {
00704 #ifdef WIN32
00705
00706
00707
00708
00709
00710 return numread + (readb == -1 ? 0:readb);
00711 #else
00712 if (stime == 0)
00713 stime = the_time();
00714
00715 if (the_time() - stime > 1 || errno != EAGAIN) {
00716 if (port_ > 0) {
00717
00718 cerr << "NetStream::read_from_net - read error: peer reset"
00719 << endl;
00720 }
00721 return -1;
00722 }
00723 #endif
00724 }
00725
00726 nbytes -= readb;
00727 tmpbuf += readb;
00728 numread+= readb;
00729 }
00730 if (Config::get_var_bool("PRINT_ERRS",false,true))
00731 cerr << " bytes read from network = " << numread << endl;
00732 return numread;
00733 }
00734
00735 void
00736 NetStream::set_blocking(bool val) const
00737 {
00738 #ifdef WIN32
00739
00740 if (Config::get_var_bool("PRINT_ERRS",false,true))
00741 cerr << "NetStream::set_blocking - not supported" << endl;
00742 #else
00743 int flags;
00744 if((flags = fcntl(_fd, F_GETFL, 0))<0) {
00745 err_ret("NetStream::set_blocking: fcntl(..,F_GETFL)");
00746 return;
00747 }
00748 if (val) {
00749 flags &= ~O_NDELAY;
00750 } else {
00751 flags |= O_NDELAY;
00752 }
00753 if (fcntl(_fd, F_SETFL, flags)<0) {
00754 err_ret("NetStream::set_blocking: fcntl(..,F_GETFL)");
00755 return;
00756 }
00757 #endif
00758 }
00759
00760 ssize_t
00761 NetStream::write_to_net(
00762 const void *buf,
00763 size_t nbytes
00764 ) const
00765 {
00766 set_blocking(true);
00767 #ifdef WIN32
00768 ssize_t bytes_written = write_win32(_fd, buf, nbytes);
00769 #else
00770 ssize_t bytes_written = ::write(_fd, buf, nbytes);
00771 #endif
00772 set_blocking(false);
00773
00774 if (bytes_written < 0 || NET_exception) {
00775 perror("NetStream::write_to_net: Warning: ");
00776 NET_exception = 0;
00777 } else if (bytes_written < (ssize_t)nbytes)
00778 cerr << "Couldn't flush the buffer. Some data wasn't written. (nbytes="
00779 << nbytes << " written=" << bytes_written << ")\n";
00780 return bytes_written;
00781 }
00782
00783 void
00784 NetStream::no_linger(int fd)
00785 {
00786 int reuse = 1;
00787 struct linger ling;
00788
00789 ling.l_onoff = 0;
00790 ling.l_linger = 0;
00791
00792 if (setsockopt(fd, SOL_SOCKET, SO_LINGER, (char *)&ling, sizeof(ling))) {
00793 perror("NetStream::no_linger. setsockopt - SO_LINGER :");
00794 }
00795 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *)&reuse, sizeof(reuse))) {
00796 perror("NetStream::no_linger. setsockopt - SO_REUSEADDR:");
00797 }
00798 }
00799
00800 int
00801 NetStream::interpret()
00802 {
00803 char buff_[256], *buff = buff_;
00804 NETenum code;
00805 int port;
00806 processing_ = 1;
00807 int ret = 0;
00808
00809 while (_in_queue.count()) {
00810 *this >> code;
00811 switch (code) {
00812 case NETadd_connection: {
00813 *this >> buff >> port;
00814 NetStream *s = new NetStream(port, buff);
00815 if (s->fd() != -1)
00816 network_->add_stream(s);
00817 network_->interpret(code, this);
00818 }
00819 brcase NETquit: {
00820 network_->interpret(code, this);
00821 network_->remove_stream(this);
00822 ret = 1;
00823 }
00824 brcase NETidentify :
00825 *this >> port;
00826 set_port(port);
00827 if (network_->first_) {
00828 cerr << "NetStream accepts server -->" << print_name()<<endl;
00829 for (int i=0; i<network_->nStreams_; i++) {
00830 NetStream *s = network_->streams_[i];
00831 if (s->port() != -1 && s != this) {
00832 *this << NETadd_connection
00833 << s->name()
00834 << s->port()
00835 << NETflush;
00836 }
00837 }
00838 }
00839 brcase NETtext:
00840 *this >> buff;
00841 cerr << "(* " << print_name() << " *) " << buff << endl;
00842 brcase NETbroadcast: {
00843 str_ptr flag;
00844 *this >> flag;
00845 int i;
00846 for (i = 0; i < tags_.num(); i++)
00847 if (flag == tags_[i])
00848 break;
00849 if (i == tags_.num()) {
00850 _in_queue.remove_all();
00851 if (Config::get_var_bool("PRINT_ERRS",false,true))
00852 cerr << "Ignoring broadcast " << flag << endl;
00853 }
00854 }
00855 brcase NETbarrier: network_->_at_barrier++;
00856 brdefault :
00857
00858 if (network_->interpret(code, this)) return 1;
00859 }
00860 }
00861 processing_ = 0;
00862
00863 return ret;
00864 }
00865
00866
00867 void NetStream::flush_data()
00868 {
00869 int count = _out_queue.count();
00870 if (!count) return;
00871 if (Config::get_var_bool("PRINT_ERRS",false,true))
00872 cerr << "NetStream: sending message to net of length " << count << endl;
00873
00874
00875 int packcount = 0;
00876 char packbuf_space[sizeof(int)];
00877 char *packbuf = packbuf_space;
00878 UGA_PACK_WORD(count, packbuf, packcount);
00879
00880 write_to_net(packbuf_space, packcount);
00881 flush();
00882 }
00883
00884
00885 int
00886 NetStream::read_stuff()
00887 {
00888
00889 const unsigned int BUFSIZE= 666666;
00890 char buff[BUFSIZE];
00891 int num_read = 0;
00892
00893 if (Config::get_var_bool("PRINT_ERRS",false,true)) cerr << "NetStream: ReadStuff called\n";
00894
00895
00896 if (msgSize_ == -1) {
00897
00898 char packbuf_space[sizeof(int) + 1];
00899 char *packbuf = packbuf_space;
00900 int nread = read_from_net(packbuf, sizeof(int));
00901 if (nread < 0)
00902 return nread;
00903 int count = 0;
00904 UGA_UNPACK_INTEGER(msgSize_, packbuf, count);
00905 if (Config::get_var_bool("PRINT_ERRS",false,true)) cerr << "NetStream: msgSize is " << msgSize_ << endl;
00906 }
00907
00908
00909
00910 do {
00911 int nread = read_from_net(buff, BUFSIZE);
00912 if (nread <= 0)
00913 return nread;
00914 else num_read = nread;
00915 if (msgSize_ > (int)BUFSIZE) {
00916
00917 if (num_read != (int)BUFSIZE) return num_read;
00918 _in_queue.put((UGAptr)buff, BUFSIZE);
00919 msgSize_ -= BUFSIZE;
00920 if (Config::get_var_bool("PRINT_ERRS",false,true)) cerr << "NetStream: Big message, storing first BUFSIZE bytes (msgSize = " << msgSize_ << endl;
00921 num_read = 0;
00922 }
00923 } while (num_read == 0);
00924
00925
00926 char *tbuf = buff;
00927 if (Config::get_var_bool("PRINT_ERRS",false,true)) cerr << "NetStream: processing num_read " << num_read << endl;
00928 if (num_read >= msgSize_) {
00929
00930 while (num_read && num_read >= msgSize_) {
00931 _in_queue.put((UGAptr)tbuf, msgSize_);
00932 num_read -= msgSize_;
00933 tbuf += msgSize_;
00934
00935 if (Config::get_var_bool("PRINT_ERRS",false,true)) cerr << "NetStream: processing full_message " << msgSize_ << " (num_read = " << num_read << endl;
00936
00937 if (interpret() != 0)
00938 return 1;
00939
00940
00941 if (num_read > 0) {
00942 int count = 0;
00943 UGA_UNPACK_INTEGER(msgSize_, tbuf, count);
00944 num_read -= count;
00945 if (Config::get_var_bool("PRINT_ERRS",false,true)) cerr << "NetStream: next message" << msgSize_ << " (num_read = " << num_read << endl;
00946 } else
00947 msgSize_ = -1;
00948 }
00949 }
00950
00951
00952 _in_queue.put(tbuf, num_read);
00953 msgSize_ -= num_read;
00954 if (Config::get_var_bool("PRINT_ERRS",false,true)) cerr << "NetStream: saved for next time " << num_read << " (msgSize = " << msgSize_ << endl;
00955 return 0;
00956 }
00957
00958 void
00959 NetStream::sample()
00960 {
00961 if (read_stuff() != 0)
00962 network()->remove_stream(this);
00963 }
00964
00965
00966
00967
00968 int Network::NETWORK_SERVER_BACKLOG = 5;
00969
00970 void
00971 Network::connect_to(
00972 NetStream *s
00973 )
00974 {
00975 if (s && s->fd() != -1) {
00976 add_stream(s);
00977 if (Config::get_var_bool("PRINT_ERRS",false,true))
00978 cerr << "Network::connect_to - sending identity to server" << endl;
00979 *s << NETidentify << port_ << NETflush;
00980 }
00981 }
00982
00983 void
00984 Network::barrier()
00985 {
00986 for (int i=0; i<nStreams_; i++)
00987 *streams_[i] << NETbarrier << NETflush;
00988
00989 while (_at_barrier < nStreams_)
00990 _manager->loop(0);
00991 _at_barrier-=nStreams_;
00992 }
00993
00994 int
00995 Network::processing(void) const
00996 {
00997 int yes = 0;
00998 for (int i=0; !yes && i < nStreams_; i++)
00999 yes = streams_[i]->processing();
01000 return yes;
01001 }
01002
01003 NetStream *
01004 Network::wait_for_connect()
01005 {
01006 struct sockaddr_in cli_addr;
01007 socklen_t clilen = sizeof(cli_addr);
01008 int newFd;
01009 NetStream *newStream;
01010
01011 if ((newFd = accept(_fd, (struct sockaddr*) &cli_addr, &clilen)) < 0)
01012 _die("accept");
01013
01014
01015
01016 if (!(newStream = new NetStream(newFd, &cli_addr)))
01017 _die("out of memory");
01018
01019 return newStream;
01020 }
01021
01022 void
01023 Network::remove_stream(
01024 NetStream *s
01025 )
01026 {
01027
01028 notify_net(Network_obs::remove_str, s);
01029
01030 int i=0;
01031 while (i<nStreams_ && streams_[i] != s) ++i;
01032 if (i < nStreams_) {
01033 Unregister(s);
01034 streams_[i] = streams_[--nStreams_];
01035 delete s;
01036 }
01037 }
01038
01039 void
01040 Network::accept_stream(void)
01041 {
01042 NetStream *s = wait_for_connect();
01043 cerr << "Network accept_stream from ---->" << s->name() << endl;
01044 add_stream(s);
01045 add_client(s);
01046
01047
01048 notify_net(Network_obs::accept_str, s);
01049 }
01050
01051 void
01052 Network::add_client(
01053 NetStream *cli
01054 )
01055 {
01056 *cli << NETtext << "Initialize World" << NETflush;
01057 }
01058
01059
01060 void
01061 Network::_die(
01062 const char *msg
01063 )
01064 {
01065 cerr << "Network(" << name_ << "): " << msg << ": ";
01066 perror(NULL);
01067 exit(1);
01068 }
01069
01070
01071 char *
01072 Network::configure(
01073 int port,
01074 int backlog
01075 )
01076 {
01077 struct sockaddr_in serv_addr;
01078 char buff[255];
01079
01080 port_ = port;
01081 gethostname(buff, 255);
01082 name_ = str_ptr(buff);
01083
01084
01085 if ((_fd = socket(PF_INET, SOCK_STREAM, 0)) < 0)
01086 return "socket";
01087
01088
01089 memset(&serv_addr, 0, sizeof(serv_addr));
01090 serv_addr.sin_family = AF_INET;
01091 serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
01092 serv_addr.sin_port = htons((short) port);
01093
01094
01095 NetStream::no_linger(_fd);
01096
01097 if (bind(_fd, (struct sockaddr*) &serv_addr, sizeof(serv_addr)) < 0)
01098 return "bind";
01099
01100 if (port == 0) {
01101 socklen_t foo(sizeof(struct sockaddr_in));
01102 getsockname(_fd, (struct sockaddr *)&serv_addr, &foo);
01103 port_ = int(ntohs(serv_addr.sin_port));
01104 }
01105
01106
01107 if (listen(_fd, backlog) < 0)
01108 return "listen";
01109
01110
01111
01112
01113 Register();
01114
01115 cerr << "Network: server "<<name_<<" on port "<< port_<< endl;
01116
01117 signal(SIGPIPE, net_exception_handler);
01118
01119 return 0;
01120 }
01121
01122
01123 void
01124 Network::start(
01125 int myPort
01126 )
01127 {
01128 char *msg;
01129
01130
01131
01132
01133 first_ = (myPort == 0 ? 0 : 1);
01134
01135 if ((msg = configure(myPort)))
01136 _die(msg);
01137 }
01138
01139 void
01140 Network::flush_data()
01141 {
01142 for (int i=0; i<nStreams_; i++)
01143 streams_[i]->flush_data();
01144 }
01145
01146
01147 STDdstream &
01148 operator >> (
01149 STDdstream &ds,
01150 NETenum &m
01151 )
01152 {
01153 int x;
01154 ds >> x;
01155 m = NETenum(x);
01156 return ds;
01157 }
01158
01159
01160 STDdstream &
01161 operator << (
01162 STDdstream &ds,
01163 NETenum m
01164 )
01165 {
01166 switch (m) {
01167 case NETflush : if (ds.ascii()) {
01168 *ds.ostr() << endl;
01169 ds.ostr()->flush();
01170 }
01171 else ((NetStream &)ds).flush_data();
01172 brdefault : { int x(m);
01173 ds << x;
01174 }
01175 }
01176 return ds;
01177 }
01178
01179