1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import time
23 import datetime
24
25 from twisted.internet import reactor
26
27 from flumotion.common import log, eventcalendar
28 from flumotion.component.base import watcher
29
30 __version__ = "$Rev: 7587 $"
31
32
34 return max(td.days * 24 * 60 * 60 + td.seconds + td.microseconds / 1e6, 0)
35
36
38 """
39 I provide notifications when events start and end.
40 I use a L{eventcalendar.Calendar} for scheduling.
41
42 @cvar windowSize: how much time to look ahead when scheduling
43 @type windowSize: L{datetime.timedelta}
44 """
45 windowSize = datetime.timedelta(days=1)
46
48 self._delayedCall = None
49 self._subscribeId = 0
50 self._subscribers = {}
51 self._nextStart = 0
52 self._calendar = None
53
54
55
57 """
58 Return the calendar used for scheduling.
59
60 @rtype: L{eventcalendar.Calendar}
61 """
62 return self._calendar
63
65 """
66 Set the given calendar to use for scheduling.
67
68 This function will send start notifications for all new events that
69 should currently be in progress, if they were not registered in
70 the old calendar or if there was no old calendar.
71
72 If the scheduler previously had a calendar, it will send end
73 notifications for all events currently in progress that are not in the
74 new calendar.
75
76 @param calendar: the new calendar to set
77 @type calendar: L{eventcalendar.Calendar}
78 @param when: the time at which to consider the calendar to be set;
79 defaults to now
80 @type when: L{datetime.datetime}
81 """
82 if not self._calendar:
83 self.debug('Setting new calendar %r', calendar)
84 else:
85 self.debug('Replacing existing calendar %r with new %r',
86 self._calendar, calendar)
87
88
89
90 if not when:
91 when = datetime.datetime.now(eventcalendar.UTC)
92
93
94
95 oldInstances = []
96 if self._calendar:
97 oldInstances = self._calendar.getActiveEventInstances(when)
98 oldInstancesContent = [i.event.content for i in oldInstances]
99
100 newInstances = calendar.getActiveEventInstances(when)
101 newInstancesContent = [i.event.content for i in newInstances]
102
103
104
105
106 for instance in oldInstances:
107 if instance.event.content not in newInstancesContent:
108 self.debug(
109 'old active %r for %r not in new calendar, ending',
110 instance, instance.event.content)
111 self._eventInstanceEnded(instance)
112
113 for instance in newInstances:
114 if instance.event.content not in oldInstancesContent:
115 self.debug(
116 'new active %r for %r not in old calendar, starting',
117 instance, instance.event.content)
118 self._eventInstanceStarted(instance)
119
120 self._calendar = calendar
121 self._reschedule()
122
124 """
125 Get all points on this scheduler's event horizon.
126 """
127 if not when:
128 when = datetime.datetime.now(eventcalendar.LOCAL)
129
130 self.debug('getPoints at %s', str(when))
131 earliest = when + self.windowSize
132
133 points = self._calendar.getPoints(when, self.windowSize)
134
135 self.debug('%d points in given windowsize %s',
136 len(points), str(self.windowSize))
137
138 return points
139
141 """
142 Clean up all resources used by this scheduler.
143
144 This cancels all pending scheduling calls.
145 """
146 self._cancelScheduledCalls()
147
148
149
150 - def subscribe(self, eventInstanceStarted, eventInstanceEnded):
151 """
152 Subscribe to event happenings in the scheduler.
153
154 @param eventInstanceStarted: function that will be called when an
155 event instance starts
156 @type eventInstanceStarted: function with signature L{EventInstance}
157 @param eventInstanceEnded: function that will be called when an
158 event instance ends
159 @type eventInstanceEnded: function with signature L{EventInstance}
160
161 @rtype: int
162 @returns: A subscription ID that can later be passed to
163 unsubscribe().
164 """
165 sid = self._subscribeId
166 self._subscribeId += 1
167 self._subscribers[sid] = (eventInstanceStarted, eventInstanceEnded)
168 return sid
169
171 """
172 Unsubscribe from event happenings in the scheduler.
173
174 @type id: int
175 @param id: Subscription ID received from subscribe()
176 """
177 del self._subscribers[id]
178
180 self.debug('notifying %d subscribers of start of instance %r',
181 len(self._subscribers), eventInstance)
182 for started, _ in self._subscribers.values():
183 started(eventInstance)
184
186 self.debug('notifying %d subscribers of end of instance %r',
187 len(self._subscribers), eventInstance)
188 for _, ended in self._subscribers.values():
189 ended(eventInstance)
190
191
192
194
195 start = time.time()
196
197 self.debug("reschedule events")
198 self._cancelScheduledCalls()
199
200 now = datetime.datetime.now(eventcalendar.LOCAL)
201
202 def _getNextPoints():
203
204
205 self.debug('_getNextPoints at %s', str(now))
206 result = []
207
208 points = self.getPoints(now)
209
210 if not points:
211 return result
212
213 earliest = points[0].dt
214 for point in points:
215 if point.dt > earliest:
216 break
217 result.append(point)
218
219 if result:
220 self.debug('%d points at %s, first point is for %r',
221 len(result), str(result[0].dt),
222 result[0].eventInstance.event.content)
223
224 return result
225
226 def _handlePoints(points):
227 for point in points:
228 self.debug(
229 "handle %s event %r in %s at %s",
230 point.which,
231 point.eventInstance.event.content,
232 str(point.dt - now),
233 point.dt)
234 if point.which == 'start':
235 self._eventInstanceStarted(point.eventInstance)
236 elif point.which == 'end':
237 self._eventInstanceEnded(point.eventInstance)
238
239 self._reschedule()
240
241 points = _getNextPoints()
242
243 if points:
244 seconds = _timedeltaToSeconds(points[0].dt - now)
245 self.debug(
246 "schedule next point at %s in %.2f seconds",
247 str(points[0].dt), seconds)
248 dc = reactor.callLater(seconds, _handlePoints, points)
249
250 else:
251 self.debug(
252 "schedule rescheduling in %s", str(self.windowSize / 2))
253 seconds = _timedeltaToSeconds(self.windowSize / 2)
254 dc = reactor.callLater(seconds, self._reschedule)
255 self._nextStart = seconds
256 self._delayedCall = dc
257
258 delta = time.time() - start
259 if delta < 0.5:
260 self.debug('_reschedule took %.3f seconds', delta)
261 else:
262 self.warning('Rescheduling took more than half a second')
263
265 if self._delayedCall:
266 if self._delayedCall.active():
267 self._delayedCall.cancel()
268 self._delayedCall = None
269
270
307
309 """
310 Stop watching the ical file.
311 """
312 if self.watcher:
313 self.watcher.stop()
314
318
322