Package coprs :: Module helpers
[hide private]
[frames] | no frames]

Source Code for Module coprs.helpers

  1  import math 
  2  import random 
  3  import string 
  4  import html5_parser 
  5   
  6  from os.path import normpath 
  7  from six import with_metaclass 
  8  from six.moves.urllib.parse import urlparse, parse_qs, urlunparse, urlencode 
  9  import re 
 10   
 11  import flask 
 12  import posixpath 
 13  from flask import url_for 
 14  from dateutil import parser as dt_parser 
 15  from netaddr import IPAddress, IPNetwork 
 16  from redis import StrictRedis 
 17  from sqlalchemy.types import TypeDecorator, VARCHAR 
 18  import json 
 19   
 20  from copr_common.enums import EnumType 
 21  from copr_common.rpm import splitFilename 
 22  from coprs import constants 
 23  from coprs import app 
24 25 26 -def generate_api_token(size=30):
27 """ Generate a random string used as token to access the API 28 remotely. 29 30 :kwarg: size, the size of the token to generate, defaults to 30 31 chars. 32 :return: a string, the API token for the user. 33 """ 34 return ''.join(random.choice(string.ascii_lowercase) for x in range(size))
35 36 37 REPO_DL_STAT_FMT = "repo_dl_stat::{copr_user}@{copr_project_name}:{copr_name_release}" 38 CHROOT_REPO_MD_DL_STAT_FMT = "chroot_repo_metadata_dl_stat:hset::{copr_user}@{copr_project_name}:{copr_chroot}" 39 CHROOT_RPMS_DL_STAT_FMT = "chroot_rpms_dl_stat:hset::{copr_user}@{copr_project_name}:{copr_chroot}" 40 PROJECT_RPMS_DL_STAT_FMT = "project_rpms_dl_stat:hset::{copr_user}@{copr_project_name}" 41 42 43 FINISHED_STATUSES = ["succeeded", "forked", "canceled", "skipped", "failed"]
44 45 46 -class CounterStatType(object):
47 REPO_DL = "repo_dl"
48
49 50 -class PermissionEnum(with_metaclass(EnumType, object)):
51 # The text form is part of APIv3! 52 vals = {"nothing": 0, "request": 1, "approved": 2} 53 54 @classmethod
55 - def choices_list(cls, without=-1):
56 return [(n, k) for k, n in cls.vals.items() if n != without]
57
58 59 -class BuildSourceEnum(with_metaclass(EnumType, object)):
60 vals = {"unset": 0, 61 "link": 1, # url 62 "upload": 2, # pkg, tmp, url 63 "pypi": 5, # package_name, version, python_versions 64 "rubygems": 6, # gem_name 65 "scm": 8, # type, clone_url, committish, subdirectory, spec, srpm_build_method 66 "custom": 9, # user-provided script to build sources 67 }
68
69 70 -class JSONEncodedDict(TypeDecorator):
71 """Represents an immutable structure as a json-encoded string. 72 73 Usage:: 74 75 JSONEncodedDict(255) 76 77 """ 78 79 impl = VARCHAR 80
81 - def process_bind_param(self, value, dialect):
82 if value is not None: 83 value = json.dumps(value) 84 85 return value
86
87 - def process_result_value(self, value, dialect):
88 if value is not None: 89 value = json.loads(value) 90 return value
91
92 93 -class Paginator(object):
94 - def __init__(self, query, total_count, page=1, 95 per_page_override=None, urls_count_override=None, 96 additional_params=None):
97 98 self.query = query 99 self.total_count = total_count 100 self.page = page 101 self.per_page = per_page_override or constants.ITEMS_PER_PAGE 102 self.urls_count = urls_count_override or constants.PAGES_URLS_COUNT 103 self.additional_params = additional_params or dict() 104 105 self._sliced_query = None
106
107 - def page_slice(self, page):
108 return (self.per_page * (page - 1), 109 self.per_page * page)
110 111 @property
112 - def sliced_query(self):
113 if not self._sliced_query: 114 self._sliced_query = self.query[slice(*self.page_slice(self.page))] 115 return self._sliced_query
116 117 @property
118 - def pages(self):
119 return int(math.ceil(self.total_count / float(self.per_page)))
120
121 - def border_url(self, request, start):
122 if start: 123 if self.page - 1 > self.urls_count // 2: 124 return self.url_for_other_page(request, 1), 1 125 else: 126 if self.page < self.pages - self.urls_count // 2: 127 return self.url_for_other_page(request, self.pages), self.pages 128 129 return None
130
131 - def get_urls(self, request):
132 left_border = self.page - self.urls_count // 2 133 left_border = 1 if left_border < 1 else left_border 134 right_border = self.page + self.urls_count // 2 135 right_border = self.pages if right_border > self.pages else right_border 136 137 return [(self.url_for_other_page(request, i), i) 138 for i in range(left_border, right_border + 1)]
139
140 - def url_for_other_page(self, request, page):
141 args = request.view_args.copy() 142 args["page"] = page 143 args.update(self.additional_params) 144 return flask.url_for(request.endpoint, **args)
145
146 147 -def chroot_to_branch(chroot):
148 """ 149 Get a git branch name from chroot. Follow the fedora naming standard. 150 """ 151 os, version, arch = chroot.rsplit("-", 2) 152 if os == "fedora": 153 if version == "rawhide": 154 return "master" 155 os = "f" 156 elif os == "epel" and int(version) <= 6: 157 os = "el" 158 elif os == "mageia" and version == "cauldron": 159 os = "cauldron" 160 version = "" 161 elif os == "mageia": 162 os = "mga" 163 return "{}{}".format(os, version)
164
165 166 -def parse_package_name(pkg):
167 """ 168 Parse package name from possibly incomplete nvra string. 169 """ 170 171 if pkg.count(".") >= 3 and pkg.count("-") >= 2: 172 return splitFilename(pkg)[0] 173 174 # doesn"t seem like valid pkg string, try to guess package name 175 result = "" 176 pkg = pkg.replace(".rpm", "").replace(".src", "") 177 178 for delim in ["-", "."]: 179 if delim in pkg: 180 parts = pkg.split(delim) 181 for part in parts: 182 if any(map(lambda x: x.isdigit(), part)): 183 return result[:-1] 184 185 result += part + "-" 186 187 return result[:-1] 188 189 return pkg
190
191 192 -def generate_repo_url(mock_chroot, url, arch=None):
193 """ Generates url with build results for .repo file. 194 No checks if copr or mock_chroot exists. 195 """ 196 os_version = mock_chroot.os_version 197 198 if mock_chroot.os_release == "fedora": 199 if mock_chroot.os_version != "rawhide": 200 os_version = "$releasever" 201 202 if mock_chroot.os_release == "opensuse-leap": 203 os_version = "$releasever" 204 205 if mock_chroot.os_release == "mageia": 206 if mock_chroot.os_version != "cauldron": 207 os_version = "$releasever" 208 209 url = posixpath.join( 210 url, "{0}-{1}-{2}/".format(mock_chroot.os_release, 211 os_version, arch or '$basearch')) 212 213 return url
214
215 216 -def fix_protocol_for_backend(url):
217 """ 218 Ensure that url either has http or https protocol according to the 219 option in app config "ENFORCE_PROTOCOL_FOR_BACKEND_URL" 220 """ 221 if app.config["ENFORCE_PROTOCOL_FOR_BACKEND_URL"] == "https": 222 return url.replace("http://", "https://") 223 elif app.config["ENFORCE_PROTOCOL_FOR_BACKEND_URL"] == "http": 224 return url.replace("https://", "http://") 225 else: 226 return url
227
228 229 -def fix_protocol_for_frontend(url):
230 """ 231 Ensure that url either has http or https protocol according to the 232 option in app config "ENFORCE_PROTOCOL_FOR_FRONTEND_URL" 233 """ 234 if app.config["ENFORCE_PROTOCOL_FOR_FRONTEND_URL"] == "https": 235 return url.replace("http://", "https://") 236 elif app.config["ENFORCE_PROTOCOL_FOR_FRONTEND_URL"] == "http": 237 return url.replace("https://", "http://") 238 else: 239 return url
240
241 242 -class Serializer(object):
243
244 - def to_dict(self, options=None):
245 """ 246 Usage: 247 248 SQLAlchObject.to_dict() => returns a flat dict of the object 249 SQLAlchObject.to_dict({"foo": {}}) => returns a dict of the object 250 and will include a flat dict of object foo inside of that 251 SQLAlchObject.to_dict({"foo": {"bar": {}}, "spam": {}}) => returns 252 a dict of the object, which will include dict of foo 253 (which will include dict of bar) and dict of spam. 254 255 Options can also contain two special values: __columns_only__ 256 and __columns_except__ 257 258 If present, the first makes only specified fields appear, 259 the second removes specified fields. Both of these fields 260 must be either strings (only works for one field) or lists 261 (for one and more fields). 262 263 SQLAlchObject.to_dict({"foo": {"__columns_except__": ["id"]}, 264 "__columns_only__": "name"}) => 265 266 The SQLAlchObject will only put its "name" into the resulting dict, 267 while "foo" all of its fields except "id". 268 269 Options can also specify whether to include foo_id when displaying 270 related foo object (__included_ids__, defaults to True). 271 This doesn"t apply when __columns_only__ is specified. 272 """ 273 274 result = {} 275 if options is None: 276 options = {} 277 columns = self.serializable_attributes 278 279 if "__columns_only__" in options: 280 columns = options["__columns_only__"] 281 else: 282 columns = set(columns) 283 if "__columns_except__" in options: 284 columns_except = options["__columns_except__"] 285 if not isinstance(options["__columns_except__"], list): 286 columns_except = [options["__columns_except__"]] 287 288 columns -= set(columns_except) 289 290 if ("__included_ids__" in options and 291 options["__included_ids__"] is False): 292 293 related_objs_ids = [ 294 r + "_id" for r, _ in options.items() 295 if not r.startswith("__")] 296 297 columns -= set(related_objs_ids) 298 299 columns = list(columns) 300 301 for column in columns: 302 result[column] = getattr(self, column) 303 304 for related, values in options.items(): 305 if hasattr(self, related): 306 result[related] = getattr(self, related).to_dict(values) 307 return result
308 309 @property
310 - def serializable_attributes(self):
311 return map(lambda x: x.name, self.__table__.columns)
312
313 314 -class RedisConnectionProvider(object):
315 - def __init__(self, config, db=0):
316 self.host = config.get("REDIS_HOST", "127.0.0.1") 317 self.port = int(config.get("REDIS_PORT", "6379")) 318 self.db = db
319
320 - def get_connection(self):
321 return StrictRedis(host=self.host, port=self.port, db=self.db)
322
323 324 -def get_redis_connection():
325 """ 326 Creates connection to redis, now we use default instance at localhost, no config needed 327 """ 328 return StrictRedis()
329
330 331 -def str2bool(v):
332 if v is None: 333 return False 334 return v.lower() in ("yes", "true", "t", "1")
335
336 337 -def copr_url(view, copr, **kwargs):
338 """ 339 Examine given copr and generate proper URL for the `view` 340 341 Values of `username/group_name` and `coprname` are automatically passed as the first two URL parameters, 342 and therefore you should *not* pass them manually. 343 344 Usage: 345 copr_url("coprs_ns.foo", copr) 346 copr_url("coprs_ns.foo", copr, arg1='bar', arg2='baz) 347 """ 348 if copr.is_a_group_project: 349 return url_for(view, group_name=copr.group.name, coprname=copr.name, **kwargs) 350 return url_for(view, username=copr.user.name, coprname=copr.name, **kwargs)
351
352 353 -def owner_url(owner):
354 """ 355 For a given `owner` object, which may be either `models.User` or `models.Group`, 356 return an URL to its _profile_ page. 357 """ 358 # We can't check whether owner is instance of `models.Group` because once 359 # we include models from helpers, we get circular imports 360 if hasattr(owner, "at_name"): 361 return url_for("groups_ns.list_projects_by_group", group_name=owner.name) 362 return url_for("coprs_ns.coprs_by_user", username=owner.username)
363
364 365 -def url_for_copr_view(view, group_view, copr, **kwargs):
366 if copr.is_a_group_project: 367 return url_for(group_view, group_name=copr.group.name, coprname=copr.name, **kwargs) 368 else: 369 return url_for(view, username=copr.user.name, coprname=copr.name, **kwargs)
370
371 372 -def url_for_copr_builds(copr):
373 return copr_url("coprs_ns.copr_builds", copr)
374 375 376 from sqlalchemy.engine.default import DefaultDialect 377 from sqlalchemy.sql.sqltypes import String, DateTime, NullType 378 379 # python2/3 compatible. 380 PY3 = str is not bytes 381 text = str if PY3 else unicode 382 int_type = int if PY3 else (int, long) 383 str_type = str if PY3 else (str, unicode)
384 385 386 -class StringLiteral(String):
387 """Teach SA how to literalize various things."""
388 - def literal_processor(self, dialect):
389 super_processor = super(StringLiteral, self).literal_processor(dialect) 390 391 def process(value): 392 if isinstance(value, int_type): 393 return text(value) 394 if not isinstance(value, str_type): 395 value = text(value) 396 result = super_processor(value) 397 if isinstance(result, bytes): 398 result = result.decode(dialect.encoding) 399 return result
400 return process
401
402 403 -class LiteralDialect(DefaultDialect):
404 colspecs = { 405 # prevent various encoding explosions 406 String: StringLiteral, 407 # teach SA about how to literalize a datetime 408 DateTime: StringLiteral, 409 # don't format py2 long integers to NULL 410 NullType: StringLiteral, 411 }
412
413 414 -def literal_query(statement):
415 """NOTE: This is entirely insecure. DO NOT execute the resulting strings. 416 This can be used for debuggin - it is not and should not be used in production 417 code. 418 419 It is useful if you want to debug an sqlalchemy query, i.e. copy the 420 resulting SQL query into psql console and try to tweak it so that it 421 actually works or works faster. 422 """ 423 import sqlalchemy.orm 424 if isinstance(statement, sqlalchemy.orm.Query): 425 statement = statement.statement 426 return statement.compile( 427 dialect=LiteralDialect(), 428 compile_kwargs={'literal_binds': True}, 429 ).string
430
431 432 -def stream_template(template_name, **context):
433 app.update_template_context(context) 434 t = app.jinja_env.get_template(template_name) 435 rv = t.stream(context) 436 rv.enable_buffering(2) 437 return rv
438
439 440 -def generate_repo_name(repo_url):
441 """ based on url, generate repo name """ 442 repo_url = re.sub("[^a-zA-Z0-9]", '_', repo_url) 443 repo_url = re.sub("(__*)", '_', repo_url) 444 repo_url = re.sub("(_*$)|^_*", '', repo_url) 445 return repo_url
446
447 448 -def is_copr_repo(repo_url):
449 return copr_repo_fullname(repo_url) is not None
450
451 452 -def copr_repo_fullname(repo_url):
453 parsed_url = urlparse(repo_url) 454 query = parse_qs(parsed_url.query) 455 if parsed_url.scheme != "copr": 456 return None 457 return parsed_url.netloc + parsed_url.path
458
459 460 -def pre_process_repo_url(chroot, repo_url):
461 """ 462 Expands variables and sanitize repo url to be used for mock config 463 """ 464 parsed_url = urlparse(repo_url) 465 query = parse_qs(parsed_url.query) 466 467 if parsed_url.scheme == "copr": 468 user = parsed_url.netloc 469 prj = parsed_url.path.split("/")[1] 470 repo_url = "/".join([ 471 flask.current_app.config["BACKEND_BASE_URL"], 472 "results", user, prj, chroot 473 ]) + "/" 474 475 elif "priority" in query: 476 query.pop("priority") 477 query_string = urlencode(query, doseq=True) 478 parsed_url = parsed_url._replace(query=query_string) 479 repo_url = urlunparse(parsed_url) 480 481 repo_url = repo_url.replace("$chroot", chroot) 482 repo_url = repo_url.replace("$distname", chroot.rsplit("-", 2)[0]) 483 return repo_url
484
485 486 -def parse_repo_params(repo, supported_keys=None):
487 """ 488 :param repo: str repo from Copr/CoprChroot/Build/... 489 :param supported_keys list of supported optional parameters 490 :return: dict of optional parameters parsed from the repo URL 491 """ 492 supported_keys = supported_keys or ["priority"] 493 params = {} 494 qs = parse_qs(urlparse(repo).query) 495 for k, v in qs.items(): 496 if k in supported_keys: 497 # parse_qs returns values as lists, but we allow setting the param only once, 498 # so we can take just first value from it 499 value = int(v[0]) if v[0].isnumeric() else v[0] 500 params[k] = value 501 return params
502
503 504 -def trim_git_url(url):
505 if not url: 506 return None 507 508 return re.sub(r'(\.git)?/*$', '', url)
509
510 511 -def get_parsed_git_url(url):
512 if not url: 513 return False 514 515 url = trim_git_url(url) 516 return urlparse(url)
517
518 519 -class SubdirMatch(object):
520 - def __init__(self, subdir):
521 if not subdir: 522 self.subdir = '.' 523 else: 524 self.subdir = normpath(subdir).strip('/')
525
526 - def match(self, path):
527 if not path: # shouldn't happen 528 return False 529 530 changed = normpath(path).strip('/') 531 if changed == '.': 532 return False # shouldn't happen! 533 534 if self.subdir == '.': 535 return True 536 537 return changed.startswith(self.subdir + '/')
538
539 540 -def pagure_html_diff_changed(html_string):
541 parsed = html5_parser.parse(str(html_string)) 542 elements = parsed.xpath( 543 "//section[contains(@class, 'commit_diff')]" 544 "//div[contains(@class, 'card-header')]" 545 "//a[contains(@class, 'font-weight-bold')]" 546 "/text()") 547 548 return set([str(x) for x in elements])
549
550 551 -def raw_commit_changes(text):
552 changes = set() 553 for line in text.split('\n'): 554 match = re.search(r'^(\+\+\+|---) [ab]/(.*)$', line) 555 if match: 556 changes.add(str(match.group(2))) 557 match = re.search(r'^diff --git a/(.*) b/(.*)$', line) 558 if match: 559 changes.add(str(match.group(1))) 560 changes.add(str(match.group(2))) 561 print(changes) 562 563 return changes
564