Package SimPy :: Module Simulation
[hide private]
[frames] | no frames]

Source Code for Module SimPy.Simulation

   1  #!/usr/bin/env python 
   2  from SimPy.Lister import * 
   3  import heapq as hq 
   4  import types 
   5  import sys 
   6  import new 
   7  import random 
   8  import inspect 
   9   
  10  # $Revision: 1.1.1.78 $ $Date: 2008/03/03 13:49:59 $ kgm 
  11  """Simulation 1.9.1 Implements SimPy Processes, Resources, Buffers, and the backbone simulation  
  12  scheduling by coroutine calls. Provides data collection through classes  
  13  Monitor and Tally. 
  14  Based on generators (Python 2.3 and later) 
  15   
  16  LICENSE: 
  17  Copyright (C) 2002,2005,2006,2007  Klaus G. Muller, Tony Vignaux 
  18  mailto: kgmuller@xs4all.nl and Tony.Vignaux@vuw.ac.nz 
  19   
  20      This library is free software; you can redistribute it and/or 
  21      modify it under the terms of the GNU Lesser General Public 
  22      License as published by the Free Software Foundation; either 
  23      version 2.1 of the License, or (at your option) any later version. 
  24   
  25      This library is distributed in the hope that it will be useful, 
  26      but WITHOUT ANY WARRANTY; without even the implied warranty of 
  27      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
  28      Lesser General Public License for more details. 
  29   
  30      You should have received a copy of the GNU Lesser General Public 
  31      License along with this library; if not, write to the Free Software 
  32      Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA 
  33  END OF LICENSE 
  34   
  35  **Change history:** 
  36   
  37      Started out as SiPy 0.9 
  38       
  39      5/9/2002: SiPy 0.9.1 
  40       
  41          - Addition of '_cancel' method in class Process and supporting '_unpost' method in  
  42            class __Evlist. 
  43           
  44          - Removal of redundant 'Action' method in class Process. 
  45           
  46      12/9/2002: 
  47       
  48          - Addition of resource class 
  49           
  50          - Addition of "_request" and "_release" coroutine calls 
  51           
  52      15/9/2002: moved into SimPy package 
  53       
  54      16/9/2002: 
  55          - Resource attributes fully implemented (resources can now have more 
  56            than 1 shareable resource units) 
  57           
  58      17/9/2002: 
  59       
  60          - corrected removal from waitQ (Vignaux) 
  61           
  62      17/9/2002: 
  63       
  64          - added test for queue discipline in "test_demo()". Must be FIFO 
  65           
  66      26/9/02: Version 0.2.0 
  67       
  68          - cleaned up code; more consistent naming 
  69           
  70          - prefixed all Simulation-private variable names with "_". 
  71           
  72          - prefixed all class-private variable names with "__". 
  73           
  74          - made normal exit quiet (but return message from scheduler() 
  75           
  76      28/9/02: 
  77       
  78          - included stopSimulation() 
  79           
  80      15/10/02: Simulation version 0.3 
  81       
  82          - Version printout now only if __TESTING 
  83           
  84          - "_stop" initialized to True by module load, and set to False in  
  85        initialize() 
  86           
  87          - Introduced 'simulate(until=0)' instead of 'scheduler(till=0)'.  
  88        Left 'scheduler()' in for backward compatibility, but marked 
  89        as deprecated. 
  90           
  91          - Added attribute "name" to class Process; default=="a_process" 
  92           
  93          - Changed Resource constructor to  
  94        'def __init__(self,capacity=1,name="a_resource",unitName="units"'. 
  95           
  96      13/11/02: Simulation version 0.6 
  97       
  98          - Major changes to class Resource: 
  99           
 100              - Added two queue types for resources, FIFO (default) and PriorityQ 
 101               
 102              - Changed constructor to allow selection of queue type. 
 103               
 104              - Introduced preemption of resources (to be used with PriorityQ 
 105                queue type) 
 106               
 107              - Changed constructor of class Resource to allow selection of preemption 
 108               
 109              - Changes to class Process to support preemption of service 
 110               
 111              - Cleaned up 'simulate' by replacing series of if-statements by dispatch table. 
 112   
 113      19/11/02: Simulation version 0.6.1 
 114          - Changed priority schemes so that higher values of Process  
 115            attribute "priority" represent higher priority. 
 116   
 117      20/11/02: Simulation version 0.7 
 118          - Major change of priority approach: 
 119   
 120              - Priority set by "yield request,self,res,priority" 
 121   
 122              - Priority of a Process instance associated with a specific  
 123                resource 
 124   
 125      25/11/02: Simulation version 0.7.1 
 126   
 127          - Code cleanup and optimization 
 128   
 129          - Made process attributes remainService and preempted private  
 130           (_remainService and _preempted) 
 131   
 132      11/12/2002: First process interrupt implementation 
 133   
 134          - Addition of user methods 'interrupt' and 'resume' 
 135   
 136          - Significant code cleanup to maintain process state 
 137   
 138      20/12/2002: Changes to "interrupt"; addition of boolean methods to show 
 139                       process states 
 140   
 141      16/3/2003: Changed hold (allowing posting events past _endtime) 
 142       
 143      18/3/2003: Changed _nextev to prevent _t going past _endtime 
 144   
 145      23/3/2003: Introduced new interrupt construct; deleted 'resume' method 
 146       
 147      25/3/2003: Expanded interrupt construct: 
 148       
 149          - Made 'interrupt' a method  of Process 
 150   
 151          - Added 'interruptCause' as an attribute of an interrupted process 
 152   
 153          - Changed definition of 'active' to  
 154           'self._nextTime <> None and not self._inInterrupt' 
 155   
 156          - Cleaned up test_interrupt function 
 157   
 158      30/3/2003: Modification of 'simulate': 
 159   
 160          - error message if 'initialize' not called (fatal) 
 161   
 162          - error message if no process scheduled (warning) 
 163   
 164          - Ensured that upon exit from 'simulate', now() == _endtime is  
 165            always valid 
 166   
 167      2/04/2003: 
 168   
 169          - Modification of 'simulate': leave _endtime alone (undid change 
 170            of 30 Mar 03) 
 171   
 172          - faster '_unpost' 
 173   
 174      3/04/2003: Made 'priority' private ('_priority') 
 175   
 176      4/04/2003: Catch activation of non-generator error 
 177   
 178      5/04/2003: Added 'interruptReset()' function to Process. 
 179   
 180      7/04/2003: Changed '_unpost' to ensure that process has 
 181                     _nextTime == None (is passive) afterwards. 
 182   
 183      8/04/2003: Changed _hold to allow for 'yield hold,self'  
 184                     (equiv to 'yield hold,self,0') 
 185   
 186      10/04/2003: Changed 'cancel' syntax to 'Process().cancel(victim)' 
 187   
 188      12/5/2003: Changed eventlist handling from dictionary to bisect 
 189       
 190      9/6/2003: - Changed eventlist handling from pure dictionary to bisect- 
 191                  sorted "timestamps" list of keys, resulting in greatly  
 192                  improved performance for models with large 
 193                  numbers of event notices with differing event times. 
 194                  ========================================================= 
 195                  This great change was suggested by Prof. Simon Frost.  
 196                  Thank you, Simon! This version 1.3 is dedicated to you! 
 197                  ========================================================= 
 198                - Added import of Lister which supports well-structured  
 199                  printing of all attributes of Process and Resource instances. 
 200   
 201      Oct 2003: Added monitored Resource instances (Monitors for activeQ and waitQ) 
 202   
 203      13 Dec 2003: Merged in Monitor and Histogram 
 204   
 205      27 Feb 2004: Repaired bug in activeQ monitor of class Resource. Now actMon 
 206                   correctly records departures from activeQ. 
 207                    
 208      19 May 2004: Added erroneously omitted Histogram class. 
 209   
 210      5 Sep 2004: Added SimEvents synchronization constructs 
 211       
 212      17 Sep 2004: Added waituntil synchronization construct 
 213       
 214      01 Dec 2004: SimPy version 1.5 
 215                   Changes in this module: Repaired SimEvents bug re proc.eventsFired 
 216                    
 217      12 Jan 2005: SimPy version 1.5.1 
 218                   Changes in this module: Monitor objects now have a default name 
 219                                           'a_Monitor' 
 220                                            
 221      29 Mar 2005: Start SimPy 1.6: compound "yield request" statements 
 222       
 223      05 Jun 2005: Fixed bug in _request method -- waitMon did not work properly in 
 224                   preemption case 
 225                    
 226      09 Jun 2005: Added test in 'activate' to see whether 'initialize()' was called first. 
 227       
 228      23 Aug 2005: - Added Tally data collection class 
 229                   - Adjusted Resource to work with Tally 
 230                   - Redid function allEventNotices() (returns prettyprinted string with event 
 231                     times and names of process instances 
 232                   - Added function allEventTimes (returns event times of all scheduled events) 
 233                    
 234      16 Mar 2006: - Added Store and Level classes 
 235                   - Added 'yield get' and 'yield put' 
 236                    
 237      10 May 2006: - Repaired bug in Store._get method 
 238                   - Repaired Level to allow initialBuffered have float value 
 239                   - Added type test for Level get parameter 'nrToGet' 
 240                    
 241      06 Jun 2006: - To improve pretty-printed output of 'Level' objects, changed attribute 
 242                     _nrBuffered to nrBuffered (synonym for amount property) 
 243                   - To improve pretty-printed output of 'Store' objects, added attribute 
 244                     buffered (which refers to _theBuffer) 
 245                      
 246      25 Aug 2006: - Start of version 1.8 
 247                   - made 'version' public 
 248                   - corrected condQ initialization bug 
 249                    
 250      30 Sep 2006: - Introduced checks to ensure capacity of a Buffer > 0 
 251                   - Removed from __future__ import (so Python 2.3 or later needed) 
 252                   
 253      15 Oct 2006: - Added code to register all Monitors and all Tallies in variables 
 254                     'allMonitors' and 'allTallies' 
 255                   - Added function 'startCollection' to activate Monitors and Tallies at a 
 256                     specified time (e.g. after warmup period) 
 257                   - Moved all test/demo programs to after 'if __name__=="__main__":'. 
 258                   
 259      17 Oct 2006: - Added compound 'put' and 'get' statements for Level and Store. 
 260       
 261      18 Oct 2006: - Repaired bug: self.eventsFired now gets set after an event fires 
 262                     in a compound yield get/put with a waitevent clause (reneging case). 
 263                      
 264      21 Oct 2006: - Introduced Store 'yield get' with a filter function. 
 265                   
 266      22 Oct 2006: - Repaired bug in prettyprinting of Store objects (the buffer  
 267                     content==._theBuffer was not shown) by changing ._theBuffer  
 268                     to .theBuffer. 
 269                   
 270      04 Dec 2006: - Added printHistogram method to Tally and Monitor (generates 
 271                     table-form histogram) 
 272                       
 273      07 Dec 2006: - Changed the __str__ method of Histogram to print a table  
 274                     (like printHistogram). 
 275       
 276      18 Dec 2006: - Added trace printing of Buffers' "unitName" for yield get and put. 
 277       
 278      09 Jun 2007: - Cleaned out all uses of "object" to prevent name clash. 
 279       
 280      18 Nov 2007: - Start of 1.9 development 
 281                   - Added 'start' method (alternative to activate) to Process 
 282                    
 283      22 Nov 2007: - Major change to event list handling to speed up larger models: 
 284                      * Drop dictionary 
 285                      * Replace bisect by heapq 
 286                      * Mark cancelled event notices in unpost and skip them in 
 287                        nextev (great idea of Tony Vignaux)) 
 288                         
 289      4 Dec 2007: - Added twVariance calculation for both Monitor and Tally (gav) 
 290       
 291      5 Dec 2007: - Changed name back to timeVariance (gav) 
 292       
 293      1 Mar 2008: - Start of 1.9.1 bugfix release 
 294                  - Delete circular reference in Process instances when event  
 295                    notice has been processed (caused much circular garbage) 
 296                  - Added capability for multiple preempts of a process 
 297       
 298  """ 
 299   
 300  __TESTING=False 
 301  version=__version__="1.9.1 $Revision: 1.1.1.78 $ $Date: 2008/03/03 13:49:59 $" 
 302  if __TESTING:  
 303      print "SimPy.Simulation %s" %__version__, 
 304      if __debug__: 
 305          print "__debug__ on" 
 306      else: 
 307          print 
 308   
 309  # yield keywords 
 310  hold=1 
 311  passivate=2 
 312  request=3 
 313  release=4 
 314  waitevent=5 
 315  queueevent=6 
 316  waituntil=7 
 317  get=8 
 318  put=9 
 319   
 320  _endtime=0 
 321  _t=0 
 322  _e=None 
 323  _stop=True 
 324  _wustep=False #controls per event stepping for waituntil construct; not user API 
 325  try: 
 326    True, False 
 327  except NameError: 
 328    True, False = (1 == 1), (0 == 1) 
 329  condQ=[] 
 330  allMonitors=[] 
 331  allTallies=[] 
 332   
333 -def initialize():
334 global _e,_t,_stop,condQ,allMonitors,allTallies 335 _e=__Evlist() 336 _t=0 337 _stop=False 338 condQ=[] 339 allMonitors=[] 340 allTallies=[]
341
342 -def now():
343 return _t
344
345 -def stopSimulation():
346 """Application function to stop simulation run""" 347 global _stop 348 _stop=True
349
350 -def _startWUStepping():
351 """Application function to start stepping through simulation for waituntil construct.""" 352 global _wustep 353 _wustep=True
354
355 -def _stopWUStepping():
356 """Application function to stop stepping through simulation.""" 357 global _wustep 358 _wustep=False
359
360 -class Simerror(Exception):
361 - def __init__(self,value):
362 self.value=value
363
364 - def __str__(self):
365 return `self.value`
366
367 -class FatalSimerror(Simerror):
368 - def __init__(self,value):
369 Simerror.__init__(self,value) 370 self.value=value
371
372 -class Process(Lister):
373 """Superclass of classes which may use generator functions"""
374 - def __init__(self,name="a_process"):
375 #the reference to this Process instances single process (==generator) 376 self._nextpoint=None 377 self.name=name 378 self._nextTime=None #next activation time 379 self._remainService=0 380 self._preempted=0 381 self._priority={} 382 self._getpriority={} 383 self._putpriority={} 384 self._terminated= False 385 self._inInterrupt= False 386 self.eventsFired=[] #which events process waited/queued for occurred
387
388 - def active(self):
389 return self._nextTime <> None and not self._inInterrupt
390
391 - def passive(self):
392 return self._nextTime is None and not self._terminated
393
394 - def terminated(self):
395 return self._terminated
396
397 - def interrupted(self):
398 return self._inInterrupt and not self._terminated
399
400 - def queuing(self,resource):
401 return self in resource.waitQ
402
403 - def cancel(self,victim):
404 """Application function to cancel all event notices for this Process 405 instance;(should be all event notices for the _generator_).""" 406 _e._unpost(whom=victim)
407
408 - def start(self,pem=None,at="undefined",delay="undefined",prior=False):
409 """Activates PEM of this Process. 410 p.start(p.pemname([args])[,{at= t |delay=period}][,prior=False]) or 411 p.start([p.ACTIONS()][,{at= t |delay=period}][,prior=False]) (ACTIONS 412 parameter optional) 413 """ 414 if pem is None: 415 try: 416 pem=self.ACTIONS() 417 except AttributeError: 418 raise FatalSimerror\ 419 ("Fatal SimPy error: no generator function to activate") 420 else: 421 pass 422 if _e is None: 423 raise FatalSimerror\ 424 ("Fatal SimPy error: simulation is not initialized"\ 425 "(call initialize() first)") 426 if not (type(pem) == types.GeneratorType): 427 raise FatalSimerror("Fatal SimPy error: activating function which"+ 428 " is not a generator (contains no 'yield')") 429 if not self._terminated and not self._nextTime: 430 #store generator reference in object; needed for reactivation 431 self._nextpoint=pem 432 if at=="undefined": 433 at=_t 434 if delay=="undefined": 435 zeit=max(_t,at) 436 else: 437 zeit=max(_t,_t+delay) 438 _e._post(what=self,at=zeit,prior=prior)
439
440 - def _hold(self,a):
441 if len(a[0]) == 3: 442 delay=abs(a[0][2]) 443 else: 444 delay=0 445 who=a[1] 446 self.interruptLeft=delay 447 self._inInterrupt=False 448 self.interruptCause=None 449 _e._post(what=who,at=_t+delay)
450
451 - def _passivate(self,a):
452 a[0][1]._nextTime=None
453
454 - def interrupt(self,victim):
455 """Application function to interrupt active processes""" 456 # can't interrupt terminated/passive/interrupted process 457 if victim.active(): 458 victim.interruptCause=self # self causes interrupt 459 left=victim._nextTime-_t 460 victim.interruptLeft=left # time left in current 'hold' 461 victim._inInterrupt=True 462 reactivate(victim) 463 return left 464 else: #victim not active -- can't interrupt 465 return None
466
467 - def interruptReset(self):
468 """ 469 Application function for an interrupt victim to get out of 470 'interrupted' state. 471 """ 472 self._inInterrupt= False
473
474 - def acquired(self,res):
475 """Multi-functional test for reneging for 'request' and 'get': 476 (1)If res of type Resource: 477 Tests whether resource res was acquired when proces reactivated. 478 If yes, the parallel wakeup process is killed. 479 If not, process is removed from res.waitQ (reneging). 480 (2)If res of type Store: 481 Tests whether item(s) gotten from Store res. 482 If yes, the parallel wakeup process is killed. 483 If no, process is removed from res.getQ 484 (3)If res of type Level: 485 Tests whether units gotten from Level res. 486 If yes, the parallel wakeup process is killed. 487 If no, process is removed from res.getQ. 488 """ 489 if isinstance(res,Resource): 490 test=self in res.activeQ 491 if test: 492 self.cancel(self._holder) 493 else: 494 res.waitQ.remove(self) 495 if res.monitored: 496 res.waitMon.observe(len(res.waitQ),t=now()) 497 return test 498 elif isinstance(res,Store): 499 test=len(self.got) 500 if test: 501 self.cancel(self._holder) 502 else: 503 res.getQ.remove(self) 504 if res.monitored: 505 res.getQMon.observe(len(res.getQ),t=now()) 506 return test 507 elif isinstance(res,Level): 508 test=not (self.got is None) 509 if test: 510 self.cancel(self._holder) 511 else: 512 res.getQ.remove(self) 513 if res.monitored: 514 res.getQMon.observe(len(res.getQ),t=now()) 515 return test
516
517 - def stored(self,buffer):
518 """Test for reneging for 'yield put . . .' compound statement (Level and 519 Store. Returns True if not reneged. 520 If self not in buffer.putQ, kill wakeup process, else take self out of 521 buffer.putQ (reneged)""" 522 test=self in buffer.putQ 523 if test: #reneged 524 buffer.putQ.remove(self) 525 if buffer.monitored: 526 buffer.putQMon.observe(len(buffer.putQ),t=now()) 527 else: 528 self.cancel(self._holder) 529 return not test
530
531 -def allEventNotices():
532 """Returns string with eventlist as; 533 t1: processname,processname2 534 t2: processname4,processname5, . . . 535 . . . . 536 """ 537 ret="" 538 tempList=[] 539 tempList[:]=_e.timestamps 540 tempList.sort() 541 # return only event notices which are not cancelled 542 tempList=[[x[0],x[2].name] for x in tempList if not x[3]] 543 tprev=-1 544 for t in tempList: 545 # if new time, new line 546 if t[0]==tprev: 547 # continue line 548 ret+=",%s"%t[1] 549 else: 550 # new time 551 if tprev==-1: 552 ret="%s: %s"%(t[0],t[1]) 553 else: 554 ret+="\n%s: %s"%(t[0],t[1]) 555 tprev=t[0] 556 return ret+"\n"
557
558 -def allEventTimes():
559 """Returns list of all times for which events are scheduled. 560 """ 561 r=[] 562 r[:]=_e.timestamps 563 r.sort() 564 # return only event times of not cancelled event notices 565 r1=[x[0] for x in r if not r[3]] 566 tprev=-1 567 ret=[] 568 for t in r1: 569 if t==tprev: 570 #skip time, already in list 571 pass 572 else: 573 ret.append(t) 574 tprev=t 575 return ret
576
577 -class __Evlist(object):
578 """Defines event list and operations on it"""
579 - def __init__(self):
580 # always sorted list of events (sorted by time, priority) 581 # make heapq 582 self.timestamps = [] 583 self.sortpr=0
584
585 - def _post(self, what, at, prior=False):
586 """Post an event notice for process what for time at""" 587 # event notices are Process instances 588 if at < _t: 589 raise Simerror("Attempt to schedule event in the past") 590 what._nextTime = at 591 self.sortpr-=1 592 if prior: 593 # before all other event notices at this time 594 # heappush with highest priority value so far (negative of 595 # monotonely decreasing number) 596 # store event notice in process instance 597 what._rec=[at,self.sortpr,what,False] 598 # make event list refer to it 599 hq.heappush(self.timestamps,what._rec) 600 else: 601 # heappush with lowest priority 602 # store event notice in process instance 603 what._rec=[at,-self.sortpr,what,False] 604 # make event list refer to it 605 hq.heappush(self.timestamps,what._rec)
606
607 - def _unpost(self, whom):
608 """ 609 Mark event notice for whom as cancelled if whom is a suspended process 610 """ 611 if whom._nextTime is not None: # check if whom was actually active 612 whom._rec[3]=True ## Mark as cancelled 613 whom._nextTime=None
614
615 - def _nextev(self):
616 """Retrieve next event from event list""" 617 global _t, _stop 618 noActiveNotice=True 619 ## Find next event notice which is not marked cancelled 620 while noActiveNotice: 621 if self.timestamps: 622 ## ignore priority value 623 (_tnotice, p,nextEvent,cancelled) = hq.heappop(self.timestamps) 624 noActiveNotice=cancelled 625 else: 626 raise Simerror("No more events at time %s" % _t) 627 nextEvent._rec=None 628 _t=_tnotice 629 if _t > _endtime: 630 _t = _endtime 631 _stop = True 632 return (None,) 633 try: 634 resultTuple = nextEvent._nextpoint.next() 635 except StopIteration: 636 nextEvent._nextpoint = None 637 nextEvent._terminated = True 638 nextEvent._nextTime = None 639 resultTuple = None 640 return (resultTuple, nextEvent)
641
642 - def _isEmpty(self):
643 return not self.timestamps
644
645 - def _allEventNotices(self):
646 """Returns string with eventlist as 647 t1: [procname,procname2] 648 t2: [procname4,procname5, . . . ] 649 . . . . 650 """ 651 ret="" 652 for t in self.timestamps: 653 ret+="%s:%s\n"%(t[1]._nextTime, t[1].name) 654 return ret[:-1]
655
656 - def _allEventTimes(self):
657 """Returns list of all times for which events are scheduled. 658 """ 659 return self.timestamps
660
661 -def activate(obj,process,at="undefined",delay="undefined",prior=False):
662 """Application function to activate passive process.""" 663 if _e is None: 664 raise FatalSimerror\ 665 ("Fatal error: simulation is not initialized (call initialize() first)") 666 if not (type(process) == types.GeneratorType): 667 raise FatalSimerror("Activating function which"+ 668 " is not a generator (contains no 'yield')") 669 if not obj._terminated and not obj._nextTime: 670 #store generator reference in object; needed for reactivation 671 obj._nextpoint=process 672 if at=="undefined": 673 at=_t 674 if delay=="undefined": 675 zeit=max(_t,at) 676 else: 677 zeit=max(_t,_t+delay) 678 _e._post(obj,at=zeit,prior=prior)
679
680 -def reactivate(obj,at="undefined",delay="undefined",prior=False):
681 """Application function to reactivate a process which is active, 682 suspended or passive.""" 683 # Object may be active, suspended or passive 684 if not obj._terminated: 685 a=Process("SimPysystem") 686 a.cancel(obj) 687 # object now passive 688 if at=="undefined": 689 at=_t 690 if delay=="undefined": 691 zeit=max(_t,at) 692 else: 693 zeit=max(_t,_t+delay) 694 _e._post(obj,at=zeit,prior=prior)
695
696 -class Histogram(list):
697 """ A histogram gathering and sampling class""" 698
699 - def __init__(self,name = '',low=0.0,high=100.0,nbins=10):
700 list.__init__(self) 701 self.name = name 702 self.low = float(low) 703 self.high = float(high) 704 self.nbins = nbins 705 self.binsize=(self.high-self.low)/nbins 706 self._nrObs=0 707 self._sum=0 708 self[:] =[[low+(i-1)*self.binsize,0] for i in range(self.nbins+2)]
709
710 - def addIn(self,y):
711 """ add a value into the correct bin""" 712 self._nrObs+=1 713 self._sum+=y 714 b = int((y-self.low+self.binsize)/self.binsize) 715 if b < 0: b = 0 716 if b > self.nbins+1: b = self.nbins+1 717 assert 0 <= b <=self.nbins+1,'Histogram.addIn: b out of range: %s'%b 718 self[b][1]+=1
719
720 - def __str__(self):
721 histo=self 722 ylab="value" 723 nrObs=self._nrObs 724 width=len(str(nrObs)) 725 res=[] 726 res.append("<Histogram %s:"%self.name) 727 res.append("\nNumber of observations: %s"%nrObs) 728 if nrObs: 729 su=self._sum 730 cum=histo[0][1] 731 fmt="%s" 732 line="\n%s <= %s < %s: %s (cum: %s/%s%s)"\ 733 %(fmt,"%s",fmt,"%s","%s","%5.1f","%s") 734 line1="\n%s%s < %s: %s (cum: %s/%s%s)"\ 735 %("%s","%s",fmt,"%s","%s","%5.1f","%s") 736 l1width=len(("%s <= "%fmt)%histo[1][0]) 737 res.append(line1\ 738 %(" "*l1width,ylab,histo[1][0],str(histo[0][1]).rjust(width),\ 739 str(cum).rjust(width),(float(cum)/nrObs)*100,"%") 740 ) 741 for i in range(1,len(histo)-1): 742 cum+=histo[i][1] 743 res.append(line\ 744 %(histo[i][0],ylab,histo[i+1][0],str(histo[i][1]).rjust(width),\ 745 str(cum).rjust(width),(float(cum)/nrObs)*100,"%") 746 ) 747 cum+=histo[-1][1] 748 linen="\n%s <= %s %s : %s (cum: %s/%s%s)"\ 749 %(fmt,"%s","%s","%s","%s","%5.1f","%s") 750 lnwidth=len(("<%s"%fmt)%histo[1][0]) 751 res.append(linen\ 752 %(histo[-1][0],ylab," "*lnwidth,str(histo[-1][1]).rjust(width),\ 753 str(cum).rjust(width),(float(cum)/nrObs)*100,"%") 754 ) 755 res.append("\n>") 756 return " ".join(res)
757
758 -def startCollection(when=0.0,monitors=None,tallies=None):
759 """Starts data collection of all designated Monitor and Tally objects 760 (default=all) at time 'when'. 761 """ 762 class Starter(Process): 763 def collect(self,monitors,tallies): 764 for m in monitors: 765 print m.name 766 m.reset() 767 for t in tallies: 768 t.reset() 769 yield hold,self
770 if monitors is None: 771 monitors=allMonitors 772 if tallies is None: 773 tallies=allTallies 774 s=Starter() 775 activate(s,s.collect(monitors=monitors,tallies=tallies),at=when) 776
777 -class Monitor(list):
778 """ Monitored variables 779 780 A Class for monitored variables, that is, variables that allow one 781 to gather simple statistics. A Monitor is a subclass of list and 782 list operations can be performed on it. An object is established 783 using m= Monitor(name = '..'). It can be given a 784 unique name for use in debugging and in tracing and ylab and tlab 785 strings for labelling graphs. 786 """
787 - def __init__(self,name='a_Monitor',ylab='y',tlab='t'):
788 list.__init__(self) 789 self.startTime = 0.0 790 self.name = name 791 self.ylab = ylab 792 self.tlab = tlab 793 allMonitors.append(self)
794
795 - def setHistogram(self,name = '',low=0.0,high=100.0,nbins=10):
796 """Sets histogram parameters. 797 Must be called before call to getHistogram""" 798 if name=='': 799 histname=self.name 800 else: 801 histname=name 802 self.histo=Histogram(name=histname,low=low,high=high,nbins=nbins)
803
804 - def observe(self,y,t=None):
805 """record y and t""" 806 if t is None: t = now() 807 self.append([t,y])
808
809 - def tally(self,y):
810 """ deprecated: tally for backward compatibility""" 811 self.observe(y,0)
812
813 - def accum(self,y,t=None):
814 """ deprecated: accum for backward compatibility""" 815 self.observe(y,t)
816
817 - def reset(self,t=None):
818 """reset the sums and counts for the monitored variable """ 819 self[:]=[] 820 if t is None: t = now() 821 self.startTime = t
822
823 - def tseries(self):
824 """ the series of measured times""" 825 return list(zip(*self)[0])
826
827 - def yseries(self):
828 """ the series of measured values""" 829 return list(zip(*self)[1])
830
831 - def count(self):
832 """ deprecated: the number of observations made """ 833 return self.__len__()
834
835 - def total(self):
836 """ the sum of the y""" 837 if self.__len__()==0: return 0 838 else: 839 sum = 0.0 840 for i in range(self.__len__()): 841 sum += self[i][1] 842 return sum # replace by sum() later
843
844 - def mean(self):
845 """ the simple average of the monitored variable""" 846 try: return 1.0*self.total()/self.__len__() 847 except: print 'SimPy: No observations for mean'
848
849 - def var(self):
850 """ the sample variance of the monitored variable """ 851 n = len(self) 852 tot = self.total() 853 ssq=0.0 854 ##yy = self.yseries() 855 for i in range(self.__len__()): 856 ssq += self[i][1]**2 # replace by sum() eventually 857 try: return (ssq - float(tot*tot)/n)/n 858 except: print 'SimPy: No observations for sample variance'
859
860 - def timeAverage(self,t=None):
861 """ the time-weighted average of the monitored variable. 862 863 If t is used it is assumed to be the current time, 864 otherwise t = now() 865 """ 866 N = self.__len__() 867 if N == 0: 868 print 'SimPy: No observations for timeAverage' 869 return None 870 871 if t is None: t = now() 872 sum = 0.0 873 tlast = self.startTime 874 #print 'DEBUG: timave ',t,tlast 875 ylast = 0.0 876 for i in range(N): 877 ti,yi = self[i] 878 sum += ylast*(ti-tlast) 879 tlast = ti 880 ylast = yi 881 sum += ylast*(t-tlast) 882 T = t - self.startTime 883 if T == 0: 884 print 'SimPy: No elapsed time for timeAverage' 885 return None 886 #print 'DEBUG: timave ',sum,t,T 887 return sum/float(T)
888
889 - def timeVariance(self,t=None):
890 """ the time-weighted Variance of the monitored variable. 891 892 If t is used it is assumed to be the current time, 893 otherwise t = now() 894 """ 895 N = self.__len__() 896 if N == 0: 897 print 'SimPy: No observations for timeVariance' 898 return None 899 if t is None: t = now() 900 sm = 0.0 901 ssq = 0.0 902 tlast = self.startTime 903 # print 'DEBUG: 1 twVar ',t,tlast 904 ylast = 0.0 905 for i in range(N): 906 ti,yi = self[i] 907 sm += ylast*(ti-tlast) 908 ssq += ylast*ylast*(ti-tlast) 909 tlast = ti 910 ylast = yi 911 sm += ylast*(t-tlast) 912 ssq += ylast*ylast*(t-tlast) 913 T = t - self.startTime 914 if T == 0: 915 print 'SimPy: No elapsed time for timeVariance' 916 return None 917 mn = sm/float(T) 918 # print 'DEBUG: 2 twVar ',ssq,t,T 919 return ssq/float(T) - mn*mn
920 921
922 - def histogram(self,low=0.0,high=100.0,nbins=10):
923 """ A histogram of the monitored y data values. 924 """ 925 h = Histogram(name=self.name,low=low,high=high,nbins=nbins) 926 ys = self.yseries() 927 for y in ys: h.addIn(y) 928 return h
929
930 - def getHistogram(self):
931 """Returns a histogram based on the parameters provided in 932 preceding call to setHistogram. 933 """ 934 ys = self.yseries() 935 h=self.histo 936 for y in ys: h.addIn(y) 937 return h
938
939 - def printHistogram(self,fmt="%s"):
940 """Returns formatted frequency distribution table string from Monitor. 941 Precondition: setHistogram must have been called. 942 fmt==format of bin range values 943 """ 944 try: 945 histo=self.getHistogram() 946 except: 947 raise FatalSimerror("histogramTable: call setHistogram first"\ 948 " for Monitor %s"%self.name) 949 ylab=self.ylab 950 nrObs=self.count() 951 width=len(str(nrObs)) 952 res=[] 953 res.append("\nHistogram for %s:"%histo.name) 954 res.append("\nNumber of observations: %s"%nrObs) 955 su=sum(self.yseries()) 956 cum=histo[0][1] 957 line="\n%s <= %s < %s: %s (cum: %s/%s%s)"\ 958 %(fmt,"%s",fmt,"%s","%s","%5.1f","%s") 959 line1="\n%s%s < %s: %s (cum: %s/%s%s)"\ 960 %("%s","%s",fmt,"%s","%s","%5.1f","%s") 961 l1width=len(("%s <= "%fmt)%histo[1][0]) 962 res.append(line1\ 963 %(" "*l1width,ylab,histo[1][0],str(histo[0][1]).rjust(width),\ 964 str(cum).rjust(width),(float(cum)/nrObs)*100,"%") 965 ) 966 for i in range(1,len(histo)-1): 967 cum+=histo[i][1] 968 res.append(line\ 969 %(histo[i][0],ylab,histo[i+1][0],str(histo[i][1]).rjust(width),\ 970 str(cum).rjust(width),(float(cum)/nrObs)*100,"%") 971 ) 972 cum+=histo[-1][1] 973 linen="\n%s <= %s %s : %s (cum: %s/%s%s)"\ 974 %(fmt,"%s","%s","%s","%s","%5.1f","%s") 975 lnwidth=len(("<%s"%fmt)%histo[1][0]) 976 res.append(linen\ 977 %(histo[-1][0],ylab," "*lnwidth,str(histo[-1][1]).rjust(width),\ 978 str(cum).rjust(width),(float(cum)/nrObs)*100,"%") 979 ) 980 return " ".join(res)
981
982 -class Tally:
983 - def __init__(self, name="a_Tally", ylab="y",tlab="t"):
984 self.name = name 985 self.ylab = ylab 986 self.tlab = tlab 987 self.reset() 988 self.startTime = 0.0 989 self.histo = None 990 self.sum = 0.0 991 self._sum_of_squares = 0 992 self._integral = 0.0 # time-weighted sum 993 self._integral2 = 0.0 # time-weighted sum of squares 994 allTallies.append(self)
995
996 - def setHistogram(self,name = '',low=0.0,high=100.0,nbins=10):
997 """Sets histogram parameters. 998 Must be called to prior to observations initiate data collection 999 for histogram. 1000 """ 1001 if name=='': 1002 hname=self.name 1003 else: 1004 hname=name 1005 self.histo=Histogram(name=hname,low=low,high=high,nbins=nbins)
1006
1007 - def observe(self, y, t=None):
1008 if t is None: 1009 t = now() 1010 self._integral += (t - self._last_timestamp) * self._last_observation 1011 yy = self._last_observation* self._last_observation 1012 self._integral2 += (t - self._last_timestamp) * yy 1013 self._last_timestamp = t 1014 self._last_observation = y 1015 self._total += y 1016 self._count += 1 1017 self._sum += y 1018 self._sum_of_squares += y * y 1019 if self.histo: 1020 self.histo.addIn(y)
1021
1022 - def reset(self, t=None):
1023 if t is None: 1024 t = now() 1025 self.startTime = t 1026 self._last_timestamp = t 1027 self._last_observation = 0.0 1028 self._count = 0 1029 self._total = 0.0 1030 self._integral = 0.0 1031 self._integral2 = 0.0 1032 self._sum = 0.0 1033 self._sum_of_squares = 0.0
1034
1035 - def count(self):
1036 return self._count
1037
1038 - def total(self):
1039 return self._total
1040
1041 - def mean(self):
1042 return 1.0 * self._total / self._count
1043
1044 - def timeAverage(self,t=None):
1045 if t is None: 1046 t=now() 1047 integ=self._integral+(t - self._last_timestamp) * self._last_observation 1048 if (t > self.startTime): 1049 return 1.0 * integ/(t - self.startTime) 1050 else: 1051 print 'SimPy: No elapsed time for timeAverage' 1052 return None
1053
1054 - def var(self):
1055 return 1.0 * (self._sum_of_squares - (1.0 * (self._sum * self._sum)\ 1056 / self._count)) / (self._count)
1057
1058 - def timeVariance(self,t=None):
1059 """ the time-weighted Variance of the Tallied variable. 1060 1061 If t is used it is assumed to be the current time, 1062 otherwise t = now() 1063 """ 1064 if t is None: 1065 t=now() 1066 twAve = self.timeAverage(t) 1067 #print 'Tally timeVariance DEBUG: twave:', twAve 1068 last = self._last_observation 1069 twinteg2=self._integral2+(t - self._last_timestamp) * last * last 1070 #print 'Tally timeVariance DEBUG:tinteg2:', twinteg2 1071 if (t > self.startTime): 1072 return 1.0 * twinteg2/(t - self.startTime) - twAve*twAve 1073 else: 1074 print 'SimPy: No elapsed time for timeVariance' 1075 return None
1076 1077 1078
1079 - def __len__(self):
1080 return self._count
1081
1082 - def __eq__(self, l):
1083 return len(l) == self._count
1084
1085 - def getHistogram(self):
1086 return self.histo
1087
1088 - def printHistogram(self,fmt="%s"):
1089 """Returns formatted frequency distribution table string from Tally. 1090 Precondition: setHistogram must have been called. 1091 fmt==format of bin range values 1092 """ 1093 try: 1094 histo=self.getHistogram() 1095 except: 1096 raise FatalSimerror("histogramTable: call setHistogram first"\ 1097 " for Tally %s"%self.name) 1098 ylab=self.ylab 1099 nrObs=self.count() 1100 width=len(str(nrObs)) 1101 res=[] 1102 res.append("\nHistogram for %s:"%histo.name) 1103 res.append("\nNumber of observations: %s"%nrObs) 1104 su=self.total() 1105 cum=histo[0][1] 1106 line="\n%s <= %s < %s: %s (cum: %s/%s%s)"\ 1107 %(fmt,"%s",fmt,"%s","%s","%5.1f","%s") 1108 line1="\n%s%s < %s: %s (cum: %s/%s%s)"\ 1109 %("%s","%s",fmt,"%s","%s","%5.1f","%s") 1110 l1width=len(("%s <= "%fmt)%histo[1][0]) 1111 res.append(line1\ 1112 %(" "*l1width,ylab,histo[1][0],str(histo[0][1]).rjust(width),\ 1113 str(cum).rjust(width),(float(cum)/nrObs)*100,"%") 1114 ) 1115 for i in range(1,len(histo)-1): 1116 cum+=histo[i][1] 1117 res.append(line\ 1118 %(histo[i][0],ylab,histo[i+1][0],str(histo[i][1]).rjust(width),\ 1119 str(cum).rjust(width),(float(cum)/nrObs)*100,"%") 1120 ) 1121 cum+=histo[-1][1] 1122 linen="\n%s <= %s %s : %s (cum: %s/%s%s)"\ 1123 %(fmt,"%s","%s","%s","%s","%5.1f","%s") 1124 lnwidth=len(("<%s"%fmt)%histo[1][0]) 1125 res.append(linen\ 1126 %(histo[-1][0],ylab," "*lnwidth,str(histo[-1][1]).rjust(width),\ 1127 str(cum).rjust(width),(float(cum)/nrObs)*100,"%") 1128 ) 1129 return " ".join(res)
1130
1131 -class Queue(list):
1132 - def __init__(self,res,moni):
1133 if not moni is None: #moni==[]: 1134 self.monit=True # True if a type of Monitor/Tally attached 1135 else: 1136 self.monit=False 1137 self.moni=moni # The Monitor/Tally 1138 self.resource=res # the resource/buffer this queue belongs to
1139
1140 - def enter(self,obj):
1141 pass
1142
1143 - def leave(self):
1144 pass
1145
1146 - def takeout(self,obj):
1147 self.remove(obj) 1148 if self.monit: 1149 self.moni.observe(len(self),t=now())
1150
1151 -class FIFO(Queue):
1152 - def __init__(self,res,moni):
1153 Queue.__init__(self,res,moni)
1154
1155 - def enter(self,obj):
1156 self.append(obj) 1157 if self.monit: 1158 self.moni.observe(len(self),t=now())
1159
1160 - def enterGet(self,obj):
1161 self.enter(obj)
1162
1163 - def enterPut(self,obj):
1164 self.enter(obj)
1165
1166 - def leave(self):
1167 a= self.pop(0) 1168 if self.monit: 1169 self.moni.observe(len(self),t=now()) 1170 return a
1171
1172 -class PriorityQ(FIFO):
1173 """Queue is always ordered according to priority. 1174 Higher value of priority attribute == higher priority. 1175 """
1176 - def __init__(self,res,moni):
1177 FIFO.__init__(self,res,moni)
1178
1179 - def enter(self,obj):
1180 """Handles request queue for Resource""" 1181 if len(self): 1182 ix=self.resource 1183 if self[-1]._priority[ix] >= obj._priority[ix]: 1184 self.append(obj) 1185 else: 1186 z=0 1187 while self[z]._priority[ix] >= obj._priority[ix]: 1188 z += 1 1189 self.insert(z,obj) 1190 else: 1191 self.append(obj) 1192 if self.monit: 1193 self.moni.observe(len(self),t=now())
1194
1195 - def enterGet(self,obj):
1196 """Handles getQ in Buffer""" 1197 if len(self): 1198 ix=self.resource 1199 #print "priority:",[x._priority[ix] for x in self] 1200 if self[-1]._getpriority[ix] >= obj._getpriority[ix]: 1201 self.append(obj) 1202 else: 1203 z=0 1204 while self[z]._getpriority[ix] >= obj._getpriority[ix]: 1205 z += 1 1206 self.insert(z,obj) 1207 else: 1208 self.append(obj) 1209 if self.monit: 1210 self.moni.observe(len(self),t=now())
1211
1212 - def enterPut(self,obj):
1213 """Handles putQ in Buffer""" 1214 if len(self): 1215 ix=self.resource 1216 #print "priority:",[x._priority[ix] for x in self] 1217 if self[-1]._putpriority[ix] >= obj._putpriority[ix]: 1218 self.append(obj) 1219 else: 1220 z=0 1221 while self[z]._putpriority[ix] >= obj._putpriority[ix]: 1222 z += 1 1223 self.insert(z,obj) 1224 else: 1225 self.append(obj) 1226 if self.monit: 1227 self.moni.observe(len(self),t=now())
1228
1229 -class Resource(Lister):
1230 """Models shared, limited capacity resources with queuing; 1231 FIFO is default queuing discipline. 1232 """ 1233
1234 - def __init__(self,capacity=1,name="a_resource",unitName="units", 1235 qType=FIFO,preemptable=0,monitored=False,monitorType=Monitor):
1236 """ 1237 monitorType={Monitor(default)|Tally} 1238 """ 1239 self.name=name # resource name 1240 self.capacity=capacity # resource units in this resource 1241 self.unitName=unitName # type name of resource units 1242 self.n=capacity # uncommitted resource units 1243 self.monitored=monitored 1244 1245 if self.monitored: # Monitor waitQ, activeQ 1246 self.actMon=monitorType(name="Active Queue Monitor %s"%self.name, 1247 ylab="nr in queue",tlab="time") 1248 monact=self.actMon 1249 self.waitMon=monitorType(name="Wait Queue Monitor %s"%self.name, 1250 ylab="nr in queue",tlab="time") 1251 monwait=self.waitMon 1252 else: 1253 monwait=None 1254 monact=None 1255 self.waitQ=qType(self,monwait) 1256 self.preemptable=preemptable 1257 self.activeQ=qType(self,monact) 1258 self.priority_default=0
1259
1260 - def _request(self,arg):
1261 """Process request event for this resource""" 1262 obj=arg[1] 1263 if len(arg[0]) == 4: # yield request,self,resource,priority 1264 obj._priority[self]=arg[0][3] 1265 else: # yield request,self,resource 1266 obj._priority[self]=self.priority_default 1267 if self.preemptable and self.n == 0: # No free resource 1268 # test for preemption condition 1269 preempt=obj._priority[self] > self.activeQ[-1]._priority[self] 1270 # If yes: 1271 if preempt: 1272 z=self.activeQ[-1] 1273 # Keep track of preempt level 1274 z._preempted+=1 1275 # suspend lowest priority process being served 1276 # record remaining service time at first preempt only 1277 if z._preempted==1: 1278 z._remainService = z._nextTime - _t 1279 # cancel only at first preempt 1280 Process().cancel(z) 1281 # remove from activeQ 1282 self.activeQ.remove(z) 1283 # put into front of waitQ 1284 self.waitQ.insert(0,z) 1285 # if self is monitored, update waitQ monitor 1286 if self.monitored: 1287 self.waitMon.observe(len(self.waitQ),now()) 1288 # passivate re-queued process 1289 z._nextTime=None 1290 # assign resource unit to preemptor 1291 self.activeQ.enter(obj) 1292 # post event notice for preempting process 1293 _e._post(obj,at=_t,prior=1) 1294 else: 1295 self.waitQ.enter(obj) 1296 # passivate queuing process 1297 obj._nextTime=None 1298 else: # treat non-preemption case 1299 if self.n == 0: 1300 self.waitQ.enter(obj) 1301 # passivate queuing process 1302 obj._nextTime=None 1303 else: 1304 self.n -= 1 1305 self.activeQ.enter(obj) 1306 _e._post(obj,at=_t,prior=1)
1307
1308 - def _release(self,arg):
1309 """Process release request for this resource""" 1310 self.n += 1 1311 self.activeQ.remove(arg[1]) 1312 if self.monitored: 1313 self.actMon.observe(len(self.activeQ),t=now()) 1314 #reactivate first waiting requestor if any; assign Resource to it 1315 if self.waitQ: 1316 obj=self.waitQ.leave() 1317 self.n -= 1 #assign 1 resource unit to object 1318 self.activeQ.enter(obj) 1319 # if resource preemptable: 1320 if self.preemptable: 1321 # if object had been preempted: 1322 if obj._preempted: 1323 # keep track of preempt level 1324 obj._preempted-=1 1325 # reactivate object delay= remaining service time 1326 # but only, if all other preempts are over 1327 if obj._preempted==0: 1328 reactivate(obj,delay=obj._remainService,prior=1) 1329 # else reactivate right away 1330 else: 1331 reactivate(obj,delay=0,prior=1) 1332 # else: 1333 else: 1334 reactivate(obj,delay=0,prior=1) 1335 _e._post(arg[1],at=_t,prior=1)
1336
1337 -class Buffer(Lister):
1338 """Abstract class for buffers 1339 Blocks a process when a put would cause buffer overflow or a get would cause 1340 buffer underflow. 1341 Default queuing discipline for blocked processes is FIFO.""" 1342 1343 priorityDefault=0
1344 - def __init__(self,name=None,capacity="unbounded",unitName="units", 1345 putQType=FIFO,getQType=FIFO, 1346 monitored=False,monitorType=Monitor,initialBuffered=None):
1347 if capacity=="unbounded": capacity=sys.maxint 1348 self.capacity=capacity 1349 self.name=name 1350 self.putQType=putQType 1351 self.getQType=getQType 1352 self.monitored=monitored 1353 self.initialBuffered=initialBuffered 1354 self.unitName=unitName 1355 if self.monitored: 1356 ## monitor for Producer processes' queue 1357 self.putQMon=monitorType(name="Producer Queue Monitor %s"%self.name, 1358 ylab="nr in queue",tlab="time") 1359 ## monitor for Consumer processes' queue 1360 self.getQMon=monitorType(name="Consumer Queue Monitor %s"%self.name, 1361 ylab="nr in queue",tlab="time") 1362 ## monitor for nr items in buffer 1363 self.bufferMon=monitorType(name="Buffer Monitor %s"%self.name, 1364 ylab="nr in buffer",tlab="time") 1365 else: 1366 self.putQMon=None 1367 self.getQMon=None 1368 self.bufferMon=None 1369 self.putQ=self.putQType(res=self,moni=self.putQMon) 1370 self.getQ=self.getQType(res=self,moni=self.getQMon) 1371 if self.monitored: 1372 self.putQMon.observe(y=len(self.putQ),t=now()) 1373 self.getQMon.observe(y=len(self.getQ),t=now()) 1374 self._putpriority={} 1375 self._getpriority={} 1376 1377 def _put(self): 1378 pass
1379 def _get(self): 1380 pass
1381
1382 -class Level(Buffer):
1383 """Models buffers for processes putting/getting un-distinguishable items. 1384 """
1385 - def getamount(self):
1386 return self.nrBuffered
1387
1388 - def gettheBuffer(self):
1389 return self.nrBuffered
1390 1391 theBuffer=property(gettheBuffer) 1392
1393 - def __init__(self,**pars):
1394 Buffer.__init__(self,**pars) 1395 if self.name is None: 1396 self.name="a_level" ## default name 1397 1398 if (type(self.capacity)!=type(1.0) and\ 1399 type(self.capacity)!=type(1)) or\ 1400 self.capacity<0: 1401 raise FatalSimerror\ 1402 ("Level: capacity parameter not a positive number: %s"\ 1403 %self.initialBuffered) 1404 1405 if type(self.initialBuffered)==type(1.0) or\ 1406 type(self.initialBuffered)==type(1): 1407 if self.initialBuffered>self.capacity: 1408 raise FatalSimerror("initialBuffered exceeds capacity") 1409 if self.initialBuffered>=0: 1410 self.nrBuffered=self.initialBuffered ## nr items initially in buffer 1411 ## buffer is just a counter (int type) 1412 else: 1413 raise FatalSimerror\ 1414 ("initialBuffered param of Level negative: %s"\ 1415 %self.initialBuffered) 1416 elif self.initialBuffered is None: 1417 self.initialBuffered=0 1418 self.nrBuffered=0 1419 else: 1420 raise FatalSimerror\ 1421 ("Level: wrong type of initialBuffered (parameter=%s)"\ 1422 %self.initialBuffered) 1423 if self.monitored: 1424 self.bufferMon.observe(y=self.amount,t=now())
1425 amount=property(getamount) 1426
1427 - def _put(self,arg):
1428 """Handles put requests for Level instances""" 1429 obj=arg[1] 1430 if len(arg[0]) == 5: # yield put,self,buff,whattoput,priority 1431 obj._putpriority[self]=arg[0][4] 1432 whatToPut=arg[0][3] 1433 elif len(arg[0]) == 4: # yield get,self,buff,whattoput 1434 obj._putpriority[self]=Buffer.priorityDefault #default 1435 whatToPut=arg[0][3] 1436 else: # yield get,self,buff 1437 obj._putpriority[self]=Buffer.priorityDefault #default 1438 whatToPut=1 1439 if type(whatToPut)!=type(1) and type(whatToPut)!=type(1.0): 1440 raise FatalSimerror("Level: put parameter not a number") 1441 if not whatToPut>=0.0: 1442 raise FatalSimerror("Level: put parameter not positive number") 1443 whatToPutNr=whatToPut 1444 if whatToPutNr+self.amount>self.capacity: 1445 obj._nextTime=None #passivate put requestor 1446 obj._whatToPut=whatToPutNr 1447 self.putQ.enterPut(obj) #and queue, with size of put 1448 else: 1449 self.nrBuffered+=whatToPutNr 1450 if self.monitored: 1451 self.bufferMon.observe(y=self.amount,t=now()) 1452 # service any getters waiting 1453 # service in queue-order; do not serve second in queue before first 1454 # has been served 1455 while len(self.getQ) and self.amount>0: 1456 proc=self.getQ[0] 1457 if proc._nrToGet<=self.amount: 1458 proc.got=proc._nrToGet 1459 self.nrBuffered-=proc.got 1460 if self.monitored: 1461 self.bufferMon.observe(y=self.amount,t=now()) 1462 self.getQ.takeout(proc) # get requestor's record out of queue 1463 _e._post(proc,at=_t) # continue a blocked get requestor 1464 else: 1465 break 1466 _e._post(obj,at=_t,prior=1) # continue the put requestor
1467
1468 - def _get(self,arg):
1469 """Handles get requests for Level instances""" 1470 obj=arg[1] 1471 obj.got=None 1472 if len(arg[0]) == 5: # yield get,self,buff,whattoget,priority 1473 obj._getpriority[self]=arg[0][4] 1474 nrToGet=arg[0][3] 1475 elif len(arg[0]) == 4: # yield get,self,buff,whattoget 1476 obj._getpriority[self]=Buffer.priorityDefault #default 1477 nrToGet=arg[0][3] 1478 else: # yield get,self,buff 1479 obj._getpriority[self]=Buffer.priorityDefault 1480 nrToGet=1 1481 if type(nrToGet)!=type(1.0) and type(nrToGet)!=type(1): 1482 raise FatalSimerror\ 1483 ("Level: get parameter not a number: %s"%nrToGet) 1484 if nrToGet<0: 1485 raise FatalSimerror\ 1486 ("Level: get parameter not positive number: %s"%nrToGet) 1487 if self.amount < nrToGet: 1488 obj._nrToGet=nrToGet 1489 self.getQ.enterGet(obj) 1490 # passivate queuing process 1491 obj._nextTime=None 1492 else: 1493 obj.got=nrToGet 1494 self.nrBuffered-=nrToGet 1495 if self.monitored: 1496 self.bufferMon.observe(y=self.amount,t=now()) 1497 _e._post(obj,at=_t,prior=1) 1498 # reactivate any put requestors for which space is now available 1499 # service in queue-order; do not serve second in queue before first 1500 # has been served 1501 while len(self.putQ): #test for queued producers 1502 proc=self.putQ[0] 1503 if proc._whatToPut+self.amount<=self.capacity: 1504 self.nrBuffered+=proc._whatToPut 1505 if self.monitored: 1506 self.bufferMon.observe(y=self.amount,t=now()) 1507 self.putQ.takeout(proc)#requestor's record out of queue 1508 _e._post(proc,at=_t) # continue a blocked put requestor 1509 else: 1510 break
1511
1512 -class Store(Buffer):
1513 """Models buffers for processes coupled by putting/getting distinguishable 1514 items. 1515 Blocks a process when a put would cause buffer overflow or a get would cause 1516 buffer underflow. 1517 Default queuing discipline for blocked processes is priority FIFO. 1518 """
1519 - def getnrBuffered(self):
1520 return len(self.theBuffer)
1521 nrBuffered=property(getnrBuffered) 1522
1523 - def getbuffered(self):
1524 return self.theBuffer
1525 buffered=property(getbuffered) 1526
1527 - def __init__(self,**pars):
1528 Buffer.__init__(self,**pars) 1529 self.theBuffer=[] 1530 if self.name is None: 1531 self.name="a_store" ## default name 1532 if type(self.capacity)!=type(1) or self.capacity<=0: 1533 raise FatalSimerror\ 1534 ("Store: capacity parameter not a positive integer > 0: %s"\ 1535 %self.initialBuffered) 1536 if type(self.initialBuffered)==type([]): 1537 if len(self.initialBuffered)>self.capacity: 1538 raise FatalSimerror("initialBuffered exceeds capacity") 1539 else: 1540 self.theBuffer[:]=self.initialBuffered##buffer==list of objects 1541 elif self.initialBuffered is None: 1542 self.theBuffer=[] 1543 else: 1544 raise FatalSimerror\ 1545 ("Store: initialBuffered not a list") 1546 if self.monitored: 1547 self.bufferMon.observe(y=self.nrBuffered,t=now()) 1548 self._sort=None
1549 1550 1551
1552 - def addSort(self,sortFunc):
1553 """Adds buffer sorting to this instance of Store. It maintains 1554 theBuffer sorted by the sortAttr attribute of the objects in the 1555 buffer. 1556 The user-provided 'sortFunc' must look like this: 1557 1558 def mySort(self,par): 1559 tmplist=[(x.sortAttr,x) for x in par] 1560 tmplist.sort() 1561 return [x for (key,x) in tmplist] 1562 1563 """ 1564 1565 self._sort=new.instancemethod(sortFunc,self,self.__class__) 1566 self.theBuffer=self._sort(self.theBuffer)
1567
1568 - def _put(self,arg):
1569 """Handles put requests for Store instances""" 1570 obj=arg[1] 1571 if len(arg[0]) == 5: # yield put,self,buff,whattoput,priority 1572 obj._putpriority[self]=arg[0][4] 1573 whatToPut=arg[0][3] 1574 elif len(arg[0]) == 4: # yield put,self,buff,whattoput 1575 obj._putpriority[self]=Buffer.priorityDefault #default 1576 whatToPut=arg[0][3] 1577 else: # error, whattoput missing 1578 raise FatalSimerror("Item to put missing in yield put stmt") 1579 if type(whatToPut)!=type([]): 1580 raise FatalSimerror("put parameter is not a list") 1581 whatToPutNr=len(whatToPut) 1582 if whatToPutNr+self.nrBuffered>self.capacity: 1583 obj._nextTime=None #passivate put requestor 1584 obj._whatToPut=whatToPut 1585 self.putQ.enterPut(obj) #and queue, with items to put 1586 else: 1587 self.theBuffer.extend(whatToPut) 1588 if not(self._sort is None): 1589 self.theBuffer=self._sort(self.theBuffer) 1590 if self.monitored: 1591 self.bufferMon.observe(y=self.nrBuffered,t=now()) 1592 1593 # service any waiting getters 1594 # service in queue order: do not serve second in queue before first 1595 # has been served 1596 while self.nrBuffered>0 and len(self.getQ): 1597 proc=self.getQ[0] 1598 if inspect.isfunction(proc._nrToGet): 1599 movCand=proc._nrToGet(self.theBuffer) #predicate parameter 1600 if movCand: 1601 proc.got=movCand[:] 1602 for i in movCand: 1603 self.theBuffer.remove(i) 1604 self.getQ.takeout(proc) 1605 if self.monitored: 1606 self.bufferMon.observe(y=self.nrBuffered,t=now()) 1607 _e._post(what=proc,at=_t) # continue a blocked get requestor 1608 else: 1609 break 1610 else: #numerical parameter 1611 if proc._nrToGet<=self.nrBuffered: 1612 nrToGet=proc._nrToGet 1613 proc.got=[] 1614 proc.got[:]=self.theBuffer[0:nrToGet] 1615 self.theBuffer[:]=self.theBuffer[nrToGet:] 1616 if self.monitored: 1617 self.bufferMon.observe(y=self.nrBuffered,t=now()) 1618 # take this get requestor's record out of queue: 1619 self.getQ.takeout(proc) 1620 _e._post(what=proc,at=_t) # continue a blocked get requestor 1621 else: 1622 break 1623 1624 _e._post(what=obj,at=_t,prior=1) # continue the put requestor
1625
1626 - def _get(self,arg):
1627 """Handles get requests""" 1628 filtfunc=None 1629 obj=arg[1] 1630 obj.got=[] # the list of items retrieved by 'get' 1631 if len(arg[0]) == 5: # yield get,self,buff,whattoget,priority 1632 obj._getpriority[self]=arg[0][4] 1633 if inspect.isfunction(arg[0][3]): 1634 filtfunc=arg[0][3] 1635 else: 1636 nrToGet=arg[0][3] 1637 elif len(arg[0]) == 4: # yield get,self,buff,whattoget 1638 obj._getpriority[self]=Buffer.priorityDefault #default 1639 if inspect.isfunction(arg[0][3]): 1640 filtfunc=arg[0][3] 1641 else: 1642 nrToGet=arg[0][3] 1643 else: # yield get,self,buff 1644 obj._getpriority[self]=Buffer.priorityDefault 1645 nrToGet=1 1646 if not filtfunc: #number specifies nr items to get 1647 if nrToGet<0: 1648 raise FatalSimerror\ 1649 ("Store: get parameter not positive number: %s"%nrToGet) 1650 if self.nrBuffered < nrToGet: 1651 obj._nrToGet=nrToGet 1652 self.getQ.enterGet(obj) 1653 # passivate/block queuing 'get' process 1654 obj._nextTime=None 1655 else: 1656 for i in range(nrToGet): 1657 obj.got.append(self.theBuffer.pop(0)) # move items from 1658 # buffer to requesting process 1659 if self.monitored: 1660 self.bufferMon.observe(y=self.nrBuffered,t=now()) 1661 _e._post(obj,at=_t,prior=1) 1662 # reactivate any put requestors for which space is now available 1663 # serve in queue order: do not serve second in queue before first 1664 # has been served 1665 while len(self.putQ): 1666 proc=self.putQ[0] 1667 if len(proc._whatToPut)+self.nrBuffered<=self.capacity: 1668 for i in proc._whatToPut: 1669 self.theBuffer.append(i) #move items to buffer 1670 if not(self._sort is None): 1671 self.theBuffer=self._sort(self.theBuffer) 1672 if self.monitored: 1673 self.bufferMon.observe(y=self.nrBuffered,t=now()) 1674 self.putQ.takeout(proc) # dequeue requestor's record 1675 _e._post(proc,at=_t) # continue a blocked put requestor 1676 else: 1677 break 1678 else: # items to get determined by filtfunc 1679 movCand=filtfunc(self.theBuffer) 1680 if movCand: # get succeded 1681 _e._post(obj,at=_t,prior=1) 1682 obj.got=movCand[:] 1683 for item in movCand: 1684 self.theBuffer.remove(item) 1685 if self.monitored: 1686 self.bufferMon.observe(y=self.nrBuffered,t=now()) 1687 # reactivate any put requestors for which space is now available 1688 # serve in queue order: do not serve second in queue before first 1689 # has been served 1690 while len(self.putQ): 1691 proc=self.putQ[0] 1692 if len(proc._whatToPut)+self.nrBuffered<=self.capacity: 1693 for i in proc._whatToPut: 1694 self.theBuffer.append(i) #move items to buffer 1695 if not(self._sort is None): 1696 self.theBuffer=self._sort(self.theBuffer) 1697 if self.monitored: 1698 self.bufferMon.observe(y=self.nrBuffered,t=now()) 1699 self.putQ.takeout(proc) # dequeue requestor's record 1700 _e._post(proc,at=_t) # continue a blocked put requestor 1701 else: 1702 break 1703 else: # get did not succeed, block 1704 obj._nrToGet=filtfunc 1705 self.getQ.enterGet(obj) 1706 # passivate/block queuing 'get' process 1707 obj._nextTime=None
1708
1709 -class SimEvent(Lister):
1710 """Supports one-shot signalling between processes. All processes waiting for an event to occur 1711 get activated when its occurrence is signalled. From the processes queuing for an event, only 1712 the first gets activated. 1713 """
1714 - def __init__(self,name="a_SimEvent"):
1715 self.name=name 1716 self.waits=[] 1717 self.queues=[] 1718 self.occurred=False 1719 self.signalparam=None
1720
1721 - def signal(self,param=None):
1722 """Produces a signal to self; 1723 Fires this event (makes it occur). 1724 Reactivates ALL processes waiting for this event. (Cleanup waits lists 1725 of other events if wait was for an event-group (OR).) 1726 Reactivates the first process for which event(s) it is queuing for 1727 have fired. (Cleanup queues of other events if wait was for an event-group (OR).) 1728 """ 1729 self.signalparam=param 1730 if not self.waits and not self.queues: 1731 self.occurred=True 1732 else: 1733 #reactivate all waiting processes 1734 for p in self.waits: 1735 p[0].eventsFired.append(self) 1736 reactivate(p[0],prior=True) 1737 #delete waits entries for this process in other events 1738 for ev in p[1]: 1739 if ev!=self: 1740 if ev.occurred: 1741 p[0].eventsFired.append(ev) 1742 for iev in ev.waits: 1743 if iev[0]==p[0]: 1744 ev.waits.remove(iev) 1745 break 1746 self.waits=[] 1747 if self.queues: 1748 proc=self.queues.pop(0)[0] 1749 proc.eventsFired.append(self) 1750 reactivate(proc)
1751
1752 - def _wait(self,par):
1753 """Consumes a signal if it has occurred, otherwise process 'proc' 1754 waits for this event. 1755 """ 1756 proc=par[0][1] #the process issuing the yield waitevent command 1757 proc.eventsFired=[] 1758 if not self.occurred: 1759 self.waits.append([proc,[self]]) 1760 proc._nextTime=None #passivate calling process 1761 else: 1762 proc.eventsFired.append(self) 1763 self.occurred=False 1764 _e._post(proc,at=_t,prior=1)
1765
1766 - def _waitOR(self,par):
1767 """Handles waiting for an OR of events in a tuple/list. 1768 """ 1769 proc=par[0][1] 1770 evlist=par[0][2] 1771 proc.eventsFired=[] 1772 anyoccur=False 1773 for ev in evlist: 1774 if ev.occurred: 1775 anyoccur=True 1776 proc.eventsFired.append(ev) 1777 ev.occurred=False 1778 if anyoccur: #at least one event has fired; continue process 1779 _e._post(proc,at=_t,prior=1) 1780 1781 else: #no event in list has fired, enter process in all 'waits' lists 1782 proc.eventsFired=[] 1783 proc._nextTime=None #passivate calling process 1784 for ev in evlist: 1785 ev.waits.append([proc,evlist])
1786
1787 - def _queue(self,par):
1788 """Consumes a signal if it has occurred, otherwise process 'proc' 1789 queues for this event. 1790 """ 1791 proc=par[0][1] #the process issuing the yield queueevent command 1792 proc.eventsFired=[] 1793 if not self.occurred: 1794 self.queues.append([proc,[self]]) 1795 proc._nextTime=None #passivate calling process 1796 else: 1797 proc.eventsFired.append(self) 1798 self.occurred=False 1799 _e._post(proc,at=_t,prior=1)
1800
1801 - def _queueOR(self,par):
1802 """Handles queueing for an OR of events in a tuple/list. 1803 """ 1804 proc=par[0][1] 1805 evlist=par[0][2] 1806 proc.eventsFired=[] 1807 anyoccur=False 1808 for ev in evlist: 1809 if ev.occurred: 1810 anyoccur=True 1811 proc.eventsFired.append(ev) 1812 ev.occurred=False 1813 if anyoccur: #at least one event has fired; continue process 1814 _e._post(proc,at=_t,prior=1) 1815 1816 else: #no event in list has fired, enter process in all 'waits' lists 1817 proc.eventsFired=[] 1818 proc._nextTime=None #passivate calling process 1819 for ev in evlist: 1820 ev.queues.append([proc,evlist])
1821 1822 ## begin waituntil functionality
1823 -def _test():
1824 """ 1825 Gets called by simulate after every event, as long as there are processes 1826 waiting in condQ for a condition to be satisfied. 1827 Tests the conditions for all waiting processes. Where condition satisfied, 1828 reactivates that process immediately and removes it from queue. 1829 """ 1830 global condQ 1831 rList=[] 1832 for el in condQ: 1833 if el.cond(): 1834 rList.append(el) 1835 reactivate(el) 1836 for i in rList: 1837 condQ.remove(i) 1838 1839 if not condQ: 1840 _stopWUStepping()
1841
1842 -def _waitUntilFunc(proc,cond):
1843 global condQ 1844 """ 1845 Puts a process 'proc' waiting for a condition into a waiting queue. 1846 'cond' is a predicate function which returns True if the condition is 1847 satisfied. 1848 """ 1849 if not cond(): 1850 condQ.append(proc) 1851 proc.cond=cond 1852 _startWUStepping() #signal 'simulate' that a process is waiting 1853 # passivate calling process 1854 proc._nextTime=None 1855 else: 1856 #schedule continuation of calling process 1857 _e._post(proc,at=_t,prior=1)
1858 1859 1860 ##end waituntil functionality 1861
1862 -def scheduler(till=0):
1863 """Schedules Processes/semi-coroutines until time 'till'. 1864 Deprecated since version 0.5. 1865 """ 1866 simulate(until=till)
1867
1868 -def holdfunc(a):
1869 a[0][1]._hold(a)
1870
1871 -def requestfunc(a):
1872 """Handles 'yield request,self,res' and 'yield (request,self,res),(<code>,self,par)'. 1873 <code> can be 'hold' or 'waitevent'. 1874 """ 1875 if type(a[0][0])==tuple: 1876 ## Compound yield request statement 1877 ## first tuple in ((request,self,res),(xx,self,yy)) 1878 b=a[0][0] 1879 ## b[2]==res (the resource requested) 1880 ##process the first part of the compound yield statement 1881 ##a[1] is the Process instance 1882 b[2]._request(arg=(b,a[1])) 1883 ##deal with add-on condition to command 1884 ##Trigger processes for reneging 1885 class _Holder(Process): 1886 """Provides timeout process""" 1887 def trigger(self,delay): 1888 yield hold,self,delay 1889 if not proc in b[2].activeQ: 1890 reactivate(proc)
1891 1892 class _EventWait(Process): 1893 """Provides event waiting process""" 1894 def trigger(self,event): 1895 yield waitevent,self,event 1896 if not proc in b[2].activeQ: 1897 a[1].eventsFired=self.eventsFired 1898 reactivate(proc) 1899 1900 #activate it 1901 proc=a[0][0][1] # the process to be woken up 1902 actCode=a[0][1][0] 1903 if actCode==hold: 1904 proc._holder=_Holder(name="RENEGE-hold for %s"%proc.name) 1905 ## the timeout delay 1906 activate(proc._holder,proc._holder.trigger(a[0][1][2])) 1907 elif actCode==waituntil: 1908 raise FatalSimerror("Illegal code for reneging: waituntil") 1909 elif actCode==waitevent: 1910 proc._holder=_EventWait(name="RENEGE-waitevent for %s"%proc.name) 1911 ## the event 1912 activate(proc._holder,proc._holder.trigger(a[0][1][2])) 1913 elif actCode==queueevent: 1914 raise FatalSimerror("Illegal code for reneging: queueevent") 1915 else: 1916 raise FatalSimerror("Illegal code for reneging %s"%actCode) 1917 else: 1918 ## Simple yield request command 1919 a[0][2]._request(a) 1920
1921 -def releasefunc(a):
1922 a[0][2]._release(a)
1923
1924 -def passivatefunc(a):
1925 a[0][1]._passivate(a)
1926
1927 -def waitevfunc(a):
1928 #if waiting for one event only (not a tuple or list) 1929 evtpar=a[0][2] 1930 if isinstance(evtpar,SimEvent): 1931 a[0][2]._wait(a) 1932 # else, if waiting for an OR of events (list/tuple): 1933 else: #it should be a list/tuple of events 1934 # call _waitOR for first event 1935 evtpar[0]._waitOR(a)
1936
1937 -def queueevfunc(a):
1938 #if queueing for one event only (not a tuple or list) 1939 evtpar=a[0][2] 1940 if isinstance(evtpar,SimEvent): 1941 a[0][2]._queue(a) 1942 #else, if queueing for an OR of events (list/tuple): 1943 else: #it should be a list/tuple of events 1944 # call _queueOR for first event 1945 evtpar[0]._queueOR(a)
1946
1947 -def waituntilfunc(par):
1948 _waitUntilFunc(par[0][1],par[0][2])
1949
1950 -def getfunc(a):
1951 """Handles 'yield get,self,buffer,what,priority' and 1952 'yield (get,self,buffer,what,priority),(<code>,self,par)'. 1953 <code> can be 'hold' or 'waitevent'. 1954 """ 1955 if type(a[0][0])==tuple: 1956 ## Compound yield request statement 1957 ## first tuple in ((request,self,res),(xx,self,yy)) 1958 b=a[0][0] 1959 ## b[2]==res (the resource requested) 1960 ##process the first part of the compound yield statement 1961 ##a[1] is the Process instance 1962 b[2]._get(arg=(b,a[1])) 1963 ##deal with add-on condition to command 1964 ##Trigger processes for reneging 1965 class _Holder(Process): 1966 """Provides timeout process""" 1967 def trigger(self,delay): 1968 yield hold,self,delay 1969 #if not proc in b[2].activeQ: 1970 if proc in b[2].getQ: 1971 reactivate(proc)
1972 1973 class _EventWait(Process): 1974 """Provides event waiting process""" 1975 def trigger(self,event): 1976 yield waitevent,self,event 1977 if proc in b[2].getQ: 1978 a[1].eventsFired=self.eventsFired 1979 reactivate(proc) 1980 1981 #activate it 1982 proc=a[0][0][1] # the process to be woken up 1983 actCode=a[0][1][0] 1984 if actCode==hold: 1985 proc._holder=_Holder("RENEGE-hold for %s"%proc.name) 1986 ## the timeout delay 1987 activate(proc._holder,proc._holder.trigger(a[0][1][2])) 1988 elif actCode==waituntil: 1989 raise FatalSimerror("Illegal code for reneging: waituntil") 1990 elif actCode==waitevent: 1991 proc._holder=_EventWait(proc.name) 1992 ## the event 1993 activate(proc._holder,proc._holder.trigger(a[0][1][2])) 1994 elif actCode==queueevent: 1995 raise FatalSimerror("Illegal code for reneging: queueevent") 1996 else: 1997 raise FatalSimerror("Illegal code for reneging %s"%actCode) 1998 else: 1999 ## Simple yield request command 2000 a[0][2]._get(a) 2001 2002
2003 -def putfunc(a):
2004 """Handles 'yield put' (simple and compound hold/waitevent) 2005 """ 2006 if type(a[0][0])==tuple: 2007 ## Compound yield request statement 2008 ## first tuple in ((request,self,res),(xx,self,yy)) 2009 b=a[0][0] 2010 ## b[2]==res (the resource requested) 2011 ##process the first part of the compound yield statement 2012 ##a[1] is the Process instance 2013 b[2]._put(arg=(b,a[1])) 2014 ##deal with add-on condition to command 2015 ##Trigger processes for reneging 2016 class _Holder(Process): 2017 """Provides timeout process""" 2018 def trigger(self,delay): 2019 yield hold,self,delay 2020 #if not proc in b[2].activeQ: 2021 if proc in b[2].putQ: 2022 reactivate(proc)
2023 2024 class _EventWait(Process): 2025 """Provides event waiting process""" 2026 def trigger(self,event): 2027 yield waitevent,self,event 2028 if proc in b[2].putQ: 2029 a[1].eventsFired=self.eventsFired 2030 reactivate(proc) 2031 2032 #activate it 2033 proc=a[0][0][1] # the process to be woken up 2034 actCode=a[0][1][0] 2035 if actCode==hold: 2036 proc._holder=_Holder("RENEGE-hold for %s"%proc.name) 2037 ## the timeout delay 2038 activate(proc._holder,proc._holder.trigger(a[0][1][2])) 2039 elif actCode==waituntil: 2040 raise FatalSimerror("Illegal code for reneging: waituntil") 2041 elif actCode==waitevent: 2042 proc._holder=_EventWait("RENEGE-waitevent for %s"%proc.name) 2043 ## the event 2044 activate(proc._holder,proc._holder.trigger(a[0][1][2])) 2045 elif actCode==queueevent: 2046 raise FatalSimerror("Illegal code for reneging: queueevent") 2047 else: 2048 raise FatalSimerror("Illegal code for reneging %s"%actCode) 2049 else: 2050 ## Simple yield request command 2051 a[0][2]._put(a) 2052
2053 -def simulate(until=0):
2054 """Schedules Processes/semi-coroutines until time 'until'""" 2055 2056 """Gets called once. Afterwards, co-routines (generators) return by 2057 'yield' with a cargo: 2058 yield hold, self, <delay>: schedules the "self" process for activation 2059 after <delay> time units.If <,delay> missing, 2060 same as "yield hold,self,0" 2061 2062 yield passivate,self : makes the "self" process wait to be re-activated 2063 2064 yield request,self,<Resource>[,<priority>]: request 1 unit from <Resource> 2065 with <priority> pos integer (default=0) 2066 2067 yield release,self,<Resource> : release 1 unit to <Resource> 2068 2069 yield waitevent,self,<SimEvent>|[<Evt1>,<Evt2>,<Evt3), . . . ]: 2070 wait for one or more of several events 2071 2072 2073 yield queueevent,self,<SimEvent>|[<Evt1>,<Evt2>,<Evt3), . . . ]: 2074 queue for one or more of several events 2075 2076 yield waituntil,self,cond : wait for arbitrary condition 2077 2078 yield get,self,<buffer>[,<WhatToGet>[,<priority>]] 2079 get <WhatToGet> items from buffer (default=1); 2080 <WhatToGet> can be a pos integer or a filter function 2081 (Store only) 2082 2083 yield put,self,<buffer>[,<WhatToPut>[,priority]] 2084 put <WhatToPut> items into buffer (default=1); 2085 <WhatToPut> can be a pos integer (Level) or a list of objects 2086 (Store) 2087 2088 EXTENSIONS: 2089 Request with timeout reneging: 2090 yield (request,self,<Resource>),(hold,self,<patience>) : 2091 requests 1 unit from <Resource>. If unit not acquired in time period 2092 <patience>, self leaves waitQ (reneges). 2093 2094 Request with event-based reneging: 2095 yield (request,self,<Resource>),(waitevent,self,<eventlist>): 2096 requests 1 unit from <Resource>. If one of the events in <eventlist> occurs before unit 2097 acquired, self leaves waitQ (reneges). 2098 2099 Get with timeout reneging (for Store and Level): 2100 yield (get,self,<buffer>,nrToGet etc.),(hold,self,<patience>) 2101 requests <nrToGet> items/units from <buffer>. If not acquired <nrToGet> in time period 2102 <patience>, self leaves <buffer>.getQ (reneges). 2103 2104 Get with event-based reneging (for Store and Level): 2105 yield (get,self,<buffer>,nrToGet etc.),(waitevent,self,<eventlist>) 2106 requests <nrToGet> items/units from <buffer>. If not acquired <nrToGet> before one of 2107 the events in <eventlist> occurs, self leaves <buffer>.getQ (reneges). 2108 2109 2110 2111 Event notices get posted in event-list by scheduler after "yield" or by 2112 "activate"/"reactivate" functions. 2113 2114 """ 2115 global _endtime,_e,_stop,_t,_wustep 2116 _stop=False 2117 2118 if _e is None: 2119 raise FatalSimerror("Simulation not initialized") 2120 if _e._isEmpty(): 2121 message="SimPy: No activities scheduled" 2122 return message 2123 2124 _endtime=until 2125 message="SimPy: Normal exit" 2126 dispatch={hold:holdfunc,request:requestfunc,release:releasefunc, 2127 passivate:passivatefunc,waitevent:waitevfunc,queueevent:queueevfunc, 2128 waituntil:waituntilfunc,get:getfunc,put:putfunc} 2129 commandcodes=dispatch.keys() 2130 commandwords={hold:"hold",request:"request",release:"release",passivate:"passivate", 2131 waitevent:"waitevent",queueevent:"queueevent",waituntil:"waituntil", 2132 get:"get",put:"put"} 2133 nextev=_e._nextev ## just a timesaver 2134 while not _stop and _t<=_endtime: 2135 try: 2136 a=nextev() 2137 if not a[0] is None: 2138 ## 'a' is tuple "(<yield command>, <action>)" 2139 if type(a[0][0])==tuple: 2140 ##allowing for yield (request,self,res),(waituntil,self,cond) 2141 command=a[0][0][0] 2142 else: 2143 command = a[0][0] 2144 if __debug__: 2145 if not command in commandcodes: 2146 raise FatalSimerror("Illegal command: yield %s"%command) 2147 dispatch[command](a) 2148 except FatalSimerror,error: 2149 print "SimPy: "+error.value 2150 sys.exit(1) 2151 except Simerror,error: 2152 message="SimPy: "+error.value 2153 _stop = True 2154 if _wustep: 2155 _test() 2156 _stopWUStepping() 2157 _e=None 2158 return message
2159 2160 2161 if __name__ == "__main__": 2162 print "SimPy.Simulation %s" %__version__ 2163 2164 ############# Test/demo functions #############
2165 - def test_demo():
2166 class Aa(Process): 2167 sequIn=[] 2168 sequOut=[] 2169 def __init__(self,holdtime,name): 2170 Process.__init__(self,name) 2171 self.holdtime=holdtime
2172 2173 def life(self,priority): 2174 for i in range(1): 2175 Aa.sequIn.append(self.name) 2176 print now(),rrr.name,"waitQ:",len(rrr.waitQ),"activeQ:",\ 2177 len(rrr.activeQ) 2178 print "waitQ: ",[(k.name,k._priority[rrr]) for k in rrr.waitQ] 2179 print "activeQ: ",[(k.name,k._priority[rrr]) \ 2180 for k in rrr.activeQ] 2181 assert rrr.n+len(rrr.activeQ)==rrr.capacity, \ 2182 "Inconsistent resource unit numbers" 2183 print now(),self.name,"requests 1 ", rrr.unitName 2184 yield request,self,rrr,priority 2185 print now(),self.name,"has 1 ",rrr.unitName 2186 print now(),rrr.name,"waitQ:",len(rrr.waitQ),"activeQ:",\ 2187 len(rrr.activeQ) 2188 print now(),rrr.name,"waitQ:",len(rrr.waitQ),"activeQ:",\ 2189 len(rrr.activeQ) 2190 assert rrr.n+len(rrr.activeQ)==rrr.capacity, \ 2191 "Inconsistent resource unit numbers" 2192 yield hold,self,self.holdtime 2193 print now(),self.name,"gives up 1",rrr.unitName 2194 yield release,self,rrr 2195 Aa.sequOut.append(self.name) 2196 print now(),self.name,"has released 1 ",rrr.unitName 2197 print "waitQ: ",[(k.name,k._priority[rrr]) for k in rrr.waitQ] 2198 print now(),rrr.name,"waitQ:",len(rrr.waitQ),"activeQ:",\ 2199 len(rrr.activeQ) 2200 assert rrr.n+len(rrr.activeQ)==rrr.capacity, \ 2201 "Inconsistent resource unit numbers" 2202 2203 class Observer(Process): 2204 def __init__(self): 2205 Process.__init__(self) 2206 2207 def observe(self,step,processes,res): 2208 while now()<11: 2209 for i in processes: 2210 print " %s %s: act:%s, pass:%s, term: %s,interr:%s, qu:%s"\ 2211 %(now(),i.name,i.active(),i.passive(),i.terminated()\ 2212 ,i.interrupted(),i.queuing(res)) 2213 print 2214 yield hold,self,step 2215 2216 print"\n+++test_demo output" 2217 print "****First case == priority queue, resource service not preemptable" 2218 initialize() 2219 rrr=Resource(5,name="Parking",unitName="space(s)", qType=PriorityQ, 2220 preemptable=0) 2221 procs=[] 2222 for i in range(10): 2223 z=Aa(holdtime=i,name="Car "+str(i)) 2224 procs.append(z) 2225 activate(z,z.life(priority=i)) 2226 o=Observer() 2227 activate(o,o.observe(1,procs,rrr)) 2228 a=simulate(until=10000) 2229 print a 2230 print "Input sequence: ",Aa.sequIn 2231 print "Output sequence: ",Aa.sequOut 2232 2233 print "\n****Second case == priority queue, resource service preemptable" 2234 initialize() 2235 rrr=Resource(5,name="Parking",unitName="space(s)", qType=PriorityQ, 2236 preemptable=1) 2237 procs=[] 2238 for i in range(10): 2239 z=Aa(holdtime=i,name="Car "+str(i)) 2240 procs.append(z) 2241 activate(z,z.life(priority=i)) 2242 o=Observer() 2243 activate(o,o.observe(1,procs,rrr)) 2244 Aa.sequIn=[] 2245 Aa.sequOut=[] 2246 a=simulate(until=10000) 2247 print a 2248 print "Input sequence: ",Aa.sequIn 2249 print "Output sequence: ",Aa.sequOut 2250
2251 - def test_interrupt():
2252 class Bus(Process): 2253 def __init__(self,name): 2254 Process.__init__(self,name)
2255 2256 def operate(self,repairduration=0): 2257 print now(),">> %s starts" %(self.name) 2258 tripleft = 1000 2259 while tripleft > 0: 2260 yield hold,self,tripleft 2261 if self.interrupted(): 2262 print "interrupted by %s" %self.interruptCause.name 2263 print "%s: %s breaks down " %(now(),self.name) 2264 tripleft=self.interruptLeft 2265 self.interruptReset() 2266 print "tripleft ",tripleft 2267 reactivate(br,delay=repairduration) # breakdowns only during operation 2268 yield hold,self,repairduration 2269 print now()," repaired" 2270 else: 2271 break # no breakdown, ergo bus arrived 2272 print now(),"<< %s done" %(self.name) 2273 2274 class Breakdown(Process): 2275 def __init__(self,myBus): 2276 Process.__init__(self,name="Breakdown "+myBus.name) 2277 self.bus=myBus 2278 2279 def breakBus(self,interval): 2280 2281 while True: 2282 yield hold,self,interval 2283 if self.bus.terminated(): break 2284 self.interrupt(self.bus) 2285 2286 print"\n\n+++test_interrupt" 2287 initialize() 2288 b=Bus("Bus 1") 2289 activate(b,b.operate(repairduration=20)) 2290 br=Breakdown(b) 2291 activate(br,br.breakBus(200)) 2292 print simulate(until=4000) 2293
2294 - def testSimEvents():
2295 class Waiter(Process): 2296 def waiting(self,theSignal): 2297 while True: 2298 yield waitevent,self,theSignal 2299 print "%s: process '%s' continued after waiting for %s"%(now(),self.name,theSignal.name) 2300 yield queueevent,self,theSignal 2301 print "%s: process '%s' continued after queueing for %s"%(now(),self.name,theSignal.name)
2302 2303 class ORWaiter(Process): 2304 def waiting(self,signals): 2305 while True: 2306 yield waitevent,self,signals 2307 print now(),"one of %s signals occurred"%[x.name for x in signals] 2308 print "\t%s (fired/param)"%[(x.name,x.signalparam) for x in self.eventsFired] 2309 yield hold,self,1 2310 2311 class Caller(Process): 2312 def calling(self): 2313 while True: 2314 signal1.signal("wake up!") 2315 print "%s: signal 1 has occurred"%now() 2316 yield hold,self,10 2317 signal2.signal("and again") 2318 signal2.signal("sig 2 again") 2319 print "%s: signal1, signal2 have occurred"%now() 2320 yield hold,self,10 2321 print"\n+++testSimEvents output" 2322 initialize() 2323 signal1=SimEvent("signal 1") 2324 signal2=SimEvent("signal 2") 2325 signal1.signal("startup1") 2326 signal2.signal("startup2") 2327 w1=Waiter("waiting for signal 1") 2328 activate(w1,w1.waiting(signal1)) 2329 w2=Waiter("waiting for signal 2") 2330 activate(w2,w2.waiting(signal2)) 2331 w3=Waiter("also waiting for signal 2") 2332 activate(w3,w3.waiting(signal2)) 2333 w4=ORWaiter("waiting for either signal 1 or signal 2") 2334 activate(w4,w4.waiting([signal1,signal2]),prior=True) 2335 c=Caller("Caller") 2336 activate(c,c.calling()) 2337 print simulate(until=100) 2338
2339 - def testwaituntil():
2340 """ 2341 Demo of waitUntil capability. 2342 2343 Scenario: 2344 Three workers require sets of tools to do their jobs. Tools are shared, scarce 2345 resources for which they compete. 2346 """ 2347 class Worker(Process): 2348 def __init__(self,name,heNeeds=[]): 2349 Process.__init__(self,name) 2350 self.heNeeds=heNeeds
2351 def work(self): 2352 2353 def workerNeeds(): 2354 for item in self.heNeeds: 2355 if item.n==0: 2356 return False 2357 return True 2358 2359 while now()<8*60: 2360 yield waituntil,self,workerNeeds 2361 for item in self.heNeeds: 2362 yield request,self,item 2363 print "%s %s has %s and starts job" %(now(),self.name, 2364 [x.name for x in self.heNeeds]) 2365 yield hold,self,random.uniform(10,30) 2366 for item in self.heNeeds: 2367 yield release,self,item 2368 yield hold,self,2 #rest 2369 2370 print "\n+++ nwaituntil demo output" 2371 initialize() 2372 brush=Resource(capacity=1,name="brush") 2373 ladder=Resource(capacity=2,name="ladder") 2374 hammer=Resource(capacity=1,name="hammer") 2375 saw=Resource(capacity=1,name="saw") 2376 painter=Worker("painter",[brush,ladder]) 2377 activate(painter,painter.work()) 2378 roofer=Worker("roofer",[hammer,ladder,ladder]) 2379 activate(roofer,roofer.work()) 2380 treeguy=Worker("treeguy",[saw,ladder]) 2381 activate(treeguy,treeguy.work()) 2382 for who in (painter,roofer,treeguy): 2383 print "%s needs %s for his job" %(who.name,[x.name for x in who.heNeeds]) 2384 print 2385 print simulate(until=9*60) 2386 2387 ## ------------------------------------------------------------- 2388 ## TEST COMPOUND "YIELD REQUEST" COMMANDS 2389 ## ------------------------------------------------------------- 2390 2391 ## ------------------------------------------------------------- 2392 ## TEST "yield (request,self,res),(hold,self,delay)" 2393 ## == timeout renege 2394 ## ------------------------------------------------------------- 2395
2396 - class JobTO(Process):
2397 """ Job class for testing timeout reneging 2398 """
2399 - def __init__(self,server=None,name=""):
2400 Process.__init__(self,name) 2401 self.res=server 2402 self.gotResource=None
2403
2404 - def execute(self,timeout,usetime):
2405 yield (request,self,self.res),(hold,self,timeout) 2406 if self.acquired(self.res): 2407 self.gotResource=True 2408 yield hold,self,usetime 2409 yield release,self,self.res 2410 else: 2411 self.gotResource=False
2412
2413 - def testNoTimeout():
2414 """Test that resource gets acquired without timeout 2415 """ 2416 res=Resource(name="Server",capacity=1) 2417 initialize() 2418 usetime=5 2419 timeout=1000000 2420 j1=JobTO(server=res,name="Job_1") 2421 activate(j1,j1.execute(timeout=timeout,usetime=usetime)) 2422 j2=JobTO(server=res,name="Job_2") 2423 activate(j2,j2.execute(timeout=timeout,usetime=usetime)) 2424 simulate(until=2*usetime) 2425 assert now()==2*usetime,"time not ==2*usetime" 2426 assert j1.gotResource and j2.gotResource,\ 2427 "at least one job failed to get resource" 2428 assert not (res.waitQ or res.activeQ),\ 2429 "job waiting or using resource"
2430
2431 - def testTimeout1():
2432 """Test that timeout occurs when resource busy 2433 """ 2434 res=Resource(name="Server",capacity=1,monitored=True) 2435 initialize() 2436 usetime=5 2437 timeout=3 2438 j1=JobTO(server=res,name="Job_1") 2439 activate(j1,j1.execute(timeout=timeout,usetime=usetime)) 2440 j2=JobTO(server=res,name="Job_2") 2441 activate(j2,j2.execute(timeout=timeout,usetime=usetime)) 2442 simulate(until=2*usetime) 2443 assert(now()==usetime),"time not ==usetime" 2444 assert(j1.gotResource),"Job_1 did not get resource" 2445 assert(not j2.gotResource),"Job_2 did not renege" 2446 assert not (res.waitQ or res.activeQ),\ 2447 "job waiting or using resource"
2448
2449 - def testTimeout2():
2450 """Test that timeout occurs when resource has no capacity free 2451 """ 2452 res=Resource(name="Server",capacity=0) 2453 initialize() 2454 usetime=5 2455 timeout=3 2456 j1=JobTO(server=res,name="Job_1") 2457 activate(j1,j1.execute(timeout=timeout,usetime=usetime)) 2458 j2=JobTO(server=res,name="Job_2") 2459 activate(j2,j2.execute(timeout=timeout,usetime=usetime)) 2460 simulate(until=2*usetime) 2461 assert now()==timeout,"time %s not == timeout"%now() 2462 assert not j1.gotResource,"Job_1 got resource" 2463 assert not j2.gotResource,"Job_2 got resource" 2464 assert not (res.waitQ or res.activeQ),\ 2465 "job waiting or using resource"
2466 2467 ## ------------------------------------------------------------------ 2468 ## TEST "yield (request,self,res),(waitevent,self,event)" 2469 ## == event renege 2470 ## ------------------------------------------------------------------
2471 - class JobEvt(Process):
2472 """ Job class for testing event reneging 2473 """
2474 - def __init__(self,server=None,name=""):
2475 Process.__init__(self,name) 2476 self.res=server 2477 self.gotResource=None
2478
2479 - def execute(self,event,usetime):
2480 yield (request,self,self.res),(waitevent,self,event) 2481 if self.acquired(self.res): 2482 self.gotResource=True 2483 yield hold,self,usetime 2484 yield release,self,self.res 2485 else: 2486 self.gotResource=False
2487
2488 - class JobEvtMulti(Process):
2489 """ Job class for testing event reneging with multi-event lists 2490 """
2491 - def __init__(self,server=None,name=""):
2492 Process.__init__(self,name) 2493 self.res=server 2494 self.gotResource=None
2495
2496 - def execute(self,eventlist,usetime):
2497 yield (request,self,self.res),(waitevent,self,eventlist) 2498 if self.acquired(self.res): 2499 self.gotResource=True 2500 yield hold,self,usetime 2501 yield release,self,self.res 2502 else: 2503 self.gotResource=False
2504
2505 - class FireEvent(Process):
2506 """Fires reneging event 2507 """
2508 - def fire(self,fireDelay,event):
2509 yield hold,self,fireDelay 2510 event.signal()
2511
2512 - def testNoEvent():
2513 """Test that processes acquire resource normally if no event fires 2514 """ 2515 res=Resource(name="Server",capacity=1) 2516 event=SimEvent("Renege_trigger") #never gets fired 2517 initialize() 2518 usetime=5 2519 j1=JobEvt(server=res,name="Job_1") 2520 activate(j1,j1.execute(event=event,usetime=usetime)) 2521 j2=JobEvt(server=res,name="Job_2") 2522 activate(j2,j2.execute(event=event,usetime=usetime)) 2523 simulate(until=2*usetime) 2524 # Both jobs should get server (in sequence) 2525 assert now()==2*usetime,"time not ==2*usetime" 2526 assert j1.gotResource and j2.gotResource,\ 2527 "at least one job failed to get resource" 2528 assert not (res.waitQ or res.activeQ),\ 2529 "job waiting or using resource"
2530
2531 - def testWaitEvent1():
2532 """Test that signalled event leads to renege when resource busy 2533 """ 2534 res=Resource(name="Server",capacity=1) 2535 initialize() 2536 event=SimEvent("Renege_trigger") 2537 usetime=5 2538 eventtime=1 2539 j1=JobEvt(server=res,name="Job_1") 2540 activate(j1,j1.execute(event=event,usetime=usetime)) 2541 j2=JobEvt(server=res,name="Job_2") 2542 activate(j2,j2.execute(event=event,usetime=usetime)) 2543 f=FireEvent(name="FireEvent") 2544 activate(f,f.fire(fireDelay=eventtime,event=event)) 2545 simulate(until=2*usetime) 2546 # Job_1 should get server, Job_2 renege 2547 assert(now()==usetime),"time not ==usetime" 2548 assert(j1.gotResource),"Job_1 did not get resource" 2549 assert(not j2.gotResource),"Job_2 did not renege" 2550 assert not (res.waitQ or res.activeQ),\ 2551 "job waiting or using resource"
2552
2553 - def testWaitEvent2():
2554 """Test that renege-triggering event can be one of an event list 2555 """ 2556 res=Resource(name="Server",capacity=1) 2557 initialize() 2558 event1=SimEvent("Renege_trigger_1") 2559 event2=SimEvent("Renege_trigger_2") 2560 usetime=5 2561 eventtime=1 #for both events 2562 j1=JobEvtMulti(server=res,name="Job_1") 2563 activate(j1,j1.execute(eventlist=[event1,event2],usetime=usetime)) 2564 j2=JobEvtMulti(server=res,name="Job_2") 2565 activate(j2,j2.execute(eventlist=[event1,event2],usetime=usetime)) 2566 f1=FireEvent(name="FireEvent_1") 2567 activate(f1,f1.fire(fireDelay=eventtime,event=event1)) 2568 f2=FireEvent(name="FireEvent_2") 2569 activate(f2,f2.fire(fireDelay=eventtime,event=event2)) 2570 simulate(until=2*usetime) 2571 # Job_1 should get server, Job_2 should renege 2572 assert(now()==usetime),"time not ==usetime" 2573 assert(j1.gotResource),"Job_1 did not get resource" 2574 assert(not j2.gotResource),"Job_2 did not renege" 2575 assert not (res.waitQ or res.activeQ),\ 2576 "job waiting or using resource"
2577 2578 ## Run tests 2579 testNoTimeout() 2580 testTimeout1() 2581 testTimeout2() 2582 testNoEvent() 2583 testWaitEvent1() 2584 testWaitEvent2() 2585 test_demo() 2586 test_interrupt() 2587 testwaituntil() 2588