Trees | Indices | Help |
---|
|
1 import os 2 import time 3 import datetime 4 import flask 5 6 from sqlalchemy import and_ 7 from sqlalchemy.sql import func 8 from sqlalchemy import asc, desc 9 from sqlalchemy.event import listens_for 10 from sqlalchemy.orm.attributes import NEVER_SET 11 from sqlalchemy.orm.exc import NoResultFound 12 from sqlalchemy.orm.attributes import get_history 13 14 from copr_common.enums import ActionTypeEnum, BackendResultEnum, ActionPriorityEnum 15 from coprs import db 16 from coprs import exceptions 17 from coprs import helpers 18 from coprs import models 19 from coprs import logic 20 from coprs.exceptions import MalformedArgumentException, BadRequest 21 from coprs.logic import users_logic 22 from coprs.whoosheers import CoprWhoosheer 23 from coprs.helpers import fix_protocol_for_backend 24 25 from coprs.logic.actions_logic import ActionsLogic 26 from coprs.logic.users_logic import UsersLogic30 """ 31 Used for manipulating Coprs. 32 33 All methods accept user object as a first argument, 34 as this may be needed in future. 35 """ 36 37 @classmethod39439 """ Return all coprs without those which are deleted. """ 40 query = (db.session.query(models.Copr) 41 .join(models.Copr.user) 42 .options(db.contains_eager(models.Copr.user)) 43 .filter(models.Copr.deleted == False)) 44 return query45 46 @classmethod 49 50 @classmethod52 query = (query.outerjoin(models.Copr.builds) 53 .options(db.contains_eager(models.Copr.builds)) 54 .order_by(models.Build.submitted_on.desc())) 55 return query56 57 @classmethod59 query = (query.outerjoin(*models.Copr.mock_chroots.attr) 60 .options(db.contains_eager(*models.Copr.mock_chroots.attr)) 61 .order_by(models.MockChroot.os_release.asc()) 62 .order_by(models.MockChroot.os_version.asc()) 63 .order_by(models.MockChroot.arch.asc())) 64 return query65 66 @classmethod68 with_builds = kwargs.get("with_builds", False) 69 with_mock_chroots = kwargs.get("with_mock_chroots", False) 70 71 query = ( 72 cls.get_all() 73 .filter(models.User.username == username) 74 ) 75 76 if with_builds: 77 query = cls.attach_build(query) 78 79 if with_mock_chroots: 80 query = cls.attach_mock_chroots(query) 81 82 return query83 84 @classmethod86 with_builds = kwargs.get("with_builds", False) 87 with_mock_chroots = kwargs.get("with_mock_chroots", False) 88 89 query = ( 90 cls.get_all() 91 .filter(models.Copr.group_id == group_id) 92 ) 93 94 if with_builds: 95 query = cls.attach_build(query) 96 97 if with_mock_chroots: 98 query = cls.attach_mock_chroots(query) 99 100 return query101 102 @classmethod104 query = cls.get_multiple_by_username(username, **kwargs) 105 query = query.filter(models.Copr.name == coprname) 106 return query107 108 @classmethod110 query = cls.get_multiple_by_group_id(group_id, **kwargs) 111 query = query.filter(models.Copr.name == coprname) 112 return query113 114 @classmethod116 query = ( 117 db.session.query(models.Copr) 118 .join(models.Copr.user) 119 .outerjoin(models.Group) 120 .options(db.contains_eager(models.Copr.user)) 121 ) 122 123 if not include_deleted: 124 query = query.filter(models.Copr.deleted.is_(False)) 125 126 if not include_unlisted_on_hp: 127 query = query.filter(models.Copr.unlisted_on_hp.is_(False)) 128 129 return query130 131 @classmethod133 if desc: 134 query = query.order_by(models.Copr.id.desc()) 135 else: 136 query = query.order_by(models.Copr.id.asc()) 137 return query138 139 # user_relation="owned", username=username, with_mock_chroots=False 140 @classmethod142 query = cls.get_multiple(include_unlisted_on_hp=include_unlisted_on_hp) 143 return query.filter(models.User.username == username)144 145 @classmethod 148 149 @classmethod151 # should be already joined with the User table 152 return query.filter(models.User.username == username)153 154 @classmethod156 # should be already joined with the Group table 157 return query.filter(models.Group.name == group_name)158 159 @classmethod 162 163 @classmethod 166 167 @classmethod169 return (query.outerjoin(models.Copr.builds) 170 .options(db.contains_eager(models.Copr.builds)) 171 .order_by(models.Build.submitted_on.desc()))172 173 @classmethod175 return (query.outerjoin(*models.Copr.mock_chroots.attr) 176 .options(db.contains_eager(*models.Copr.mock_chroots.attr)) 177 .order_by(models.MockChroot.os_release.asc()) 178 .order_by(models.MockChroot.os_version.asc()) 179 .order_by(models.MockChroot.arch.asc()))180 181 @classmethod 184 185 @classmethod187 if user.admin: 188 db.session.add(copr) 189 pass 190 else: 191 raise exceptions.InsufficientRightsException( 192 "User is not a system admin")193 194 @classmethod196 query = (models.Copr.query.order_by(desc(models.Copr.created_on)) 197 .join(models.User) 198 .filter(models.Copr.deleted == False)) 199 if "/" in search_string: # copr search by its full name 200 if search_string[0] == '@': # searching for @group/project 201 group_name = "%{}%".format(search_string.split("/")[0][1:]) 202 project = "%{}%".format(search_string.split("/")[1]) 203 query = query.filter(and_(models.Group.name.ilike(group_name), 204 models.Copr.name.ilike(project), 205 models.Group.id == models.Copr.group_id)) 206 query = query.order_by(asc(func.length(models.Group.name)+func.length(models.Copr.name))) 207 else: # searching for user/project 208 user_name = "%{}%".format(search_string.split("/")[0]) 209 project = "%{}%".format(search_string.split("/")[1]) 210 query = query.filter(and_(models.User.username.ilike(user_name), 211 models.Copr.name.ilike(project), 212 models.User.id == models.Copr.user_id)) 213 query = query.order_by(asc(func.length(models.User.username)+func.length(models.Copr.name))) 214 else: # fulltext search 215 query = query.whooshee_search(search_string, whoosheer=CoprWhoosheer, order_by_relevance=100) 216 return query217 218 @classmethod219 - def add(cls, user, name, selected_chroots, repos=None, description=None, 220 instructions=None, check_for_duplicates=False, group=None, persistent=False, 221 auto_prune=True, bootstrap=None, follow_fedora_branching=False, 222 **kwargs):223 224 if not flask.g.user.admin and flask.g.user != user: 225 msg = ("You were authorized as '{0}' user without permissions to access " 226 "projects of user '{1}'".format(flask.g.user.name, user.name)) 227 raise exceptions.AccessRestricted(msg) 228 229 if not flask.g.user.admin and persistent: 230 raise exceptions.NonAdminCannotCreatePersistentProject() 231 232 if not flask.g.user.admin and not auto_prune: 233 raise exceptions.NonAdminCannotDisableAutoPrunning() 234 235 # form validation checks for duplicates 236 cls.new(user, name, group, check_for_duplicates=check_for_duplicates) 237 238 copr = models.Copr(name=name, 239 repos=repos or u"", 240 user=user, 241 description=description or u"", 242 instructions=instructions or u"", 243 created_on=int(time.time()), 244 persistent=persistent, 245 auto_prune=auto_prune, 246 bootstrap=bootstrap, 247 follow_fedora_branching=follow_fedora_branching, 248 **kwargs) 249 250 251 if group is not None: 252 UsersLogic.raise_if_not_in_group(user, group) 253 copr.group = group 254 255 copr_dir = models.CoprDir( 256 main=True, 257 name=name, 258 copr=copr) 259 260 db.session.add(copr_dir) 261 db.session.add(copr) 262 263 CoprChrootsLogic.new_from_names( 264 copr, selected_chroots) 265 266 db.session.flush() 267 ActionsLogic.send_create_gpg_key(copr) 268 269 return copr270 271 @classmethod273 if check_for_duplicates: 274 if group is None and cls.exists_for_user(user, copr_name).all(): 275 raise exceptions.DuplicateException( 276 "Copr: '{0}/{1}' already exists".format(user.name, copr_name)) 277 elif group: 278 if cls.exists_for_group(group, copr_name).all(): 279 db.session.rollback() 280 raise exceptions.DuplicateException( 281 "Copr: '@{0}/{1}' already exists".format(group.name, copr_name))282 283 @classmethod285 # we should call get_history before other requests, otherwise 286 # the changes would be forgotten 287 if get_history(copr, "name").has_changes(): 288 raise MalformedArgumentException("Change name of the project is forbidden") 289 290 users_logic.UsersLogic.raise_if_cant_update_copr( 291 user, copr, "Only owners and admins may update their projects.") 292 293 if not user.admin and not copr.auto_prune: 294 raise exceptions.NonAdminCannotDisableAutoPrunning() 295 296 db.session.add(copr)297 298 @classmethod300 """ 301 Deletes copr without termination of ongoing builds. 302 """ 303 cls.raise_if_cant_delete(user, copr) 304 # TODO: do we want to dump the information somewhere, so that we can 305 # search it in future? 306 cls.raise_if_unfinished_blocking_action( 307 copr, "Can't delete this project," 308 " another operation is in progress: {action}") 309 310 ActionsLogic.send_delete_copr(copr) 311 CoprDirsLogic.delete_all_by_copr(copr) 312 313 copr.deleted = True 314 return copr315 316 @classmethod318 existing = (models.Copr.query 319 .order_by(desc(models.Copr.created_on)) 320 .filter(models.Copr.name == coprname) 321 .filter(models.Copr.user_id == user.id)) 322 323 if not incl_deleted: 324 existing = existing.filter(models.Copr.deleted == False) 325 326 return cls.filter_without_group_projects(existing)327 328 @classmethod330 existing = (models.Copr.query 331 .order_by(desc(models.Copr.created_on)) 332 .filter(models.Copr.name == coprname) 333 .filter(models.Copr.group_id == group.id)) 334 335 if not incl_deleted: 336 existing = existing.filter(models.Copr.deleted == False) 337 338 return existing339 340 @classmethod342 blocking_actions = [ActionTypeEnum("delete")] 343 344 actions = (models.Action.query 345 .filter(models.Action.object_type == "copr") 346 .filter(models.Action.object_id == copr.id) 347 .filter(models.Action.result == 348 BackendResultEnum("waiting")) 349 .filter(models.Action.action_type.in_(blocking_actions))) 350 351 return actions352 353 @classmethod355 repos = {} 356 release_tmpl = "{chroot.os_release}-{chroot.os_version}-{chroot.arch}" 357 build = models.Build.query.filter(models.Build.copr_id == copr.id).first() 358 if build or empty: 359 for chroot in copr.active_chroots: 360 release = release_tmpl.format(chroot=chroot) 361 repos[release] = fix_protocol_for_backend( 362 os.path.join(copr.repo_url, release + '/')) 363 return repos364 365 @classmethod367 """ 368 Raise ActionInProgressException if given copr has an unfinished 369 action. Return None otherwise. 370 """ 371 372 unfinished_actions = cls.unfinished_blocking_actions_for(copr).all() 373 if unfinished_actions: 374 raise exceptions.ActionInProgressException( 375 message, unfinished_actions[0])376 377 @classmethod379 """ 380 Raise InsufficientRightsException if given copr cant be deleted 381 by given user. Return None otherwise. 382 """ 383 if user.admin: 384 return 385 386 if copr.group: 387 return UsersLogic.raise_if_not_in_group(user, copr.group) 388 389 if user == copr.user: 390 return 391 392 raise exceptions.InsufficientRightsException( 393 "Only owners may delete their projects.")397 @classmethod524399 query = (models.CoprPermission.query 400 .filter(models.CoprPermission.copr == copr) 401 .filter(models.CoprPermission.user == searched_user)) 402 403 return query404 405 @classmethod407 query = models.CoprPermission.query.filter( 408 models.CoprPermission.copr == copr) 409 410 return query411 412 @classmethod414 permissions = cls.get_for_copr(copr) 415 return [copr.user] + [p.user for p in permissions if p.copr_admin == helpers.PermissionEnum("approved")]416 417 @classmethod 420 421 @classmethod424 425 users_logic.UsersLogic.raise_if_cant_update_copr( 426 user, copr, "Only owners and admins may update" 427 " their projects permissions.") 428 429 (models.CoprPermission.query 430 .filter(models.CoprPermission.copr_id == copr.id) 431 .filter(models.CoprPermission.user_id == copr_permission.user_id) 432 .update({"copr_builder": new_builder, 433 "copr_admin": new_admin}))434 435 @classmethod437 if copr_permission: 438 # preserve approved permissions if set 439 if (not new_builder or 440 copr_permission.copr_builder != helpers.PermissionEnum("approved")): 441 442 copr_permission.copr_builder = new_builder 443 444 if (not new_admin or 445 copr_permission.copr_admin != helpers.PermissionEnum("approved")): 446 447 copr_permission.copr_admin = new_admin 448 else: 449 perm = models.CoprPermission( 450 user=user, 451 copr=copr, 452 copr_builder=new_builder, 453 copr_admin=new_admin) 454 455 cls.new(perm)456 457 @classmethod 460 461 @classmethod463 allowed = ['admin', 'builder'] 464 if permission not in allowed: 465 raise BadRequest( 466 "invalid permission '{0}', allowed {1}".format(permission, 467 '|'.join(allowed))) 468 469 allowed = helpers.PermissionEnum.vals.keys() 470 if state not in allowed: 471 raise BadRequest( 472 "invalid '{0}' permission state '{1}', " 473 "use {2}".format(permission, state, '|'.join(allowed))) 474 475 if user.id == copr.user_id: 476 raise BadRequest("user '{0}' is owner of the '{1}' " 477 "project".format(user.name, copr.full_name))478 479 @classmethod481 users_logic.UsersLogic.raise_if_cant_update_copr( 482 request_user, copr, 483 "only owners and admins may update their projects permissions.") 484 485 cls.validate_permission(user, copr, permission, state) 486 487 perm_o = models.CoprPermission(user_id=user.id, copr_id=copr.id) 488 perm_o = db.session.merge(perm_o) 489 old_state = perm_o.get_permission(permission) 490 491 new_state = helpers.PermissionEnum(state) 492 perm_o.set_permission(permission, new_state) 493 db.session.merge(perm_o) 494 495 return (old_state, new_state) if old_state != new_state else None496 497 @classmethod499 approved = helpers.PermissionEnum('approved') 500 state = None 501 if req_bool is True: 502 state = 'request' 503 elif req_bool is False: 504 state = 'nothing' 505 else: 506 raise BadRequest("invalid '{0}' permission request '{1}', " 507 "expected True or False".format(permission, 508 req_bool)) 509 510 cls.validate_permission(user, copr, permission, state) 511 perm_o = models.CoprPermission(user_id=user.id, copr_id=copr.id) 512 perm_o = db.session.merge(perm_o) 513 old_state = perm_o.get_permission(permission) 514 if old_state == approved and state == 'request': 515 raise BadRequest("You already are '{0}' in '{1}'".format( 516 permission, copr.full_name)) 517 518 new_state = helpers.PermissionEnum(state) 519 perm_o.set_permission(permission, new_state) 520 521 if old_state != new_state: 522 return (old_state, new_state) 523 return None527 @classmethod563 564 565 @listens_for(models.Copr.auto_createrepo, 'set')529 copr_dir = cls.get_by_copr(copr, dirname).first() 530 531 if copr_dir: 532 return copr_dir 533 534 copr_dir = models.CoprDir( 535 name=dirname, copr=copr, main=main) 536 537 ActionsLogic.send_createrepo(copr, dirnames=[dirname]) 538 539 db.session.add(copr_dir) 540 return copr_dir541 542 @classmethod544 return (db.session.query(models.CoprDir) 545 .join(models.Copr) 546 .filter(models.Copr.id==copr.id) 547 .filter(models.CoprDir.name==dirname))548 549 @classmethod551 return (db.session.query(models.CoprDir) 552 .filter(models.CoprDir.name==dirname) 553 .filter(models.CoprDir.ownername==ownername))554 555 @classmethod 558 559 @classmethod567 """ Emit createrepo action when auto_createrepo re-enabled""" 568 if old_value_acr == NEVER_SET: 569 # created new copr, not interesting 570 return 571 if not old_value_acr and value_acr: 572 # re-enabled 573 ActionsLogic.send_createrepo(target_copr)574 589592 @classmethod830594 query = models.CoprChroot.query.join(models.Copr) 595 if not include_deleted: 596 query = query.filter(models.Copr.deleted.is_(False)) 597 return query598 599 @classmethod601 """ 602 Return a list of MockChroot objects (not a query object!) which are 603 named by one of the ``names`` list. 604 """ 605 # TODO: this should be moved to MockChrootsLogic 606 db_chroots = models.MockChroot.query.all() 607 mock_chroots = [] 608 for ch in db_chroots: 609 if ch.name in names: 610 mock_chroots.append(ch) 611 612 return mock_chroots613 614 @classmethod616 """ 617 Query CoprChroot(s) in Copr with MockChroot.id 618 """ 619 return ( 620 models.CoprChroot.query 621 .filter(models.CoprChroot.copr_id == copr.id) 622 .filter(models.CoprChroot.mock_chroot_id == mock_chroot_id) 623 )624 625 @classmethod627 mc = MockChrootsLogic.get_from_name(chroot_name, active_only=active_only).one() 628 return cls.get_by_mock_chroot_id(copr, mc.id)629 630 @classmethod632 """ 633 :rtype: models.CoprChroot 634 """ 635 try: 636 return cls.get_by_name(copr, chroot_name).one() 637 except NoResultFound: 638 return None639 640 @classmethod 643 644 @classmethod646 for mock_chroot in cls.mock_chroots_from_names(names): 647 db.session.add( 648 models.CoprChroot(copr=copr, mock_chroot=mock_chroot)) 649 650 action = ActionsLogic.send_createrepo(copr) 651 action.priority = ActionPriorityEnum("highest")652 653 @classmethod654 - def create_chroot(cls, user, copr, mock_chroot, buildroot_pkgs=None, repos=None, comps=None, comps_name=None, 655 with_opts="", without_opts="", 656 delete_after=None, delete_notify=None, module_toggle="", 657 bootstrap=None, bootstrap_image=None):658 """ 659 :type user: models.User 660 :type mock_chroot: models.MockChroot 661 """ 662 if buildroot_pkgs is None: 663 buildroot_pkgs = "" 664 if repos is None: 665 repos = "" 666 UsersLogic.raise_if_cant_update_copr( 667 user, copr, 668 "Only owners and admins may update their projects.") 669 670 chroot = models.CoprChroot(copr=copr, mock_chroot=mock_chroot) 671 cls._update_chroot(buildroot_pkgs, repos, comps, comps_name, chroot, 672 with_opts, without_opts, delete_after, delete_notify, 673 module_toggle, bootstrap, bootstrap_image) 674 675 # reassign old build_chroots, if the chroot is re-created 676 get_old = logic.builds_logic.BuildChrootsLogic.by_copr_and_mock_chroot 677 for old_bch in get_old(copr, mock_chroot): 678 old_bch.copr_chroot = chroot 679 680 return chroot681 682 @classmethod683 - def update_chroot(cls, user, copr_chroot, buildroot_pkgs=None, repos=None, comps=None, comps_name=None, 684 with_opts="", without_opts="", delete_after=None, delete_notify=None, module_toggle="", 685 bootstrap=None, bootstrap_image=None):686 """ 687 :type user: models.User 688 :type copr_chroot: models.CoprChroot 689 """ 690 UsersLogic.raise_if_cant_update_copr( 691 user, copr_chroot.copr, 692 "Only owners and admins may update their projects.") 693 694 cls._update_chroot(buildroot_pkgs, repos, comps, comps_name, 695 copr_chroot, with_opts, without_opts, delete_after, delete_notify, module_toggle, 696 bootstrap, bootstrap_image) 697 return copr_chroot698 699 @classmethod700 - def _update_chroot(cls, buildroot_pkgs, repos, comps, comps_name, 701 copr_chroot, with_opts, without_opts, delete_after, delete_notify, module_toggle, 702 bootstrap, bootstrap_image):703 if buildroot_pkgs is not None: 704 copr_chroot.buildroot_pkgs = buildroot_pkgs 705 706 if repos is not None: 707 copr_chroot.repos = repos.replace("\n", " ") 708 709 if with_opts is not None: 710 copr_chroot.with_opts = with_opts 711 712 if without_opts is not None: 713 copr_chroot.without_opts = without_opts 714 715 if comps_name is not None: 716 copr_chroot.update_comps(comps) 717 copr_chroot.comps_name = comps_name 718 ActionsLogic.send_update_comps(copr_chroot) 719 720 if delete_after is not None: 721 copr_chroot.delete_after = delete_after 722 723 if delete_notify is not None: 724 copr_chroot.delete_notify = delete_notify 725 726 if module_toggle is not None: 727 copr_chroot.module_toggle = module_toggle 728 729 if bootstrap is not None: 730 copr_chroot.bootstrap = bootstrap 731 732 if bootstrap_image is not None: 733 # By CLI/API we can set custom_image, and keep bootstrap unset. In 734 # such case set also bootstrap to correct value. 735 if not bootstrap: 736 copr_chroot.bootstrap = 'custom_image' 737 copr_chroot.bootstrap_image = bootstrap_image 738 739 db.session.add(copr_chroot)740 741 @classmethod743 """ 744 Update list of CoprChroots assigned to ``copr`` from chroot ``names`` 745 array. The chroots not present in ``names`` are disabled. 746 747 :param user: The user who does the change. 748 :type user: models.User 749 """ 750 751 UsersLogic.raise_if_cant_update_copr( 752 user, copr, 753 "Only owners and admins may update their projects.") 754 755 current_copr_chroots = copr.copr_chroots 756 chroot_map = {cch.mock_chroot: cch for cch in current_copr_chroots} 757 new_mock_chroots = cls.mock_chroots_from_names(names) 758 759 # add non-existing 760 run_createrepo = False 761 for mock_chroot in new_mock_chroots: 762 if mock_chroot not in chroot_map: 763 db.session.add(CoprChrootsLogic.create_chroot( 764 user=user, 765 copr=copr, 766 mock_chroot=mock_chroot, 767 )) 768 run_createrepo = True 769 770 if run_createrepo: 771 ActionsLogic.send_createrepo(copr) 772 773 to_remove = [] 774 for mock_chroot in chroot_map: 775 if mock_chroot in new_mock_chroots: 776 continue 777 if not mock_chroot.is_active: 778 # we don't remove EOLed variants here 779 continue 780 # can't delete here, it would change current_chroots and break 781 # iteration 782 to_remove.append(mock_chroot) 783 784 running_builds = set() 785 for mc in to_remove: 786 for bch in chroot_map[mc].build_chroots: 787 if not bch.finished: 788 running_builds.add(bch.build_id) 789 continue 790 copr.mock_chroots.remove(mc) 791 792 # reject the request when some build_chroots are not yet finished 793 if running_builds: 794 raise exceptions.ConflictingRequest( 795 "Can't drop chroot from project, related " 796 "{} still in progress".format( 797 helpers.pluralize("build", list(running_builds), 798 be_suffix=True)))799 800 801 @classmethod803 UsersLogic.raise_if_cant_update_copr( 804 user, copr_chroot.copr, 805 "Only owners and admins may update their projects.") 806 807 copr_chroot.comps_name = None 808 copr_chroot.comps_zlib = None 809 ActionsLogic.send_update_comps(copr_chroot) 810 db.session.add(copr_chroot)811 812 @classmethod814 """ 815 :param models.CoprChroot chroot: 816 """ 817 UsersLogic.raise_if_cant_update_copr( 818 user, copr_chroot.copr, 819 "Only owners and admins may update their projects.") 820 821 db.session.delete(copr_chroot)822 823 @classmethod 826 827 @classmethod833 """ 834 Class for logic regarding upvoting and downvoting projects 835 """ 836 837 @classmethod866839 query = db.session.query(models.CoprScore) 840 query = query.filter(models.CoprScore.copr_id == copr.id) 841 query = query.filter(models.CoprScore.user_id == user.id) 842 return query843 844 @classmethod 847 848 @classmethod 851 852 @classmethod854 """ 855 Low-level function for giving score to projects. The `value` should be 856 a negative number for downvoting or a positive number for upvoting. 857 """ 858 score = models.CoprScore(copr_id=copr.id, user_id=flask.g.user.id, 859 score=(1 if value > 0 else -1)) 860 db.session.add(score) 861 return score862 863 @classmethod869 @classmethod999871 if noarch and not arch: 872 return (models.MockChroot.query 873 .filter(models.MockChroot.os_release == os_release, 874 models.MockChroot.os_version == os_version)) 875 876 return (models.MockChroot.query 877 .filter(models.MockChroot.os_release == os_release, 878 models.MockChroot.os_version == os_version, 879 models.MockChroot.arch == arch))880 881 @classmethod883 """ 884 chroot_name should be os-version-architecture, e.g. fedora-rawhide-x86_64 885 the architecture could be optional with noarch=True 886 887 Return MockChroot object for textual representation of chroot 888 """ 889 890 name_tuple = cls.tuple_from_name(chroot_name, noarch=noarch) 891 return cls.get(name_tuple[0], name_tuple[1], name_tuple[2], 892 active_only=active_only, noarch=noarch)893 894 @classmethod896 query = models.MockChroot.query 897 if active_only: 898 query = query.filter(models.MockChroot.is_active == True) 899 return query900 901 @classmethod903 name_tuple = cls.tuple_from_name(name) 904 if cls.get(*name_tuple).first(): 905 raise exceptions.DuplicateException( 906 "Mock chroot with this name already exists.") 907 new_chroot = models.MockChroot(os_release=name_tuple[0], 908 os_version=name_tuple[1], 909 arch=name_tuple[2]) 910 cls.new(new_chroot) 911 return new_chroot912 913 @classmethod 916 917 @classmethod 920 921 @classmethod 924 925 @classmethod927 name_tuple = cls.tuple_from_name(name) 928 mock_chroot = cls.get(*name_tuple).first() 929 if not mock_chroot: 930 raise exceptions.NotFoundException( 931 "Mock chroot with this name doesn't exist.") 932 933 mock_chroot.is_active = is_active 934 cls.update(mock_chroot) 935 return mock_chroot936 937 @classmethod 940 941 @classmethod943 name_tuple = cls.tuple_from_name(name) 944 mock_chroot = cls.get(*name_tuple).first() 945 if not mock_chroot: 946 raise exceptions.NotFoundException( 947 "Mock chroot with this name doesn't exist.") 948 949 cls.delete(mock_chroot)950 951 @classmethod 954 955 @classmethod957 """ 958 input should be os-version-architecture, e.g. fedora-rawhide-x86_64 959 960 the architecture could be optional with noarch=True 961 962 returns ("os", "version", "arch") or ("os", "version", None) 963 """ 964 split_name = name.rsplit("-", 1) if noarch else name.rsplit("-", 2) 965 966 valid = False 967 if noarch and len(split_name) in [2, 3]: 968 valid = True 969 if not noarch and len(split_name) == 3: 970 valid = True 971 972 if not valid: 973 raise MalformedArgumentException("Chroot identification is not valid") 974 975 if noarch and len(split_name) == 2: 976 split_name.append(None) 977 978 return tuple(split_name)979 980 @classmethod982 for chroot_name in chroots_pruned: 983 chroot = cls.get_from_name(chroot_name).one() 984 if not chroot.is_active: 985 chroot.final_prunerepo_done = True 986 987 db.session.commit() 988 return True989 990 @classmethod1002 1003 @classmethod 1006 1007 @classmethod 1010 1011 @classmethod10441013 if isinstance(owner, models.Group): 1014 return cls.get_by_group_id(owner.id) 1015 return cls.get_by_user_id(owner.id)1016 1017 @classmethod 1020 1021 @classmethod 1024 1025 @classmethod1027 kwargs = dict(copr_id=copr_id, position=position) 1028 kwargs["group_id" if isinstance(owner, models.Group) else "user_id"] = owner.id 1029 pin = models.PinnedCoprs(**kwargs) 1030 db.session.add(pin)1031 1032 @classmethod1034 query = db.session.query(models.PinnedCoprs) 1035 if isinstance(owner, models.Group): 1036 return query.filter(models.PinnedCoprs.group_id == owner.id).delete() 1037 return query.filter(models.PinnedCoprs.user_id == owner.id).delete()1038 1039 @classmethod1041 return (db.session.query(models.PinnedCoprs) 1042 .filter(models.PinnedCoprs.copr_id == copr.id) 1043 .delete())
Trees | Indices | Help |
---|
Generated by Epydoc 3.0.1 | http://epydoc.sourceforge.net |