00001
00002
00003
00004
00005
00006
00007
00008
00009 #include "std/support.H"
00010 #include "net.H"
00011 #include "pack.H"
00012 #include "stream.H"
00013 static const int DSTREAM_READ_AHEAD_FACTOR = 4;
00014
00015
00016
00017
00018
00019
00020 STDdstream::STDdstream():
00021 _iostream(0),
00022 _istream(0),
00023 _ostream(0),
00024 _indent(0),
00025 _fail(STD_FALSE),
00026 _block(STD_TRUE)
00027 {
00028 }
00029
00030 STDdstream::STDdstream(iostream* s):
00031 _iostream(s),
00032 _istream(0),
00033 _ostream(0),
00034 _indent(0),
00035 _fail(STD_FALSE),
00036 _block(STD_TRUE)
00037 {
00038 }
00039
00040 STDdstream::STDdstream(istream* s):
00041 _iostream(0),
00042 _istream(s),
00043 _ostream(0),
00044 _indent(0),
00045 _fail(STD_FALSE),
00046 _block(STD_TRUE)
00047 {
00048 }
00049
00050 STDdstream::STDdstream(ostream* s):
00051 _iostream(0),
00052 _istream(0),
00053 _ostream(s),
00054 _indent(0),
00055 _fail(STD_FALSE),
00056 _block(STD_TRUE)
00057 {
00058 }
00059
00060
00061
00062
00063
00064 char
00065 STDdstream::peekahead()
00066 {
00067 char c;
00068 if (istr())
00069 {
00070
00071
00072
00073
00074 bool was_good = !fail();
00075
00076 (*this) >> c;
00077
00078 if (fail() && eof() && was_good)
00079 {
00080 _fail = STD_FALSE;
00081 }
00082
00083 istr()->putback(c);
00084 }
00085 else
00086 {
00087 read(&c, sizeof(char), 0);
00088 }
00089 return c;
00090 }
00091
00092
00093
00094
00095
00096
00097 void
00098 STDdstream::write (
00099 const char *const data,
00100 int count
00101 )
00102 {
00103 _out_queue.put (data, count);
00104
00105
00106 if (_block)
00107 flush();
00108
00109
00110 if (!_block)
00111 _fail = STD_FALSE;
00112 }
00113
00114
00115
00116
00117
00118
00119
00120
00121
00122 void
00123 STDdstream::read (
00124 UGAptr data,
00125 int count,
00126 int pop
00127 )
00128 {
00129 UGAptr buffer = NULL;
00130 size_t recv_request, recv_result;
00131
00132 while ((int)_in_queue.count () < count)
00133 {
00134 if (!buffer)
00135 {
00136 recv_request = count * DSTREAM_READ_AHEAD_FACTOR;
00137 buffer = (UGAptr)malloc(recv_request);
00138 }
00139
00140 recv_result = recv (buffer, recv_request);
00141
00142 if (recv_result)
00143 _in_queue.put (buffer, recv_result);
00144
00145 if (!_block)
00146 break;
00147 }
00148 if (buffer)
00149 free(buffer);
00150
00151 if ((int)_in_queue.count () >= count)
00152 {
00153 if (pop)
00154 _in_queue.get (data, count);
00155 else _in_queue.peek(data, count);
00156
00157 _fail = STD_FALSE;
00158 }
00159 else {
00160 cerr << "Failed to read message" << endl;
00161 _fail = STD_TRUE;
00162 }
00163 }
00164
00165
00166
00167
00168
00169
00170
00171
00172 void
00173 STDdstream::flush (void)
00174 {
00175 UGAptr buffer = NULL;
00176 size_t send_request, send_result;
00177
00178 while (_out_queue.count ())
00179 {
00180 if (!buffer)
00181 {
00182 send_request = _out_queue.count ();
00183 buffer = (UGAptr)malloc(send_request);
00184 }
00185
00186 _out_queue.peek (buffer, send_request);
00187 send_result = send (buffer, send_request);
00188
00189 if (send_result)
00190 _out_queue.get (buffer, send_result);
00191
00192 if (!_block)
00193 break;
00194 }
00195 if (buffer)
00196 free(buffer);
00197
00198 _fail = (_out_queue.count () > 0) ? STD_TRUE : STD_FALSE;
00199 }
00200
00201
00202
00203
00204 void
00205 STDdstream::read_close_delim()
00206 {
00207 if (istr()) {
00208 char brace;
00209 (*this) >> brace;
00210 } else {
00211 read_delim();
00212 }
00213 }
00214
00215 void
00216 STDdstream::read_open_delim()
00217 {
00218 if (istr()) {
00219 char brace;
00220 (*this) >> brace;
00221 } else {
00222 read_delim();
00223 }
00224 }
00225
00226 void
00227 STDdstream::write_delim(char c)
00228 {
00229 if (ostr())
00230 (*this) << c;
00231 else
00232 write(&c, sizeof(char));
00233 }
00234
00235 char
00236 STDdstream::read_delim()
00237 {
00238 char delim;
00239 read(&delim, sizeof(char));
00240 return delim;
00241 }
00242
00243 str_ptr
00244 STDdstream::get_string_with_spaces()
00245 {
00246 if (ascii())
00247 {
00248 const int bufsize = 1024;
00249 char buf[bufsize];
00250 int i = 0;
00251 char ch = ' ';
00252 int done = 0;
00253 while (!done)
00254 {
00255 istr()->get(ch);
00256
00257 done = (ch == '}') || (ch == '{') || (ch == '\n');
00258 if (done)
00259 {
00260
00261 istr()->putback(ch);
00262
00263 int j;
00264 for (j = i-1; j >= 0 && (buf[j] == ' ' || buf[j]=='\t'); j--)
00265 {
00266 istr()->putback(buf[j]);
00267 }
00268 i = j + 1;
00269 }
00270 else
00271 {
00272 done = (i > bufsize-1) || !istr()->good();
00273 if (!done) buf[i++] = ch;
00274 }
00275 }
00276 buf[i] = '\0';
00277
00278 int start = 0;
00279 while (buf[start] == ' ' || buf[start] == '\t')
00280 {
00281 start++;
00282 }
00283 return str_ptr(buf + start);
00284 }
00285 str_ptr the_string;
00286 (*this) >> the_string;
00287 return the_string;
00288 }
00289
00290
00291 static char packbuf_space[sizeof(double)];
00292 static char *packbuf;
00293 static int packcount;
00294
00295 #define INIT_PACK (packcount = 0, packbuf = packbuf_space)
00296 #define DONE_PACK ds.write_delim(' '); ds.write(packbuf_space, packcount);
00297
00298 #define INIT_UNPACK packbuf = packbuf_space; \
00299 ds.read_delim(); ds.read(packbuf, sizeof(data))
00300
00301
00302
00303
00304 STDdstream &
00305 operator >> (STDdstream &ds, char * &data)
00306 {
00307 if (ds.ascii())
00308 {
00309 *ds.istr() >> data;
00310 ds._fail = ((ds.istr()->fail())?(STD_TRUE):(STD_FALSE));
00311 }
00312 else {
00313 ds.read_delim();
00314
00315 int len;
00316 ds >> len;
00317 ds.read(data, len * sizeof(char));
00318 data[len] = '\0';
00319 }
00320 return ds;
00321 }
00322
00323 STDdstream &
00324 operator << (STDdstream &ds, const char * const data)
00325 {
00326 if (ds.ascii())
00327 {
00328 *ds.ostr() << data;
00329 ds._fail = ((ds.ostr()->fail())?(STD_TRUE):(STD_FALSE));
00330 } else {
00331 ds.write_delim(' ');
00332
00333 ds << strlen(data);
00334 ds.write(data, strlen(data) * sizeof(char));
00335 }
00336 return ds;
00337 }
00338
00339
00340
00341 STDdstream &
00342 operator >> (STDdstream &ds, str_ptr &data)
00343 {
00344 int len;
00345 const int buflen = 4096;
00346 char buff[buflen];
00347 char *usebuff = buff;
00348
00349 if (ds.ascii())
00350 {
00351 *ds.istr() >> usebuff;
00352 ds._fail = ((ds.istr()->fail())?(STD_TRUE):(STD_FALSE));
00353 data = str_ptr(usebuff);
00354 } else {
00355 ds.read_delim();
00356
00357 ds >> len;
00358 if (len + 1 > buflen) {
00359 usebuff = new char[len + 1];
00360 }
00361 ds.read(usebuff, len * sizeof(char));
00362 usebuff[len] = '\0';
00363
00364 data = str_ptr(usebuff);
00365
00366 if (len + 1 > buflen)
00367 delete [] usebuff;
00368 }
00369
00370 return ds;
00371 }
00372
00373 STDdstream &
00374 operator << (STDdstream &ds, str_ptr data)
00375 {
00376 if (ds.ascii())
00377 {
00378 *ds.ostr() << **data << " ";
00379 ds._fail = ((ds.ostr()->fail())?(STD_TRUE):(STD_FALSE));
00380 }
00381 else {
00382 ds.write_delim(' ');
00383
00384 ds << strlen(**data);
00385 ds.write(**data, strlen(**data) * sizeof(char));
00386 }
00387 return ds;
00388 }
00389
00390
00391
00392
00393
00394 STDdstream &
00395 operator >> (STDdstream &ds, short &data)
00396 {
00397 if (ds.ascii())
00398 {
00399 *ds.istr() >> data;
00400 ds._fail = ((ds.istr()->fail())?(STD_TRUE):(STD_FALSE));
00401 }
00402 else {
00403 INIT_UNPACK;
00404 UGA_UNPACK_WORD (data, packbuf, packcount, short);
00405 }
00406 return ds;
00407 }
00408 STDdstream &
00409 operator << (STDdstream &ds, short data)
00410 {
00411 if (ds.ascii())
00412 {
00413 *ds.ostr() << data << " ";
00414 ds._fail = ((ds.ostr()->fail())?(STD_TRUE):(STD_FALSE));
00415 }
00416 else {
00417 INIT_PACK;
00418 UGA_PACK_WORD (data, packbuf, packcount);
00419 DONE_PACK;
00420 }
00421 return ds;
00422 }
00423
00424
00425
00426
00427 STDdstream &
00428 operator >> (STDdstream &ds, int &data)
00429 {
00430 if (ds.ascii())
00431 {
00432 *ds.istr() >> data;
00433 ds._fail = ((ds.istr()->fail())?(STD_TRUE):(STD_FALSE));
00434 }
00435 else {
00436 INIT_UNPACK;
00437 UGA_UNPACK_WORD (data, packbuf, packcount, int);
00438 }
00439 return ds;
00440 }
00441 STDdstream &
00442 operator << (STDdstream &ds, int data)
00443 {
00444 if (ds.ascii())
00445 {
00446 *ds.ostr() << data << " ";
00447 ds._fail = ((ds.ostr()->fail())?(STD_TRUE):(STD_FALSE));
00448 }
00449 else {
00450 INIT_PACK;
00451 UGA_PACK_WORD (data, packbuf, packcount);
00452 DONE_PACK;
00453 }
00454 return ds;
00455 }
00456
00457
00458
00459
00460 STDdstream &
00461 operator >> (STDdstream &ds, long &data)
00462 {
00463 if (ds.ascii())
00464 {
00465 *ds.istr() >> data;
00466 ds._fail = ((ds.istr()->fail())?(STD_TRUE):(STD_FALSE));
00467 }
00468 else {
00469 INIT_UNPACK;
00470 UGA_UNPACK_WORD (data, packbuf, packcount, long);
00471 }
00472 return ds;
00473 }
00474 STDdstream &
00475 operator << (STDdstream &ds, long data)
00476 {
00477 if (ds.ascii())
00478 {
00479 *ds.ostr() << data << " ";
00480 ds._fail = ((ds.ostr()->fail())?(STD_TRUE):(STD_FALSE));
00481 }
00482 else {
00483 INIT_PACK;
00484 UGA_PACK_WORD (data, packbuf, packcount);
00485 DONE_PACK;
00486 }
00487 return ds;
00488 }
00489
00490
00491
00492
00493 STDdstream &
00494 operator >> (STDdstream &ds, unsigned short &data)
00495 {
00496 if (ds.ascii())
00497 {
00498 *ds.istr() >> data;
00499 ds._fail = ((ds.istr()->fail())?(STD_TRUE):(STD_FALSE));
00500 }
00501 else {
00502 INIT_UNPACK;
00503 UGA_UNPACK_WORD (data, packbuf, packcount, unsigned short);
00504 }
00505 return ds;
00506 }
00507 STDdstream &
00508 operator << (STDdstream &ds, unsigned short data)
00509 {
00510 if (ds.ascii())
00511 {
00512 *ds.ostr() << data << " ";
00513 ds._fail = ((ds.ostr()->fail())?(STD_TRUE):(STD_FALSE));
00514 }
00515 else {
00516 INIT_PACK;
00517 UGA_PACK_WORD (data, packbuf, packcount);
00518 DONE_PACK;
00519 }
00520 return ds;
00521 }
00522
00523
00524
00525
00526 STDdstream &
00527 operator >> (STDdstream &ds, unsigned int &data)
00528 {
00529 if (ds.ascii())
00530 {
00531 *ds.istr() >> data;
00532 ds._fail = ((ds.istr()->fail())?(STD_TRUE):(STD_FALSE));
00533 }
00534 else {
00535 INIT_UNPACK;
00536 UGA_UNPACK_WORD (data, packbuf, packcount, unsigned int);
00537 }
00538 return ds;
00539 }
00540 STDdstream &
00541 operator << (STDdstream &ds, unsigned int data)
00542 {
00543 if (ds.ascii())
00544 {
00545 *ds.ostr() << data << " ";
00546 ds._fail = ((ds.ostr()->fail())?(STD_TRUE):(STD_FALSE));
00547 }
00548 else {
00549 INIT_PACK;
00550 UGA_PACK_WORD (data, packbuf, packcount);
00551 DONE_PACK;
00552 }
00553 return ds;
00554 }
00555
00556
00557
00558
00559 STDdstream &
00560 operator >> (STDdstream &ds, unsigned long &data)
00561 {
00562 if (ds.ascii())
00563 {
00564 *ds.istr() >> data;
00565 ds._fail = ((ds.istr()->fail())?(STD_TRUE):(STD_FALSE));
00566 }
00567 else {
00568 INIT_UNPACK;
00569 UGA_UNPACK_WORD (data, packbuf, packcount, unsigned long);
00570 }
00571 return ds;
00572 }
00573 STDdstream &
00574 operator << (STDdstream &ds, unsigned long data)
00575 {
00576 if (ds.ascii())
00577 {
00578 *ds.ostr() << data << " ";
00579 ds._fail = ((ds.ostr()->fail())?(STD_TRUE):(STD_FALSE));
00580 }
00581 else {
00582 INIT_PACK;
00583 UGA_PACK_WORD (data, packbuf, packcount);
00584 DONE_PACK;
00585 }
00586 return ds;
00587 }
00588
00589
00590
00591
00592 STDdstream &
00593 operator >> (STDdstream &ds, float &temp)
00594 {
00595 if (ds.ascii())
00596 {
00597 *ds.istr() >> temp;
00598 ds._fail = ((ds.istr()->fail())?(STD_TRUE):(STD_FALSE));
00599 }
00600 else {
00601
00602 double data;
00603 INIT_UNPACK;
00604 UGA_UNPACK_DOUBLE (data, packbuf, packcount);
00605 temp = (float)data;
00606 }
00607 return ds;
00608 }
00609 STDdstream &
00610 operator << (STDdstream &ds, float data)
00611 {
00612 if (ds.ascii())
00613 {
00614 *ds.ostr() << data << " ";
00615 ds._fail = ((ds.ostr()->fail())?(STD_TRUE):(STD_FALSE));
00616 }
00617 else {
00618 double temp = data;
00619 INIT_PACK;
00620 UGA_PACK_DOUBLE (temp, packbuf, packcount);
00621 DONE_PACK;
00622 }
00623 return ds;
00624 }
00625
00626
00627
00628
00629 STDdstream &
00630 operator >> (STDdstream &ds, double &data)
00631 {
00632 if (ds.ascii())
00633 {
00634 *ds.istr() >> data;
00635 ds._fail = ((ds.istr()->fail())?(STD_TRUE):(STD_FALSE));
00636 }
00637 else {
00638 INIT_UNPACK;
00639 UGA_UNPACK_DOUBLE (data, packbuf, packcount);
00640 }
00641 return ds;
00642 }
00643 STDdstream &
00644 operator << (STDdstream &ds, double data)
00645 {
00646 if (ds.ascii())
00647 {
00648 *ds.ostr() << data << " ";
00649 ds._fail = ((ds.ostr()->fail())?(STD_TRUE):(STD_FALSE));
00650 }
00651 else {
00652 INIT_PACK;
00653 UGA_PACK_DOUBLE (data, packbuf, packcount);
00654 DONE_PACK;
00655 }
00656 return ds;
00657 }
00658
00659
00660
00661
00662 STDdstream &
00663 operator >> (STDdstream &ds, char &data)
00664 {
00665 if (ds.ascii())
00666 {
00667 *ds.istr() >> data;
00668 ds._fail = ((ds.istr()->fail())?(STD_TRUE):(STD_FALSE));
00669 }
00670 else {
00671 ds.read_delim();
00672
00673 ds.read (&data, sizeof(char));
00674 }
00675 return ds;
00676 }
00677 STDdstream &
00678 operator << (STDdstream &ds, char data)
00679 {
00680 if (ds.ascii())
00681 {
00682 *ds.ostr() << data << " ";
00683 ds._fail = ((ds.ostr()->fail())?(STD_TRUE):(STD_FALSE));
00684 }
00685 else {
00686 ds.write_delim(' ');
00687
00688 ds.write (&data, sizeof(char));
00689 }
00690 return ds;
00691 }
00692
00693
00694
00695
00696 STDdstream &
00697 operator >> (STDdstream &ds, unsigned char &data)
00698 {
00699 if (ds.ascii())
00700 {
00701 *ds.istr() >> data;
00702 ds._fail = ((ds.istr()->fail())?(STD_TRUE):(STD_FALSE));
00703 }
00704 else {
00705 ds.read_delim();
00706
00707 ds.read ((UGAptr)&data, sizeof(unsigned char));
00708 }
00709 return ds;
00710 }
00711 STDdstream &
00712 operator << (STDdstream &ds, unsigned char data)
00713 {
00714 if (ds.ascii())
00715 {
00716 *ds.ostr() << data << " ";
00717 ds._fail = ((ds.ostr()->fail())?(STD_TRUE):(STD_FALSE));
00718 }
00719 else {
00720 ds.write_delim(' ');
00721
00722 ds.write ((UGAptr)&data, sizeof(unsigned char));
00723 }
00724 return ds;
00725 }
00726
00727
00728 void
00729 STDdstream::ws(char *x)
00730 {
00731 if (ascii())
00732 (*this) << x;
00733 }
00734
00735