Package backend :: Module dispatcher
[hide private]
[frames] | no frames]

Source Code for Module backend.dispatcher

  1  import os 
  2  import sys 
  3  import multiprocessing 
  4  import time 
  5  import Queue 
  6  import json 
  7  import mockremote 
  8  from bunch import Bunch 
  9  import errors 
 10  import ansible 
 11  import ansible.playbook 
 12  import ansible.errors 
 13  from ansible import callbacks 
 14  import requests 
 15   
 16   
 17   
 18   
19 -class SilentPlaybookCallbacks(callbacks.PlaybookCallbacks):
20 ''' playbook callbacks - quietly! ''' 21
22 - def __init__(self, verbose=False):
23 24 self.verbose = verbose
25
26 - def on_start(self):
27 callbacks.call_callback_module('playbook_on_start')
28
29 - def on_notify(self, host, handler):
30 callbacks.call_callback_module('playbook_on_notify', host, handler)
31
32 - def on_no_hosts_matched(self):
33 callbacks.call_callback_module('playbook_on_no_hosts_matched')
34
35 - def on_no_hosts_remaining(self):
36 callbacks.call_callback_module('playbook_on_no_hosts_remaining')
37
38 - def on_task_start(self, name, is_conditional):
39 callbacks.call_callback_module('playbook_on_task_start', name, is_conditional)
40
41 - def on_vars_prompt(self, varname, private=True, prompt=None, encrypt=None, confirm=False, salt_size=None, salt=None):
42 result = None 43 print "***** VARS_PROMPT WILL NOT BE RUN IN THIS KIND OF PLAYBOOK *****" 44 callbacks.call_callback_module('playbook_on_vars_prompt', varname, private=private, prompt=prompt, encrypt=encrypt, confirm=confirm, salt_size=salt_size, salt=None) 45 return result
46
47 - def on_setup(self):
48 callbacks.call_callback_module('playbook_on_setup')
49
50 - def on_import_for_host(self, host, imported_file):
51 callbacks.call_callback_module('playbook_on_import_for_host', host, imported_file)
52
53 - def on_not_import_for_host(self, host, missing_file):
54 callbacks.call_callback_module('playbook_on_not_import_for_host', host, missing_file)
55
56 - def on_play_start(self, pattern):
57 callbacks.call_callback_module('playbook_on_play_start', pattern)
58
59 - def on_stats(self, stats):
60 callbacks.call_callback_module('playbook_on_stats', stats)
61 62
63 -class WorkerCallback(object):
64 - def __init__(self, logfile=None):
65 self.logfile = logfile
66
67 - def log(self, msg):
68 if self.logfile: 69 now = time.strftime('%F %T') 70 try: 71 open(self.logfile, 'a').write(str(now) + ': ' + msg + '\n') 72 except (IOError, OSError), e: 73 print >>sys.stderr, 'Could not write to logfile %s - %s' % (self.logfile, str(e))
74 75
76 -class Worker(multiprocessing.Process):
77 - def __init__(self, opts, jobs, events, worker_num, ip=None, create=True, callback=None):
78 79 # base class initialization 80 multiprocessing.Process.__init__(self, name="worker-builder") 81 82 83 # job management stuff 84 self.jobs = jobs 85 self.events = events # event queue for communicating back to dispatcher 86 self.worker_num = worker_num 87 self.ip = ip 88 self.opts = opts 89 self.kill_received = False 90 self.callback = callback 91 self.create = create 92 if not self.callback: 93 self.logfile = self.opts.worker_logdir + '/worker-%s.log' % self.worker_num 94 self.callback = WorkerCallback(logfile = self.logfile) 95 96 if ip: 97 self.callback.log('creating worker: %s' % ip) 98 self.event('creating worker: %s' % ip) 99 else: 100 self.callback.log('creating worker: dynamic ip') 101 self.event('creating worker: dynamic ip')
102
103 - def event(self, what):
104 if self.ip: 105 who = 'worker-%s-%s' % (self.worker_num, self.ip) 106 else: 107 who = 'worker-%s' % (self.worker_num) 108 109 self.events.put({'when':time.time(), 'who':who, 'what':what})
110
111 - def spawn_instance(self):
112 """call the spawn playbook to startup/provision a building instance""" 113 114 115 self.callback.log('spawning instance begin') 116 start = time.time() 117 118 stats = callbacks.AggregateStats() 119 playbook_cb = SilentPlaybookCallbacks(verbose=False) 120 runner_cb = callbacks.DefaultRunnerCallbacks() 121 # fixme - extra_vars to include ip as a var if we need to specify ips 122 # also to include info for instance type to handle the memory requirements of builds 123 play = ansible.playbook.PlayBook(stats=stats, playbook=self.opts.spawn_playbook, 124 callbacks=playbook_cb, runner_callbacks=runner_cb, 125 remote_user='root') 126 127 play.run() 128 self.callback.log('spawning instance end') 129 self.callback.log('Instance spawn/provision took %s sec' % (time.time() - start)) 130 131 if self.ip: 132 return self.ip 133 134 for i in play.SETUP_CACHE: 135 if i =='localhost': 136 continue 137 return i 138 139 # if we get here we're in trouble 140 self.callback.log('No IP back from spawn_instance - dumping cache output') 141 self.callback.log(str(play.SETUP_CACHE)) 142 self.callback.log(str(play.stats.summarize('localhost'))) 143 self.callback.log('Test spawn_instance playbook manually') 144 145 return None
146
147 - def terminate_instance(self,ip):
148 """call the terminate playbook to destroy the building instance""" 149 self.callback.log('terminate instance begin') 150 151 stats = callbacks.AggregateStats() 152 playbook_cb = SilentPlaybookCallbacks(verbose=False) 153 runner_cb = callbacks.DefaultRunnerCallbacks() 154 play = ansible.playbook.PlayBook(host_list=ip +',', stats=stats, playbook=self.opts.terminate_playbook, 155 callbacks=playbook_cb, runner_callbacks=runner_cb, 156 remote_user='root') 157 158 play.run() 159 self.callback.log('terminate instance end')
160
161 - def parse_job(self, jobfile):
162 # read the json of the job in 163 # break out what we need return a bunch of the info we need 164 build = json.load(open(jobfile)) 165 jobdata = Bunch() 166 jobdata.pkgs = build['pkgs'].split(' ') 167 jobdata.repos = [r for r in build['repos'].split(' ') if r.strip() ] 168 jobdata.chroots = build['chroots'].split(' ') 169 jobdata.memory_reqs = build['memory_reqs'] 170 jobdata.timeout = build['timeout'] 171 jobdata.destdir = self.opts.destdir + '/' + build['copr']['owner']['name'] + '/' + build['copr']['name'] + '/' 172 jobdata.build_id = build['id'] 173 jobdata.results = self.opts.results_baseurl + '/' + build['copr']['owner']['name'] + '/' + build['copr']['name'] + '/' 174 jobdata.copr_id = build['copr']['id'] 175 jobdata.user_id = build['user_id'] 176 jobdata.user_name = build['copr']['owner']['name'] 177 jobdata.copr_name = build['copr']['name'] 178 return jobdata
179 180 # maybe we move this to the callback?
181 - def post_to_frontend(self, data):
182 """send data to frontend""" 183 184 headers = {'content-type': 'application/json'} 185 url='%s/update_builds/' % self.opts.frontend_url 186 auth=('user', self.opts.frontend_auth) 187 188 msg = None 189 try: 190 r = requests.post(url, data=json.dumps(data), auth=auth, 191 headers=headers) 192 if r.status_code != 200: 193 msg = 'Failed to submit to frontend: %s: %s' % (r.status_code, r.text) 194 except requests.RequestException, e: 195 msg = 'Post request failed: %s' % e 196 197 if msg: 198 self.callback.log(msg) 199 return False 200 201 return True
202 203 # maybe we move this to the callback?
204 - def mark_started(self, job):
205 206 207 build = {'id':job.build_id, 208 'started_on': job.started_on, 209 'results': job.results, 210 } 211 data = {'builds':[build]} 212 213 if not self.post_to_frontend(data): 214 raise errors.CoprWorkerError, "Could not communicate to front end to submit status info"
215 216 # maybe we move this to the callback?
217 - def return_results(self, job):
218 219 self.callback.log('%s status %s. Took %s seconds' % (job.build_id, job.status, job.ended_on - job.started_on)) 220 build = {'id':job.build_id, 221 'ended_on': job.ended_on, 222 'status': job.status, 223 } 224 data = {'builds':[build]} 225 226 if not self.post_to_frontend(data): 227 raise errors.CoprWorkerError, "Could not communicate to front end to submit results" 228 229 os.unlink(job.jobfile)
230
231 - def run(self):
232 # worker should startup and check if it can function 233 # for each job it takes from the jobs queue 234 # run opts.setup_playbook to create the instance 235 # do the build (mockremote) 236 # terminate the instance 237 238 while not self.kill_received: 239 try: 240 jobfile = self.jobs.get() 241 except Queue.Empty: 242 break 243 244 # parse the job json into our info 245 job = self.parse_job(jobfile) 246 247 # FIXME 248 # this is our best place to sanity check the job before starting 249 # up any longer process 250 251 job.jobfile = jobfile 252 253 # spin up our build instance 254 if self.create: 255 try: 256 ip = self.spawn_instance() 257 if not ip: 258 raise errors.CoprWorkerError, "No IP found from creating instance" 259 260 except ansible.errors.AnsibleError, e: 261 self.callback.log('failure to setup instance: %s' % e) 262 raise 263 264 status = 1 265 job.started_on = time.time() 266 self.mark_started(job) 267 268 self.event('build start: user:%s copr:%s build:%s ip:%s pid:%s' % (job.user_name, job.copr_name, job.build_id, ip, self.pid)) 269 270 for chroot in job.chroots: 271 self.event('chroot start: chroot:%s user:%s copr:%s build:%s ip:%s pid:%s' % (chroot, job.user_name, job.copr_name, job.build_id, ip, self.pid)) 272 chroot_destdir = job.destdir + '/' + chroot 273 # setup our target dir locally 274 if not os.path.exists(chroot_destdir): 275 try: 276 os.makedirs(chroot_destdir) 277 except (OSError, IOError), e: 278 msg = "Could not make results dir for job: %s - %s" % (chroot_destdir, str(e)) 279 self.callback.log(msg) 280 status = 0 281 continue 282 283 # FIXME 284 # need a plugin hook or some mechanism to check random 285 # info about the pkgs 286 # this should use ansible to download the pkg on the remote system 287 # and run a series of checks on the package before we 288 # start the build - most importantly license checks. 289 290 291 self.callback.log('Starting build: id=%r builder=%r timeout=%r destdir=%r chroot=%r repos=%r' % (job.build_id,ip, job.timeout, job.destdir, chroot, str(job.repos))) 292 self.callback.log('building pkgs: %s' % ' '.join(job.pkgs)) 293 try: 294 chroot_repos = list(job.repos) 295 chroot_repos.append(job.results + '/' + chroot) 296 chrootlogfile = chroot_destdir + '/build-%s.log' % job.build_id 297 mr = mockremote.MockRemote(builder=ip, timeout=job.timeout, 298 destdir=job.destdir, chroot=chroot, cont=True, recurse=True, 299 repos=chroot_repos, 300 callback=mockremote.CliLogCallBack(quiet=True,logfn=chrootlogfile)) 301 mr.build_pkgs(job.pkgs) 302 except mockremote.MockRemoteError, e: 303 # record and break 304 self.callback.log('%s - %s' % (ip, e)) 305 status = 0 # failure 306 else: 307 # we can't really trace back if we just fail normally 308 # check if any pkgs didn't build 309 if mr.failed: 310 status = 0 311 self.callback.log('Finished build: id=%r builder=%r timeout=%r destdir=%r chroot=%r repos=%r' % (job.build_id, ip, job.timeout, job.destdir, chroot, str(job.repos))) 312 job.ended_on = time.time() 313 314 job.status = status 315 self.return_results(job) 316 self.callback.log('worker finished build: %s' % ip) 317 self.event('build end: user:%s copr:%s build:%s ip:%s pid:%s status:%s' % (job.user_name, job.copr_name, job.build_id, ip, self.pid, job.status)) 318 # clean up the instance 319 if self.create: 320 self.terminate_instance(ip)
321