Package coprs :: Package views :: Module misc
[hide private]
[frames] | no frames]

Source Code for Module coprs.views.misc

  1  import os 
  2  import base64 
  3  import datetime 
  4  import functools 
  5  from functools import wraps, partial 
  6   
  7  from netaddr import IPAddress, IPNetwork 
  8  import re 
  9  import flask 
 10  from flask import send_file 
 11   
 12  from urllib.parse import urlparse 
 13  from openid_teams.teams import TeamsRequest 
 14   
 15  from copr_common.enums import RoleEnum 
 16  from coprs import app 
 17  from coprs import db 
 18  from coprs import helpers 
 19  from coprs import models 
 20  from coprs import oid 
 21  from coprs.logic.complex_logic import ComplexLogic 
 22  from coprs.logic.users_logic import UsersLogic 
 23  from coprs.logic.coprs_logic import CoprsLogic 
24 25 26 -def create_user_wrapper(username, email, timezone=None):
27 expiration_date_token = datetime.date.today() + \ 28 datetime.timedelta( 29 days=flask.current_app.config["API_TOKEN_EXPIRATION"]) 30 31 copr64 = base64.b64encode(b"copr") + b"##" 32 user = models.User(username=username, mail=email, 33 timezone=timezone, 34 api_login=copr64.decode("utf-8") + helpers.generate_api_token( 35 app.config["API_TOKEN_LENGTH"] - len(copr64)), 36 api_token=helpers.generate_api_token( 37 app.config["API_TOKEN_LENGTH"]), 38 api_token_expiration=expiration_date_token) 39 return user
40
41 42 -def fed_raw_name(oidname):
43 oidname_parse = urlparse(oidname) 44 if not oidname_parse.netloc: 45 return oidname 46 config_parse = urlparse(app.config["OPENID_PROVIDER_URL"]) 47 return oidname_parse.netloc.replace(".{0}".format(config_parse.netloc), "")
48
49 50 -def krb_straighten_username(krb_remote_user):
51 # Input should look like 'USERNAME@REALM.TLD', strip realm. 52 username = re.sub(r'@.*', '', krb_remote_user) 53 54 # But USERNAME part can consist of USER/DOMAIN.TLD. 55 # TODO: Do we need more clever thing here? 56 username = re.sub('/', '_', username) 57 58 # Based on restrictions for project name: "letters, digits, underscores, 59 # dashes and dots", it is worth limitting the username here, too. 60 # TODO: Store this pattern on one place. 61 return username if re.match(r"^[\w.-]+$", username) else None
62
63 64 @app.before_request 65 -def set_empty_user():
66 flask.g.user = None
67
68 69 @app.before_request 70 -def lookup_current_user():
71 flask.g.user = username = None 72 if "openid" in flask.session: 73 username = fed_raw_name(flask.session["openid"]) 74 elif "krb5_login" in flask.session: 75 username = flask.session["krb5_login"] 76 77 if username: 78 flask.g.user = models.User.query.filter( 79 models.User.username == username).first()
80
81 82 -def page_not_found(message):
83 return flask.render_template("404.html", message=message), 404
84
85 86 -def access_restricted(message):
87 return flask.render_template("403.html", message=message), 403
88
89 90 -def generic_error(message, code=500, title=None):
91 """ 92 :type message: str 93 :type err: CoprHttpException 94 """ 95 return flask.render_template("_error.html", 96 message=message, 97 error_code=code, 98 error_title=title), code
99 100 101 server_error_handler = partial(generic_error, code=500, title="Internal Server Error") 102 bad_request_handler = partial(generic_error, code=400, title="Bad Request") 103 conflict_request_handler = partial(generic_error, code=409, title="Conflict") 104 105 misc = flask.Blueprint("misc", __name__) 106 107 108 @misc.route(app.config['KRB5_LOGIN_BASEURI'] + "<name>/", methods=["GET"])
109 -def krb5_login(name):
110 """ 111 Handle the Kerberos authentication. 112 113 Note that if we are able to get here, either the user is authenticated 114 correctly, or apache is mis-configured and it does not perform KRB 115 authentication at all. Note also, even if that can be considered ugly, we 116 are reusing oid's get_next_url feature with kerberos login. 117 """ 118 119 # Already logged in? 120 if flask.g.user is not None: 121 return flask.redirect(oid.get_next_url()) 122 123 krb_config = app.config['KRB5_LOGIN'] 124 125 found = None 126 for key in krb_config.keys(): 127 if krb_config[key]['URI'] == name: 128 found = key 129 break 130 131 if not found: 132 # no KRB5_LOGIN.<name> configured in copr.conf 133 return flask.render_template("404.html"), 404 134 135 if app.config["DEBUG"] and 'TEST_REMOTE_USER' in os.environ: 136 # For local testing (without krb5 keytab and other configuration) 137 flask.request.environ['REMOTE_USER'] = os.environ['TEST_REMOTE_USER'] 138 139 if 'REMOTE_USER' not in flask.request.environ: 140 nocred = "Kerberos authentication failed (no credentials provided)" 141 return flask.render_template("403.html", message=nocred), 403 142 143 krb_username = flask.request.environ['REMOTE_USER'] 144 app.logger.debug("krb5 login attempt: " + krb_username) 145 username = krb_straighten_username(krb_username) 146 if not username: 147 message = "invalid krb5 username: " + krb_username 148 return flask.render_template("403.html", message=message), 403 149 150 krb_login = ( 151 models.Krb5Login.query 152 .filter(models.Krb5Login.config_name == key) 153 .filter(models.Krb5Login.primary == username) 154 .first() 155 ) 156 if krb_login: 157 flask.g.user = krb_login.user 158 flask.session['krb5_login'] = krb_login.user.name 159 flask.flash(u"Welcome, {0}".format(flask.g.user.name), "success") 160 return flask.redirect(oid.get_next_url()) 161 162 # We need to create row in 'krb5_login' table 163 user = models.User.query.filter(models.User.username == username).first() 164 if not user: 165 # Even the item in 'user' table does not exist, create _now_ 166 email = username + "@" + krb_config[key]['email_domain'] 167 user = create_user_wrapper(username, email) 168 db.session.add(user) 169 170 krb_login = models.Krb5Login(user=user, primary=username, config_name=key) 171 db.session.add(krb_login) 172 db.session.commit() 173 174 flask.flash(u"Welcome, {0}".format(user.name), "success") 175 flask.g.user = user 176 flask.session['krb5_login'] = user.name 177 return flask.redirect(oid.get_next_url())
178
179 180 @misc.route("/login/", methods=["GET"]) 181 @oid.loginhandler 182 -def login():
183 if not app.config['FAS_LOGIN']: 184 if app.config['KRB5_LOGIN']: 185 return krb5_login_redirect(next=oid.get_next_url()) 186 flask.flash("No auth method available", "error") 187 return flask.redirect(flask.url_for("coprs_ns.coprs_show")) 188 189 if flask.g.user is not None: 190 return flask.redirect(oid.get_next_url()) 191 else: 192 # a bit of magic 193 team_req = TeamsRequest(["_FAS_ALL_GROUPS_"]) 194 return oid.try_login(app.config["OPENID_PROVIDER_URL"], 195 ask_for=["email", "timezone"], 196 extensions=[team_req])
197
198 199 @oid.after_login 200 -def create_or_login(resp):
201 flask.session["openid"] = resp.identity_url 202 fasusername = fed_raw_name(resp.identity_url) 203 204 # kidding me.. or not 205 if fasusername and ( 206 ( 207 app.config["USE_ALLOWED_USERS"] and 208 fasusername in app.config["ALLOWED_USERS"] 209 ) or not app.config["USE_ALLOWED_USERS"]): 210 211 username = fed_raw_name(resp.identity_url) 212 user = models.User.query.filter( 213 models.User.username == username).first() 214 if not user: # create if not created already 215 user = create_user_wrapper(username, resp.email, resp.timezone) 216 else: 217 user.mail = resp.email 218 user.timezone = resp.timezone 219 if "lp" in resp.extensions: 220 team_resp = resp.extensions['lp'] # name space for the teams extension 221 user.openid_groups = {"fas_groups": team_resp.teams} 222 223 db.session.add(user) 224 db.session.commit() 225 flask.flash(u"Welcome, {0}".format(user.name), "success") 226 flask.g.user = user 227 228 if flask.request.url_root == oid.get_next_url(): 229 return flask.redirect(flask.url_for("coprs_ns.coprs_by_user", 230 username=user.name)) 231 return flask.redirect(oid.get_next_url()) 232 else: 233 flask.flash("User '{0}' is not allowed".format(fasusername)) 234 return flask.redirect(oid.get_next_url())
235
236 237 @misc.route("/logout/") 238 -def logout():
239 flask.session.pop("openid", None) 240 flask.session.pop("krb5_login", None) 241 flask.flash(u"You were signed out") 242 return flask.redirect(oid.get_next_url())
243
244 245 -def api_login_required(f):
246 @functools.wraps(f) 247 def decorated_function(*args, **kwargs): 248 token = None 249 apt_login = None 250 if "Authorization" in flask.request.headers: 251 base64string = flask.request.headers["Authorization"] 252 base64string = base64string.split()[1].strip() 253 userstring = base64.b64decode(base64string) 254 (apt_login, token) = userstring.decode("utf-8").split(":") 255 token_auth = False 256 if token and apt_login: 257 user = UsersLogic.get_by_api_login(apt_login).first() 258 if (user and user.api_token == token and 259 user.api_token_expiration >= datetime.date.today()): 260 261 if user.proxy and "username" in flask.request.form: 262 user = UsersLogic.get(flask.request.form["username"]).first() 263 264 token_auth = True 265 flask.g.user = user 266 if not token_auth: 267 url = 'https://' + app.config["PUBLIC_COPR_HOSTNAME"] 268 url = helpers.fix_protocol_for_frontend(url) 269 270 output = { 271 "output": "notok", 272 "error": "Login invalid/expired. Please visit {0}/api to get or renew your API token.".format(url), 273 } 274 jsonout = flask.jsonify(output) 275 jsonout.status_code = 401 276 return jsonout 277 return f(*args, **kwargs)
278 return decorated_function 279
280 281 -def krb5_login_redirect(next=None):
282 krbc = app.config['KRB5_LOGIN'] 283 for key in krbc: 284 # Pick the first one for now. 285 return flask.redirect(flask.url_for("misc.krb5_login", 286 name=krbc[key]['URI'], 287 next=next)) 288 flask.flash("Unable to pick krb5 login page", "error") 289 return flask.redirect(flask.url_for("coprs_ns.coprs_show"))
290
291 292 -def login_required(role=RoleEnum("user")):
293 def view_wrapper(f): 294 @functools.wraps(f) 295 def decorated_function(*args, **kwargs): 296 if flask.g.user is None: 297 return flask.redirect(flask.url_for("misc.login", 298 next=flask.request.url)) 299 300 if role == RoleEnum("admin") and not flask.g.user.admin: 301 flask.flash("You are not allowed to access admin section.") 302 return flask.redirect(flask.url_for("coprs_ns.coprs_show")) 303 304 return f(*args, **kwargs)
305 return decorated_function 306 # hack: if login_required is used without params, the "role" parameter 307 # is in fact the decorated function, so we need to return 308 # the wrapped function, not the wrapper 309 # proper solution would be to use login_required() with parentheses 310 # everywhere, even if they"re empty - TODO 311 if callable(role): 312 return view_wrapper(role) 313 else: 314 return view_wrapper 315
316 317 # backend authentication 318 -def backend_authenticated(f):
319 @functools.wraps(f) 320 def decorated_function(*args, **kwargs): 321 auth = flask.request.authorization 322 if not auth or auth.password != app.config["BACKEND_PASSWORD"]: 323 return "You have to provide the correct password\n", 401 324 325 return f(*args, **kwargs)
326 return decorated_function 327
328 329 -def intranet_required(f):
330 @functools.wraps(f) 331 def decorated_function(*args, **kwargs): 332 ip_addr = IPAddress(flask.request.remote_addr) 333 accept_ranges = set(app.config.get("INTRANET_IPS", [])) 334 accept_ranges.add("127.0.0.1") # always accept from localhost 335 if not any(ip_addr in IPNetwork(addr_or_net) for addr_or_net in accept_ranges): 336 return ("Stats can be update only from intranet hosts, " 337 "not {}, check config\n".format(flask.request.remote_addr)), 403 338 339 return f(*args, **kwargs)
340 return decorated_function 341
342 343 -def req_with_copr(f):
344 @wraps(f) 345 def wrapper(**kwargs): 346 coprname = kwargs.pop("coprname") 347 if "group_name" in kwargs: 348 group_name = kwargs.pop("group_name") 349 copr = ComplexLogic.get_group_copr_safe(group_name, coprname, with_mock_chroots=True) 350 else: 351 username = kwargs.pop("username") 352 copr = ComplexLogic.get_copr_safe(username, coprname, with_mock_chroots=True) 353 return f(copr, **kwargs)
354 return wrapper 355
356 357 -def req_with_copr_dir(f):
358 @wraps(f) 359 def wrapper(**kwargs): 360 if "group_name" in kwargs: 361 ownername = '@' + kwargs.pop("group_name") 362 else: 363 ownername = kwargs.pop("username") 364 copr_dirname = kwargs.pop("copr_dirname") 365 copr_dir = ComplexLogic.get_copr_dir_safe(ownername, copr_dirname) 366 return f(copr_dir, **kwargs)
367 return wrapper 368
369 370 -def send_build_icon(build):
371 if not build: 372 return send_file("static/status_images/unknown.png", 373 mimetype='image/png') 374 375 if build.state in ["importing", "pending", "starting", "running"]: 376 # The icon is about to change very soon, disable caches: 377 # https://help.github.com/articles/about-anonymized-image-urls/ 378 response = send_file("static/status_images/in_progress.png", 379 mimetype='image/png') 380 response.headers['Cache-Control'] = 'no-cache' 381 return response 382 383 if build.state in ["succeeded", "skipped"]: 384 return send_file("static/status_images/succeeded.png", 385 mimetype='image/png') 386 387 if build.state == "failed": 388 return send_file("static/status_images/failed.png", 389 mimetype='image/png') 390 391 return send_file("static/status_images/unknown.png", 392 mimetype='image/png')
393