00001 00036 #include <itpp/protocol/tcp.h> 00037 #include <itpp/base/itfile.h> 00038 #include <limits> 00039 #include <ctime> 00040 00041 00042 #ifdef _MSC_VER 00043 #pragma warning(disable:4355) 00044 #endif 00045 00046 namespace itpp { 00047 00048 // -------------------- Default parameters ---------------------------------- 00049 00050 // TCP sender and receiver 00051 00052 #define TCP_HEADERLENGTH 40 00053 00054 // TCP sender 00055 00056 #define TCP_VERSION kReno 00057 #define TCP_SMSS 1460 00058 #define TCP_INITIALCWNDREL 2 // related to MSS 00059 #define TCP_INITIALSSTHRESHREL 1 // related to MaxCWnd 00060 #define TCP_MAXCWNDREL 32 // related to MSS 00061 #define TCP_DUPACKS 3 00062 #define TCP_INITIALRTT 1 00063 const double TCP_STIMERGRAN = 0.2; 00064 const double TCP_SWSATIMERVALUE = 0.2; 00065 #define TCP_MAXBACKOFF 64 00066 //#define TCP_MAXRTO DBL_MAX 00067 #define TCP_MAXRTO std::numeric_limits<double>::max() 00068 #define TCP_IMMEDIATEBACKOFFRESET false 00069 #define TCP_TIMESTAMPS false 00070 #define TCP_KARN true 00071 #define TCP_NAGLE false 00072 #define TCP_GOBACKN true 00073 #define TCP_FLIGHTSIZERECOVERY false 00074 #define TCP_RENOCONSERVATION true 00075 #define TCP_CAREFULSSTHRESHREDUCTION true 00076 #define TCP_IGNOREDUPACKONTORECOVERY true 00077 #define TCP_CAREFULMULFASTRTXAVOIDANCE true 00078 #define TCP_RESTARTAFTERIDLE true 00079 00080 // TCP receiver 00081 00082 #define TCP_RMSS 1460 00083 #define TCP_BUFFERSIZE INT_MAX/4 00084 #define TCP_DELAYEDACK true 00085 const double TCP_ACKDELAYTIME = 0.2; 00086 #define TCP_SENDPERIODICACKS false 00087 //#define TCP_SENDPERIODICACKS true 00088 #define TCP_STRICTPERIODICACKS false 00089 #define TCP_PERIODICACKINTERVAL 1 00090 #define TCP_ACKSCHEDULINGDELAY 0 00091 #define TCP_ACKBUFFERWRITE false 00092 #define TCP_ACKBUFFERREAD true 00093 #define TCP_MAXUSERBLOCKSIZE INT_MAX/4 00094 #define TCP_MINUSERBLOCKSIZE 1 00095 #define TCP_USERBLOCKPROCDELAY 0 00096 00097 // TCP generator 00098 00099 #define TCPGEN_BLOCKSIZE 1460 00100 00101 // TCP applications 00102 00103 #define TCPAPP_MAXNOOFACTIVEAPPS 500 00104 #define TCPAPP_DISTSTATARRAYSIZE 100 00105 #define TCPAPP_DISTSTATMAXGOODPUT 1000 00106 #define TCPAPP_DISTSTATMAXTRANSFERTIME 10000 00107 #define TCPAPP_CONDMEANSTATARRAYSIZE 100 00108 #define TCPAPP_CONDMEANSTATMAXREQSIZE 100000 00109 00110 00111 00112 inline int min(int opd1, int opd2) 00113 { 00114 return (opd1 < opd2)? opd1 : opd2; 00115 } 00116 00117 00118 inline int max(int opd1, int opd2) 00119 { 00120 return (opd1 > opd2)? opd1 : opd2; 00121 } 00122 00123 00124 // round is used to map a double value (e.g. RTO in TTCPSender) to the 00125 // next higher value of a certain granularity (e.g. timer granularity). 00126 inline double round (const double value, const double granularity) 00127 { 00128 return ceil(value / granularity) * granularity; 00129 } 00130 00131 // -------------------- TCP_Segment ---------------------------------------- 00132 00133 TCP_Segment::TCP_Segment() : 00134 seq_begin(), 00135 seq_end() 00136 { 00137 } 00138 00139 TCP_Segment::TCP_Segment(const Sequence_Number &sn_begin, const Sequence_Number &sn_end) : 00140 seq_begin(sn_begin), 00141 seq_end(sn_end) 00142 { 00143 it_assert(seq_begin <= seq_end, "TCP_Segment::TCP_Segment, end byte " + to_str(seq_end.value()) + 00144 " < begin byte " + to_str(seq_begin.value())); 00145 } 00146 00147 00148 TCP_Segment::TCP_Segment(const TCP_Segment &segment) : 00149 seq_begin(segment.seq_begin), 00150 seq_end(segment.seq_end) 00151 { 00152 } 00153 00154 00155 TCP_Segment &TCP_Segment::operator=(const TCP_Segment &segment) 00156 { 00157 this->seq_begin = segment.seq_begin; 00158 this->seq_end = segment.seq_end; 00159 00160 return *this; 00161 } 00162 00163 00164 void TCP_Segment::combine(const TCP_Segment &segment) 00165 { 00166 it_assert(can_be_combined(segment), "TCP_Segment::CombineWith, segments cannot be combined"); 00167 00168 seq_begin = min(seq_begin, segment.seq_begin); 00169 seq_end = max(seq_end, segment.seq_end); 00170 } 00171 00172 00173 std::ostream & operator<<(std::ostream &os, const TCP_Segment &segment) 00174 { 00175 os << "(" << segment.seq_begin << "," << segment.seq_end << ")"; 00176 return os; 00177 } 00178 00179 00180 // -------------------- TCP_Packet ---------------------------------------- 00181 TCP_Packet::TCP_Packet() : 00182 fSegment(), 00183 fACK(), 00184 fWnd(0), 00185 fSessionId(0), 00186 fInfo(0) 00187 { 00188 } 00189 00190 00191 TCP_Packet::TCP_Packet(const TCP_Packet &packet) : 00192 fSegment(packet.fSegment), 00193 fACK(packet.fACK), 00194 fWnd(packet.fWnd), 00195 fSessionId(packet.fSessionId), 00196 fInfo(0) 00197 { 00198 std::cout << "TCP_Packet::TCP_Packet ############" << " "; 00199 00200 if (packet.fInfo != 0) { 00201 std::cout << "TCP_Packet::TCP_Packet rhs.fInfo ###########" << " "; 00202 fInfo = new TDebugInfo(*packet.fInfo); 00203 } 00204 } 00205 00206 00207 TCP_Packet::~TCP_Packet() 00208 { 00209 delete fInfo; 00210 } 00211 00212 00213 TCP_Packet & TCP_Packet::clone() const 00214 { 00215 return *new TCP_Packet(*this); 00216 } 00217 00218 00219 void TCP_Packet::set_info(unsigned ssThresh, unsigned recWnd, unsigned cWnd, 00220 double estRTT, Sequence_Number sndUna, 00221 Sequence_Number sndNxt, bool isRtx) 00222 { 00223 if (fInfo == 0) { 00224 fInfo = new TDebugInfo; 00225 } 00226 00227 fInfo->fSSThresh = ssThresh; 00228 fInfo->fRecWnd = recWnd; 00229 fInfo->fCWnd = cWnd; 00230 fInfo->fRTTEstimate = estRTT; 00231 fInfo->fSndUna = sndUna; 00232 fInfo->fSndNxt = sndNxt; 00233 fInfo->fRtxFlag = isRtx; 00234 } 00235 00236 00237 void TCP_Packet::print_header(std::ostream &out) const 00238 { 00239 std::cout << "Hello!\n"; 00240 00241 std::cout << "Ses = " << get_session_id() << " "; 00242 00243 std::cout << "Segment = " << get_segment() << " " 00244 << "ACK = " << get_ACK() << " " 00245 << "Wnd = " << get_wnd() << " "; 00246 00247 std::cout << "DestPort = " << fDestinationPort << " " 00248 << "SourcePort = " << fSourcePort << " "; 00249 00250 00251 if (fInfo != 0) { 00252 std::cout << "SndSSThresh = " << fInfo->fSSThresh << " "; 00253 std::cout << "RecWnd = " << fInfo->fRecWnd << " "; 00254 std::cout << "SndCWnd = " << fInfo->fCWnd << " "; 00255 std::cout << "RTTEstimate = " << fInfo->fRTTEstimate << " "; 00256 std::cout << "RtxFlag = " << fInfo->fRtxFlag; 00257 } 00258 else 00259 std::cout << "fInfo = " << fInfo << " "; 00260 00261 std::cout << std::endl; 00262 00263 } 00264 00265 00266 00267 std::ostream & operator<<(std::ostream & out, TCP_Packet & msg) 00268 { 00269 msg.print_header(out); 00270 return out; 00271 } 00272 00273 00274 // -------------------- TCP_Sender ---------------------------------------- 00275 TCP_Sender::TCP_Sender(int label) : 00276 fLabel(label), 00277 fTCPVersion(TCP_VERSION), 00278 fMSS(TCP_SMSS), 00279 fTCPIPHeaderLength(TCP_HEADERLENGTH), 00280 fInitialRTT(TCP_INITIALRTT), 00281 fInitialCWnd(0), // default initialization see below 00282 fInitialSSThresh(0), // default initialization see below 00283 fMaxCWnd(0), // default initialization see below 00284 fDupACKThreshold(TCP_DUPACKS), 00285 fTimerGranularity(TCP_STIMERGRAN), 00286 fMaxRTO(TCP_MAXRTO), 00287 fMaxBackoff(TCP_MAXBACKOFF), 00288 fImmediateBackoffReset(TCP_IMMEDIATEBACKOFFRESET), 00289 fKarn(TCP_KARN), 00290 fGoBackN(TCP_GOBACKN), 00291 fFlightSizeRecovery(TCP_FLIGHTSIZERECOVERY), 00292 fRenoConservation(TCP_RENOCONSERVATION), 00293 fCarefulSSThreshReduction(TCP_CAREFULSSTHRESHREDUCTION), 00294 fIgnoreDupACKOnTORecovery(TCP_IGNOREDUPACKONTORECOVERY), 00295 fCarefulMulFastRtxAvoidance(TCP_CAREFULMULFASTRTXAVOIDANCE), 00296 fNagle(TCP_NAGLE), 00297 fSWSATimerValue(TCP_SWSATIMERVALUE), 00298 fRestartAfterIdle(TCP_RESTARTAFTERIDLE), 00299 fDebug(false), 00300 fTrace(false), 00301 fSessionId(0), 00302 fRtxTimer(*this, &TCP_Sender::HandleRtxTimeout), 00303 fSWSATimer(*this, &TCP_Sender::HandleSWSATimeout)/*,*/ 00304 { 00305 00306 // default values and parameter check for MaxCWND, InitCWND, InitSSThresh 00307 if (fMaxCWnd == 0) { 00308 fMaxCWnd = (unsigned)(TCP_MAXCWNDREL * fMSS); 00309 } else if (fMaxCWnd < fMSS) { 00310 // throw (UL_CException("TCP_Sender::TCP_Sender", 00311 // "MaxCWnd must be >= MSS")); 00312 } 00313 00314 if (fInitialCWnd == 0) { 00315 fInitialCWnd = (unsigned)(TCP_INITIALCWNDREL * fMSS); 00316 } else if ((fInitialCWnd < fMSS) || (fInitialCWnd > fMaxCWnd)) { 00317 // throw (UL_CException("TCP_Sender::TCP_Sender", 00318 // "initial CWnd must be >= MSS and <= MaxCWnd")); 00319 } 00320 00321 if ((fInitialSSThresh == 0) && (fMaxCWnd >= 2 * fMSS)) { 00322 fInitialSSThresh = (unsigned)(TCP_INITIALSSTHRESHREL * fMaxCWnd); 00323 } else if ((fInitialSSThresh < 2*fMSS) || (fInitialCWnd > fMaxCWnd)) { 00324 // throw (UL_CException("TCP_Sender::TCP_Sender", 00325 // "initial CWnd must be >= 2*MSS and <= MaxCWnd")); 00326 } 00327 00328 setup(); 00329 00330 InitStatistics(); 00331 00332 00333 tcp_send.set_name("TCP Send"); 00334 tcp_receive_ack.forward(this, &TCP_Sender::ReceiveMessageFromNet); 00335 tcp_receive_ack.set_name("TCP ACK"); 00336 tcp_socket_write.forward(this, &TCP_Sender::HandleUserMessageIndication); 00337 tcp_socket_write.set_name("SocketWrite"); 00338 tcp_release.forward(this, &TCP_Sender::release); 00339 tcp_release.set_name("Release"); 00340 00341 } 00342 00343 00344 TCP_Sender::~TCP_Sender () 00345 { 00346 } 00347 00348 void TCP_Sender::set_debug(const bool enable_debug) 00349 { 00350 fDebug = enable_debug; 00351 tcp_send.set_debug(enable_debug); 00352 } 00353 00354 void TCP_Sender::set_debug(bool enable_debug, bool enable_signal_debug) 00355 { 00356 fDebug = enable_debug; 00357 tcp_send.set_debug(enable_signal_debug); 00358 } 00359 00360 void TCP_Sender::set_trace(const bool enable_trace) 00361 { 00362 fTrace = enable_trace; 00363 } 00364 00365 void TCP_Sender::set_label(int label) 00366 { 00367 fLabel = label; 00368 } 00369 00370 void TCP_Sender::setup() 00371 { 00372 fSndUna = 0; 00373 fSndNxt = 0; 00374 fSndMax = 0; 00375 fMaxRecWnd = 0; 00376 fRecWnd = fMaxCWnd; 00377 fUserNxt = 0; 00378 fCWnd = fInitialCWnd; 00379 fSSThresh = fInitialSSThresh; 00380 fRecoveryDupACK = 0; 00381 fRecoveryTO = 0; 00382 fDupACKCnt = 0; 00383 00384 // timers 00385 fBackoff = 1; 00386 fPendingBackoffReset = false; 00387 fLastSendTime = Event_Queue::now(); 00388 00389 // RTT measurement 00390 fTimUna = 0; 00391 fSRTT = 0; 00392 fRTTVar = 0; 00393 fRTTEstimate = fInitialRTT; 00394 fRTTMPending = false; 00395 fRTTMByte = 0; 00396 00397 CWnd_val.set_size(1000); 00398 CWnd_val.zeros(); 00399 CWnd_time.set_size(1000); 00400 CWnd_time.zeros(); 00401 CWnd_val(0) = fInitialCWnd; 00402 CWnd_time(0) = 0; 00403 CWnd_index=1; 00404 00405 SSThresh_val.set_size(1000); 00406 SSThresh_val.zeros(); 00407 SSThresh_time.set_size(1000); 00408 SSThresh_time.zeros(); 00409 SSThresh_val(0) = fInitialSSThresh; 00410 SSThresh_time(0) = 0; 00411 SSThresh_index=1; 00412 00413 sent_seq_num_val.set_size(1000); 00414 sent_seq_num_val.zeros(); 00415 sent_seq_num_time.set_size(1000); 00416 sent_seq_num_time.zeros(); 00417 sent_seq_num_val(0) = 0; 00418 sent_seq_num_time(0) = 0; 00419 sent_seq_num_index=1; 00420 00421 sender_recv_ack_seq_num_val.set_size(1000); 00422 sender_recv_ack_seq_num_val.zeros(); 00423 sender_recv_ack_seq_num_time.set_size(1000); 00424 sender_recv_ack_seq_num_time.zeros(); 00425 sender_recv_ack_seq_num_val(0) = 0; 00426 sender_recv_ack_seq_num_time(0) = 0; 00427 sender_recv_ack_seq_num_index=1; 00428 00429 RTTEstimate_val.set_size(1000); 00430 RTTEstimate_val.zeros(); 00431 RTTEstimate_time.set_size(1000); 00432 RTTEstimate_time.zeros(); 00433 RTTEstimate_val(0) = fInitialRTT; 00434 RTTEstimate_time(0) = 0; 00435 RTTEstimate_index=1; 00436 00437 RTTsample_val.set_size(1000); 00438 RTTsample_val.zeros(); 00439 RTTsample_time.set_size(1000); 00440 RTTsample_time.zeros(); 00441 RTTsample_val(0) = 0; 00442 RTTsample_time(0) = 0; 00443 RTTsample_index=1; 00444 00445 } 00446 00447 std::string TCP_Sender::GenerateFilename() 00448 { 00449 time_t rawtime; 00450 struct tm *timeinfo; 00451 timeinfo = localtime(&rawtime); 00452 std::ostringstream filename_stream; 00453 filename_stream << "trace_tcp_sender_u" << fLabel 00454 << "_" << 1900+timeinfo->tm_year 00455 << "_" << timeinfo->tm_mon 00456 << "_" << timeinfo->tm_mday 00457 << "__" << timeinfo->tm_hour 00458 << "_" << timeinfo->tm_min 00459 << "_" << timeinfo->tm_sec 00460 << "_.it"; 00461 return filename_stream.str(); 00462 } 00463 00464 00465 void TCP_Sender::release(std::string file) 00466 { 00467 std::string filename; 00468 fSessionId++; 00469 00470 fRtxTimer.Reset(); 00471 fSWSATimer.Reset(); 00472 00473 if (fTrace) { 00474 if (file == "") 00475 filename = GenerateFilename(); 00476 else 00477 filename = file; 00478 00479 save_trace(filename); 00480 } 00481 } 00482 00483 00484 void TCP_Sender::InitStatistics() 00485 { 00486 fNumberOfTimeouts = 0; 00487 fNumberOfIdleTimeouts = 0; 00488 fNumberOfFastRetransmits = 0; 00489 fNumberOfRTTMeasurements = 0; 00490 fNumberOfReceivedACKs = 0; 00491 } 00492 00493 00494 void TCP_Sender::StopTransientPhase() 00495 { 00496 InitStatistics(); 00497 } 00498 00499 00500 void TCP_Sender::HandleUserMessageIndication(itpp::Packet *user_data_p) 00501 { 00502 if (fDebug) { 00503 std::cout << "TCP_Sender::HandleUserMessageIndication" 00504 << " byte_size=" << user_data_p->bit_size()/8 00505 << " ptr=" << user_data_p 00506 << " time=" << Event_Queue::now() << std::endl; 00507 } 00508 00509 SocketWriteQueue.push(user_data_p); 00510 00511 SendNewData(); // will call GetMessage (via GetNextSegmentSize) 00512 // if new data can be sent 00513 } 00514 00515 00516 void TCP_Sender::ReceiveMessageFromNet(itpp::Packet *msg) 00517 { 00518 TCP_Packet & packet = (TCP_Packet &)*msg; 00519 00520 if (fDebug) { 00521 std::cout << "TCP_Sender::ReceiveMessageFromNet" 00522 << " byte_size=" << msg->bit_size()/8 00523 << " ptr=" << msg 00524 << " time=" << Event_Queue::now() << std::endl; 00525 } 00526 00527 if((packet.get_session_id() == fSessionId) && // ACK of current session 00528 (packet.get_ACK() >= fSndUna)) { // ACK is OK 00529 HandleACK(packet); 00530 } 00531 00532 delete &packet; 00533 } 00534 00535 00536 void TCP_Sender::HandleACK(TCP_Packet &msg) 00537 { 00538 it_assert(msg.get_ACK() <= fSndMax, "TCP_Sender::HandleACK, received ACK > SndMax at "); 00539 00540 fNumberOfReceivedACKs++; 00541 00542 if (fTrace) { 00543 TraceACKedSeqNo(msg.get_ACK()); 00544 } 00545 00546 if (fDebug) { 00547 std::cout << "sender " << fLabel << ": " 00548 << "receive ACK: " 00549 << " t = " << Event_Queue::now() << ", " 00550 << msg << std::endl; 00551 } 00552 00553 // update receiver advertised window size 00554 fRecWnd = msg.get_wnd(); 00555 fMaxRecWnd = max(fRecWnd, fMaxRecWnd); 00556 00557 if (msg.get_ACK() == fSndUna) { // duplicate ACK 00558 00559 bool ignoreDupACK = (fSndMax == fSndUna); // no outstanding data 00560 00561 if (fIgnoreDupACKOnTORecovery) { 00562 // don't count dupacks during TO recovery! 00563 if (fCarefulMulFastRtxAvoidance) { // see RFC 2582, Section 5 00564 // like in Solaris 00565 ignoreDupACK = ignoreDupACK || (fSndUna <= fRecoveryTO); 00566 } else { 00567 // like in ns 00568 ignoreDupACK = ignoreDupACK || (fSndUna < fRecoveryTO); 00569 } 00570 } 00571 00572 if (!ignoreDupACK) { 00573 fDupACKCnt++; // count the number of duplicate ACKs 00574 00575 if (fDupACKCnt == fDupACKThreshold) { 00576 // dupack threshold is reached 00577 fNumberOfFastRetransmits++; 00578 00579 fRecoveryDupACK = fSndMax; 00580 00581 ReduceSSThresh(); // halve ssthresh (in most cases) 00582 00583 if ((fTCPVersion == kReno) || (fTCPVersion == kNewReno)) { 00584 fCWnd = fSSThresh; 00585 } else if (fTCPVersion == kTahoe) { 00586 fCWnd = fMSS; 00587 } 00588 00589 if (fTCPVersion == kReno || fTCPVersion == kNewReno) { 00590 // conservation of packets: 00591 if (fRenoConservation) { 00592 fCWnd += fDupACKThreshold * fMSS; 00593 } 00594 } else if (fTCPVersion == kTahoe) { 00595 if (fGoBackN) { 00596 fSndNxt = fSndUna; // Go-Back-N (like in ns) 00597 } 00598 } 00599 00600 UnaRetransmit(); // initiate retransmission 00601 } else if (fDupACKCnt > fDupACKThreshold) { 00602 if (fTCPVersion == kReno || fTCPVersion == kNewReno) { 00603 // conservation of packets 00604 // CWnd may exceed MaxCWnd during fast recovery, 00605 // however, the result of SendWindow() is always <= MaxCwnd 00606 if (fRenoConservation) { 00607 fCWnd += fMSS; 00608 } 00609 } 00610 } 00611 } 00612 } else { // new ACK 00613 Sequence_Number oldSndUna = fSndUna; // required for NewReno partial ACK 00614 fSndUna = msg.get_ACK(); 00615 fSndNxt = max(fSndNxt, fSndUna); // required in case of "Go-Back-N" 00616 00617 // reset retransmission timer 00618 00619 if ((fSndUna > fTimUna) && fRtxTimer.IsPending()) { 00620 // seq. no. for which rtx timer is running has been received 00621 fRtxTimer.Reset(); 00622 } 00623 00624 // backoff reset 00625 00626 if (fImmediateBackoffReset) { 00627 fBackoff = 1; 00628 } else { 00629 if (fPendingBackoffReset) { 00630 fBackoff = 1; 00631 fPendingBackoffReset = false; 00632 } else if (fBackoff > 1) { 00633 // reset backoff counter only on next new ACK (this is probably 00634 // the way to operate intended by Karn) 00635 fPendingBackoffReset = true; 00636 } 00637 } 00638 00639 // RTT measurement 00640 00641 if ((fSndUna > fRTTMByte) && fRTTMPending) { 00642 UpdateRTTVariables(Event_Queue::now() - fRTTMStartTime); 00643 fRTTMPending = false; 00644 } 00645 00646 // update CWnd and reset dupack counter 00647 00648 if (fDupACKCnt >= fDupACKThreshold) { 00649 // we are in fast recovery 00650 if (fTCPVersion == kNewReno && fSndUna < fRecoveryDupACK) { 00651 // New Reno partial ACK handling 00652 if (fRenoConservation) { 00653 fCWnd = max(fMSS, fCWnd - (fSndUna - oldSndUna) + fMSS); 00654 } 00655 UnaRetransmit(); // start retransmit immediately 00656 } else { 00657 FinishFastRecovery(); 00658 } 00659 } else { 00660 // no fast recovery 00661 fDupACKCnt = 0; 00662 if (fCWnd < fSSThresh) { 00663 // slow start phase 00664 fCWnd = min (fCWnd + fMSS, fMaxCWnd); 00665 } else { 00666 // congestion avoidance phase 00667 fCWnd += max (fMSS * fMSS / fCWnd, 1); // RFC 2581 00668 fCWnd = min (fCWnd, fMaxCWnd); 00669 } 00670 } 00671 } // new ACK 00672 00673 SendNewData(); // try to send new data (even in the case that a retransmit 00674 // had to be performed) 00675 00676 if (fTrace) { 00677 TraceCWnd(); 00678 } 00679 } 00680 00681 00682 void TCP_Sender::SendNewData(bool skipSWSA) 00683 { 00684 unsigned nextSegmentSize; 00685 00686 it_assert(fSndUna <= fSndNxt, "TCP_Sender::SendNewData, SndUna > SndNxt in sender " + to_str(fLabel) + "!"); 00687 00688 if (fRestartAfterIdle) { 00689 IdleCheck(); 00690 } 00691 00692 bool sillyWindowAvoidanceFailed = false; 00693 00694 while (!sillyWindowAvoidanceFailed && 00695 ((nextSegmentSize = GetNextSegmentSize(fSndNxt)) > 0)) 00696 { 00697 // there is new data to send and window is large enough 00698 00699 // SWSA and Nagle (RFC 1122): assume PUSH to be set 00700 unsigned queuedUnsent = fUserNxt - fSndNxt; 00701 unsigned usableWindow = max(0, (fSndUna + SendWindow()) - fSndNxt); 00702 00703 if (((unsigned)min(queuedUnsent, usableWindow) >= fMSS) || 00704 ((!fNagle || (fSndUna == fSndNxt)) && 00705 ((queuedUnsent <= usableWindow) || // Silly W. A. 00706 ((unsigned)min(queuedUnsent, usableWindow) >= fMaxRecWnd / 2) 00707 ) 00708 ) || 00709 skipSWSA 00710 ) { 00711 // Silly Window Syndrome Avoidance (SWSA) and Nagle passed 00712 00713 TCP_Segment nextSegment(fSndNxt, fSndNxt + nextSegmentSize); 00714 TCP_Packet & msg = * new TCP_Packet (); 00715 00716 msg.set_segment(nextSegment); 00717 msg.set_session_id(fSessionId); 00718 msg.set_destination_port(fLabel); // The dest and src port are set to the same 00719 msg.set_source_port(fLabel); // number for simplicity. 00720 msg.set_bit_size(8 * (nextSegmentSize + fTCPIPHeaderLength)); 00721 00722 if (fDebug) { 00723 std::cout << "TCP_Sender::SendNewData," 00724 << " nextSegmentSize=" << nextSegmentSize 00725 << " fTCPIPHeaderLength=" << fTCPIPHeaderLength 00726 << " byte_size=" << msg.bit_size()/8 00727 << " ptr=" << &msg 00728 << " time=" << Event_Queue::now() << std::endl; 00729 } 00730 00731 // no RTT measurement for retransmitted segments 00732 // changes on Dec. 13. 2002 (Ga, Bo, Scharf) 00733 00734 if (!fRTTMPending && fSndNxt >= fSndMax) { // ##Bo## 00735 fRTTMStartTime = Event_Queue::now(); 00736 fRTTMByte = nextSegment.begin(); 00737 fRTTMPending = true; 00738 } 00739 00740 fSndNxt += nextSegmentSize; 00741 fSndMax = max(fSndNxt, fSndMax); 00742 00743 // reset SWSA timer if necessary 00744 if (skipSWSA) { 00745 skipSWSA = false; 00746 } else if (fSWSATimer.IsPending()) { 00747 fSWSATimer.Reset(); 00748 } 00749 00750 // set rtx timer if necessary 00751 if (!fRtxTimer.IsPending()) { 00752 SetRtxTimer(); 00753 } 00754 00755 00756 if (fDebug) { 00757 msg.set_info(fSSThresh, fRecWnd, fCWnd, fRTTEstimate, 00758 fSndUna, fSndNxt, false); 00759 std::cout << "sender " << fLabel 00760 << ": send new data: " 00761 << " t = " << Event_Queue::now() << ", " 00762 << msg << std::endl; 00763 } 00764 00765 SendMsg(msg); 00766 } else { 00767 sillyWindowAvoidanceFailed = true; 00768 // set SWSA timer 00769 if (!fSWSATimer.IsPending()) { 00770 fSWSATimer.Set(fSWSATimerValue); 00771 } 00772 } 00773 } 00774 00775 // set timers in case that no new data could have been sent 00776 if (!fRtxTimer.IsPending()) { 00777 if (fSndMax > fSndUna) { // there is outstanding data 00778 if (!fImmediateBackoffReset && fPendingBackoffReset) { 00779 // backoff is reset if no new data could have been sent since last 00780 // (successfull) retransmission; this is useful in case of 00781 // Reno recovery and multiple losses to avoid that in 00782 // the (unavoidable) series of timeouts the timer value 00783 // increases exponentially as this is not the intention 00784 // of the delayed backoff reset in Karn's algorithm 00785 fBackoff = 1; 00786 fPendingBackoffReset = false; 00787 } 00788 SetRtxTimer(); 00789 } 00790 } 00791 } 00792 00793 00794 void TCP_Sender::UnaRetransmit() 00795 { 00796 // resend after timeout or fast retransmit 00797 unsigned nextSegmentSize = GetNextSegmentSize(fSndUna); 00798 00799 if (nextSegmentSize > 0) { 00800 TCP_Segment nextSegment(fSndUna, fSndUna + nextSegmentSize); 00801 TCP_Packet & msg = *new TCP_Packet(); 00802 msg.set_segment(nextSegment); 00803 msg.set_session_id(fSessionId); 00804 msg.set_destination_port(fLabel); // The dest and src port are set to the same 00805 msg.set_source_port(fLabel); // number for simplicity. 00806 msg.set_bit_size(8 * (nextSegmentSize + fTCPIPHeaderLength)); 00807 00808 fSndNxt = max(fSndNxt, fSndUna + nextSegmentSize); 00809 fSndMax = max(fSndNxt, fSndMax); 00810 00811 // The RTT measurement is cancelled if the RTTM byte has a sequence 00812 // number higher or equal than the first retransmitted byte as 00813 // the ACK for the RTTM byte will be delayed by the rtx for at least 00814 // one round 00815 if (fKarn && (nextSegment.begin() <= fRTTMByte) && fRTTMPending) { 00816 fRTTMPending = false; 00817 } 00818 00819 SetRtxTimer(); 00820 00821 if (fDebug) { 00822 msg.set_info(fSSThresh, fRecWnd, fCWnd, fRTTEstimate, 00823 fSndUna, fSndNxt, true); 00824 std::cout << "sender " << fLabel; 00825 if (fDupACKCnt >= fDupACKThreshold) { 00826 std::cout << ": fast rtx: "; 00827 } else { 00828 std::cout << ": TO rtx: "; 00829 } 00830 std::cout << " t = " << Event_Queue::now() << ", " 00831 << msg << std::endl; 00832 } 00833 00834 SendMsg(msg); 00835 } else { 00836 // throw(UL_CException("TCP_Sender::UnaRetransmit", "no bytes to send")); 00837 } 00838 } 00839 00840 00841 void TCP_Sender::FinishFastRecovery() 00842 { 00843 if (fTCPVersion == kTahoe) { 00844 fDupACKCnt = 0; 00845 } else if (fTCPVersion == kReno) { 00846 // Reno fast recovery 00847 fDupACKCnt = 0; 00848 if (fFlightSizeRecovery) { 00849 fCWnd = min(fSndMax - fSndUna + fMSS, fSSThresh); 00850 } else { 00851 fCWnd = fSSThresh; 00852 } 00853 } else if (fTCPVersion == kNewReno) { 00854 // New Reno fast recovery 00855 // "Set CWnd to ... min (ssthresh, FlightSize + MSS) 00856 // ... or ssthresh" (RFC 2582) 00857 if (fFlightSizeRecovery) { 00858 fCWnd = min(fSndMax - fSndUna + fMSS, fSSThresh); 00859 } else { 00860 fCWnd = fSSThresh; 00861 } 00862 fDupACKCnt = 0; 00863 } 00864 } 00865 00866 00867 void TCP_Sender::ReduceSSThresh() 00868 { 00869 if (fCarefulSSThreshReduction) { 00870 // If Reno conservation is enabled the amount of 00871 // outstanding data ("flight size") might be rather large 00872 // and even larger than twice the old ssthresh value; 00873 // so this corresponds more to the ns behaviour where always cwnd is 00874 // taken instead of flight size. 00875 fSSThresh = max(2 * fMSS, 00876 min(min(fCWnd, fSndMax - fSndUna), fRecWnd) / 2); 00877 } else { 00878 // use filght size / 2 as recommended in RFC 2581 00879 fSSThresh = max(2 * fMSS, min(fSndMax - fSndUna, fRecWnd) / 2); 00880 } 00881 00882 it_assert(fSSThresh <= fMaxCWnd, "TCP_Sender::HandleACK, internal error: SndSSThresh is > MaxCWnd"); 00883 00884 if (fTrace) { 00885 TraceSSThresh(); 00886 } 00887 } 00888 00889 00890 void TCP_Sender::SendMsg(TCP_Packet &msg) 00891 { 00892 if (fTrace) { 00893 TraceSentSeqNo(msg.get_segment().end()); 00894 } 00895 00896 if (fRestartAfterIdle) { 00897 fLastSendTime = Event_Queue::now(); // needed for idle detection 00898 } 00899 00900 tcp_send(&msg); 00901 } 00902 00903 00904 void TCP_Sender::IdleCheck() 00905 { 00906 // idle detection according to Jacobson, SIGCOMM, 1988: 00907 // sender is currently idle and nothing has been send since RTO 00908 00909 if (fSndMax == fSndUna && Event_Queue::now() - fLastSendTime > CalcRTOValue()) { 00910 fCWnd = fInitialCWnd; // see RFC2581 00911 00912 fNumberOfIdleTimeouts++; 00913 00914 if (fTrace) { 00915 TraceCWnd(); 00916 } 00917 00918 if (fDebug) { 00919 std::cout << "sender " << fLabel 00920 << ": idle timeout: " 00921 << "t = " << Event_Queue::now() 00922 << ", SndNxt = " << fSndNxt 00923 << ", SndUna = " << fSndUna 00924 << ", Backoff = " << fBackoff 00925 << std::endl; 00926 } 00927 } 00928 } 00929 00930 00931 void TCP_Sender::HandleRtxTimeout(Ttype time) 00932 { 00933 fNumberOfTimeouts++; 00934 00935 // update backoff 00936 fBackoff = min(fMaxBackoff, fBackoff * 2); 00937 if (!fImmediateBackoffReset) { 00938 fPendingBackoffReset = false; 00939 } 00940 00941 if (fDupACKCnt >= fDupACKThreshold) { 00942 FinishFastRecovery(); // reset dup ACK cnt and CWnd 00943 } else if (fDupACKCnt > 0) { 00944 fDupACKCnt = 0; // don't allow dupack action during TO recovery 00945 } 00946 00947 // update CWnd and SSThresh 00948 ReduceSSThresh(); // halve ssthresh (in most cases) 00949 fCWnd = fMSS; // not initial CWnd, see RFC 2581 00950 00951 it_assert(fSSThresh <= fMaxCWnd, "TCP_Sender::HandleRtxTimeout, internal error: SndSSThresh is > MaxCWnd"); 00952 00953 fRecoveryTO = fSndMax; 00954 00955 if (fGoBackN) { 00956 // go back N is mainly relevant in the case of multiple losses 00957 // which would lead to a series of timeouts without resetting sndnxt 00958 fSndNxt = fSndUna; 00959 } 00960 00961 if (fDebug) { 00962 std::cout << "sender " << fLabel 00963 << ": rtx timeout: " 00964 << "t = " << Event_Queue::now() 00965 << ", SndNxt = " << fSndNxt 00966 << ", SndUna = " << fSndUna 00967 << std::endl; 00968 } 00969 00970 if (fTrace) { 00971 TraceCWnd(); 00972 } 00973 00974 UnaRetransmit(); // initiate retransmission 00975 } 00976 00977 00978 void TCP_Sender::HandleSWSATimeout(Ttype) 00979 { 00980 SendNewData(true); 00981 } 00982 00983 00984 unsigned TCP_Sender::GetNextSegmentSize(const Sequence_Number & begin) 00985 { 00986 // try to get new user messages if available and necessary 00987 while ((fUserNxt < begin + fMSS) && (!SocketWriteQueue.empty())) { 00988 itpp::Packet *packet_p = SocketWriteQueue.front(); 00989 SocketWriteQueue.pop(); 00990 fUserNxt += (unsigned) packet_p->bit_size()/8; 00991 delete packet_p; 00992 } 00993 00994 Sequence_Number end = min(min(fUserNxt, begin + fMSS), 00995 fSndUna + SendWindow()); 00996 00997 if (fDebug) { 00998 std::cout << "TCP_Sender::GetNextSegmentSize," 00999 << " fUserNxt=" << fUserNxt 01000 << " begin_seq_num=" << begin 01001 << " fMSS=" << fMSS 01002 << " fSndUna=" << fSndUna 01003 << " SendWindow()=" << SendWindow() 01004 << " end_seq_num=" << end 01005 << " time=" << Event_Queue::now() << std::endl; 01006 } 01007 01008 return max(0, end - begin); 01009 } 01010 01011 01012 unsigned TCP_Sender::SendWindow() const 01013 { 01014 return min(fRecWnd, min (fMaxCWnd, fCWnd)); 01015 } 01016 01017 01018 double TCP_Sender::CalcRTOValue() const 01019 { 01020 static const double factor = 1 + 1e-8; 01021 // to avoid "simultaneous" TO/receive ACK events in case of const. RTT 01022 01023 double rto = fBackoff * fRTTEstimate * factor; 01024 01025 if (rto > fMaxRTO) { 01026 rto = fMaxRTO; 01027 } 01028 01029 return rto; 01030 } 01031 01032 01033 void TCP_Sender::SetRtxTimer() 01034 { 01035 double rto = CalcRTOValue(); 01036 fRtxTimer.Set(rto); 01037 fTimUna = fSndUna; 01038 if (fDebug) { 01039 std::cout << "sender " << fLabel 01040 << ": set rtx timer: " 01041 << "t = " << Event_Queue::now() 01042 << ", RTO = " << rto 01043 << ", Backoff = " << fBackoff 01044 << ", TimUna = " << fTimUna 01045 << std::endl; 01046 } 01047 } 01048 01049 01050 void TCP_Sender::UpdateRTTVariables(double sampleRTT) 01051 { 01052 if (fSRTT == 0) { 01053 fSRTT = sampleRTT; 01054 fRTTVar = sampleRTT / 2; 01055 } else { 01056 // see, e.g., Comer for the values used as weights 01057 fSRTT = 0.875 * fSRTT + 0.125 * sampleRTT; 01058 fRTTVar = 0.75 * fRTTVar + 0.25 * fabs(sampleRTT - fSRTT); 01059 } 01060 01061 fRTTEstimate = round(fSRTT + 4 * fRTTVar, fTimerGranularity); 01062 01063 if (fTrace) { 01064 TraceRTTVariables(sampleRTT); 01065 } 01066 01067 fNumberOfRTTMeasurements++; 01068 } 01069 01070 01071 void TCP_Sender::TraceRTTVariables(double sampleRTT) 01072 { 01073 if (fDebug) { 01074 std::cout << "sender " << fLabel 01075 << ": RTT update: " 01076 << "t = " << Event_Queue::now() 01077 << ", sample = " << sampleRTT 01078 << ", SRTT = " << fSRTT 01079 << ", RTTVar = " << fRTTVar 01080 << ", RTTEstimate = " << fRTTEstimate 01081 << std::endl; 01082 } 01083 01084 if (RTTsample_index >= RTTsample_time.size()) { 01085 RTTsample_time.set_size(2*RTTsample_time.size(),true); 01086 RTTsample_val.set_size(2*RTTsample_val.size(),true); 01087 } 01088 RTTsample_val(RTTsample_index) = sampleRTT; 01089 RTTsample_time(RTTsample_index) = Event_Queue::now(); 01090 RTTsample_index++; 01091 01092 if (RTTEstimate_index >= RTTEstimate_time.size()) { 01093 RTTEstimate_time.set_size(2*RTTEstimate_time.size(),true); 01094 RTTEstimate_val.set_size(2*RTTEstimate_val.size(),true); 01095 } 01096 RTTEstimate_val(RTTEstimate_index) = fRTTEstimate; 01097 RTTEstimate_time(RTTEstimate_index) = Event_Queue::now(); 01098 RTTEstimate_index++; 01099 } 01100 01101 01102 void TCP_Sender::TraceCWnd() 01103 { 01104 if (fDebug) { 01105 std::cout << "sender " << fLabel 01106 << " t = " << Event_Queue::now() 01107 << " cwnd = " << fCWnd << std::endl; 01108 } 01109 if (CWnd_index >= CWnd_time.size()) { 01110 CWnd_time.set_size(2*CWnd_time.size(),true); 01111 CWnd_val.set_size(2*CWnd_val.size(),true); 01112 } 01113 CWnd_val(CWnd_index) = fCWnd; 01114 CWnd_time(CWnd_index) = Event_Queue::now(); 01115 CWnd_index++; 01116 01117 } 01118 01119 void TCP_Sender::TraceSSThresh() 01120 { 01121 if (fDebug) { 01122 std::cout << "sender " << fLabel 01123 << " t = " << Event_Queue::now() 01124 << " cwnd = " << fSSThresh << std::endl; 01125 } 01126 if (SSThresh_index >= SSThresh_time.size()) { 01127 SSThresh_time.set_size(2*SSThresh_time.size(),true); 01128 SSThresh_val.set_size(2*SSThresh_val.size(),true); 01129 } 01130 SSThresh_val(SSThresh_index) = fSSThresh; 01131 SSThresh_time(SSThresh_index) = Event_Queue::now(); 01132 SSThresh_index++; 01133 01134 } 01135 01136 void TCP_Sender::TraceSentSeqNo(const Sequence_Number sn) 01137 { 01139 if (fDebug) { 01140 std::cout << "sender " << fLabel 01141 << " t = " << Event_Queue::now() 01142 << " sent = " << sn 01143 << std::endl; 01144 } 01145 if (sent_seq_num_index >= sent_seq_num_time.size()) { 01146 sent_seq_num_time.set_size(2*sent_seq_num_time.size(),true); 01147 sent_seq_num_val.set_size(2*sent_seq_num_val.size(),true); 01148 } 01149 sent_seq_num_val(sent_seq_num_index) = sn.value(); 01150 sent_seq_num_time(sent_seq_num_index) = Event_Queue::now(); 01151 sent_seq_num_index++; 01152 } 01153 01154 01155 void TCP_Sender::TraceACKedSeqNo(const Sequence_Number sn) 01156 { 01157 if (fDebug) { 01158 std::cout << "sender " << fLabel 01159 << " t = " << Event_Queue::now() 01160 << " ACK = " << sn 01161 << std::endl; 01162 } 01163 01164 if (sender_recv_ack_seq_num_index >= sender_recv_ack_seq_num_time.size()) { 01165 sender_recv_ack_seq_num_time.set_size(2*sender_recv_ack_seq_num_time.size(),true); 01166 sender_recv_ack_seq_num_val.set_size(2*sender_recv_ack_seq_num_val.size(),true); 01167 } 01168 sender_recv_ack_seq_num_val(sender_recv_ack_seq_num_index) = sn.value(); 01169 sender_recv_ack_seq_num_time(sender_recv_ack_seq_num_index) = Event_Queue::now(); 01170 sender_recv_ack_seq_num_index++; 01171 } 01172 01173 01174 void TCP_Sender::save_trace(std::string filename) { 01175 01176 CWnd_val.set_size(CWnd_index, true); 01177 CWnd_time.set_size(CWnd_index,true); 01178 01179 SSThresh_val.set_size(SSThresh_index, true); 01180 SSThresh_time.set_size(SSThresh_index,true); 01181 01182 sent_seq_num_val.set_size(sent_seq_num_index, true); 01183 sent_seq_num_time.set_size(sent_seq_num_index,true); 01184 01185 sender_recv_ack_seq_num_val.set_size(sender_recv_ack_seq_num_index, true); 01186 sender_recv_ack_seq_num_time.set_size(sender_recv_ack_seq_num_index,true); 01187 01188 RTTEstimate_val.set_size(RTTEstimate_index, true); 01189 RTTEstimate_time.set_size(RTTEstimate_index,true); 01190 01191 RTTsample_val.set_size(RTTsample_index, true); 01192 RTTsample_time.set_size(RTTsample_index,true); 01193 01194 if (fDebug) { 01195 std::cout << "CWnd_val" << CWnd_val << std::endl; 01196 std::cout << "CWnd_time" << CWnd_time << std::endl; 01197 std::cout << "CWnd_index" << CWnd_index << std::endl; 01198 01199 std::cout << "SSThresh_val" << SSThresh_val << std::endl; 01200 std::cout << "SSThresh_time" << SSThresh_time << std::endl; 01201 std::cout << "SSThresh_index" << SSThresh_index << std::endl; 01202 01203 std::cout << "sent_seq_num_val" << sent_seq_num_val << std::endl; 01204 std::cout << "sent_seq_num_time" << sent_seq_num_time << std::endl; 01205 std::cout << "sent_seq_num_index" << sent_seq_num_index << std::endl; 01206 01207 std::cout << "sender_recv_ack_seq_num_val" << sender_recv_ack_seq_num_val << std::endl; 01208 std::cout << "sender_recv_ack_seq_num_time" << sender_recv_ack_seq_num_time << std::endl; 01209 std::cout << "sender_recv_ack_seq_num_index" << sender_recv_ack_seq_num_index << std::endl; 01210 01211 std::cout << "RTTEstimate_val" << RTTEstimate_val << std::endl; 01212 std::cout << "RTTEstimate_time" << RTTEstimate_time << std::endl; 01213 std::cout << "RTTEstimate_index" << RTTEstimate_index << std::endl; 01214 01215 std::cout << "RTTsample_val" << RTTsample_val << std::endl; 01216 std::cout << "RTTsample_time" << RTTsample_time << std::endl; 01217 std::cout << "RTTsample_index" << RTTsample_index << std::endl; 01218 01219 std::cout << "TCP_Sender::saving to file: " << filename << std::endl; 01220 } 01221 01222 it_file ff2; 01223 ff2.open(filename); 01224 01225 ff2 << Name("CWnd_val") << CWnd_val; 01226 ff2 << Name("CWnd_time") << CWnd_time; 01227 ff2 << Name("CWnd_index") << CWnd_index; 01228 01229 ff2 << Name("SSThresh_val") << SSThresh_val; 01230 ff2 << Name("SSThresh_time") << SSThresh_time; 01231 ff2 << Name("SSThresh_index") << SSThresh_index; 01232 01233 ff2 << Name("sent_seq_num_val") << sent_seq_num_val; 01234 ff2 << Name("sent_seq_num_time") << sent_seq_num_time; 01235 ff2 << Name("sent_seq_num_index") << sent_seq_num_index; 01236 01237 ff2 << Name("sender_recv_ack_seq_num_val") << sender_recv_ack_seq_num_val; 01238 ff2 << Name("sender_recv_ack_seq_num_time") << sender_recv_ack_seq_num_time; 01239 ff2 << Name("sender_recv_ack_seq_num_index") << sender_recv_ack_seq_num_index; 01240 01241 ff2 << Name("RTTEstimate_val") << RTTEstimate_val; 01242 ff2 << Name("RTTEstimate_time") << RTTEstimate_time; 01243 ff2 << Name("RTTEstimate_index") << RTTEstimate_index; 01244 01245 ff2 << Name("RTTsample_val") << RTTsample_val; 01246 ff2 << Name("RTTsample_time") << RTTsample_time; 01247 ff2 << Name("RTTsample_index") << RTTsample_index; 01248 01249 ff2.flush(); 01250 ff2.close(); 01251 } 01252 01253 01254 void TCP_Sender::print_item(std::ostream & out, const std::string & keyword) 01255 { 01256 if (keyword == "Label") { 01257 std::cout << fLabel; 01258 } else if (keyword == "CWnd") { 01259 std::cout << fCWnd; 01260 } else if (keyword == "SSThresh") { 01261 std::cout << fSSThresh; 01262 } else if (keyword == "SRTT") { 01263 std::cout << fSRTT; 01264 } else if (keyword == "RTTvar") { 01265 std::cout << fRTTVar; 01266 } else if (keyword == "Backoff") { 01267 std::cout << fBackoff; 01268 } else if (keyword == "RTO") { 01269 std::cout << CalcRTOValue(); 01270 } else if (keyword == "NoOfFastRets") { 01271 std::cout << fNumberOfFastRetransmits; 01272 } else if (keyword == "NoOfRetTOs") { 01273 std::cout << fNumberOfTimeouts; 01274 } else if (keyword == "NoOfIdleTOs") { 01275 std::cout << fNumberOfIdleTimeouts; 01276 } else if (keyword == "NoOfRTTMs") { 01277 std::cout << fNumberOfRTTMeasurements; 01278 } else if (keyword == "NoOfRecACKs") { 01279 std::cout << fNumberOfReceivedACKs; 01280 } else { 01281 } 01282 } 01283 01284 01285 // -------------------- TCP_Receiver_Buffer ---------------------------------------- 01286 TCP_Receiver_Buffer::TCP_Receiver_Buffer() : 01287 fFirstByte() 01288 { 01289 } 01290 01291 01292 TCP_Receiver_Buffer::TCP_Receiver_Buffer(const TCP_Receiver_Buffer & rhs) : 01293 fFirstByte(rhs.fFirstByte), 01294 fBufList(rhs.fBufList) 01295 { 01296 } 01297 01298 01299 void TCP_Receiver_Buffer::reset() 01300 { 01301 fBufList.clear(); 01302 fFirstByte = 0; 01303 } 01304 01305 01306 TCP_Receiver_Buffer::~TCP_Receiver_Buffer() 01307 { 01308 } 01309 01310 01311 void TCP_Receiver_Buffer::write(TCP_Segment newBlock) 01312 { 01313 // error cases 01314 it_assert(newBlock.begin() <= newBlock.end(), "TCP_Receiver_Buffer::Write, no valid segment"); 01315 01316 // cut blocks beginning before fFirstByte 01317 if (newBlock.begin() < fFirstByte) { 01318 if (newBlock.end() > fFirstByte) { 01319 newBlock.set_begin(fFirstByte); 01320 } else { 01321 return; //// TODO: Is this strange? 01322 } 01323 } 01324 01325 if (newBlock.length() == 0) { // empty block, nothing to do 01326 return; 01327 } 01328 01329 if (fBufList.empty() || (newBlock.begin() > fBufList.back().end())) { 01330 // new block is behind last block in buffer 01331 fBufList.push_back(newBlock); 01332 } else { 01333 // skip list entries if beginning of newBlock > end of current one 01334 // (search for correct list position) 01335 std::list<TCP_Segment>::iterator iter; 01336 iter = fBufList.begin(); 01337 while (newBlock.begin() > iter->end()) { 01338 iter++; 01339 it_assert(iter != fBufList.end(), "TCP_Receiver_Buffer::Write, internal error"); 01340 } 01341 01342 TCP_Segment & exBlock = *iter; 01343 01344 if (exBlock.can_be_combined(newBlock)) { 01345 // overlapping or contiguous blocks -> combine 01346 exBlock.combine(newBlock); 01347 01348 // check following blocks 01349 iter++; 01350 while ((iter != fBufList.end()) && 01351 exBlock.can_be_combined(*iter)) { 01352 exBlock.combine(*iter); 01353 iter = fBufList.erase(iter); 01354 } 01355 } else { 01356 // no overlap, newBlock lies between two existing list entries 01357 // new list entry has to be created 01358 01359 fBufList.insert(iter, newBlock); 01360 } 01361 } 01362 01363 it_assert(!fBufList.empty() && fBufList.front().begin() >= fFirstByte, "TCP_Receiver_Buffer::Write, internal error"); 01364 01365 } 01366 01367 01368 // The amount of data read from the buffer is given as parameter. It has 01369 // to be less than or equal to the size of the first block stored. This 01370 // mean the caller of Read should first check how much data is available 01371 // by calling FirstBlockSize. 01372 void TCP_Receiver_Buffer::read(unsigned noOfBytes) 01373 { 01374 it_assert(first_block_size() > 0, "TCP_Receiver_Buffer::Read, No block to read"); 01375 it_assert(noOfBytes <= first_block_size(), "TCP_Receiver_Buffer::Read, submitted block size not valid"); 01376 01377 01378 if (noOfBytes < first_block_size()) { 01379 fBufList.front().set_begin(fBufList.front().begin() + noOfBytes); 01380 } else { // first block will be read completely 01381 fBufList.pop_front(); 01382 } 01383 fFirstByte += noOfBytes; 01384 01385 it_assert(fBufList.empty() || fBufList.front().begin() >= fFirstByte, "TCP_Receiver_Buffer::Read, internal error"); 01386 } 01387 01388 01389 // FirstBlockSize returns the size of the first block stored in the 01390 // buffer or 0 if the buffer is empty 01391 unsigned TCP_Receiver_Buffer::first_block_size() const 01392 { 01393 if (!fBufList.empty() && (fBufList.front().begin() == fFirstByte)) { 01394 return fBufList.front().length(); 01395 } else { 01396 return 0; 01397 } 01398 } 01399 01400 01401 std::ostream & TCP_Receiver_Buffer::info(std::ostream &os, int detail) const 01402 { 01403 os << "receiver buffer information" << std::endl 01404 << "number of blocks: " << fBufList.size() << std::endl 01405 << "first byte stored: " << fFirstByte << std::endl 01406 << "last byte stored +1: " << last_byte() << std::endl 01407 << "next byte expected: " << next_expected() << std::endl; 01408 01409 if (detail>0) { 01410 os << "segments in receiver buffer:" << std::endl; 01411 01412 typedef std::list<TCP_Segment>::const_iterator LI; 01413 for (LI i = fBufList.begin(); i != fBufList.end(); ++i) { 01414 const TCP_Segment & block = *i; 01415 os << ". segment: " << block << std::endl; 01416 } 01417 01418 } 01419 01420 return os; 01421 } 01422 01423 01424 // -------------------- TCP_Receiver ---------------------------------------- 01425 TCP_Receiver::TCP_Receiver(int label) : 01426 fReceiverBuffer(), 01427 fLabel(label), 01428 fTCPIPHeaderLength(TCP_HEADERLENGTH), 01429 fMSS(TCP_RMSS), 01430 fBufferSize(TCP_BUFFERSIZE), 01431 fDelayedACK(TCP_DELAYEDACK), 01432 fACKDelayTime(TCP_ACKDELAYTIME), 01433 fSendPeriodicACKs(TCP_SENDPERIODICACKS), 01434 fStrictPeriodicACKs(TCP_STRICTPERIODICACKS), 01435 fPeriodicACKInterval(TCP_PERIODICACKINTERVAL), 01436 fACKSchedulingDelay(TCP_ACKSCHEDULINGDELAY), 01437 fACKOnBufferWrite(TCP_ACKBUFFERWRITE), 01438 fACKOnBufferRead(TCP_ACKBUFFERREAD), 01439 fMaxUserBlockSize(TCP_MAXUSERBLOCKSIZE), 01440 fMinUserBlockSize(TCP_MINUSERBLOCKSIZE), 01441 fUserBlockProcDelay(TCP_USERBLOCKPROCDELAY), 01442 fTrace(false), 01443 fDebug(false), 01444 fSessionId(0), 01445 fDelayedACKTimer(*this, &TCP_Receiver::DelayedACKHandler), 01446 fPeriodicACKTimer(*this, &TCP_Receiver::PeriodicACKHandler), 01447 fACKSchedulingTimer(*this, &TCP_Receiver::SendACKMessage), 01448 fWaitingACKMsg(0), 01449 fUserBlockProcTimer(*this, &TCP_Receiver::HandleEndOfProcessing) 01450 { 01451 fUserMessage = NULL; 01452 01453 01454 if (!fACKOnBufferRead && !fACKOnBufferWrite) { 01455 // throw(UL_CException("TCP_Receiver::TCP_Receiver", 01456 // "ACKs must be sent on buffer read or write or both")); 01457 } 01458 01459 setup(); 01460 01461 tcp_receive.forward(this, &TCP_Receiver::ReceiveMessageFromNet); 01462 tcp_receive.set_name("TCP Receive"); 01463 tcp_send_ack.set_name("TCP send ACK"); 01464 tcp_new_data.set_name("TCP New Data"); 01465 tcp_release.forward(this, &TCP_Receiver::release); 01466 tcp_release.set_name("TCP Release"); 01467 01468 } 01469 01470 01471 TCP_Receiver::~TCP_Receiver () 01472 { 01473 delete fWaitingACKMsg; 01474 delete fUserMessage; 01475 } 01476 01477 01478 void TCP_Receiver::set_debug(const bool enable_debug) 01479 { 01480 fDebug = enable_debug; 01481 tcp_send_ack.set_debug(enable_debug); 01482 tcp_new_data.set_debug(); 01483 } 01484 01485 void TCP_Receiver::set_debug(bool enable_debug, bool enable_signal_debug) 01486 { 01487 fDebug = enable_debug; 01488 tcp_send_ack.set_debug(enable_signal_debug); 01489 tcp_new_data.set_debug(); 01490 } 01491 01492 void TCP_Receiver::set_trace(const bool enable_trace) 01493 { 01494 fTrace = enable_trace; 01495 } 01496 01497 01498 01499 void TCP_Receiver::setup() 01500 { 01501 fAdvRcvWnd = 0; 01502 fAdvRcvNxt = 0; 01503 01504 if (fSendPeriodicACKs) { 01505 fPeriodicACKTimer.Set(fPeriodicACKInterval); 01506 } 01507 01508 fReceiverBuffer.reset(); 01509 01510 received_seq_num_val.set_size(1000); 01511 received_seq_num_val.zeros(); 01512 received_seq_num_time.set_size(1000); 01513 received_seq_num_time.zeros(); 01514 received_seq_num_val(0) = 0; 01515 received_seq_num_time(0) = 0; 01516 received_seq_num_index=1; 01517 } 01518 01519 std::string TCP_Receiver::GenerateFilename() 01520 { 01521 time_t rawtime; 01522 struct tm *timeinfo; 01523 timeinfo = localtime(&rawtime); 01524 std::ostringstream filename_stream; 01525 filename_stream << "trace_tcp_receiver_u" << fLabel 01526 << "_" << 1900+timeinfo->tm_year 01527 << "_" << timeinfo->tm_mon 01528 << "_" << timeinfo->tm_mday 01529 << "__" << timeinfo->tm_hour 01530 << "_" << timeinfo->tm_min 01531 << "_" << timeinfo->tm_sec 01532 << "_.it"; 01533 return filename_stream.str(); 01534 } 01535 01536 void TCP_Receiver::release(std::string file) 01537 { 01538 std::string filename; 01539 fSessionId++; 01540 01541 if (fWaitingACKMsg != 0) { 01542 delete fWaitingACKMsg; 01543 fWaitingACKMsg = 0; 01544 } 01545 if (fUserMessage != 0) { 01546 delete fUserMessage; 01547 fUserMessage = 0; 01548 } 01549 01550 fUserBlockProcTimer.Reset(); 01551 fDelayedACKTimer.Reset(); 01552 fPeriodicACKTimer.Reset(); 01553 fACKSchedulingTimer.Reset(); 01554 01555 if (fTrace) { 01556 if (file == "") 01557 filename = GenerateFilename(); 01558 else 01559 filename = file; 01560 01561 save_trace(filename); 01562 } 01563 } 01564 01565 01566 void TCP_Receiver::ReceiveMessageFromNet(itpp::Packet *msg) 01567 { 01568 TCP_Packet & packet = (TCP_Packet &) *msg; 01569 if (packet.get_destination_port() == fLabel) { 01570 if (packet.get_session_id() == fSessionId) { 01571 ReceiveDataPacket(packet); 01572 } 01573 else { 01574 it_warning("Received a TCP packet with wrong SessionId"); 01575 std::cout << "TCP_Receiver::ReceiveMessageFromNet, " 01576 << "fLabel= " << fLabel 01577 << "fSessionId= " << fSessionId << std::endl; 01578 std::cout << "packet=" << packet 01579 << ", next exp. = " << fReceiverBuffer.next_expected() 01580 << std::endl; 01581 exit(0); 01582 } 01583 } 01584 else { 01585 it_warning("Received a TCP packet with label"); 01586 exit(0); 01587 } 01588 } 01589 01590 01591 void TCP_Receiver::ReceiveDataPacket(TCP_Packet &msg) 01592 { 01593 TCP_Segment segment = msg.get_segment(); 01594 01595 bool isOutOfOrder = (segment.begin() > fReceiverBuffer.next_expected()) || 01596 (segment.end() <= fReceiverBuffer.next_expected()); 01597 01598 if (fDebug) { 01599 std::cout << "TCP_Receiver::ReceiveDataPacket receiver: " << fLabel << ": " 01600 << "receive msg: " 01601 << "t = " << Event_Queue::now() 01602 << ", next exp. = " << fReceiverBuffer.next_expected() 01603 << ", " << msg << std::endl; 01604 } 01605 01606 if (fTrace) { 01607 TraceReceivedSeqNo(segment.end()); 01608 } 01609 01610 it_assert(segment.end() <= fReceiverBuffer.first_byte() + fBufferSize, "TCP_Receiver::ReceiveTCPPacket, packet exceeds window at "); 01611 it_assert(segment.begin() < segment.end(), "TCP_Receiver::ReceiveTCPPacket, silly packet received at "); 01612 01613 fReceiverBuffer.write(segment); 01614 01615 if (isOutOfOrder) { 01616 SendACK(true); // create dupack conditionless 01617 } else { 01618 if (fACKOnBufferWrite) { 01619 SendACK(false); 01620 } 01621 IndicateUserMessage(); 01622 } 01623 01624 delete &msg; 01625 } 01626 01627 01628 void TCP_Receiver::IndicateUserMessage() 01629 { 01630 if (fUserMessage == 0) { 01631 // receive a block 01632 unsigned noOfBytes = min(fReceiverBuffer.first_block_size(), 01633 fMaxUserBlockSize); 01634 01635 if (fDebug) { 01636 std::cout << "TCP_Receiver::IndicateUserMessage " 01637 << "t = " << Event_Queue::now() 01638 << " noOfBytes = " << noOfBytes 01639 << " firstBlock = " << fReceiverBuffer.first_block_size() 01640 << std::endl; 01641 } 01642 01643 if (noOfBytes >= fMinUserBlockSize) { 01644 fUserMessage = new Packet(); 01645 fUserMessage->set_bit_size(8*noOfBytes); 01646 fUserBlockProcTimer.Set(fUserBlockProcDelay); 01647 } 01648 } 01649 } 01650 01651 01652 bool TCP_Receiver::is_user_message_available() 01653 { 01654 if (fUserMessage != 0) { 01655 return true; 01656 } 01657 01658 unsigned noOfBytes = min(fReceiverBuffer.first_block_size(), 01659 fMaxUserBlockSize); 01660 01661 if (noOfBytes >= fMinUserBlockSize) { 01662 fUserMessage = new Packet(); 01663 fUserMessage->set_bit_size(8*noOfBytes); 01664 return true; 01665 } else { 01666 return false; 01667 } 01668 } 01669 01670 01671 itpp::Packet & TCP_Receiver::get_user_message() 01672 { 01673 it_assert(fUserMessage != 0, "TCP_Receiver::GetUserMessage, no message available"); 01674 if (fDebug) { 01675 std::cout << "TCP_Receiver::GetUserMessage " 01676 << "receiver: " << fLabel << ": " 01677 << "read from buffer: " 01678 << "t = " << Event_Queue::now() 01679 << ", user msg length = " << (fUserMessage->bit_size()/8) 01680 << ", first byte = " << fReceiverBuffer.first_byte() 01681 << ", first block size = " << fReceiverBuffer.first_block_size() 01682 << std::endl; 01683 } 01684 01685 fReceiverBuffer.read(fUserMessage->bit_size()/8); 01686 if (fACKOnBufferRead) { 01687 SendACK(false); // send acknowledgement 01688 } 01689 01690 itpp::Packet & msg = *fUserMessage; 01691 fUserMessage = 0; 01692 01693 if (fReceiverBuffer.first_block_size() > 0) { 01694 IndicateUserMessage(); 01695 } 01696 01697 return msg; 01698 } 01699 01700 01701 01702 void TCP_Receiver::HandleEndOfProcessing(Ttype) 01703 { 01704 it_assert(fUserMessage != 0, "TCP_Receiver::HandleEndOfProcessing, no message available"); 01705 01706 01707 tcp_new_data(fLabel); 01708 } 01709 01710 01711 void TCP_Receiver::DelayedACKHandler(Ttype) 01712 { 01713 if (fDebug) { 01714 std::cout << "TCP_Receiver::DelayedACKHandler " 01715 << "receiver " << fLabel 01716 << ": delACK TO: " 01717 << "t = " << Event_Queue::now() << std::endl; 01718 } 01719 01720 SendACK(true); 01721 } 01722 01723 01724 void TCP_Receiver::PeriodicACKHandler(Ttype) 01725 { 01726 if (fDebug) { 01727 std::cout << "TCP_Receiver::PeriodicACKHandler" 01728 << "receiver " << fLabel 01729 << ": periodicACK TO: " 01730 << "t = " << Event_Queue::now() << std::endl; 01731 } 01732 01733 SendACK(true); 01734 } 01735 01736 01737 void TCP_Receiver::SendACK(bool sendConditionless) 01738 { 01739 // sendConditionless is set 01740 // ... if packet was received out of order or 01741 // ... if delayed ACK timer has expired 01742 01743 // Bei eingeschaltetem "delayed ACK" wird ein ACK nur 01744 // gesendet, wenn das Fenster um 2MSS oder 35% der 01745 // maximalen Fenstergroesse verschoben worden ist 01746 // ... oder nach delayed ACK Timeout 01747 // ... oder wenn es das ACK fur ein Out of Order Segment ist 01748 // ... oder (in der Realitat), wenn ich auch was zu senden habe. 01749 01750 if (sendConditionless || !fDelayedACK || 01751 (fReceiverBuffer.next_expected() - fAdvRcvNxt >= (int)(2 * fMSS)) || 01752 (fReceiverBuffer.next_expected() - fAdvRcvNxt >= 01753 (int)(0.35 * fBufferSize))) { 01754 // Remark: RFC2581 recommends to acknowledge every second 01755 // packet conditionless (without setting this as a requirement) 01756 // in order to avoid excessive ack delays when the receiver MSS 01757 // is larger than the sender MSS. In this uni-directional 01758 // implementation, the receiver's MSS is not actively 01759 // used for sending but only for deciding when acknowledgments 01760 // have to be returned. Thus, the best solution to account for 01761 // RFC2581 is to set the receiver's MSS always equal to the 01762 // sender's MSS. 01763 01764 // Receiver Silly Window Syndrome Avoidance: 01765 01766 if (fAdvRcvNxt + fAdvRcvWnd + min(fBufferSize / 2, fMSS) 01767 <= fReceiverBuffer.first_byte() + fBufferSize) { 01768 // Die rechte Grenze des Empfangerfensters wird nur anders angezeigt 01769 // als beim letzten ACK, wenn sie sich seither um mindestens 01770 // min (BufferSize/ 2, MSS) geandert hat. 01771 fAdvRcvWnd = fBufferSize - fReceiverBuffer.first_block_size(); 01772 } else { 01773 fAdvRcvWnd = fAdvRcvNxt + fAdvRcvWnd - fReceiverBuffer.next_expected(); 01774 } 01775 01776 fAdvRcvNxt = fReceiverBuffer.next_expected(); 01777 01778 if (fSendPeriodicACKs && 01779 (!fStrictPeriodicACKs || !fPeriodicACKTimer.IsPending())) { 01780 fPeriodicACKTimer.Set(fPeriodicACKInterval); 01781 } 01782 01783 if (fDelayedACK && fDelayedACKTimer.IsPending()) { 01784 fDelayedACKTimer.Reset(); 01785 } 01786 01787 ScheduleACKMessage(); 01788 } else { 01789 if (!fDelayedACKTimer.IsPending()) { 01790 fDelayedACKTimer.Set(fACKDelayTime); 01791 if (fDebug) { 01792 std::cout << "TCP_Receiver::SendACK" 01793 << "receiver " << fLabel 01794 << ": set delACK timer: " 01795 << "t = " << Event_Queue::now() << std::endl; 01796 } 01797 } 01798 } 01799 } 01800 01801 01802 void TCP_Receiver::ScheduleACKMessage() 01803 { 01804 if (fWaitingACKMsg == 0) { 01805 fWaitingACKMsg = new TCP_Packet; 01806 } 01807 01808 fWaitingACKMsg->set_ACK(fAdvRcvNxt); 01809 fWaitingACKMsg->set_wnd(fAdvRcvWnd); 01810 fWaitingACKMsg->set_session_id(fSessionId); 01811 fWaitingACKMsg->set_destination_port(fLabel); 01812 fWaitingACKMsg->set_source_port(fLabel); 01813 fWaitingACKMsg->set_bit_size(8*fTCPIPHeaderLength); 01814 01815 if (fACKSchedulingDelay > 0) { 01816 if (!fACKSchedulingTimer.IsPending()) { 01817 fACKSchedulingTimer.Set(fACKSchedulingDelay); 01818 } 01819 } else { 01820 SendACKMessage(Event_Queue::now()); 01821 } 01822 } 01823 01824 01825 void TCP_Receiver::SendACKMessage(Ttype) 01826 { 01827 it_assert(fWaitingACKMsg != 0, "TCP_Receiver::SendACKMessage, no ACK message waiting"); 01828 01829 if (fDebug) { 01830 std::cout << "TCP_Receiver::SendACKMessage Ack sent" 01831 << "receiver " << fLabel 01832 << ": send ACK: " 01833 << "t = " << Event_Queue::now() 01834 << ", " << (*fWaitingACKMsg) 01835 << " byte_size=" << fWaitingACKMsg->bit_size()/8 01836 << " ptr=" << fWaitingACKMsg << std::endl; 01837 } 01838 01839 tcp_send_ack(fWaitingACKMsg); 01840 01841 fWaitingACKMsg = 0; 01842 } 01843 01844 01845 void TCP_Receiver::TraceReceivedSeqNo(const Sequence_Number &sn) 01846 { 01847 if (fDebug) { 01848 std::cout << "TCP_Receiver::TraceReceivedSeqNo " 01849 << "receiver " << fLabel 01850 << " t = " << Event_Queue::now() 01851 << " sn = " << sn << std::endl; 01852 } 01853 if (received_seq_num_index >= received_seq_num_time.size()) { 01854 received_seq_num_time.set_size(2*received_seq_num_time.size(),true); 01855 received_seq_num_val.set_size(2*received_seq_num_val.size(),true); 01856 } 01857 received_seq_num_val(received_seq_num_index) = sn.value(); 01858 received_seq_num_time(received_seq_num_index) = Event_Queue::now(); 01859 received_seq_num_index++; 01860 } 01861 01862 01863 void TCP_Receiver::save_trace(std::string filename) { 01864 01865 received_seq_num_val.set_size(received_seq_num_index, true); 01866 received_seq_num_time.set_size(received_seq_num_index,true); 01867 01868 if (fDebug) { 01869 std::cout << "received_seq_num_val" << received_seq_num_val << std::endl; 01870 std::cout << "received_seq_num_time" << received_seq_num_time << std::endl; 01871 std::cout << "received_seq_num_index" << received_seq_num_index << std::endl; 01872 std::cout << "TCP_Receiver::saving to file: " << filename << std::endl; 01873 } 01874 01875 it_file ff2; 01876 ff2.open(filename); 01877 01878 ff2 << Name("received_seq_num_val") << received_seq_num_val; 01879 ff2 << Name("received_seq_num_time") << received_seq_num_time; 01880 ff2 << Name("received_seq_num_index") << received_seq_num_index; 01881 01882 ff2.flush(); 01883 ff2.close(); 01884 } 01885 01886 01887 } //namespace itpp 01888 01889 #ifdef _MSC_VER 01890 #pragma warning(default:4355) 01891 #endif
Generated on Thu Apr 19 14:14:59 2007 for IT++ by Doxygen 1.5.1