Package coprs :: Package logic :: Module coprs_logic
[hide private]
[frames] | no frames]

Source Code for Module coprs.logic.coprs_logic

  1  import os 
  2  import time 
  3  import datetime 
  4   
  5  from sqlalchemy import and_ 
  6  from sqlalchemy.sql import func 
  7  from sqlalchemy import asc, desc 
  8  from sqlalchemy.event import listens_for 
  9  from sqlalchemy.orm.attributes import NEVER_SET 
 10  from sqlalchemy.orm.exc import NoResultFound 
 11  from sqlalchemy.orm.attributes import get_history 
 12   
 13  from copr_common.enums import ActionTypeEnum, BackendResultEnum 
 14  from coprs import db 
 15  from coprs import exceptions 
 16  from coprs import helpers 
 17  from coprs import models 
 18  from coprs.exceptions import MalformedArgumentException, BadRequest 
 19  from coprs.logic import users_logic 
 20  from coprs.whoosheers import CoprWhoosheer 
 21  from coprs.helpers import fix_protocol_for_backend 
 22   
 23  from coprs.logic.actions_logic import ActionsLogic 
 24  from coprs.logic.users_logic import UsersLogic 
25 26 27 -class CoprsLogic(object):
28 """ 29 Used for manipulating Coprs. 30 31 All methods accept user object as a first argument, 32 as this may be needed in future. 33 """ 34 35 @classmethod
36 - def get_all(cls):
37 """ Return all coprs without those which are deleted. """ 38 query = (db.session.query(models.Copr) 39 .join(models.Copr.user) 40 .options(db.contains_eager(models.Copr.user)) 41 .filter(models.Copr.deleted == False)) 42 return query
43 44 @classmethod
45 - def get_by_id(cls, copr_id):
46 return cls.get_all().filter(models.Copr.id == copr_id)
47 48 @classmethod
49 - def attach_build(cls, query):
50 query = (query.outerjoin(models.Copr.builds) 51 .options(db.contains_eager(models.Copr.builds)) 52 .order_by(models.Build.submitted_on.desc())) 53 return query
54 55 @classmethod
56 - def attach_mock_chroots(cls, query):
57 query = (query.outerjoin(*models.Copr.mock_chroots.attr) 58 .options(db.contains_eager(*models.Copr.mock_chroots.attr)) 59 .order_by(models.MockChroot.os_release.asc()) 60 .order_by(models.MockChroot.os_version.asc()) 61 .order_by(models.MockChroot.arch.asc())) 62 return query
63 64 @classmethod
65 - def get_multiple_by_username(cls, username, **kwargs):
66 with_builds = kwargs.get("with_builds", False) 67 with_mock_chroots = kwargs.get("with_mock_chroots", False) 68 69 query = ( 70 cls.get_all() 71 .filter(models.User.username == username) 72 ) 73 74 if with_builds: 75 query = cls.attach_build(query) 76 77 if with_mock_chroots: 78 query = cls.attach_mock_chroots(query) 79 80 return query
81 82 @classmethod
83 - def get_multiple_by_group_id(cls, group_id, **kwargs):
84 with_builds = kwargs.get("with_builds", False) 85 with_mock_chroots = kwargs.get("with_mock_chroots", False) 86 87 query = ( 88 cls.get_all() 89 .filter(models.Copr.group_id == group_id) 90 ) 91 92 if with_builds: 93 query = cls.attach_build(query) 94 95 if with_mock_chroots: 96 query = cls.attach_mock_chroots(query) 97 98 return query
99 100 @classmethod
101 - def get(cls, username, coprname, **kwargs):
102 query = cls.get_multiple_by_username(username, **kwargs) 103 query = query.filter(models.Copr.name == coprname) 104 return query
105 106 @classmethod
107 - def get_by_group_id(cls, group_id, coprname, **kwargs):
108 query = cls.get_multiple_by_group_id(group_id, **kwargs) 109 query = query.filter(models.Copr.name == coprname) 110 return query
111 112 @classmethod
113 - def get_multiple(cls, include_deleted=False, include_unlisted_on_hp=True):
114 query = ( 115 db.session.query(models.Copr) 116 .join(models.Copr.user) 117 .outerjoin(models.Group) 118 .options(db.contains_eager(models.Copr.user)) 119 ) 120 121 if not include_deleted: 122 query = query.filter(models.Copr.deleted.is_(False)) 123 124 if not include_unlisted_on_hp: 125 query = query.filter(models.Copr.unlisted_on_hp.is_(False)) 126 127 return query
128 129 @classmethod
130 - def set_query_order(cls, query, desc=False):
131 if desc: 132 query = query.order_by(models.Copr.id.desc()) 133 else: 134 query = query.order_by(models.Copr.id.asc()) 135 return query
136 137 # user_relation="owned", username=username, with_mock_chroots=False 138 @classmethod
139 - def get_multiple_owned_by_username(cls, username, include_unlisted_on_hp=True):
140 query = cls.get_multiple(include_unlisted_on_hp=include_unlisted_on_hp) 141 return query.filter(models.User.username == username)
142 143 @classmethod
144 - def filter_by_name(cls, query, name):
145 return query.filter(models.Copr.name == name)
146 147 @classmethod
148 - def filter_by_user_name(cls, query, username):
149 # should be already joined with the User table 150 return query.filter(models.User.username == username)
151 152 @classmethod
153 - def filter_by_group_name(cls, query, group_name):
154 # should be already joined with the Group table 155 return query.filter(models.Group.name == group_name)
156 157 @classmethod
158 - def filter_without_group_projects(cls, query):
159 return query.filter(models.Copr.group_id.is_(None))
160 161 @classmethod
162 - def filter_without_ids(cls, query, ids):
163 return query.filter(models.Copr.id.notin_(ids))
164 165 @classmethod
166 - def join_builds(cls, query):
167 return (query.outerjoin(models.Copr.builds) 168 .options(db.contains_eager(models.Copr.builds)) 169 .order_by(models.Build.submitted_on.desc()))
170 171 @classmethod
172 - def join_mock_chroots(cls, query):
173 return (query.outerjoin(*models.Copr.mock_chroots.attr) 174 .options(db.contains_eager(*models.Copr.mock_chroots.attr)) 175 .order_by(models.MockChroot.os_release.asc()) 176 .order_by(models.MockChroot.os_version.asc()) 177 .order_by(models.MockChroot.arch.asc()))
178 179 @classmethod
180 - def get_playground(cls):
181 return cls.get_all().filter(models.Copr.playground == True)
182 183 @classmethod
184 - def set_playground(cls, user, copr):
185 if user.admin: 186 db.session.add(copr) 187 pass 188 else: 189 raise exceptions.InsufficientRightsException( 190 "User is not a system admin")
191 192 @classmethod
193 - def get_multiple_fulltext(cls, search_string):
194 query = (models.Copr.query.order_by(desc(models.Copr.created_on)) 195 .join(models.User) 196 .filter(models.Copr.deleted == False)) 197 if "/" in search_string: # copr search by its full name 198 if search_string[0] == '@': # searching for @group/project 199 group_name = "%{}%".format(search_string.split("/")[0][1:]) 200 project = "%{}%".format(search_string.split("/")[1]) 201 query = query.filter(and_(models.Group.name.ilike(group_name), 202 models.Copr.name.ilike(project), 203 models.Group.id == models.Copr.group_id)) 204 query = query.order_by(asc(func.length(models.Group.name)+func.length(models.Copr.name))) 205 else: # searching for user/project 206 user_name = "%{}%".format(search_string.split("/")[0]) 207 project = "%{}%".format(search_string.split("/")[1]) 208 query = query.filter(and_(models.User.username.ilike(user_name), 209 models.Copr.name.ilike(project), 210 models.User.id == models.Copr.user_id)) 211 query = query.order_by(asc(func.length(models.User.username)+func.length(models.Copr.name))) 212 else: # fulltext search 213 query = query.whooshee_search(search_string, whoosheer=CoprWhoosheer, order_by_relevance=100) 214 return query
215 216 @classmethod
217 - def add(cls, user, name, selected_chroots, repos=None, description=None, 218 instructions=None, check_for_duplicates=False, group=None, persistent=False, 219 auto_prune=True, use_bootstrap_container=False, follow_fedora_branching=False, **kwargs):
220 221 if not user.admin and persistent: 222 raise exceptions.NonAdminCannotCreatePersistentProject() 223 224 if not user.admin and not auto_prune: 225 raise exceptions.NonAdminCannotDisableAutoPrunning() 226 227 # form validation checks for duplicates 228 cls.new(user, name, group, check_for_duplicates=check_for_duplicates) 229 230 copr = models.Copr(name=name, 231 repos=repos or u"", 232 user=user, 233 description=description or u"", 234 instructions=instructions or u"", 235 created_on=int(time.time()), 236 persistent=persistent, 237 auto_prune=auto_prune, 238 use_bootstrap_container=use_bootstrap_container, 239 follow_fedora_branching=follow_fedora_branching, 240 **kwargs) 241 242 243 if group is not None: 244 UsersLogic.raise_if_not_in_group(user, group) 245 copr.group = group 246 247 copr_dir = models.CoprDir( 248 main=True, 249 name=name, 250 copr=copr) 251 252 db.session.add(copr_dir) 253 db.session.add(copr) 254 255 CoprChrootsLogic.new_from_names( 256 copr, selected_chroots) 257 258 db.session.flush() 259 ActionsLogic.send_create_gpg_key(copr) 260 261 return copr
262 263 @classmethod
264 - def new(cls, user, copr_name, group=None, check_for_duplicates=True):
265 if check_for_duplicates: 266 if group is None and cls.exists_for_user(user, copr_name).all(): 267 raise exceptions.DuplicateException( 268 "Copr: '{0}/{1}' already exists".format(user.name, copr_name)) 269 elif group: 270 if cls.exists_for_group(group, copr_name).all(): 271 db.session.rollback() 272 raise exceptions.DuplicateException( 273 "Copr: '@{0}/{1}' already exists".format(group.name, copr_name))
274 275 @classmethod
276 - def update(cls, user, copr):
277 # we should call get_history before other requests, otherwise 278 # the changes would be forgotten 279 if get_history(copr, "name").has_changes(): 280 raise MalformedArgumentException("Change name of the project is forbidden") 281 282 users_logic.UsersLogic.raise_if_cant_update_copr( 283 user, copr, "Only owners and admins may update their projects.") 284 285 if not user.admin and not copr.auto_prune: 286 raise exceptions.NonAdminCannotDisableAutoPrunning() 287 288 db.session.add(copr)
289 290 @classmethod
291 - def delete_unsafe(cls, user, copr):
292 """ 293 Deletes copr without termination of ongoing builds. 294 """ 295 cls.raise_if_cant_delete(user, copr) 296 # TODO: do we want to dump the information somewhere, so that we can 297 # search it in future? 298 cls.raise_if_unfinished_blocking_action( 299 copr, "Can't delete this project," 300 " another operation is in progress: {action}") 301 302 ActionsLogic.send_delete_copr(copr) 303 CoprDirsLogic.delete_all_by_copr(copr) 304 305 copr.deleted = True 306 return copr
307 308 @classmethod
309 - def exists_for_user(cls, user, coprname, incl_deleted=False):
310 existing = (models.Copr.query 311 .order_by(desc(models.Copr.created_on)) 312 .filter(models.Copr.name == coprname) 313 .filter(models.Copr.user_id == user.id)) 314 315 if not incl_deleted: 316 existing = existing.filter(models.Copr.deleted == False) 317 318 return cls.filter_without_group_projects(existing)
319 320 @classmethod
321 - def exists_for_group(cls, group, coprname, incl_deleted=False):
322 existing = (models.Copr.query 323 .order_by(desc(models.Copr.created_on)) 324 .filter(models.Copr.name == coprname) 325 .filter(models.Copr.group_id == group.id)) 326 327 if not incl_deleted: 328 existing = existing.filter(models.Copr.deleted == False) 329 330 return existing
331 332 @classmethod
333 - def unfinished_blocking_actions_for(cls, copr):
334 blocking_actions = [ActionTypeEnum("delete")] 335 336 actions = (models.Action.query 337 .filter(models.Action.object_type == "copr") 338 .filter(models.Action.object_id == copr.id) 339 .filter(models.Action.result == 340 BackendResultEnum("waiting")) 341 .filter(models.Action.action_type.in_(blocking_actions))) 342 343 return actions
344 345 @classmethod
346 - def get_yum_repos(cls, copr, empty=False):
347 repos = {} 348 release_tmpl = "{chroot.os_release}-{chroot.os_version}-{chroot.arch}" 349 build = models.Build.query.filter(models.Build.copr_id == copr.id).first() 350 if build or empty: 351 for chroot in copr.active_chroots: 352 release = release_tmpl.format(chroot=chroot) 353 repos[release] = fix_protocol_for_backend( 354 os.path.join(copr.repo_url, release + '/')) 355 return repos
356 357 @classmethod
358 - def raise_if_unfinished_blocking_action(cls, copr, message):
359 """ 360 Raise ActionInProgressException if given copr has an unfinished 361 action. Return None otherwise. 362 """ 363 364 unfinished_actions = cls.unfinished_blocking_actions_for(copr).all() 365 if unfinished_actions: 366 raise exceptions.ActionInProgressException( 367 message, unfinished_actions[0])
368 369 @classmethod
370 - def raise_if_cant_delete(cls, user, copr):
371 """ 372 Raise InsufficientRightsException if given copr cant be deleted 373 by given user. Return None otherwise. 374 """ 375 if user.admin: 376 return 377 378 if copr.group: 379 return UsersLogic.raise_if_not_in_group(user, copr.group) 380 381 if user == copr.user: 382 return 383 384 raise exceptions.InsufficientRightsException( 385 "Only owners may delete their projects.")
386
387 388 -class CoprPermissionsLogic(object):
389 @classmethod
390 - def get(cls, copr, searched_user):
391 query = (models.CoprPermission.query 392 .filter(models.CoprPermission.copr == copr) 393 .filter(models.CoprPermission.user == searched_user)) 394 395 return query
396 397 @classmethod
398 - def get_for_copr(cls, copr):
399 query = models.CoprPermission.query.filter( 400 models.CoprPermission.copr == copr) 401 402 return query
403 404 @classmethod
405 - def get_admins_for_copr(cls, copr):
406 permissions = cls.get_for_copr(copr) 407 return [copr.user] + [p.user for p in permissions if p.copr_admin == helpers.PermissionEnum("approved")]
408 409 @classmethod
410 - def new(cls, copr_permission):
411 db.session.add(copr_permission)
412 413 @classmethod
414 - def update_permissions(cls, user, copr, copr_permission, 415 new_builder, new_admin):
416 417 users_logic.UsersLogic.raise_if_cant_update_copr( 418 user, copr, "Only owners and admins may update" 419 " their projects permissions.") 420 421 (models.CoprPermission.query 422 .filter(models.CoprPermission.copr_id == copr.id) 423 .filter(models.CoprPermission.user_id == copr_permission.user_id) 424 .update({"copr_builder": new_builder, 425 "copr_admin": new_admin}))
426 427 @classmethod
428 - def update_permissions_by_applier(cls, user, copr, copr_permission, new_builder, new_admin):
429 if copr_permission: 430 # preserve approved permissions if set 431 if (not new_builder or 432 copr_permission.copr_builder != helpers.PermissionEnum("approved")): 433 434 copr_permission.copr_builder = new_builder 435 436 if (not new_admin or 437 copr_permission.copr_admin != helpers.PermissionEnum("approved")): 438 439 copr_permission.copr_admin = new_admin 440 else: 441 perm = models.CoprPermission( 442 user=user, 443 copr=copr, 444 copr_builder=new_builder, 445 copr_admin=new_admin) 446 447 cls.new(perm)
448 449 @classmethod
450 - def delete(cls, copr_permission):
451 db.session.delete(copr_permission)
452 453 @classmethod
454 - def validate_permission(cls, user, copr, permission, state):
455 allowed = ['admin', 'builder'] 456 if permission not in allowed: 457 raise BadRequest( 458 "invalid permission '{0}', allowed {1}".format(permission, 459 '|'.join(allowed))) 460 461 allowed = helpers.PermissionEnum.vals.keys() 462 if state not in allowed: 463 raise BadRequest( 464 "invalid '{0}' permission state '{1}', " 465 "use {2}".format(permission, state, '|'.join(allowed))) 466 467 if user.id == copr.user_id: 468 raise BadRequest("user '{0}' is owner of the '{1}' " 469 "project".format(user.name, copr.full_name))
470 471 @classmethod
472 - def set_permissions(cls, request_user, copr, user, permission, state):
473 users_logic.UsersLogic.raise_if_cant_update_copr( 474 request_user, copr, 475 "only owners and admins may update their projects permissions.") 476 477 cls.validate_permission(user, copr, permission, state) 478 479 perm_o = models.CoprPermission(user_id=user.id, copr_id=copr.id) 480 perm_o = db.session.merge(perm_o) 481 old_state = perm_o.get_permission(permission) 482 483 new_state = helpers.PermissionEnum(state) 484 perm_o.set_permission(permission, new_state) 485 db.session.merge(perm_o) 486 487 return (old_state, new_state) if old_state != new_state else None
488 489 @classmethod
490 - def request_permission(cls, copr, user, permission, req_bool):
491 approved = helpers.PermissionEnum('approved') 492 state = None 493 if req_bool is True: 494 state = 'request' 495 elif req_bool is False: 496 state = 'nothing' 497 else: 498 raise BadRequest("invalid '{0}' permission request '{1}', " 499 "expected True or False".format(permission, 500 req_bool)) 501 502 cls.validate_permission(user, copr, permission, state) 503 perm_o = models.CoprPermission(user_id=user.id, copr_id=copr.id) 504 perm_o = db.session.merge(perm_o) 505 old_state = perm_o.get_permission(permission) 506 if old_state == approved and state == 'request': 507 raise BadRequest("You already are '{0}' in '{1}'".format( 508 permission, copr.full_name)) 509 510 new_state = helpers.PermissionEnum(state) 511 perm_o.set_permission(permission, new_state) 512 513 if old_state != new_state: 514 return (old_state, new_state) 515 return None
516
517 518 -class CoprDirsLogic(object):
519 @classmethod
520 - def get_or_create(cls, copr, dirname, main=False):
521 copr_dir = cls.get_by_copr(copr, dirname).first() 522 523 if copr_dir: 524 return copr_dir 525 526 copr_dir = models.CoprDir( 527 name=dirname, copr=copr, main=main) 528 529 ActionsLogic.send_createrepo(copr, dirnames=[dirname]) 530 531 db.session.add(copr_dir) 532 return copr_dir
533 534 @classmethod
535 - def get_by_copr(cls, copr, dirname):
536 return (db.session.query(models.CoprDir) 537 .join(models.Copr) 538 .filter(models.Copr.id==copr.id) 539 .filter(models.CoprDir.name==dirname))
540 541 @classmethod
542 - def get_by_ownername(cls, ownername, dirname):
543 return (db.session.query(models.CoprDir) 544 .filter(models.CoprDir.name==dirname) 545 .filter(models.CoprDir.ownername==ownername))
546 547 @classmethod
548 - def delete(cls, copr_dir):
549 db.session.delete(copr_dir)
550 551 @classmethod
552 - def delete_all_by_copr(cls, copr):
553 for copr_dir in copr.dirs: 554 db.session.delete(copr_dir)
555 556 557 @listens_for(models.Copr.auto_createrepo, 'set')
558 -def on_auto_createrepo_change(target_copr, value_acr, old_value_acr, initiator):
559 """ Emit createrepo action when auto_createrepo re-enabled""" 560 if old_value_acr == NEVER_SET: 561 # created new copr, not interesting 562 return 563 if not old_value_acr and value_acr: 564 # re-enabled 565 ActionsLogic.send_createrepo(target_copr)
566
567 568 -class BranchesLogic(object):
569 @classmethod
570 - def get_or_create(cls, name, session=db.session):
571 item = session.query(models.DistGitBranch).filter_by(name=name).first() 572 if item: 573 return item 574 575 branch = models.DistGitBranch() 576 branch.name = name 577 session.add(branch) 578 return branch
579
580 581 -class CoprChrootsLogic(object):
582 @classmethod
583 - def get_multiple(cls, include_deleted=False):
584 query = models.CoprChroot.query.join(models.Copr) 585 if not include_deleted: 586 query = query.filter(models.Copr.deleted.is_(False)) 587 return query
588 589 @classmethod
590 - def mock_chroots_from_names(cls, names):
591 592 db_chroots = models.MockChroot.query.all() 593 mock_chroots = [] 594 for ch in db_chroots: 595 if ch.name in names: 596 mock_chroots.append(ch) 597 598 return mock_chroots
599 600 @classmethod
601 - def get_by_name(cls, copr, chroot_name, active_only=True):
602 mc = MockChrootsLogic.get_from_name(chroot_name, active_only=active_only).one() 603 query = ( 604 models.CoprChroot.query.join(models.MockChroot) 605 .filter(models.CoprChroot.copr_id == copr.id) 606 .filter(models.MockChroot.id == mc.id) 607 ) 608 return query
609 610 @classmethod
611 - def get_by_name_safe(cls, copr, chroot_name):
612 """ 613 :rtype: models.CoprChroot 614 """ 615 try: 616 return cls.get_by_name(copr, chroot_name).one() 617 except NoResultFound: 618 return None
619 620 @classmethod
621 - def new(cls, mock_chroot):
622 db.session.add(mock_chroot)
623 624 @classmethod
625 - def new_from_names(cls, copr, names):
631 632 @classmethod
633 - def create_chroot(cls, user, copr, mock_chroot, buildroot_pkgs=None, repos=None, comps=None, comps_name=None, 634 with_opts="", without_opts="", 635 delete_after=None, delete_notify=None, module_toggle=""):
636 """ 637 :type user: models.User 638 :type mock_chroot: models.MockChroot 639 """ 640 if buildroot_pkgs is None: 641 buildroot_pkgs = "" 642 if repos is None: 643 repos = "" 644 UsersLogic.raise_if_cant_update_copr( 645 user, copr, 646 "Only owners and admins may update their projects.") 647 648 chroot = models.CoprChroot(copr=copr, mock_chroot=mock_chroot) 649 cls._update_chroot(buildroot_pkgs, repos, comps, comps_name, chroot, 650 with_opts, without_opts, delete_after, delete_notify, module_toggle) 651 return chroot
652 653 @classmethod
654 - def update_chroot(cls, user, copr_chroot, buildroot_pkgs=None, repos=None, comps=None, comps_name=None, 655 with_opts="", without_opts="", delete_after=None, delete_notify=None, module_toggle=""):
656 """ 657 :type user: models.User 658 :type copr_chroot: models.CoprChroot 659 """ 660 UsersLogic.raise_if_cant_update_copr( 661 user, copr_chroot.copr, 662 "Only owners and admins may update their projects.") 663 664 cls._update_chroot(buildroot_pkgs, repos, comps, comps_name, 665 copr_chroot, with_opts, without_opts, delete_after, delete_notify, module_toggle) 666 return copr_chroot
667 668 @classmethod
669 - def _update_chroot(cls, buildroot_pkgs, repos, comps, comps_name, 670 copr_chroot, with_opts, without_opts, delete_after, delete_notify, module_toggle):
671 if buildroot_pkgs is not None: 672 copr_chroot.buildroot_pkgs = buildroot_pkgs 673 674 if repos is not None: 675 copr_chroot.repos = repos.replace("\n", " ") 676 677 if with_opts is not None: 678 copr_chroot.with_opts = with_opts 679 680 if without_opts is not None: 681 copr_chroot.without_opts = without_opts 682 683 if comps_name is not None: 684 copr_chroot.update_comps(comps) 685 copr_chroot.comps_name = comps_name 686 ActionsLogic.send_update_comps(copr_chroot) 687 688 if delete_after is not None: 689 copr_chroot.delete_after = delete_after 690 691 if delete_notify is not None: 692 copr_chroot.delete_notify = delete_notify 693 694 if module_toggle is not None: 695 copr_chroot.module_toggle = module_toggle 696 697 db.session.add(copr_chroot)
698 699 @classmethod
700 - def update_from_names(cls, user, copr, names):
701 UsersLogic.raise_if_cant_update_copr( 702 user, copr, 703 "Only owners and admins may update their projects.") 704 current_chroots = copr.mock_chroots 705 new_chroots = cls.mock_chroots_from_names(names) 706 # add non-existing 707 run_createrepo = False 708 for mock_chroot in new_chroots: 709 if mock_chroot not in current_chroots: 710 db.session.add( 711 models.CoprChroot(copr=copr, mock_chroot=mock_chroot)) 712 run_createrepo = True 713 714 if run_createrepo: 715 ActionsLogic.send_createrepo(copr) 716 717 # delete no more present 718 to_remove = [] 719 for mock_chroot in current_chroots: 720 if mock_chroot in new_chroots: 721 continue 722 if not mock_chroot.is_active: 723 continue 724 # can't delete here, it would change current_chroots and break 725 # iteration 726 to_remove.append(mock_chroot) 727 728 for mc in to_remove: 729 copr.mock_chroots.remove(mc)
730 731 @classmethod
732 - def remove_comps(cls, user, copr_chroot):
733 UsersLogic.raise_if_cant_update_copr( 734 user, copr_chroot.copr, 735 "Only owners and admins may update their projects.") 736 737 copr_chroot.comps_name = None 738 copr_chroot.comps_zlib = None 739 ActionsLogic.send_update_comps(copr_chroot) 740 db.session.add(copr_chroot)
741 742 @classmethod
743 - def remove_copr_chroot(cls, user, copr_chroot):
744 """ 745 :param models.CoprChroot chroot: 746 """ 747 UsersLogic.raise_if_cant_update_copr( 748 user, copr_chroot.copr, 749 "Only owners and admins may update their projects.") 750 751 db.session.delete(copr_chroot)
752 753 @classmethod
754 - def filter_outdated(cls, query):
755 return query.filter(models.CoprChroot.delete_after >= datetime.datetime.now())
756 757 @classmethod
758 - def filter_outdated_to_be_deleted(cls, query):
759 return query.filter(models.CoprChroot.delete_after < datetime.datetime.now())
760
761 762 -class MockChrootsLogic(object):
763 @classmethod
764 - def get(cls, os_release, os_version, arch, active_only=False, noarch=False):
774 775 @classmethod
776 - def get_from_name(cls, chroot_name, active_only=False, noarch=False):
777 """ 778 chroot_name should be os-version-architecture, e.g. fedora-rawhide-x86_64 779 the architecture could be optional with noarch=True 780 781 Return MockChroot object for textual representation of chroot 782 """ 783 784 name_tuple = cls.tuple_from_name(chroot_name, noarch=noarch) 785 return cls.get(name_tuple[0], name_tuple[1], name_tuple[2], 786 active_only=active_only, noarch=noarch)
787 788 @classmethod
789 - def get_multiple(cls, active_only=False):
790 query = models.MockChroot.query 791 if active_only: 792 query = query.filter(models.MockChroot.is_active == True) 793 return query
794 795 @classmethod
796 - def add(cls, name):
797 name_tuple = cls.tuple_from_name(name) 798 if cls.get(*name_tuple).first(): 799 raise exceptions.DuplicateException( 800 "Mock chroot with this name already exists.") 801 new_chroot = models.MockChroot(os_release=name_tuple[0], 802 os_version=name_tuple[1], 803 arch=name_tuple[2]) 804 cls.new(new_chroot) 805 return new_chroot
806 807 @classmethod
808 - def active_names(cls):
809 return [ch.name for ch in cls.get_multiple(active_only=True).all()]
810 811 @classmethod
813 return [(ch.name, ch.comment) for ch in cls.get_multiple(active_only=True).all()]
814 815 @classmethod
816 - def new(cls, mock_chroot):
817 db.session.add(mock_chroot)
818 819 @classmethod
820 - def edit_by_name(cls, name, is_active):
821 name_tuple = cls.tuple_from_name(name) 822 mock_chroot = cls.get(*name_tuple).first() 823 if not mock_chroot: 824 raise exceptions.NotFoundException( 825 "Mock chroot with this name doesn't exist.") 826 827 mock_chroot.is_active = is_active 828 cls.update(mock_chroot) 829 return mock_chroot
830 831 @classmethod
832 - def update(cls, mock_chroot):
833 db.session.add(mock_chroot)
834 835 @classmethod
836 - def delete_by_name(cls, name):
837 name_tuple = cls.tuple_from_name(name) 838 mock_chroot = cls.get(*name_tuple).first() 839 if not mock_chroot: 840 raise exceptions.NotFoundException( 841 "Mock chroot with this name doesn't exist.") 842 843 cls.delete(mock_chroot)
844 845 @classmethod
846 - def delete(cls, mock_chroot):
847 db.session.delete(mock_chroot)
848 849 @classmethod
850 - def tuple_from_name(cls, name, noarch=False):
851 """ 852 input should be os-version-architecture, e.g. fedora-rawhide-x86_64 853 854 the architecture could be optional with noarch=True 855 856 returns ("os", "version", "arch") or ("os", "version", None) 857 """ 858 split_name = name.rsplit("-", 1) if noarch else name.rsplit("-", 2) 859 860 valid = False 861 if noarch and len(split_name) in [2, 3]: 862 valid = True 863 if not noarch and len(split_name) == 3: 864 valid = True 865 866 if not valid: 867 raise MalformedArgumentException("Chroot identification is not valid") 868 869 if noarch and len(split_name) == 2: 870 split_name.append(None) 871 872 return tuple(split_name)
873 874 @classmethod
875 - def prunerepo_finished(cls, chroots_pruned):
876 for chroot_name in chroots_pruned: 877 chroot = cls.get_from_name(chroot_name).one() 878 if not chroot.is_active: 879 chroot.final_prunerepo_done = True 880 881 db.session.commit() 882 return True
883 884 @classmethod
886 query = models.MockChroot.query 887 chroots = {} 888 for chroot in query: 889 if chroot.is_active or not chroot.final_prunerepo_done: 890 chroots[chroot.name] = chroot.is_active 891 892 return chroots
893
894 895 -class PinnedCoprsLogic(object):
896 897 @classmethod
898 - def get_all(cls):
899 return db.session.query(models.PinnedCoprs).order_by(models.PinnedCoprs.position)
900 901 @classmethod
902 - def get_by_id(cls, pin_id):
903 return cls.get_all().filter(models.PinnedCoprs.id == pin_id)
904 905 @classmethod
906 - def get_by_owner(cls, owner):
907 if isinstance(owner, models.Group): 908 return cls.get_by_group_id(owner.id) 909 return cls.get_by_user_id(owner.id)
910 911 @classmethod
912 - def get_by_user_id(cls, user_id):
914 915 @classmethod
916 - def get_by_group_id(cls, group_id):
918 919 @classmethod
920 - def add(cls, owner, copr_id, position):
921 kwargs = dict(copr_id=copr_id, position=position) 922 kwargs["group_id" if isinstance(owner, models.Group) else "user_id"] = owner.id 923 pin = models.PinnedCoprs(**kwargs) 924 db.session.add(pin)
925 926 @classmethod
927 - def delete_by_owner(cls, owner):
928 query = db.session.query(models.PinnedCoprs) 929 if isinstance(owner, models.Group): 930 return query.filter(models.PinnedCoprs.group_id == owner.id).delete() 931 return query.filter(models.PinnedCoprs.user_id == owner.id).delete()
932 933 @classmethod
934 - def delete_by_copr(cls, copr):
935 return (db.session.query(models.PinnedCoprs) 936 .filter(models.PinnedCoprs.copr_id == copr.id) 937 .delete())
938