Script copr_be_py
[hide private]
[frames] | no frames]

Source Code for Script script-copr_be_py

  1  #!/usr/bin/python -ttu 
  2   
  3   
  4  from backend import errors 
  5  from backend.dispatcher import Worker 
  6  from backend.actions import Action 
  7  from bunch import Bunch 
  8  from retask.task import Task 
  9  from retask.queue import Queue 
 10  import ConfigParser 
 11  import daemon 
 12  import glob 
 13  import grp 
 14  import json 
 15  import lockfile 
 16  import logging 
 17  import multiprocessing 
 18  import optparse 
 19  import os 
 20  import pwd 
 21  import requests 
 22  import setproctitle 
 23  import signal 
 24  import sys 
 25  import time 
 26   
 27   
28 -def _get_conf(cp, section, option, default):
29 """ 30 To make returning items from config parser less irritating 31 """ 32 33 if cp.has_section(section) and cp.has_option(section, option): 34 return cp.get(section, option) 35 return default
36 37
38 -class CoprJobGrab(multiprocessing.Process):
39 40 """ 41 Fetch jobs from the Frontend 42 - submit them to the jobs queue for workers 43 """ 44
45 - def __init__(self, opts, events, jobs, lock):
46 # base class initialization 47 multiprocessing.Process.__init__(self, name="jobgrab") 48 49 self.opts = opts 50 self.events = events 51 self.jobs = jobs 52 self.added_jobs = [] 53 self.lock = lock
54
55 - def event(self, what):
56 self.events.put({"when": time.time(), "who": "jobgrab", "what": what})
57
58 - def fetch_jobs(self):
59 try: 60 r = requests.get( 61 "{0}/waiting/".format(self.opts.frontend_url), 62 auth=("user", self.opts.frontend_auth)) 63 64 except requests.RequestException as e: 65 self.event("Error retrieving jobs from {0}: {1}".format( 66 self.opts.frontend_url, e)) 67 else: 68 try: 69 r_json = json.loads(r.content) # using old requests on el6 :( 70 except ValueError as e: 71 self.event("Error getting JSON build list from FE {0}" 72 .format(e)) 73 return 74 75 if "builds" in r_json and r_json["builds"]: 76 self.event("{0} jobs returned".format(len(r_json["builds"]))) 77 count = 0 78 for b in r_json["builds"]: 79 if "id" in b: 80 extended_id = "{0}-{1}".format(b["id"], b["chroot"]) 81 jobfile = os.path.join( 82 self.opts.jobsdir, 83 "{0}.json".format(extended_id)) 84 85 if (not os.path.exists(jobfile) and 86 extended_id not in self.added_jobs): 87 88 count += 1 89 open(jobfile, 'w').write(json.dumps(b)) 90 self.event("Wrote job: {0}".format(extended_id)) 91 if count: 92 self.event("New jobs: %s" % count) 93 if "actions" in r_json and r_json["actions"]: 94 self.event("{0} actions returned".format( 95 len(r_json["actions"]))) 96 97 for action in r_json["actions"]: 98 ao = Action(self.opts, self.events, action, self.lock) 99 ao.run()
100
101 - def run(self):
102 setproctitle.setproctitle("CoprJobGrab") 103 abort = False 104 try: 105 while not abort: 106 self.fetch_jobs() 107 for f in sorted(glob.glob( 108 os.path.join(self.opts.jobsdir, "*.json"))): 109 110 n = os.path.basename(f).replace(".json", "") 111 if n not in self.added_jobs: 112 self.jobs.put(f) 113 self.added_jobs.append(n) 114 self.event("adding to work queue id {0}".format(n)) 115 time.sleep(self.opts.sleeptime) 116 except KeyboardInterrupt: 117 return
118 119
120 -class CoprLog(multiprocessing.Process):
121 122 """log mechanism where items from the events queue get recorded""" 123
124 - def __init__(self, opts, events):
125 126 # base class initialization 127 multiprocessing.Process.__init__(self, name="logger") 128 129 self.opts = opts 130 self.events = events 131 132 logdir = os.path.dirname(self.opts.logfile) 133 if not os.path.exists(logdir): 134 os.makedirs(logdir, mode=0750) 135 136 # setup a log file to write to 137 logging.basicConfig(filename=self.opts.logfile, level=logging.DEBUG)
138
139 - def log(self, event):
140 141 when = time.strftime("%F %T", time.gmtime(event["when"])) 142 msg = "{0} : {1}: {2}".format(when, 143 event["who"], 144 event["what"].strip()) 145 146 try: 147 if self.opts.verbose: 148 sys.stderr.write("{0}\n".format(msg)) 149 sys.stderr.flush() 150 logging.debug(msg) 151 except (IOError, OSError) as e: 152 sys.stderr.write("Could not write to logfile {0} - {1}\n".format( 153 self.logfile, e))
154 155 # event format is a dict {when:time, who:[worker|logger|job|main], 156 # what:str}
157 - def run(self):
158 setproctitle.setproctitle("CoprLog") 159 abort = False 160 try: 161 while not abort: 162 e = self.events.get() 163 if "when" in e and "who" in e and "what" in e: 164 self.log(e) 165 except KeyboardInterrupt: 166 return
167 168
169 -class CoprBackend(object):
170 171 """ 172 Core process - starts/stops/initializes workers 173 """ 174
175 - def __init__(self, config_file=None, ext_opts=None):
176 # read in config file 177 # put all the config items into a single self.opts bunch 178 179 if not config_file: 180 raise errors.CoprBackendError("Must specify config_file") 181 182 self.config_file = config_file 183 self.ext_opts = ext_opts # to stow our cli options for read_conf() 184 self.opts = self.read_conf() 185 self.lock = multiprocessing.Lock() 186 187 # job is a path to a jobfile on the localfs 188 self.jobs = multiprocessing.Queue() 189 self.events = multiprocessing.Queue() 190 # event format is a dict {when:time, who:[worker|logger|job|main], 191 # what:str} 192 193 # create logger 194 self._logger = CoprLog(self.opts, self.events) 195 self._logger.start() 196 197 self.event("Starting up Job Grabber") 198 # create job grabber 199 self._jobgrab = CoprJobGrab(self.opts, self.events, self.jobs, self.lock) 200 self._jobgrab.start() 201 self.worker_num = 0 202 self.abort = False 203 204 if not os.path.exists(self.opts.worker_logdir): 205 os.makedirs(self.opts.worker_logdir, mode=0750) 206 207 self.workers = []
208
209 - def event(self, what):
210 self.events.put({"when": time.time(), "who": "main", "what": what})
211
212 - def read_conf(self):
213 "read in config file - return Bunch of config data" 214 opts = Bunch() 215 cp = ConfigParser.ConfigParser() 216 try: 217 cp.read(self.config_file) 218 opts.results_baseurl = _get_conf( 219 cp, "backend", "results_baseurl", "http://copr") 220 opts.frontend_url = _get_conf( 221 cp, "backend", "frontend_url", "http://coprs/rest/api") 222 opts.frontend_auth = _get_conf( 223 cp, "backend", "frontend_auth", "PASSWORDHERE") 224 225 opts.architectures = _get_conf( 226 cp, "backend", "architectures", "i386,x86_64").split(",") 227 228 opts.spawn_playbook = {} 229 for arch in opts.architectures: 230 opts.spawn_playbook[arch] = _get_conf( 231 cp, "backend", "spawn_playbook-{0}".format(arch), 232 "/srv/copr-work/provision/builderpb-{0}.yml".format(arch)) 233 234 opts.terminate_playbook = _get_conf( 235 cp, "backend", "terminate_playbook", 236 "/srv/copr-work/provision/terminatepb.yml") 237 238 opts.jobsdir = _get_conf(cp, "backend", "jobsdir", None) 239 opts.destdir = _get_conf(cp, "backend", "destdir", None) 240 opts.exit_on_worker = _get_conf( 241 cp, "backend", "exit_on_worker", False) 242 opts.fedmsg_enabled = _get_conf( 243 cp, "backend", "fedmsg_enabled", False) 244 opts.sleeptime = int(_get_conf(cp, "backend", "sleeptime", 10)) 245 opts.num_workers = int(_get_conf(cp, "backend", "num_workers", 8)) 246 opts.timeout = int(_get_conf(cp, "builder", "timeout", 1800)) 247 opts.logfile = _get_conf( 248 cp, "backend", "logfile", "/var/log/copr/backend.log") 249 opts.verbose = _get_conf(cp, "backend", "verbose", False) 250 opts.worker_logdir = _get_conf( 251 cp, "backend", "worker_logdir", "/var/log/copr/workers/") 252 opts.spawn_vars = _get_conf(cp, "backend", "spawn_vars", None) 253 opts.terminate_vars = _get_conf(cp, "backend", "terminate_vars", 254 None) 255 256 # thoughts for later 257 # ssh key for connecting to builders? 258 # cloud key stuff? 259 # 260 except ConfigParser.Error as e: 261 raise errors.CoprBackendError( 262 "Error parsing config file: {0}: {1}".format( 263 self.config_file, e)) 264 265 if not opts.jobsdir or not opts.destdir: 266 raise errors.CoprBackendError( 267 "Incomplete Config - must specify" 268 " jobsdir and destdir in configuration") 269 270 if self.ext_opts: 271 for v in self.ext_opts: 272 setattr(opts, v, self.ext_opts.get(v)) 273 return opts
274
275 - def run(self):
276 self.abort = False 277 while not self.abort: 278 # re-read config into opts 279 self.opts = self.read_conf() 280 281 if self.jobs.qsize(): 282 self.event("# jobs in queue: {0}".format(self.jobs.qsize())) 283 # this handles starting/growing the number of workers 284 if len(self.workers) < self.opts.num_workers: 285 self.event("Spinning up more workers for jobs") 286 for _ in range(self.opts.num_workers - len(self.workers)): 287 self.worker_num += 1 288 w = Worker( 289 self.opts, self.jobs, self.events, self.worker_num, 290 lock=self.lock) 291 self.workers.append(w) 292 w.start() 293 self.event("Finished starting worker processes") 294 # FIXME - prune out workers 295 # if len(self.workers) > self.opts.num_workers: 296 # killnum = len(self.workers) - self.opts.num_workers 297 # for w in self.workers[:killnum]: 298 # insert a poison pill? Kill after something? I dunno. 299 # FIXME - if a worker bombs out - we need to check them 300 # and startup a new one if it happens 301 # check for dead workers and abort 302 for w in self.workers: 303 if not w.is_alive(): 304 self.event("Worker {0} died unexpectedly".format( 305 w.worker_num)) 306 if self.opts.exit_on_worker: 307 raise errors.CoprBackendError( 308 "Worker died unexpectedly, exiting") 309 else: 310 self.workers.remove(w) # it is not working anymore 311 w.terminate() # kill it with a fire 312 313 time.sleep(self.opts.sleeptime)
314
315 - def terminate(self):
316 """ 317 Cleanup backend processes (just workers for now) 318 """ 319 320 self.abort = True 321 for w in self.workers: 322 self.workers.remove(w) 323 w.terminate()
324 325
326 -def parse_args(args):
327 parser = optparse.OptionParser("\ncopr-be [options]") 328 parser.add_option("-c", "--config", default="/etc/copr/copr-be.conf", 329 dest="config_file", 330 help="config file to use for copr-be run") 331 parser.add_option("-d", "--daemonize", default=False, dest="daemonize", 332 action="store_true", help="daemonize or not") 333 parser.add_option("-p", "--pidfile", 334 default="/var/run/copr-backend/copr-be.pid", 335 dest="pidfile", 336 help="pid file to use for copr-be if daemonized") 337 parser.add_option("-x", "--exit", default=False, dest="exit_on_worker", 338 action="store_true", help="exit on worker failure") 339 parser.add_option("-v", "--verbose", default=False, dest="verbose", 340 action="store_true", help="be more verbose") 341 342 opts, args = parser.parse_args(args) 343 if not os.path.exists(opts.config_file): 344 sys.stderr.write("No config file found at: {0}\n".format( 345 opts.config_file)) 346 sys.exit(1) 347 opts.config_file = os.path.abspath(opts.config_file) 348 349 ret_opts = Bunch() 350 for o in ("daemonize", "exit_on_worker", "pidfile", "config_file"): 351 setattr(ret_opts, o, getattr(opts, o)) 352 353 return ret_opts
354 355
356 -def main(args):
357 opts = parse_args(args) 358 359 try: 360 context = daemon.DaemonContext( 361 pidfile=lockfile.FileLock(opts.pidfile), 362 gid=grp.getgrnam("copr").gr_gid, 363 uid=pwd.getpwnam("copr").pw_uid, 364 detach_process=opts.daemonize, 365 umask=022, 366 stderr=sys.stderr, 367 signal_map={ 368 signal.SIGTERM: "terminate", 369 signal.SIGHUP: "terminate", 370 }, 371 ) 372 with context: 373 cbe = CoprBackend(opts.config_file, ext_opts=opts) 374 cbe.run() 375 except (Exception, KeyboardInterrupt): 376 sys.stderr.write("Killing/Dying\n") 377 if "cbe" in locals(): 378 cbe.terminate() 379 raise
380 381 if __name__ == "__main__": 382 try: 383 main(sys.argv[1:]) 384 except KeyboardInterrupt: 385 sys.stderr.write("\nUser cancelled, may need cleanup\n") 386 sys.exit(0) 387