Package coprs :: Module helpers
[hide private]
[frames] | no frames]

Source Code for Module coprs.helpers

  1  import math 
  2  import random 
  3  import string 
  4   
  5  from six import with_metaclass 
  6  from six.moves.urllib.parse import urljoin 
  7   
  8  import flask 
  9  from flask import url_for 
 10  from dateutil import parser as dt_parser 
 11  from netaddr import IPAddress, IPNetwork 
 12  from redis import StrictRedis 
 13  from sqlalchemy.types import TypeDecorator, VARCHAR 
 14  import json 
 15   
 16  from coprs import constants 
 17  from coprs import app 
18 19 20 -def generate_api_token(size=30):
21 """ Generate a random string used as token to access the API 22 remotely. 23 24 :kwarg: size, the size of the token to generate, defaults to 30 25 chars. 26 :return: a string, the API token for the user. 27 """ 28 return ''.join(random.choice(string.ascii_lowercase) for x in range(size))
29 30 31 REPO_DL_STAT_FMT = "repo_dl_stat::{copr_user}@{copr_project_name}:{copr_name_release}" 32 CHROOT_REPO_MD_DL_STAT_FMT = "chroot_repo_metadata_dl_stat:hset::{copr_user}@{copr_project_name}:{copr_chroot}" 33 CHROOT_RPMS_DL_STAT_FMT = "chroot_rpms_dl_stat:hset::{copr_user}@{copr_project_name}:{copr_chroot}" 34 PROJECT_RPMS_DL_STAT_FMT = "project_rpms_dl_stat:hset::{copr_user}@{copr_project_name}"
35 36 37 -class CounterStatType(object):
38 REPO_DL = "repo_dl"
39
40 41 -class EnumType(type):
42
43 - def __call__(self, attr):
44 if isinstance(attr, int): 45 for k, v in self.vals.items(): 46 if v == attr: 47 return k 48 raise KeyError("num {0} is not mapped".format(attr)) 49 else: 50 return self.vals[attr]
51
52 53 -class PermissionEnum(with_metaclass(EnumType, object)):
54 vals = {"nothing": 0, "request": 1, "approved": 2} 55 56 @classmethod
57 - def choices_list(cls, without=-1):
58 return [(n, k) for k, n in cls.vals.items() if n != without]
59
60 61 -class ActionTypeEnum(with_metaclass(EnumType, object)):
62 vals = { 63 "delete": 0, 64 "rename": 1, 65 "legal-flag": 2, 66 "createrepo": 3, 67 "update_comps": 4, 68 "gen_gpg_key": 5, 69 "rawhide_to_release": 6, 70 "fork": 7, 71 "update_module_md": 8 72 }
73
74 75 -class BackendResultEnum(with_metaclass(EnumType, object)):
76 vals = {"waiting": 0, "success": 1, "failure": 2}
77
78 79 -class RoleEnum(with_metaclass(EnumType, object)):
80 vals = {"user": 0, "admin": 1}
81
82 83 -class StatusEnum(with_metaclass(EnumType, object)):
84 vals = {"failed": 0, 85 "succeeded": 1, 86 "canceled": 2, 87 "running": 3, 88 "pending": 4, 89 "skipped": 5, # if there was this package built already 90 "starting": 6, # build picked by worker but no VM initialized 91 "importing": 7} # SRPM is being imported to dist-git
92
93 94 -class BuildSourceEnum(with_metaclass(EnumType, object)):
95 vals = {"unset": 0, 96 "srpm_link": 1, # url 97 "srpm_upload": 2, # pkg, tmp 98 "git_and_tito": 3, # git_url, git_dir, git_branch, tito_test 99 "mock_scm": 4, # scm_type, scm_url, spec, scm_branch 100 "pypi": 5, # package_name, version, python_versions 101 "rubygems": 6, # gem_name 102 }
103
104 105 # The same enum is also in distgit's helpers.py 106 -class FailTypeEnum(with_metaclass(EnumType, object)):
107 vals = {"unset": 0, 108 # General errors mixed with errors for SRPM URL/upload: 109 "unknown_error": 1, 110 "build_error": 2, 111 "srpm_import_failed": 3, 112 "srpm_download_failed": 4, 113 "srpm_query_failed": 5, 114 "import_timeout_exceeded": 6, 115 # Git and Tito errors: 116 "tito_general_error": 30, 117 "git_clone_failed": 31, 118 "git_wrong_directory": 32, 119 "git_checkout_error": 33, 120 "srpm_build_error": 34, 121 }
122
123 124 -class JSONEncodedDict(TypeDecorator):
125 """Represents an immutable structure as a json-encoded string. 126 127 Usage:: 128 129 JSONEncodedDict(255) 130 131 """ 132 133 impl = VARCHAR 134
135 - def process_bind_param(self, value, dialect):
136 if value is not None: 137 value = json.dumps(value) 138 139 return value
140
141 - def process_result_value(self, value, dialect):
142 if value is not None: 143 value = json.loads(value) 144 return value
145
146 -class Paginator(object):
147
148 - def __init__(self, query, total_count, page=1, 149 per_page_override=None, urls_count_override=None, 150 additional_params=None):
151 152 self.query = query 153 self.total_count = total_count 154 self.page = page 155 self.per_page = per_page_override or constants.ITEMS_PER_PAGE 156 self.urls_count = urls_count_override or constants.PAGES_URLS_COUNT 157 self.additional_params = additional_params or dict() 158 159 self._sliced_query = None
160
161 - def page_slice(self, page):
162 return (self.per_page * (page - 1), 163 self.per_page * page)
164 165 @property
166 - def sliced_query(self):
167 if not self._sliced_query: 168 self._sliced_query = self.query[slice(*self.page_slice(self.page))] 169 return self._sliced_query
170 171 @property
172 - def pages(self):
173 return int(math.ceil(self.total_count / float(self.per_page)))
174
175 - def border_url(self, request, start):
176 if start: 177 if self.page - 1 > self.urls_count / 2: 178 return self.url_for_other_page(request, 1), 1 179 else: 180 if self.page < self.pages - self.urls_count / 2: 181 return self.url_for_other_page(request, self.pages), self.pages 182 183 return None
184
185 - def get_urls(self, request):
186 left_border = self.page - self.urls_count / 2 187 left_border = 1 if left_border < 1 else left_border 188 right_border = self.page + self.urls_count / 2 189 right_border = self.pages if right_border > self.pages else right_border 190 191 return [(self.url_for_other_page(request, i), i) 192 for i in range(left_border, right_border + 1)]
193
194 - def url_for_other_page(self, request, page):
195 args = request.view_args.copy() 196 args["page"] = page 197 args.update(self.additional_params) 198 return flask.url_for(request.endpoint, **args)
199
200 201 -def chroot_to_branch(chroot):
202 """ 203 Get a git branch name from chroot. Follow the fedora naming standard. 204 """ 205 os, version, arch = chroot.split("-") 206 if os == "fedora": 207 if version == "rawhide": 208 return "master" 209 os = "f" 210 elif os == "epel" and int(version) <= 6: 211 os = "el" 212 return "{}{}".format(os, version)
213
214 215 -def branch_to_os_version(branch):
216 os = None 217 version = None 218 if branch == "master": 219 os = "fedora" 220 version = "rawhide" 221 elif branch[0] == "f": 222 os = "fedora" 223 version = branch[1:] 224 elif branch[:4] == "epel" or branch[:2] == "el": 225 os = "epel" 226 version = branch[-1:] 227 elif branch[:6] == "custom": 228 os = "custom" 229 version = branch[-1:] 230 elif branch[:6] == "mageia": 231 os = "mageia" 232 version = branch[6:] 233 return os, version
234
235 236 -def splitFilename(filename):
237 """ 238 Pass in a standard style rpm fullname 239 240 Return a name, version, release, epoch, arch, e.g.:: 241 foo-1.0-1.i386.rpm returns foo, 1.0, 1, i386 242 1:bar-9-123a.ia64.rpm returns bar, 9, 123a, 1, ia64 243 """ 244 245 if filename[-4:] == '.rpm': 246 filename = filename[:-4] 247 248 archIndex = filename.rfind('.') 249 arch = filename[archIndex+1:] 250 251 relIndex = filename[:archIndex].rfind('-') 252 rel = filename[relIndex+1:archIndex] 253 254 verIndex = filename[:relIndex].rfind('-') 255 ver = filename[verIndex+1:relIndex] 256 257 epochIndex = filename.find(':') 258 if epochIndex == -1: 259 epoch = '' 260 else: 261 epoch = filename[:epochIndex] 262 263 name = filename[epochIndex + 1:verIndex] 264 return name, ver, rel, epoch, arch
265
266 267 -def parse_package_name(pkg):
268 """ 269 Parse package name from possibly incomplete nvra string. 270 """ 271 272 if pkg.count(".") >= 3 and pkg.count("-") >= 2: 273 return splitFilename(pkg)[0] 274 275 # doesn"t seem like valid pkg string, try to guess package name 276 result = "" 277 pkg = pkg.replace(".rpm", "").replace(".src", "") 278 279 for delim in ["-", "."]: 280 if delim in pkg: 281 parts = pkg.split(delim) 282 for part in parts: 283 if any(map(lambda x: x.isdigit(), part)): 284 return result[:-1] 285 286 result += part + "-" 287 288 return result[:-1] 289 290 return pkg
291
292 293 -def generate_repo_url(mock_chroot, url):
294 """ Generates url with build results for .repo file. 295 No checks if copr or mock_chroot exists. 296 """ 297 if mock_chroot.os_release == "fedora": 298 if mock_chroot.os_version != "rawhide": 299 mock_chroot.os_version = "$releasever" 300 301 url = urljoin( 302 url, "{0}-{1}-{2}/".format(mock_chroot.os_release, 303 mock_chroot.os_version, "$basearch")) 304 305 return url
306
307 308 -def fix_protocol_for_backend(url):
309 """ 310 Ensure that url either has http or https protocol according to the 311 option in app config "ENFORCE_PROTOCOL_FOR_BACKEND_URL" 312 """ 313 if app.config["ENFORCE_PROTOCOL_FOR_BACKEND_URL"] == "https": 314 return url.replace("http://", "https://") 315 elif app.config["ENFORCE_PROTOCOL_FOR_BACKEND_URL"] == "http": 316 return url.replace("https://", "http://") 317 else: 318 return url
319
320 321 -def fix_protocol_for_frontend(url):
322 """ 323 Ensure that url either has http or https protocol according to the 324 option in app config "ENFORCE_PROTOCOL_FOR_FRONTEND_URL" 325 """ 326 if app.config["ENFORCE_PROTOCOL_FOR_FRONTEND_URL"] == "https": 327 return url.replace("http://", "https://") 328 elif app.config["ENFORCE_PROTOCOL_FOR_FRONTEND_URL"] == "http": 329 return url.replace("https://", "http://") 330 else: 331 return url
332
333 334 -class Serializer(object):
335
336 - def to_dict(self, options=None):
337 """ 338 Usage: 339 340 SQLAlchObject.to_dict() => returns a flat dict of the object 341 SQLAlchObject.to_dict({"foo": {}}) => returns a dict of the object 342 and will include a flat dict of object foo inside of that 343 SQLAlchObject.to_dict({"foo": {"bar": {}}, "spam": {}}) => returns 344 a dict of the object, which will include dict of foo 345 (which will include dict of bar) and dict of spam. 346 347 Options can also contain two special values: __columns_only__ 348 and __columns_except__ 349 350 If present, the first makes only specified fiels appear, 351 the second removes specified fields. Both of these fields 352 must be either strings (only works for one field) or lists 353 (for one and more fields). 354 355 SQLAlchObject.to_dict({"foo": {"__columns_except__": ["id"]}, 356 "__columns_only__": "name"}) => 357 358 The SQLAlchObject will only put its "name" into the resulting dict, 359 while "foo" all of its fields except "id". 360 361 Options can also specify whether to include foo_id when displaying 362 related foo object (__included_ids__, defaults to True). 363 This doesn"t apply when __columns_only__ is specified. 364 """ 365 366 result = {} 367 if options is None: 368 options = {} 369 columns = self.serializable_attributes 370 371 if "__columns_only__" in options: 372 columns = options["__columns_only__"] 373 else: 374 columns = set(columns) 375 if "__columns_except__" in options: 376 columns_except = options["__columns_except__"] 377 if not isinstance(options["__columns_except__"], list): 378 columns_except = [options["__columns_except__"]] 379 380 columns -= set(columns_except) 381 382 if ("__included_ids__" in options and 383 options["__included_ids__"] is False): 384 385 related_objs_ids = [ 386 r + "_id" for r, _ in options.items() 387 if not r.startswith("__")] 388 389 columns -= set(related_objs_ids) 390 391 columns = list(columns) 392 393 for column in columns: 394 result[column] = getattr(self, column) 395 396 for related, values in options.items(): 397 if hasattr(self, related): 398 result[related] = getattr(self, related).to_dict(values) 399 return result
400 401 @property
402 - def serializable_attributes(self):
403 return map(lambda x: x.name, self.__table__.columns)
404
405 406 -class RedisConnectionProvider(object):
407 - def __init__(self, config):
408 self.host = config.get("REDIS_HOST", "127.0.0.1") 409 self.port = int(config.get("REDIS_PORT", "6379"))
410
411 - def get_connection(self):
412 return StrictRedis(host=self.host, port=self.port)
413
414 415 -def get_redis_connection():
416 """ 417 Creates connection to redis, now we use default instance at localhost, no config needed 418 """ 419 return StrictRedis()
420
421 422 -def dt_to_unixtime(dt):
423 """ 424 Converts datetime to unixtime 425 :param dt: DateTime instance 426 :rtype: float 427 """ 428 return float(dt.strftime('%s'))
429
430 431 -def string_dt_to_unixtime(dt_string):
432 """ 433 Converts datetime to unixtime from string 434 :param dt_string: datetime string 435 :rtype: str 436 """ 437 return dt_to_unixtime(dt_parser.parse(dt_string))
438
439 440 -def is_ip_from_builder_net(ip):
441 """ 442 Checks is ip is owned by the builders network 443 :param str ip: IPv4 address 444 :return bool: True 445 """ 446 ip_addr = IPAddress(ip) 447 for subnet in app.config.get("BUILDER_IPS", ["127.0.0.1/24"]): 448 if ip_addr in IPNetwork(subnet): 449 return True 450 451 return False
452
453 454 -def str2bool(v):
455 if v is None: 456 return False 457 return v.lower() in ("yes", "true", "t", "1")
458
459 460 -def copr_url(view, copr, **kwargs):
461 """ 462 Examine given copr and generate proper URL for the `view` 463 464 Values of `username/group_name` and `coprname` are automatically passed as the first two URL parameters, 465 and therefore you should *not* pass them manually. 466 467 Usage: 468 copr_url("coprs_ns.foo", copr) 469 copr_url("coprs_ns.foo", copr, arg1='bar', arg2='baz) 470 """ 471 if copr.is_a_group_project: 472 return url_for(view, group_name=copr.group.name, coprname=copr.name, **kwargs) 473 return url_for(view, username=copr.user.name, coprname=copr.name, **kwargs)
474
475 476 -def url_for_copr_view(view, group_view, copr, **kwargs):
477 if copr.is_a_group_project: 478 return url_for(group_view, group_name=copr.group.name, coprname=copr.name, **kwargs) 479 else: 480 return url_for(view, username=copr.user.name, coprname=copr.name, **kwargs)
481 482 483 from sqlalchemy.engine.default import DefaultDialect 484 from sqlalchemy.sql.sqltypes import String, DateTime, NullType 485 486 # python2/3 compatible. 487 PY3 = str is not bytes 488 text = str if PY3 else unicode 489 int_type = int if PY3 else (int, long) 490 str_type = str if PY3 else (str, unicode)
491 492 493 -class StringLiteral(String):
494 """Teach SA how to literalize various things."""
495 - def literal_processor(self, dialect):
496 super_processor = super(StringLiteral, self).literal_processor(dialect) 497 498 def process(value): 499 if isinstance(value, int_type): 500 return text(value) 501 if not isinstance(value, str_type): 502 value = text(value) 503 result = super_processor(value) 504 if isinstance(result, bytes): 505 result = result.decode(dialect.encoding) 506 return result
507 return process
508
509 510 -class LiteralDialect(DefaultDialect):
511 colspecs = { 512 # prevent various encoding explosions 513 String: StringLiteral, 514 # teach SA about how to literalize a datetime 515 DateTime: StringLiteral, 516 # don't format py2 long integers to NULL 517 NullType: StringLiteral, 518 }
519
520 521 -def literal_query(statement):
522 """NOTE: This is entirely insecure. DO NOT execute the resulting strings.""" 523 import sqlalchemy.orm 524 if isinstance(statement, sqlalchemy.orm.Query): 525 statement = statement.statement 526 return statement.compile( 527 dialect=LiteralDialect(), 528 compile_kwargs={'literal_binds': True}, 529 ).string
530
531 532 -def stream_template(template_name, **context):
533 app.update_template_context(context) 534 t = app.jinja_env.get_template(template_name) 535 rv = t.stream(context) 536 rv.enable_buffering(2) 537 return rv
538