8 #define CRYPTOPP_TRACE_NETWORK 0 12 #ifdef HIGHRES_TIMER_AVAILABLE 14 lword LimitedBandwidth::ComputeCurrentTransceiveLimit()
16 if (!m_maxBytesPerSecond)
19 const double curTime = GetCurTimeAndCleanUp();
20 CRYPTOPP_UNUSED(curTime);
23 for (OpQueue::size_type i=0; i!=m_ops.size(); ++i)
24 total += m_ops[i].second;
28 double LimitedBandwidth::TimeToNextTransceive()
30 if (!m_maxBytesPerSecond)
33 if (!m_nextTransceiveTime)
34 ComputeNextTransceiveTime();
39 void LimitedBandwidth::NoteTransceive(lword size)
41 if (m_maxBytesPerSecond)
43 double curTime = GetCurTimeAndCleanUp();
44 m_ops.push_back(std::make_pair(curTime, size));
45 m_nextTransceiveTime = 0;
49 void LimitedBandwidth::ComputeNextTransceiveTime()
51 double curTime = GetCurTimeAndCleanUp();
53 for (
unsigned int i=0; i!=m_ops.size(); ++i)
54 total += m_ops[i].second;
55 m_nextTransceiveTime =
56 (total < m_maxBytesPerSecond) ? curTime : m_ops.front().first + 1000;
59 double LimitedBandwidth::GetCurTimeAndCleanUp()
61 if (!m_maxBytesPerSecond)
64 double curTime = m_timer.ElapsedTimeAsDouble();
65 while (m_ops.size() && (m_ops.front().first + 1000 < curTime))
72 double nextTransceiveTime = TimeToNextTransceive();
73 if (nextTransceiveTime)
74 container.ScheduleEvent(nextTransceiveTime,
CallStack(
"LimitedBandwidth::GetWaitObjects()", &callStack));
80 lword& byteCount,
bool blockingOutput,
81 unsigned long maxTime,
bool checkDelimiter, byte delimiter)
83 m_blockedBySpeedLimit =
false;
85 if (!GetMaxBytesPerSecond())
87 size_t ret = DoPump(byteCount, blockingOutput, maxTime, checkDelimiter, delimiter);
88 m_doPumpBlocked = (ret != 0);
93 unsigned long timeToGo = maxTime;
94 Timer timer(Timer::MILLISECONDS, forever);
95 lword maxSize = byteCount;
102 lword curMaxSize =
UnsignedMin(ComputeCurrentTransceiveLimit(), maxSize - byteCount);
104 if (curMaxSize || m_doPumpBlocked)
107 size_t ret = DoPump(curMaxSize, blockingOutput, timeToGo, checkDelimiter, delimiter);
108 m_doPumpBlocked = (ret != 0);
111 NoteTransceive(curMaxSize);
112 byteCount += curMaxSize;
118 if (maxSize != ULONG_MAX && byteCount >= maxSize)
128 double waitTime = TimeToNextTransceive();
129 if (!forever && waitTime > timeToGo)
131 m_blockedBySpeedLimit =
true;
136 LimitedBandwidth::GetWaitObjects(container,
CallStack(
"NonblockingSource::GeneralPump2() - speed limit", 0));
137 container.Wait((
unsigned long)waitTime);
143 size_t NonblockingSource::PumpMessages2(
unsigned int &messageCount,
bool blocking)
145 if (messageCount == 0)
152 byteCount = LWORD_MAX;
153 RETURN_IF_NONZERO(Pump2(byteCount, blocking));
154 }
while(byteCount == LWORD_MAX);
156 if (!m_messageEndSent && SourceExhausted())
158 RETURN_IF_NONZERO(AttachedTransformation()->Put2(NULL, 0, GetAutoSignalPropagation(),
true));
159 m_messageEndSent =
true;
167 m_blockedBySpeedLimit =
false;
169 size_t curBufSize = GetCurrentBufferSize();
170 if (curBufSize <= targetSize && (targetSize || !EofPending()))
173 if (!GetMaxBytesPerSecond())
174 return DoFlush(maxTime, targetSize);
177 unsigned long timeToGo = maxTime;
178 Timer timer(Timer::MILLISECONDS, forever);
179 lword totalFlushed = 0;
185 size_t flushSize =
UnsignedMin(curBufSize - targetSize, ComputeCurrentTransceiveLimit());
186 if (flushSize || EofPending())
189 size_t ret = (size_t)DoFlush(timeToGo, curBufSize - flushSize);
198 if (curBufSize <= targetSize && (targetSize || !EofPending()))
208 double waitTime = TimeToNextTransceive();
209 if (!forever && waitTime > timeToGo)
211 m_blockedBySpeedLimit =
true;
216 LimitedBandwidth::GetWaitObjects(container,
CallStack(
"NonblockingSink::TimedFlush() - speed limit", 0));
217 container.Wait((
unsigned long)waitTime);
226 return hardFlush && (!!GetCurrentBufferSize() || EofPending());
233 , m_putSize(0), m_dataBegin(0), m_dataEnd(0)
234 , m_waitingForResult(
false), m_outputBlocked(
false)
241 + GetReceiver().GetMaxWaitObjectCount()
242 + AttachedTransformation()->GetMaxWaitObjectCount();
247 if (BlockedBySpeedLimit())
248 LimitedBandwidth::GetWaitObjects(container,
CallStack(
"NetworkSource::GetWaitObjects() - speed limit", &callStack));
249 else if (!m_outputBlocked)
251 if (m_dataBegin == m_dataEnd)
252 AccessReceiver().GetWaitObjects(container,
CallStack(
"NetworkSource::GetWaitObjects() - no data", &callStack));
254 container.SetNoWait(
CallStack(
"NetworkSource::GetWaitObjects() - have data", &callStack));
257 AttachedTransformation()->GetWaitObjects(container,
CallStack(
"NetworkSource::GetWaitObjects() - attachment", &callStack));
260 size_t NetworkSource::DoPump(lword &byteCount,
bool blockingOutput,
unsigned long maxTime,
bool checkDelimiter, byte delimiter)
264 lword maxSize = byteCount;
267 Timer timer(Timer::MILLISECONDS, forever);
275 if (m_dataBegin == m_dataEnd)
277 if (receiver.EofReceived())
280 if (m_waitingForResult)
282 if (receiver.MustWaitForResult() &&
284 CallStack(
"NetworkSource::DoPump() - wait receive result", 0)))
287 unsigned int recvResult = receiver.GetReceiveResult();
288 #if CRYPTOPP_TRACE_NETWORK 289 OutputDebugString((
IntToString((
unsigned int)
this) +
": Received " +
IntToString(recvResult) +
" bytes\n").c_str());
291 m_dataEnd += recvResult;
292 m_waitingForResult =
false;
294 if (!receiver.MustWaitToReceive() && !receiver.EofReceived() && m_dataEnd != m_buf.size())
299 m_dataEnd = m_dataBegin = 0;
301 if (receiver.MustWaitToReceive())
304 CallStack(
"NetworkSource::DoPump() - wait receive", 0)))
307 receiver.
Receive(m_buf+m_dataEnd, m_buf.size()-m_dataEnd);
308 m_waitingForResult =
true;
313 m_waitingForResult =
true;
316 #if CRYPTOPP_TRACE_NETWORK 317 OutputDebugString((
IntToString((
unsigned int)
this) +
": Receiving " +
IntToString(m_buf.size()-m_dataEnd) +
" bytes\n").c_str());
319 while (receiver.
Receive(m_buf+m_dataEnd, m_buf.size()-m_dataEnd))
321 unsigned int recvResult = receiver.GetReceiveResult();
322 #if CRYPTOPP_TRACE_NETWORK 323 OutputDebugString((
IntToString((
unsigned int)
this) +
": Received " +
IntToString(recvResult) +
" bytes\n").c_str());
325 m_dataEnd += recvResult;
326 if (receiver.EofReceived() || m_dataEnd > m_buf.size() /2)
328 m_waitingForResult =
false;
337 m_putSize =
UnsignedMin(m_dataEnd - m_dataBegin, maxSize - byteCount);
340 m_putSize = std::find(m_buf+m_dataBegin, m_buf+m_dataBegin+m_putSize, delimiter) - (m_buf+m_dataBegin);
343 size_t result = t->
PutModifiable2(m_buf+m_dataBegin, m_putSize, 0, forever || blockingOutput);
347 CallStack(
"NetworkSource::DoPump() - wait attachment", 0)))
351 m_outputBlocked =
true;
355 m_outputBlocked =
false;
357 byteCount += m_putSize;
358 m_dataBegin += m_putSize;
359 if (checkDelimiter && m_dataBegin < m_dataEnd && m_buf[m_dataBegin] == delimiter)
361 if (maxSize != ULONG_MAX && byteCount == maxSize)
366 if (maxTime > 0 && timer.ElapsedTime() > maxTime)
376 NetworkSink::NetworkSink(
unsigned int maxBufferSize,
unsigned int autoFlushBound)
377 : m_maxBufferSize(maxBufferSize), m_autoFlushBound(autoFlushBound)
378 , m_needSendResult(
false), m_wasBlocked(
false), m_eofState(EOF_NONE)
379 , m_buffer(
STDMIN(16U*1024U+256, maxBufferSize)), m_skipBytes(0)
380 , m_speedTimer(Timer::MILLISECONDS), m_byteCountSinceLastTimerReset(0)
381 , m_currentSpeed(0), m_maxObservedSpeed(0)
387 if (m_speedTimer.ElapsedTime() > 1000)
389 m_currentSpeed = m_byteCountSinceLastTimerReset * 1000 / m_speedTimer.ElapsedTime();
390 m_maxObservedSpeed =
STDMAX(m_currentSpeed, m_maxObservedSpeed * 0.98f);
391 m_byteCountSinceLastTimerReset = 0;
392 m_speedTimer.StartTimer();
395 return m_currentSpeed;
400 lword m = GetMaxBytesPerSecond();
401 return m ?
STDMIN(m_maxObservedSpeed,
float(CRYPTOPP_VC6_INT64 m)) : m_maxObservedSpeed;
411 if (BlockedBySpeedLimit())
412 LimitedBandwidth::GetWaitObjects(container,
CallStack(
"NetworkSink::GetWaitObjects() - speed limit", &callStack));
413 else if (m_wasBlocked)
414 AccessSender().GetWaitObjects(container,
CallStack(
"NetworkSink::GetWaitObjects() - was blocked", &callStack));
415 else if (!m_buffer.IsEmpty())
416 AccessSender().GetWaitObjects(container,
CallStack(
"NetworkSink::GetWaitObjects() - buffer not empty", &callStack));
417 else if (EofPending())
418 AccessSender().GetWaitObjects(container,
CallStack(
"NetworkSink::GetWaitObjects() - EOF pending", &callStack));
423 if (m_eofState == EOF_DONE)
425 if (length || messageEnd)
431 if (m_eofState > EOF_NONE)
437 assert(length >= m_skipBytes);
438 inString += m_skipBytes;
439 length -= m_skipBytes;
442 m_buffer.Put(inString, length);
444 if (!blocking || m_buffer.CurrentSize() > m_autoFlushBound)
447 size_t targetSize = messageEnd ? 0 : m_maxBufferSize;
451 if (m_buffer.CurrentSize() > targetSize)
455 m_skipBytes += length;
456 size_t blockedBytes =
UnsignedMin(length, m_buffer.CurrentSize() - targetSize);
457 return STDMAX<size_t>(blockedBytes, 1);
460 m_wasBlocked =
false;
466 m_eofState = EOF_PENDING_SEND;
470 if (m_eofState != EOF_DONE)
477 lword NetworkSink::DoFlush(
unsigned long maxTime,
size_t targetSize)
482 Timer timer(Timer::MILLISECONDS, forever);
483 unsigned int totalFlushSize = 0;
487 if (m_buffer.CurrentSize() <= targetSize)
490 if (m_needSendResult)
492 if (sender.MustWaitForResult() &&
494 CallStack(
"NetworkSink::DoFlush() - wait send result", 0)))
497 unsigned int sendResult = sender.GetSendResult();
498 #if CRYPTOPP_TRACE_NETWORK 499 OutputDebugString((
IntToString((
unsigned int)
this) +
": Sent " +
IntToString(sendResult) +
" bytes\n").c_str());
501 m_buffer.Skip(sendResult);
502 totalFlushSize += sendResult;
503 m_needSendResult =
false;
505 if (!m_buffer.AnyRetrievable())
509 unsigned long timeOut = maxTime ?
SaturatingSubtract(maxTime, timer.ElapsedTime()) : 0;
510 if (sender.MustWaitToSend() && !sender.
Wait(timeOut,
CallStack(
"NetworkSink::DoFlush() - wait send", 0)))
513 size_t contiguousSize = 0;
514 const byte *block = m_buffer.Spy(contiguousSize);
516 #if CRYPTOPP_TRACE_NETWORK 517 OutputDebugString((
IntToString((
unsigned int)
this) +
": Sending " +
IntToString(contiguousSize) +
" bytes\n").c_str());
519 sender.Send(block, contiguousSize);
520 m_needSendResult =
true;
522 if (maxTime > 0 && timeOut == 0)
526 m_byteCountSinceLastTimerReset += totalFlushSize;
527 ComputeCurrentSpeed();
529 if (m_buffer.IsEmpty() && !m_needSendResult)
531 if (m_eofState == EOF_PENDING_SEND)
534 m_eofState = sender.MustWaitForEof() ? EOF_PENDING_DELIVERY : EOF_DONE;
537 while (m_eofState == EOF_PENDING_DELIVERY)
539 unsigned long timeOut = maxTime ?
SaturatingSubtract(maxTime, timer.ElapsedTime()) : 0;
540 if (!sender.
Wait(timeOut,
CallStack(
"NetworkSink::DoFlush() - wait EOF", 0)))
543 if (sender.EofSent())
544 m_eofState = EOF_DONE;
548 return totalFlushSize;
551 #endif // #ifdef HIGHRES_TIMER_AVAILABLE Base class for all exceptions thrown by Crypto++.
container of wait objects
float GetMaxObservedSpeed() const
get the maximum observed speed of this sink in bytes per second
lword TimedFlush(unsigned long maxTime, size_t targetSize=0)
flush to device for no more than maxTime milliseconds
Some other error occurred not belong to any of the above categories.
void GetWaitObjects(WaitObjectContainer &container, CallStack const &callStack)
Retrieves waitable objects.
size_t Put2(const byte *inString, size_t length, int messageEnd, bool blocking)
Input multiple bytes for processing.
unsigned int GetMaxWaitObjectCount() const
Retrieves the maximum number of waitable objects.
size_t GeneralPump2(lword &byteCount, bool blockingOutput=true, unsigned long maxTime=INFINITE_TIME, bool checkDelimiter=false, byte delimiter='\n')
pump up to maxSize bytes using at most maxTime milliseconds
float ComputeCurrentSpeed()
compute the current speed of this sink in bytes per second
T1 SaturatingSubtract(const T1 &a, const T2 &b)
Performs a saturating subtract clamped at 0.
const T1 UnsignedMin(const T1 &a, const T2 &b)
Safe comparison of values that could be neagtive and incorrectly promoted.
const T & STDMIN(const T &a, const T &b)
Replacement function for std::min.
const unsigned long INFINITE_TIME
Represents infinite time.
unsigned int GetMaxWaitObjectCount() const
bool Wait(unsigned long milliseconds, CallStack const &callStack)
wait on this object
std::string IntToString(T value, unsigned int base=10)
Converts a value to a string.
void GetWaitObjects(WaitObjectContainer &container, CallStack const &callStack)
Retrieves waitable objects.
const T & STDMAX(const T &a, const T &b)
Replacement function for std::max.
unsigned int GetMaxWaitObjectCount() const
Retrieves the maximum number of waitable objects.
virtual bool Receive(byte *buf, size_t bufLen)=0
receive data from network source, returns whether result is immediately available ...
Crypto++ library namespace.
bool IsolatedFlush(bool hardFlush, bool blocking)
Flushes data buffered by this object, without signal propagation.
a Source class that can pump from a device for a specified amount of time.