Package coprs :: Package views :: Module misc
[hide private]
[frames] | no frames]

Source Code for Module coprs.views.misc

  1  import os 
  2  import base64 
  3  import datetime 
  4  import functools 
  5  from functools import wraps, partial 
  6   
  7  from netaddr import IPAddress, IPNetwork 
  8  import re 
  9  import flask 
 10  from flask import send_file 
 11   
 12  from urllib.parse import urlparse 
 13  from openid_teams.teams import TeamsRequest 
 14   
 15  from copr_common.enums import RoleEnum 
 16  from coprs import app 
 17  from coprs import db 
 18  from coprs import helpers 
 19  from coprs import models 
 20  from coprs import oid 
 21  from coprs.logic.complex_logic import ComplexLogic 
 22  from coprs.logic.users_logic import UsersLogic 
 23  from coprs.logic.coprs_logic import CoprsLogic 
24 25 26 -def create_user_wrapper(username, email, timezone=None):
27 expiration_date_token = datetime.date.today() + \ 28 datetime.timedelta( 29 days=flask.current_app.config["API_TOKEN_EXPIRATION"]) 30 31 copr64 = base64.b64encode(b"copr") + b"##" 32 user = models.User(username=username, mail=email, 33 timezone=timezone, 34 api_login=copr64.decode("utf-8") + helpers.generate_api_token( 35 app.config["API_TOKEN_LENGTH"] - len(copr64)), 36 api_token=helpers.generate_api_token( 37 app.config["API_TOKEN_LENGTH"]), 38 api_token_expiration=expiration_date_token) 39 return user
40
41 42 -def fed_raw_name(oidname):
43 oidname_parse = urlparse(oidname) 44 if not oidname_parse.netloc: 45 return oidname 46 config_parse = urlparse(app.config["OPENID_PROVIDER_URL"]) 47 return oidname_parse.netloc.replace(".{0}".format(config_parse.netloc), "")
48
49 50 -def krb_straighten_username(krb_remote_user):
51 # Input should look like 'USERNAME@REALM.TLD', strip realm. 52 username = re.sub(r'@.*', '', krb_remote_user) 53 54 # But USERNAME part can consist of USER/DOMAIN.TLD. 55 # TODO: Do we need more clever thing here? 56 username = re.sub('/', '_', username) 57 58 # Based on restrictions for project name: "letters, digits, underscores, 59 # dashes and dots", it is worth limitting the username here, too. 60 # TODO: Store this pattern on one place. 61 return username if re.match(r"^[\w.-]+$", username) else None
62
63 64 @app.before_request 65 -def set_empty_user():
66 flask.g.user = None
67
68 69 @app.before_request 70 -def lookup_current_user():
71 flask.g.user = username = None 72 if "openid" in flask.session: 73 username = fed_raw_name(flask.session["openid"]) 74 elif "krb5_login" in flask.session: 75 username = flask.session["krb5_login"] 76 77 if username: 78 flask.g.user = models.User.query.filter( 79 models.User.username == username).first()
80
81 82 -def page_not_found(message):
83 return flask.render_template("404.html", message=message), 404
84
85 86 -def access_restricted(message):
87 return flask.render_template("403.html", message=message), 403
88
89 90 -def generic_error(message, code=500, title=None):
91 """ 92 :type message: str 93 :type err: CoprHttpException 94 """ 95 return flask.render_template("_error.html", 96 message=message, 97 error_code=code, 98 error_title=title), code
99 100 101 server_error_handler = partial(generic_error, code=500, title="Internal Server Error") 102 bad_request_handler = partial(generic_error, code=400, title="Bad Request") 103 104 misc = flask.Blueprint("misc", __name__) 105 106 107 @misc.route(app.config['KRB5_LOGIN_BASEURI'] + "<name>/", methods=["GET"])
108 -def krb5_login(name):
109 """ 110 Handle the Kerberos authentication. 111 112 Note that if we are able to get here, either the user is authenticated 113 correctly, or apache is mis-configured and it does not perform KRB 114 authentication at all. Note also, even if that can be considered ugly, we 115 are reusing oid's get_next_url feature with kerberos login. 116 """ 117 118 # Already logged in? 119 if flask.g.user is not None: 120 return flask.redirect(oid.get_next_url()) 121 122 krb_config = app.config['KRB5_LOGIN'] 123 124 found = None 125 for key in krb_config.keys(): 126 if krb_config[key]['URI'] == name: 127 found = key 128 break 129 130 if not found: 131 # no KRB5_LOGIN.<name> configured in copr.conf 132 return flask.render_template("404.html"), 404 133 134 if app.config["DEBUG"] and 'TEST_REMOTE_USER' in os.environ: 135 # For local testing (without krb5 keytab and other configuration) 136 flask.request.environ['REMOTE_USER'] = os.environ['TEST_REMOTE_USER'] 137 138 if 'REMOTE_USER' not in flask.request.environ: 139 nocred = "Kerberos authentication failed (no credentials provided)" 140 return flask.render_template("403.html", message=nocred), 403 141 142 krb_username = flask.request.environ['REMOTE_USER'] 143 app.logger.debug("krb5 login attempt: " + krb_username) 144 username = krb_straighten_username(krb_username) 145 if not username: 146 message = "invalid krb5 username: " + krb_username 147 return flask.render_template("403.html", message=message), 403 148 149 krb_login = ( 150 models.Krb5Login.query 151 .filter(models.Krb5Login.config_name == key) 152 .filter(models.Krb5Login.primary == username) 153 .first() 154 ) 155 if krb_login: 156 flask.g.user = krb_login.user 157 flask.session['krb5_login'] = krb_login.user.name 158 flask.flash(u"Welcome, {0}".format(flask.g.user.name), "success") 159 return flask.redirect(oid.get_next_url()) 160 161 # We need to create row in 'krb5_login' table 162 user = models.User.query.filter(models.User.username == username).first() 163 if not user: 164 # Even the item in 'user' table does not exist, create _now_ 165 email = username + "@" + krb_config[key]['email_domain'] 166 user = create_user_wrapper(username, email) 167 db.session.add(user) 168 169 krb_login = models.Krb5Login(user=user, primary=username, config_name=key) 170 db.session.add(krb_login) 171 db.session.commit() 172 173 flask.flash(u"Welcome, {0}".format(user.name), "success") 174 flask.g.user = user 175 flask.session['krb5_login'] = user.name 176 return flask.redirect(oid.get_next_url())
177
178 179 @misc.route("/login/", methods=["GET"]) 180 @oid.loginhandler 181 -def login():
182 if not app.config['FAS_LOGIN']: 183 if app.config['KRB5_LOGIN']: 184 return krb5_login_redirect(next=oid.get_next_url()) 185 flask.flash("No auth method available", "error") 186 return flask.redirect(flask.url_for("coprs_ns.coprs_show")) 187 188 if flask.g.user is not None: 189 return flask.redirect(oid.get_next_url()) 190 else: 191 # a bit of magic 192 team_req = TeamsRequest(["_FAS_ALL_GROUPS_"]) 193 return oid.try_login(app.config["OPENID_PROVIDER_URL"], 194 ask_for=["email", "timezone"], 195 extensions=[team_req])
196
197 198 @oid.after_login 199 -def create_or_login(resp):
200 flask.session["openid"] = resp.identity_url 201 fasusername = fed_raw_name(resp.identity_url) 202 203 # kidding me.. or not 204 if fasusername and ( 205 ( 206 app.config["USE_ALLOWED_USERS"] and 207 fasusername in app.config["ALLOWED_USERS"] 208 ) or not app.config["USE_ALLOWED_USERS"]): 209 210 username = fed_raw_name(resp.identity_url) 211 user = models.User.query.filter( 212 models.User.username == username).first() 213 if not user: # create if not created already 214 user = create_user_wrapper(username, resp.email, resp.timezone) 215 else: 216 user.mail = resp.email 217 user.timezone = resp.timezone 218 if "lp" in resp.extensions: 219 team_resp = resp.extensions['lp'] # name space for the teams extension 220 user.openid_groups = {"fas_groups": team_resp.teams} 221 222 db.session.add(user) 223 db.session.commit() 224 flask.flash(u"Welcome, {0}".format(user.name), "success") 225 flask.g.user = user 226 227 if flask.request.url_root == oid.get_next_url(): 228 return flask.redirect(flask.url_for("coprs_ns.coprs_by_user", 229 username=user.name)) 230 return flask.redirect(oid.get_next_url()) 231 else: 232 flask.flash("User '{0}' is not allowed".format(fasusername)) 233 return flask.redirect(oid.get_next_url())
234
235 236 @misc.route("/logout/") 237 -def logout():
238 flask.session.pop("openid", None) 239 flask.session.pop("krb5_login", None) 240 flask.flash(u"You were signed out") 241 return flask.redirect(oid.get_next_url())
242
243 244 -def api_login_required(f):
245 @functools.wraps(f) 246 def decorated_function(*args, **kwargs): 247 token = None 248 apt_login = None 249 if "Authorization" in flask.request.headers: 250 base64string = flask.request.headers["Authorization"] 251 base64string = base64string.split()[1].strip() 252 userstring = base64.b64decode(base64string) 253 (apt_login, token) = userstring.decode("utf-8").split(":") 254 token_auth = False 255 if token and apt_login: 256 user = UsersLogic.get_by_api_login(apt_login).first() 257 if (user and user.api_token == token and 258 user.api_token_expiration >= datetime.date.today()): 259 260 if user.proxy and "username" in flask.request.form: 261 user = UsersLogic.get(flask.request.form["username"]).first() 262 263 token_auth = True 264 flask.g.user = user 265 if not token_auth: 266 url = 'https://' + app.config["PUBLIC_COPR_HOSTNAME"] 267 url = helpers.fix_protocol_for_frontend(url) 268 269 output = { 270 "output": "notok", 271 "error": "Login invalid/expired. Please visit {0}/api to get or renew your API token.".format(url), 272 } 273 jsonout = flask.jsonify(output) 274 jsonout.status_code = 401 275 return jsonout 276 return f(*args, **kwargs)
277 return decorated_function 278
279 280 -def krb5_login_redirect(next=None):
281 krbc = app.config['KRB5_LOGIN'] 282 for key in krbc: 283 # Pick the first one for now. 284 return flask.redirect(flask.url_for("misc.krb5_login", 285 name=krbc[key]['URI'], 286 next=next)) 287 flask.flash("Unable to pick krb5 login page", "error") 288 return flask.redirect(flask.url_for("coprs_ns.coprs_show"))
289
290 291 -def login_required(role=RoleEnum("user")):
292 def view_wrapper(f): 293 @functools.wraps(f) 294 def decorated_function(*args, **kwargs): 295 if flask.g.user is None: 296 return flask.redirect(flask.url_for("misc.login", 297 next=flask.request.url)) 298 299 if role == RoleEnum("admin") and not flask.g.user.admin: 300 flask.flash("You are not allowed to access admin section.") 301 return flask.redirect(flask.url_for("coprs_ns.coprs_show")) 302 303 return f(*args, **kwargs)
304 return decorated_function 305 # hack: if login_required is used without params, the "role" parameter 306 # is in fact the decorated function, so we need to return 307 # the wrapped function, not the wrapper 308 # proper solution would be to use login_required() with parentheses 309 # everywhere, even if they"re empty - TODO 310 if callable(role): 311 return view_wrapper(role) 312 else: 313 return view_wrapper 314
315 316 # backend authentication 317 -def backend_authenticated(f):
318 @functools.wraps(f) 319 def decorated_function(*args, **kwargs): 320 auth = flask.request.authorization 321 if not auth or auth.password != app.config["BACKEND_PASSWORD"]: 322 return "You have to provide the correct password\n", 401 323 324 return f(*args, **kwargs)
325 return decorated_function 326
327 328 -def intranet_required(f):
329 @functools.wraps(f) 330 def decorated_function(*args, **kwargs): 331 ip_addr = IPAddress(flask.request.remote_addr) 332 accept_ranges = set(app.config.get("INTRANET_IPS", [])) 333 accept_ranges.add("127.0.0.1") # always accept from localhost 334 if not any(ip_addr in IPNetwork(addr_or_net) for addr_or_net in accept_ranges): 335 return ("Stats can be update only from intranet hosts, " 336 "not {}, check config\n".format(flask.request.remote_addr)), 403 337 338 return f(*args, **kwargs)
339 return decorated_function 340
341 342 -def req_with_copr(f):
343 @wraps(f) 344 def wrapper(**kwargs): 345 coprname = kwargs.pop("coprname") 346 if "group_name" in kwargs: 347 group_name = kwargs.pop("group_name") 348 copr = ComplexLogic.get_group_copr_safe(group_name, coprname, with_mock_chroots=True) 349 else: 350 username = kwargs.pop("username") 351 copr = ComplexLogic.get_copr_safe(username, coprname, with_mock_chroots=True) 352 return f(copr, **kwargs)
353 return wrapper 354
355 356 -def send_build_icon(build):
357 if not build: 358 return send_file("static/status_images/unknown.png", 359 mimetype='image/png') 360 361 if build.state in ["importing", "pending", "starting", "running"]: 362 # The icon is about to change very soon, disable caches: 363 # https://help.github.com/articles/about-anonymized-image-urls/ 364 response = send_file("static/status_images/in_progress.png", 365 mimetype='image/png') 366 response.headers['Cache-Control'] = 'no-cache' 367 return response 368 369 if build.state in ["succeeded", "skipped"]: 370 return send_file("static/status_images/succeeded.png", 371 mimetype='image/png') 372 373 if build.state == "failed": 374 return send_file("static/status_images/failed.png", 375 mimetype='image/png') 376 377 return send_file("static/status_images/unknown.png", 378 mimetype='image/png')
379