1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import errno
23 import os
24 import tempfile
25 import time
26 import stat
27
28 from twisted.internet import defer, threads, protocol, reactor, utils
29
30 from flumotion.common import log, common, python, format, errors
31
32 from flumotion.component.misc.httpserver import fileprovider
33
34 LOG_CATEGORY = "cache-manager"
35
36 DEFAULT_CACHE_SIZE = 1000 * 1024 * 1024
37 DEFAULT_CACHE_DIR = "/tmp/httpserver"
38 DEFAULT_CLEANUP_ENABLED = True
39 DEFAULT_CLEANUP_HIGH_WATERMARK = 1.0
40 DEFAULT_CLEANUP_LOW_WATERMARK = 0.6
41 ID_CACHE_MAX_SIZE = 1024
42 TEMP_FILE_POSTFIX = ".tmp"
43
44
46
47 logCategory = LOG_CATEGORY
48
49 - def __init__(self, stats,
50 cacheDir = None,
51 cacheSize = None,
52 cleanupEnabled = None,
53 cleanupHighWatermark = None,
54 cleanupLowWatermark = None,
55 cacheRealm = None):
56
57 if cacheDir is None:
58 cacheDir = DEFAULT_CACHE_DIR
59 if cacheSize is None:
60 cacheSize = DEFAULT_CACHE_SIZE
61 if cleanupEnabled is None:
62 cleanupEnabled = DEFAULT_CLEANUP_ENABLED
63 if cleanupHighWatermark is None:
64 cleanupHighWatermark = DEFAULT_CLEANUP_HIGH_WATERMARK
65 if cleanupLowWatermark is None:
66 cleanupLowWatermark = DEFAULT_CLEANUP_LOW_WATERMARK
67
68 self.stats = stats
69 self._cacheDir = cacheDir
70 self._cacheSize = cacheSize
71 self._cleanupEnabled = cleanupEnabled
72 highWatermark = max(0.0, min(1.0, float(cleanupHighWatermark)))
73 lowWatermark = max(0.0, min(1.0, float(cleanupLowWatermark)))
74
75 self._cachePrefix = (cacheRealm and (cacheRealm + ":")) or ""
76
77 self._identifiers = {}
78
79 self.info("Cache Manager initialized")
80 self.debug("Cache directory: '%s'", self._cacheDir)
81 self.debug("Cache size: %d bytes", self._cacheSize)
82 self.debug("Cache cleanup enabled: %s", self._cleanupEnabled)
83
84 common.ensureDir(self._cacheDir, "cache")
85
86 self._cacheUsage = None
87 self._cacheUsageLastUpdate = None
88 self._lastCacheTime = None
89
90 self._cacheMaxUsage = self._cacheSize * highWatermark
91 self._cacheMinUsage = self._cacheSize * lowWatermark
92
94 """
95 Initialize the cache manager
96
97 @return a defer
98 @raise: OSError or FlumotionError
99 """
100
101 return self.updateCacheUsage()
102
104 """
105 The returned identifier is a digest of the path encoded in hex string.
106 The hash function used is SHA1.
107 It caches the identifiers in a dictionary indexed by path and with
108 a maximum number of entry specified by the constant ID_CACHE_MAX_SIZE.
109
110 @return: an identifier for path.
111 """
112 ident = self._identifiers.get(path, None)
113 if ident is None:
114 hash = python.sha1()
115 hash.update(self._cachePrefix + path)
116 ident = hash.digest().encode("hex").strip('\n')
117
118 if len(self._identifiers) >= ID_CACHE_MAX_SIZE:
119 self._identifiers.clear()
120 self._identifiers[path] = ident
121 return ident
122
124 """
125 @return: the cached file path for a path.
126 """
127 ident = self.getIdentifier(path)
128 return os.path.join(self._cacheDir, ident)
129
131 """
132 @return: a temporary file path for a path.
133
134 Don't use this function, it's provided for compatibility.
135 Use newTempFile() instead.
136 """
137 ident = self.getIdentifier(path)
138 return os.path.join(self._cacheDir, ident + TEMP_FILE_POSTFIX)
139
142
149
151 """
152 @return: a defered with the cache usage in bytes.
153 @raise: OSError or FlumotionError
154 """
155
156
157
158 try:
159 cacheTime = os.path.getmtime(self._cacheDir)
160 except OSError, e:
161 return defer.fail(e)
162
163 if ((self._cacheUsage is None) or (self._lastCacheTime < cacheTime)):
164 self._lastCacheTime = cacheTime
165 self.log('Getting disk usage for path %r', self._cacheDir)
166 d = utils.getProcessOutput('du', ['-bs', self._cacheDir])
167 d.addCallback(lambda o: int(o.split('\t', 1)[0]))
168 d.addCallback(self._updateCacheUsage)
169 return d
170 else:
171 return defer.succeed(self._cacheUsage)
172
174 try:
175 for path in files:
176 os.remove(path)
177 except OSError, e:
178 if e.errno != errno.ENOENT:
179
180 self.warning("Error cleaning cached file: %s", str(e))
181
183
184 self._cacheUsage = usage
185 self._cacheUsageLastUpdate = time.time()
186 return usage
187
189
190 self.stats.onCleanup()
191
192 try:
193 listdir = os.listdir(self._cacheDir)
194 except OSError, e:
195 return defer.fail(e)
196
197 files = []
198 for f in listdir:
199 f = os.path.join(self._cacheDir, f)
200
201 try:
202 files.append((f, os.stat(f)))
203 except OSError, e:
204 if e.errno == errno.ENOENT:
205 pass
206 else:
207 return defer.fail(e)
208
209
210 usage = sum([d[1].st_size for d in files])
211
212 files.sort(key=lambda d: d[1].st_atime)
213 rmlist = []
214 for path, info in files:
215 usage -= info.st_size
216 rmlist.append(path)
217 if usage <= self._cacheMinUsage:
218
219 self.debug('cleaned up, cache use is now %sbytes',
220 format.formatStorage(usage))
221 break
222 d = threads.deferToThread(self._rmfiles, rmlist)
223 d.addBoth(self._setCacheUsage, usage)
224 return d
225
239
258
260 """
261 Try to reserve cache space.
262
263 If there is not enough space and the cache cleanup is enabled,
264 it will delete files from the cache starting with the ones
265 with oldest access time until the cache usage drops below
266 the fraction specified by the property cleanup-low-threshold.
267
268 Returns a 'tag' that should be used to 'free' the cache space
269 using releaseCacheSpace.
270 This tag is needed to better estimate the cache usage,
271 if the cache usage has been updated since cache space
272 has been allocated, freeing up the space should not change
273 the cache usage estimation.
274
275 @param size: size to reserve, in bytes
276 @type size: int
277
278 @return: an allocation tag or None if the allocation failed.
279 @rtype: defer to tuple
280 """
281 d = self.updateCacheUsage()
282 d.addCallback(self._allocateCacheSpace, size)
283 return d
284
286 """
287 Low-level function to release reserved cache space.
288 """
289 lastUpdate, size = tag
290 if lastUpdate == self._cacheUsageLastUpdate:
291 self._cacheUsage -= size
292 self.updateCacheUsageStatistics()
293
302
304
305 if tag is None:
306 return None
307
308 try:
309 return TempFile(self, path, tag, size, mtime)
310 except OSError, e:
311 return None
312
320
321
323 """
324 Read only.
325
326 See cachedprovider.py
327 @raise: OSError
328 """
329
341
343 """
344 Delete the cached file from filesystem, unless the current
345 file is more recent. However, this is not done atomically...
346 """
347 try:
348 s = os.stat(self.name)
349 if (s[stat.ST_MTIME] > self.stat[stat.ST_MTIME]):
350 return
351 os.unlink(self.name)
352 except OSError, e:
353 pass
354
356 file = self.__dict__['file']
357 a = getattr(file, name)
358 if type(a) != type(0):
359 setattr(self, name, a)
360 return a
361
362
364 """
365 See cachedprovider.py
366 """
367
368 - def __init__(self, cachemgr, resPath, tag, size, mtime=None):
369 """
370 @raise: OSError
371 """
372 self.tag = tag
373 self.cachemgr = cachemgr
374 self._completed = False
375 self._finishPath = cachemgr.getCachePath(resPath)
376 self.mtime = mtime
377 self.file = None
378 self.size = size
379
380 fd, tempPath = tempfile.mkstemp(TEMP_FILE_POSTFIX,
381 LOG_CATEGORY, cachemgr._cacheDir)
382 cachemgr.log("Created temporary file '%s' [fd %d]",
383 tempPath, fd)
384 self.file = os.fdopen(fd, "w+b")
385 cachemgr.log("Truncating temporary file to size %d", size)
386 self.file.truncate(size)
387 self.stat = os.fstat(self.file.fileno())
388 self.name = tempPath
389
391 file = self.__dict__['file']
392 a = getattr(file, name)
393 if type(a) != type(0):
394 setattr(self, name, a)
395 return a
396
398 """
399 Set file modification time.
400 """
401 if (mtime):
402 self.mtime = mtime
403 try:
404 if self.mtime:
405 mtime = self.mtime
406 atime = int(time.time())
407 self.cachemgr.log("Setting cache file "
408 "modification time to %d", mtime)
409
410 os.utime(self.name, (atime, mtime))
411 except OSError, e:
412 if e.errno == errno.ENOENT:
413 self.cachemgr.releaseCacheSpace(self.tag)
414 else:
415 self.cachemgr.warning(
416 "Failed to update modification time of temporary "
417 "file: %s", log.getExceptionMessage(e))
418
420 """
421 @raise: OSError
422 """
423 if self.cachemgr is None:
424 return
425
426 try:
427 if not self._completed:
428 self.cachemgr.log("Temporary file canceled '%s' [fd %d]",
429 self.name, self.fileno())
430 self.cachemgr.releaseCacheSpace(self.tag)
431 os.unlink(self.name)
432 except OSError, e:
433 pass
434
435 self.file.close()
436 self.setModificationTime()
437 self.file = None
438 self.cachemgr = None
439
441 """
442 @raise: OSError
443 @raise: IOError
444 allocated size
445 """
446 if (self.file.tell() + len(str) > self.size):
447 raise IOError("Cache size overrun (%d > %d)" %
448 (self.file.tell() + len(str), self.size))
449 return self.file.write(str)
450
452 """
453 Make the temporary file available as a cached file.
454 Do NOT close the file, afterward the file can be used
455 as a normal CachedFile instance.
456 Do not raise exceptions on rename error.
457
458 @raise: IOError if checkSize and tell() != size
459 """
460 if self.cachemgr is None:
461 return
462 if self._completed:
463 return
464 self._completed = True
465
466 _, size = self.tag
467 if (self.tell() != size and checkSize):
468 raise IOError("Did not reach end of file")
469
470 self.cachemgr.log("Temporary file completed '%s' [fd %d]",
471 self.name, self.fileno())
472 try:
473 if self.mtime is not None:
474 mtime = os.path.getmtime(self._finishPath)
475 if mtime > self.mtime:
476 self.cachemgr.log("Did not complete(), "
477 "a more recent version exists already")
478 os.unlink(self.name)
479 self.name = self._finishPath
480 return
481 except OSError, e:
482 pass
483
484 try:
485 os.rename(self.name, self._finishPath)
486 except OSError, e:
487 if e.errno == errno.ENOENT:
488 self.cachemgr.releaseCacheSpace(self.tag)
489 self.cachemgr.warning(
490 "Failed to rename file '%s': %s" %
491 (self.name, str(e)))
492 return
493
494 self.setModificationTime()
495
496 self.name = self._finishPath
497 self.cachemgr.log("Temporary file renamed to '%s' [fd %d]",
498 self._finishPath, self.fileno())
499
500
501 -def main(argv=None):
502
503 import random
504
505 CACHE_SIZE = 1 * 1024 * 1024
506 MAX_CLEANUPS = 512
507
508 class DummyStats:
509
510 def __init__(self):
511 self.oncleanup = 0
512
513 def info():
514 pass
515
516 def onEstimateCacheUsage(self, usage, size):
517
518
519 pass
520
521 def onCleanup(self):
522 self.oncleanup += 1
523 print "OnCleanup"
524
525 def makeTemp(tag, size, m, name):
526 t = TempFile(m, name, tag, size)
527 return t
528
529 def completeAndClose(t):
530 try:
531 t.complete()
532 t.close()
533 except:
534 print "Got a complete exception"
535
536 def fillTestCache(manager):
537 i = 0
538 while (manager.stats.oncleanup < MAX_CLEANUPS):
539 i += 1
540 filesize = 4096 * random.randint(1, 30)
541 d = manager.newTempFile(str(i), filesize)
542 d.addCallback(completeAndClose)
543
544 def releaseCacheSpace(tag, m):
545 print "gotCacheSpace: ", tag
546 m.releaseCacheSpace(tag)
547
548 def checkUsage(usage, m, check):
549 if (not check(m._cacheUsage)):
550 print "Cache overrun!!! %d/%d" % (m._cacheUsage, m._cacheSize)
551
552 def openCacheAndClose(_, m, name):
553 d = m.openCacheFile(name)
554 d.addCallback(lambda f: f.close())
555 return d
556
557 def checkMiss(_):
558 if (_ == "cacheMiss"):
559 return
560 raise errors.FlumotionError("an error")
561
562 def runTests():
563
564 d = m.allocateCacheSpace(1024)
565 d.addCallback(releaseCacheSpace, m)
566 d.addCallback(checkUsage, m, lambda u: u == 0)
567
568 d = m.allocateCacheSpace(CACHE_SIZE / 2)
569 d.addCallback(makeTemp, CACHE_SIZE / 2, m, "test")
570 d.addCallback(lambda t: t.close())
571 d.addCallback(checkUsage, m, lambda u: u == 0)
572
573 d = m.allocateCacheSpace(CACHE_SIZE / 2)
574 d.addCallback(makeTemp, CACHE_SIZE / 2, m, "test2")
575 d.addCallback(completeAndClose)
576 d.addCallback(checkUsage, m, lambda u: u > 0)
577
578
579 m2 = CacheManager(DummyStats(), cachedir, CACHE_SIZE, True, 0.5, 0.3)
580 d = m2.newTempFile("test3", 12000)
581 d.addCallback(completeAndClose)
582 d.addCallback(openCacheAndClose, m, "test3")
583
584 d = openCacheAndClose(None, m, "test4_do_not_exists")
585 d.addErrback(lambda _: "cacheMiss")
586 d.addCallback(checkMiss)
587
588
589 threads.deferToThread(fillTestCache, m)
590 threads.deferToThread(fillTestCache, m)
591 threads.deferToThread(fillTestCache, m)
592
593
594 m.updateCacheUsage().addCallback(checkUsage, m,
595 lambda u: u < CACHE_SIZE * 1.10)
596
597
598 cachedir = os.environ['HOME'] + "/tmp/cache"
599 m = CacheManager(DummyStats(), cachedir, CACHE_SIZE, True, 0.0, 0.0)
600 d = m.setUp()
601
602 m.addCallback(lambda x: runTests())
603
604 reactor.callLater(3, reactor.stop)
605 reactor.run()
606 return 0
607
608 if __name__ == '__main__':
609 import sys
610 status = main()
611 sys.exit(status)
612