Trees | Indices | Help |
---|
|
1 import time 2 3 from coprs import db 4 from coprs import exceptions 5 from coprs import helpers 6 from coprs import models 7 from coprs import signals 8 from coprs import whoosheers 9 from coprs.logic import users_logic12 """Used for manipulating Coprs. All methods accept user object as a first argument, as this may be needed in future.""" 13 14 @classmethod19116 with_builds = kwargs.get('with_builds', False) 17 with_mock_chroots = kwargs.get('with_mock_chroots', False) 18 19 query = db.session.query(models.Copr).\ 20 join(models.Copr.owner).\ 21 options(db.contains_eager(models.Copr.owner)).\ 22 filter(models.Copr.name == coprname).\ 23 filter(models.User.openid_name == models.User.openidize_name(username)).\ 24 filter(models.Copr.deleted==False) 25 26 if with_builds: 27 query = query.outerjoin(models.Copr.builds).\ 28 options(db.contains_eager(models.Copr.builds)).\ 29 order_by(models.Build.submitted_on.desc()) 30 if with_mock_chroots: 31 query = query.outerjoin(*models.Copr.mock_chroots.attr).\ 32 options(db.contains_eager(*models.Copr.mock_chroots.attr)).\ 33 order_by(models.MockChroot.os_release.asc()).\ 34 order_by(models.MockChroot.os_version.asc()).\ 35 order_by(models.MockChroot.arch.asc()) 36 37 return query38 39 @classmethod41 user_relation = kwargs.get('user_relation', None) 42 username = kwargs.get('username', None) 43 with_mock_chroots = kwargs.get('with_mock_chroots', None) 44 with_builds = kwargs.get('with_builds', None) 45 incl_deleted = kwargs.get('incl_deleted', None) 46 ids = kwargs.get('ids', None) 47 48 query = db.session.query(models.Copr).\ 49 join(models.Copr.owner).\ 50 options(db.contains_eager(models.Copr.owner)).\ 51 order_by(models.Copr.id.desc()) 52 53 if not incl_deleted: 54 query = query.filter(models.Copr.deleted==False) 55 56 if isinstance(ids, list): # can be an empty list 57 query = query.filter(models.Copr.id.in_(ids)) 58 59 if user_relation == 'owned': 60 query = query.filter(models.User.openid_name == models.User.openidize_name(username)) 61 elif user_relation == 'allowed': 62 aliased_user = db.aliased(models.User) 63 query = query.join(models.CoprPermission, models.Copr.copr_permissions).\ 64 filter(models.CoprPermission.copr_builder == helpers.PermissionEnum('approved')).\ 65 join(aliased_user, models.CoprPermission.user).\ 66 filter(aliased_user.openid_name == models.User.openidize_name(username)) 67 if with_mock_chroots: 68 query = query.outerjoin(*models.Copr.mock_chroots.attr).\ 69 options(db.contains_eager(*models.Copr.mock_chroots.attr)).\ 70 order_by(models.MockChroot.os_release.asc()).\ 71 order_by(models.MockChroot.os_version.asc()).\ 72 order_by(models.MockChroot.arch.asc()) 73 74 if with_builds: 75 query = query.outerjoin(models.Copr.builds).\ 76 options(db.contains_eager(models.Copr.builds)).\ 77 order_by(models.Build.submitted_on.desc()) 78 79 return query80 81 @classmethod83 query = models.Copr.query.join(models.User).\ 84 filter(models.Copr.deleted==False).\ 85 whooshee_search(search_string) 86 return query87 88 @classmethod89 - def add(cls, user, name, repos, selected_chroots, description, 90 instructions, check_for_duplicates=False):91 copr = models.Copr(name=name, 92 repos=repos, 93 owner=user, 94 description=description, 95 instructions=instructions, 96 created_on=int(time.time())) 97 CoprsLogic.new(user, copr, 98 check_for_duplicates=check_for_duplicates) # form validation checks for duplicates 99 CoprChrootsLogic.new_from_names(user, copr, 100 selected_chroots) 101 return copr102 103 @classmethod105 if check_for_duplicates and cls.exists_for_user(user, copr.name).all(): 106 raise exceptions.DuplicateException( 107 'Copr: "{0}" already exists'.format(copr.name)) 108 signals.copr_created.send(cls, copr=copr) 109 db.session.add(copr)110 111 @classmethod113 cls.raise_if_unfinished_blocking_action(user, copr, 114 'Can\'t change this project name, another operation is in progress: {action}') 115 users_logic.UsersLogic.raise_if_cant_update_copr(user, copr, 116 'Only owners and admins may update their projects.') 117 118 existing = cls.exists_for_user(copr.owner, copr.name).first() 119 if existing: 120 if check_for_duplicates and existing.id != copr.id: 121 raise exceptions.DuplicateException('Project: "{0}" already exists'.format(copr.name)) 122 else: # we're renaming 123 # if we fire a models.Copr.query, it will use the modified copr in session 124 # -> workaround this by just getting the name 125 old_copr_name = db.session.query(models.Copr.name).filter(models.Copr.id==copr.id).\ 126 filter(models.Copr.deleted==False).\ 127 first()[0] 128 action = models.Action(action_type=helpers.ActionTypeEnum('rename'), 129 object_type='copr', 130 object_id=copr.id, 131 old_value='{0}/{1}'.format(copr.owner.name, old_copr_name), 132 new_value='{0}/{1}'.format(copr.owner.name, copr.name), 133 created_on=int(time.time())) 134 db.session.add(action) 135 db.session.add(copr)136 137 @classmethod139 cls.raise_if_cant_delete(user, copr) 140 # TODO: do we want to dump the information somewhere, so that we can search it in future? 141 cls.raise_if_unfinished_blocking_action(user, copr, 142 'Can\'t delete this project, another operation is in progress: {action}') 143 action = models.Action(action_type=helpers.ActionTypeEnum('delete'), 144 object_type='copr', 145 object_id=copr.id, 146 old_value='{0}/{1}'.format(copr.owner.name, copr.name), 147 new_value='', 148 created_on=int(time.time())) 149 copr.deleted = True 150 151 db.session.add(action) 152 153 return copr154 155 @classmethod157 existing = models.Copr.query.filter(models.Copr.name==coprname).\ 158 filter(models.Copr.owner_id==user.id) 159 if not incl_deleted: 160 existing = existing.filter(models.Copr.deleted==False) 161 162 return existing163 164 @classmethod166 blocking_actions = [helpers.ActionTypeEnum('rename'), 167 helpers.ActionTypeEnum('delete')] 168 actions = models.Action.query.filter(models.Action.object_type=='copr').\ 169 filter(models.Action.object_id==copr.id).\ 170 filter(models.Action.result==helpers.BackendResultEnum('waiting')).\ 171 filter(models.Action.action_type.in_(blocking_actions)) 172 173 return actions174 175 @classmethod177 """This method raises ActionInProgressException if given copr has an unfinished 178 action. Returns None otherwise. 179 """ 180 unfinished_actions = cls.unfinished_blocking_actions_for(user, copr).all() 181 if unfinished_actions: 182 raise exceptions.ActionInProgressException(message, unfinished_actions[0])183 184 @classmethod186 """This method raises InsufficientRightsException if given copr cant be deleted 187 by given user. Returns None otherwise. 188 """ 189 if not user.admin and user != copr.owner: 190 raise exceptions.InsufficientRightsException('Only owners may delete their projects.')193 @classmethod234195 query = models.CoprPermission.query.filter(models.CoprPermission.copr == copr).\ 196 filter(models.CoprPermission.user == searched_user) 197 198 return query199 200 @classmethod202 query = models.CoprPermission.query.filter(models.CoprPermission.copr == copr) 203 204 return query205 206 @classmethod 209 210 @classmethod212 users_logic.UsersLogic.raise_if_cant_update_copr(user, copr, 213 'Only owners and admins may update their projects permissions.') 214 models.CoprPermission.query.filter(models.CoprPermission.copr_id == copr.id).\ 215 filter(models.CoprPermission.user_id == copr_permission.user_id).\ 216 update({'copr_builder': new_builder, 217 'copr_admin': new_admin})218 219 @classmethod221 if copr_permission: 222 # preserve approved permissions if set 223 if not new_builder or copr_permission.copr_builder != helpers.PermissionEnum('approved'): 224 copr_permission.copr_builder = new_builder 225 if not new_admin or copr_permission.copr_admin != helpers.PermissionEnum('approved'): 226 copr_permission.copr_admin = new_admin 227 else: 228 perm = models.CoprPermission(user = user, copr = copr, copr_builder = new_builder, copr_admin = new_admin) 229 cls.new(user, perm)230 231 @classmethod236 @classmethod281238 db_chroots = models.MockChroot.query.all() 239 mock_chroots = [] 240 for ch in db_chroots: 241 if ch.name in names: 242 mock_chroots.append(ch) 243 244 return mock_chroots245 246 @classmethod 249 250 @classmethod252 for mock_chroot in cls.mock_chroots_from_names(user, names): 253 db.session.add(models.CoprChroot(copr=copr, mock_chroot=mock_chroot))254 255 @classmethod257 current_chroots = copr.mock_chroots 258 copr_chroot = copr.check_copr_chroot(chroot) 259 if copr_chroot: 260 copr_chroot.buildroot_pkgs = buildroot_pkgs 261 db.session.add(copr_chroot)262 263 @classmethod265 current_chroots = copr.mock_chroots 266 new_chroots = cls.mock_chroots_from_names(user, names) 267 # add non-existing 268 for mock_chroot in new_chroots: 269 if mock_chroot not in current_chroots: 270 db.session.add(models.CoprChroot(copr=copr, mock_chroot=mock_chroot)) 271 272 # delete no more present 273 to_remove = [] 274 for mock_chroot in current_chroots: 275 if mock_chroot not in new_chroots: 276 # can't delete here, it would change current_chroots and break iteration 277 to_remove.append(mock_chroot) 278 279 for mc in to_remove: 280 copr.mock_chroots.remove(mc)283 @classmethod360285 return models.MockChroot.query.filter(models.MockChroot.os_release==os_release, 286 models.MockChroot.os_version==os_version, 287 models.MockChroot.arch==arch)288 289 @classmethod291 """ Returns MockChroot object for textual representation of chroot """ 292 name_tuple = cls.tuple_from_name(None, chroot_name) 293 return cls.get(None, name_tuple[0], name_tuple[1], name_tuple[2], active_only = active_only)294 295 @classmethod297 query = models.MockChroot.query 298 if active_only: 299 query = query.filter(models.MockChroot.is_active==True) 300 return query301 302 @classmethod304 name_tuple = cls.tuple_from_name(user, name) 305 if cls.get(user, *name_tuple).first(): 306 raise exceptions.DuplicateException('Mock chroot with this name already exists.') 307 new_chroot = models.MockChroot(os_release=name_tuple[0], 308 os_version=name_tuple[1], 309 arch=name_tuple[2]) 310 cls.new(user, new_chroot) 311 return new_chroot312 313 @classmethod 316 317 @classmethod319 name_tuple = cls.tuple_from_name(user, name) 320 mock_chroot = cls.get(user, *name_tuple).first() 321 if not mock_chroot: 322 raise exceptions.NotFoundException('Mock chroot with this name doesn\'t exist.') 323 324 mock_chroot.is_active = is_active 325 cls.update(user, mock_chroot) 326 return mock_chroot327 328 @classmethod 331 332 333 @classmethod335 name_tuple = cls.tuple_from_name(user, name) 336 mock_chroot = cls.get(user, *name_tuple).first() 337 if not mock_chroot: 338 raise exceptions.NotFoundException('Mock chroot with this name doesn\'t exist.') 339 340 cls.delete(user, mock_chroot)341 342 @classmethod 345 346 @classmethod348 """ valid name can be "fedora-rawhide-x86_64" or even "fedora-rawhide" """ 349 split_name = name.rsplit('-', 1) 350 if len(split_name) < 2: 351 raise exceptions.MalformedArgumentException( 352 'Chroot Name doesn\'t contain dash, can\'t determine chroot architecure.') 353 if '-' in split_name[0]: 354 os_release, os_version = (split_name[0].rsplit('-'))[0:2] 355 else: 356 os_release, os_version = split_name[0], '' 357 358 arch = split_name[1] 359 return (os_release, os_version, arch)
Trees | Indices | Help |
---|
Generated by Epydoc 3.0.1 on Tue Jan 28 00:49:31 2014 | http://epydoc.sourceforge.net |