1 import re
2 import os
3 import sys
4 import time
5 import fcntl
6 import Queue
7 import json
8 import subprocess
9 import multiprocessing
10
11 import ansible
12 import ansible.utils
13 from ansible import callbacks
14 from bunch import Bunch
15 from setproctitle import setproctitle
16 from IPy import IP
17
18 import errors
19 import mockremote
20 from callback import FrontendCallback
21
22 try:
23 import fedmsg
24 except ImportError:
25 pass
26
27
29
30 """ playbook callbacks - quietly! """
31
35
37 callbacks.call_callback_module("playbook_on_start")
38
40 callbacks.call_callback_module("playbook_on_notify", host, handler)
41
43 callbacks.call_callback_module("playbook_on_no_hosts_matched")
44
46 callbacks.call_callback_module("playbook_on_no_hosts_remaining")
47
49 callbacks.call_callback_module(
50 "playbook_on_task_start", name, is_conditional)
51
52 - def on_vars_prompt(self, varname,
53 private=True, prompt=None, encrypt=None,
54 confirm=False, salt_size=None, salt=None):
55
56 result = None
57 sys.stderr.write(
58 "***** VARS_PROMPT WILL NOT BE RUN IN THIS KIND OF PLAYBOOK *****\n")
59
60 callbacks.call_callback_module(
61 "playbook_on_vars_prompt", varname, private=private,
62 prompt=prompt, encrypt=encrypt, confirm=confirm,
63 salt_size=salt_size, salt=None)
64
65 return result
66
68 callbacks.call_callback_module("playbook_on_setup")
69
71 callbacks.call_callback_module(
72 "playbook_on_import_for_host", host, imported_file)
73
75 callbacks.call_callback_module(
76 "playbook_on_not_import_for_host", host, missing_file)
77
79 callbacks.call_callback_module("playbook_on_play_start", pattern)
80
82 callbacks.call_callback_module("playbook_on_stats", stats)
83
84
86
88 self.logfile = logfile
89
91 if self.logfile:
92 now = time.strftime("%F %T")
93 try:
94 with open(self.logfile, 'a') as lf:
95 fcntl.flock(lf, fcntl.LOCK_EX)
96 lf.write(str(now) + ': ' + msg + '\n')
97 fcntl.flock(lf, fcntl.LOCK_UN)
98 except (IOError, OSError), e:
99 sys.stderr.write("Could not write to logfile {0} - {1}\n"
100 .format(self.logfile, str(e)))
101
102
103 -class Worker(multiprocessing.Process):
104
105 - def __init__(self, opts, jobs, events, worker_num,
106 ip=None, create=True, callback=None, lock=None):
107
108
109 multiprocessing.Process.__init__(self, name="worker-builder")
110
111
112 self.jobs = jobs
113
114 self.events = events
115 self.worker_num = worker_num
116 self.ip = ip
117 self.opts = opts
118 self.kill_received = False
119 self.callback = callback
120 self.create = create
121 self.lock = lock
122 self.frontend_callback = FrontendCallback(opts)
123 if not self.callback:
124 self.logfile = os.path.join(
125 self.opts.worker_logdir,
126 "worker-{0}.log".format(self.worker_num))
127 self.callback = WorkerCallback(logfile=self.logfile)
128
129 if ip:
130 self.callback.log("creating worker: {0}".format(ip))
131 self.event("worker.create", "creating worker: {ip}", dict(ip=ip))
132 else:
133 self.callback.log("creating worker: dynamic ip")
134 self.event("worker.create", "creating worker: dynamic ip")
135
136 - def event(self, topic, template, content=None):
137 """ Multi-purpose logging method.
138
139 Logs messages to two different destinations:
140 - To log file
141 - The internal "events" queue for communicating back to the
142 dispatcher.
143 - The fedmsg bus. Messages are posted asynchronously to a
144 zmq.PUB socket.
145
146 """
147
148 content = content or {}
149 what = template.format(**content)
150
151 if self.ip:
152 who = "worker-{0}-{1}".format(self.worker_num, self.ip)
153 else:
154 who = "worker-{0}".format(self.worker_num)
155
156 self.callback.log("event: who: {0}, what: {1}".format(who, what))
157 self.events.put({"when": time.time(), "who": who, "what": what})
158 try:
159 content["who"] = who
160 content["what"] = what
161 if self.opts.fedmsg_enabled:
162 fedmsg.publish(modname="copr", topic=topic, msg=content)
163
164 except Exception, e:
165
166 self.callback.log("failed to publish message: {0}".format(e))
167
169 """call the spawn playbook to startup/provision a building instance"""
170
171 self.callback.log("spawning instance begin")
172 start = time.time()
173
174
175
176
177
178
179
180
181
182
183
184
185 try:
186 result = subprocess.check_output(
187 "ansible-playbook -c ssh {0}".format(self.opts.spawn_playbook),
188 shell=True)
189
190 except subprocess.CalledProcessError as e:
191 result = e.output
192 sys.stderr.write("{0}\n".format(result))
193 self.callback.log("CalledProcessError: {0}".format(result))
194
195
196 if retry < 3:
197 time.sleep(self.opts.sleeptime)
198 self.spawn_instance(retry + 1)
199 else:
200
201
202 raise subprocess.CalledProcessError, None, sys.exc_info()[2]
203 self.callback.log("Raw output from playbook: {0}".format(result))
204 match = re.search(r'IP=([^\{\}"]+)', result, re.MULTILINE)
205
206 if not match:
207 return None
208
209 ipaddr = match.group(1)
210
211 self.callback.log("spawning instance end")
212 self.callback.log("got instance ip: {0}".format(ipaddr))
213 self.callback.log(
214 "Instance spawn/provision took {0} sec".format(time.time() - start))
215
216 if self.ip:
217 return self.ip
218
219
220
221
222
223 try:
224 IP(ipaddr)
225 return ipaddr
226 except ValueError:
227
228 self.callback.log(
229 "No IP back from spawn_instance - dumping cache output")
230 self.callback.log(str(result))
231 self.callback.log("Test spawn_instance playbook manually")
232 return None
233
235 """call the terminate playbook to destroy the building instance"""
236 self.callback.log("terminate instance begin")
237
238
239
240
241
242
243
244
245 subprocess.check_output(
246 "/usr/bin/ansible-playbook -c ssh -i '{0},' {1} ".format(
247 ip, self.opts.terminate_playbook),
248 shell=True)
249
250 self.callback.log("terminate instance end")
251
253
254
255 try:
256 build = json.load(open(jobfile))
257 except ValueError:
258
259 return None
260 jobdata = Bunch()
261 jobdata.pkgs = build["pkgs"].split(" ")
262 jobdata.repos = [r for r in build["repos"].split(" ") if r.strip()]
263 jobdata.chroot = build["chroot"]
264 jobdata.buildroot_pkgs = build["buildroot_pkgs"]
265 jobdata.memory_reqs = build["memory_reqs"]
266 if build["timeout"]:
267 jobdata.timeout = build["timeout"]
268 else:
269 jobdata.timeout = self.opts.timeout
270 jobdata.destdir = os.path.normpath(
271 os.path.join(self.opts.destdir,
272 build["copr"]["owner"]["name"],
273 build["copr"]["name"]))
274
275 jobdata.build_id = build["id"]
276 jobdata.results = os.path.join(
277 self.opts.results_baseurl,
278 build["copr"]["owner"]["name"],
279 build["copr"]["name"] + "/")
280
281 jobdata.copr_id = build["copr"]["id"]
282 jobdata.user_id = build["user_id"]
283 jobdata.user_name = build["copr"]["owner"]["name"]
284 jobdata.copr_name = build["copr"]["name"]
285 return jobdata
286
287
288 - def post_to_frontend(self, data):
289 """send data to frontend"""
290 i = 10
291 while i > 0:
292 result = self.frontend_callback.post_to_frontend(data)
293 if not result:
294 self.callback.log(self.frontend_callback.msg)
295 i -= 1
296 time.sleep(5)
297 else:
298 i = 0
299 return result
300
301
303
304 build = {"id": job.build_id,
305 "started_on": job.started_on,
306 "results": job.results,
307 "chroot": job.chroot,
308 "status": 3,
309 }
310 data = {"builds": [build]}
311
312 if not self.post_to_frontend(data):
313 raise errors.CoprWorkerError(
314 "Could not communicate to front end to submit status info")
315
316
318
319 self.callback.log(
320 "{0} status {1}. Took {2} seconds".format(
321 job.build_id, job.status, job.ended_on - job.started_on))
322
323 build = {
324 "id": job.build_id,
325 "ended_on": job.ended_on,
326 "status": job.status,
327 "chroot": job.chroot,
328 }
329
330 data = {"builds": [build]}
331
332 if not self.post_to_frontend(data):
333 raise errors.CoprWorkerError(
334 "Could not communicate to front end to submit results")
335
336 os.unlink(job.jobfile)
337
339 """
340 Worker should startup and check if it can function
341 for each job it takes from the jobs queue
342 run opts.setup_playbook to create the instance
343 do the build (mockremote)
344 terminate the instance.
345 """
346
347 setproctitle("worker {0}".format(self.worker_num))
348 while not self.kill_received:
349 try:
350 jobfile = self.jobs.get()
351 except Queue.Empty:
352 break
353
354
355 job = self.parse_job(jobfile)
356
357 if job is None:
358 self.callback.log(
359 'jobfile {0} is mangled, please investigate'.format(
360 jobfile))
361
362 time.sleep(self.opts.sleeptime)
363 continue
364
365
366
367
368
369 job.jobfile = jobfile
370
371
372 if self.create:
373 try:
374 ip = self.spawn_instance()
375 if not ip:
376 raise errors.CoprWorkerError(
377 "No IP found from creating instance")
378
379 except ansible.errors.AnsibleError, e:
380 self.callback.log(
381 "failure to setup instance: {0}".format(e))
382
383 raise
384
385 try:
386
387 try:
388 if self.opts.fedmsg_enabled:
389 fedmsg.init(
390 name="relay_inbound",
391 cert_prefix="copr",
392 active=True)
393
394 except Exception, e:
395 self.callback.log(
396 "failed to initialize fedmsg: {0}".format(e))
397
398 status = 1
399 job.started_on = time.time()
400 self.mark_started(job)
401
402 template = "build start: user:{user} copr:{copr}" \
403 " build:{build} ip:{ip} pid:{pid}"
404
405 content = dict(user=job.user_name, copr=job.copr_name,
406 build=job.build_id, ip=ip, pid=self.pid)
407 self.event("build.start", template, content)
408
409 template = "chroot start: chroot:{chroot} user:{user}" \
410 "copr:{copr} build:{build} ip:{ip} pid:{pid}"
411
412 content = dict(chroot=job.chroot, user=job.user_name,
413 copr=job.copr_name, build=job.build_id,
414 ip=ip, pid=self.pid)
415
416 self.event("chroot.start", template, content)
417
418 chroot_destdir = os.path.normpath(
419 job.destdir + '/' + job.chroot)
420
421
422 if not os.path.exists(chroot_destdir):
423 try:
424 os.makedirs(chroot_destdir)
425 except (OSError, IOError), e:
426 msg = "Could not make results dir" \
427 " for job: {0} - {1}".format(chroot_destdir,
428 str(e))
429
430 self.callback.log(msg)
431 status = 0
432
433 if status == 1:
434
435
436
437
438
439
440
441 self.callback.log("Starting build: id={0} builder={1}"
442 " timeout={2} destdir={3}"
443 " chroot={4} repos={5}".format(
444 job.build_id, ip,
445 job.timeout, job.destdir,
446 job.chroot, str(job.repos)))
447
448 self.callback.log("building pkgs: {0}".format(
449 ' '.join(job.pkgs)))
450
451 try:
452 chroot_repos = list(job.repos)
453 chroot_repos.append(job.results + '/' + job.chroot)
454 chrootlogfile = "{0}/build-{1}.log".format(
455 chroot_destdir, job.build_id)
456
457 macros = {
458 "copr_username": job.user_name,
459 "copr_projectname": job.copr_name,
460 "vendor": "Fedora Project COPR ({0}/{1})".format(
461 job.user_name, job.copr_name)
462 }
463
464 mr = mockremote.MockRemote(
465 builder=ip,
466 timeout=job.timeout,
467 destdir=job.destdir,
468 chroot=job.chroot,
469 cont=True,
470 recurse=True,
471 repos=chroot_repos,
472 macros=macros,
473 lock=self.lock,
474 buildroot_pkgs=job.buildroot_pkgs,
475 callback=mockremote.CliLogCallBack(
476 quiet=True, logfn=chrootlogfile))
477
478 mr.build_pkgs(job.pkgs)
479
480 except mockremote.MockRemoteError, e:
481
482 self.callback.log("{0} - {1}".format(ip, e))
483 status = 0
484 else:
485
486
487 if mr.failed:
488 status = 0
489
490 self.callback.log(
491 "Finished build: id={0} builder={1}"
492 " timeout={2} destdir={3}"
493 " chroot={4} repos={5}".format(
494 job.build_id, ip,
495 job.timeout, job.destdir,
496 job.chroot, str(job.repos)))
497
498 job.ended_on = time.time()
499
500 job.status = status
501 self.return_results(job)
502 self.callback.log("worker finished build: {0}".format(ip))
503 template = "build end: user:{user} copr:{copr} build:{build}" \
504 " ip:{ip} pid:{pid} status:{status}"
505
506 content = dict(user=job.user_name, copr=job.copr_name,
507 build=job.build_id, ip=ip, pid=self.pid,
508 status=job.status)
509 self.event("build.end", template, content)
510
511 finally:
512
513 if self.create:
514 self.terminate_instance(ip)
515