# coding: utf-8
from __future__ import print_function
from __future__ import unicode_literals
from __future__ import division
from __future__ import absolute_import
import json
import time
from setproctitle import setproctitle
from requests import get, RequestException
from backend.frontend import FrontendClient
from ..actions import Action
from ..constants import JOB_GRAB_TASK_END_PUBSUB
from ..helpers import get_redis_connection, get_redis_logger
from ..exceptions import CoprJobGrabError
from .. import jobgrabcontrol
# TODO: Replace entire model with asynchronous queue, so that frontend push task,
# and workers listen for them
# praiskup: Please don't. I doubt this would help too much, and I really don't
# think it is worth another rewrite. Reasons (imho):
# a. there still needs to be "one" organizator, aka jobgrabber on the backend
# VM side -- we do not want allow Workers to contact frontend directly
# because of (1) security and (2) process synchronization.
# b. in frontend, we _never_ want to block UI differently than on database,
# so the push to BE can't be done instantly -- and thus there would have
# to be something like buffered "JobPusher" (and that would be most
# probably implemented as poll anyway). Maybe we could use some "pipe"
# approach through infinite (http?) connection, or opened database
# connection, .. but I don't think it does matter too much who will
# control the "pipe".
[docs]class CoprJobGrab(object):
"""
Fetch jobs from the Frontend
- submit build task to the jobs queue for workers
- run Action handler for action tasks
:param Munch opts: backend config
:param lock: :py:class:`multiprocessing.Lock` global backend lock
TODO: Not yet fully ready for config reload.
"""
def __init__(self, opts):
""" base class initialization """
self.opts = opts
# Maps e.g. x86_64 && i386 => PC (.
self.arch_to_group_id_map = dict()
# PC => max N builders per user
self.group_to_usermax = dict()
# task_id -> task dict
self.added_jobs_dict = dict()
self.rc = None
self.channel = None
self.ps_thread = None
self.log = get_redis_logger(self.opts, "backend.job_grab", "job_grab")
self.jg_control = jobgrabcontrol.Channel(self.opts, self.log)
self.frontend_client = FrontendClient(self.opts, self.log)
[docs] def group(self, arch):
try:
return self.arch_to_group_id_map[arch]
except KeyError:
raise CoprJobGrabError("Unknown architecture {0}".format(arch))
[docs] def listen_to_pubsub(self):
"""
Listens for job reschedule queries. Spawns self.ps_thread, don't forget to stop it.
"""
self.rc = get_redis_connection(self.opts)
self.channel = self.rc.pubsub(ignore_subscribe_messages=True)
self.channel.subscribe(**{JOB_GRAB_TASK_END_PUBSUB: self.on_pubsub_event})
self.ps_thread = self.channel.run_in_thread(sleep_time=0.05)
self.log.info("Subscribed to {} channel".format(JOB_GRAB_TASK_END_PUBSUB))
[docs] def route_build_task(self, task):
"""
Route build task to the appropriate queue.
:param task: dict-like object which represent build task
Utilized **task** keys:
- ``task_id``
- ``chroot``
- ``arch``
:return int: Count of the successfully routed tasks
"""
count = 0
if "task_id" in task:
if task["task_id"] not in self.added_jobs_dict:
arch = task["chroot"].split("-")[2]
group = self.group(arch)
username = task["project_owner"]
active_jobs_count = len([t for t_id, t in self.added_jobs_dict.items()
if t["project_owner"] == username])
if active_jobs_count > self.group_to_usermax[group]:
self.log.debug("User can not acquire more VM (active builds #{0}), "
"don't schedule more tasks".format(active_jobs_count))
return 0
msg = "enqueue task for user {0}: id={1}, arch={2}, group={3}, active={4}"
self.log.debug(msg.format(username, task["task_id"], arch, group, active_jobs_count))
# Add both to local list and control channel queue.
self.added_jobs_dict[task["task_id"]] = task
self.jg_control.add_build(group, task)
count += 1
else:
self.log.info("Task missing field `task_id`, raw task: {}".format(task))
return count
[docs] def process_action(self, action):
"""
Run action task handler, see :py:class:`~backend.action.Action`
:param action: dict-like object with action task
"""
ao = Action(self.opts, action, frontend_client=self.frontend_client)
ao.run()
[docs] def load_tasks(self):
"""
Retrieve tasks from frontend and runs appropriate handlers
"""
try:
r = get("{0}/backend/waiting/".format(self.opts.frontend_base_url),
auth=("user", self.opts.frontend_auth))
except RequestException as e:
self.log.exception("Error retrieving jobs from {}: {}"
.format(self.opts.frontend_base_url, e))
return
try:
r_json = r.json()
except ValueError as e:
self.log.exception("Error getting JSON build list from FE {0}".format(e))
return
if r_json.get("builds"):
self.log.debug("{0} jobs returned".format(len(r_json["builds"])))
count = 0
for task in r_json["builds"]:
try:
count += self.route_build_task(task)
except CoprJobGrabError as err:
self.log.exception("Failed to enqueue new job: {} with error: {}".format(task, err))
if count:
self.log.info("New build jobs: %s" % count)
if r_json.get("actions"):
count = 0
self.log.info("{0} actions returned".format(len(r_json["actions"])))
for action in r_json["actions"]:
start = time.time()
try:
self.process_action(action)
except Exception as error:
self.log.exception("Error during processing action `{}`: {}".format(action, error))
if time.time() - start > 2*self.opts.sleeptime:
# we are processing actions for too long, stop and fetch everything again (including new builds)
break
[docs] def on_pubsub_event(self, raw):
# from celery.contrib import rdb; rdb.set_trace()
if raw is None:
return
if "type" not in raw or raw["type"] != "message":
self.log.warn("Missing type or wrong type in pubsub msg: {}, ignored".format(raw))
return
try:
msg = json.loads(raw["data"])
# msg: {"action": ("remove"|"reschedule"), "task_id": ..., "build_id"..., "chroot": ...}
# Actions: "remove" simply remove `task_id` from self.added_job
# "reschedule" additionally call frontend and set pending state before removal
if "action" not in msg:
self.log.warn("Missing required field `action`, msg ignored: {}".format(msg))
return
action = msg["action"]
if action not in ["remove", "reschedule"]:
self.log.warn("Action `{}` not allowed, msg ignored: {} ".format(action, msg))
return
if "task_id" not in msg:
self.log.warn("Missing required field `task_id`, msg ignored: {}".format(msg))
return
task_id = msg["task_id"]
if action == "reschedule" and "build_id" in msg and "chroot" in msg:
# TODO: dirty dependency to frontend, Job management should be re-done (
self.log.info("Rescheduling task `{}`".format(task_id))
self.frontend_client.reschedule_build(msg["build_id"], msg["chroot"])
if task_id not in self.added_jobs_dict:
self.log.debug("Task `{}` not present in added jobs, msg ignored: {}".format(task_id, msg))
return
if action in ["remove", "reschedule"]:
self.added_jobs_dict.pop(task_id)
self.log.info("Removed task `{}` from added_jobs".format(task_id))
except Exception as err:
self.log.exception("Error receiving message from remove pubsub: raw msg: {}, error: {}"
.format(raw, err))
[docs] def log_queue_info(self):
if self.added_jobs_dict:
self.log.debug("Added jobs after remove and load: {}".format(self.added_jobs_dict))
self.log.debug("# of executed jobs: {}".format(len(self.added_jobs_dict)))
[docs] def init_internal_structures(self):
self.arch_to_group_id_map = dict()
self.group_to_usermax = dict()
for group in self.opts.build_groups:
group_id = group["id"]
for arch in group["archs"]:
self.arch_to_group_id_map[arch] = group_id
self.log.debug("mapping {0} to {1} group".format(arch, group_id))
self.log.debug("user might use only {0}VMs for {1} group".format(group["max_vm_per_user"], group_id))
self.group_to_usermax[group_id] = group["max_vm_per_user"]
self.added_jobs_dict = dict()
[docs] def handle_control_channel(self):
if not self.jg_control.backend_started():
return
self.log.info("backend gave us signal to start")
self.init_internal_structures()
self.jg_control.remove_all_builds()
self.jg_control.job_graber_initialized()
[docs] def run(self):
"""
Starts job grabber process
"""
setproctitle("CoprJobGrab")
self.listen_to_pubsub()
self.log.info("JobGrub started.")
self.init_internal_structures()
try:
while True:
try:
# This effectively delays job_grabbing until backend
# gives as signal to start.
self.handle_control_channel()
self.load_tasks()
self.log_queue_info()
time.sleep(self.opts.sleeptime)
except Exception as err:
self.log.exception("Job Grab unhandled exception: {}".format(err))
except KeyboardInterrupt:
return
[docs] def terminate(self):
if self.ps_thread:
self.ps_thread.stop()
self.ps_thread.join()
super(CoprJobGrab, self).terminate()