Source code for noggin.controller.user

import os
from base64 import b32encode

import jwt
import python_freeipa
from flask import (
    current_app,
    flash,
    g,
    redirect,
    render_template,
    request,
    session,
    url_for,
)
from flask_babel import _
from flask_mail import Message
from markupsafe import Markup
from pyotp import TOTP
from werkzeug.datastructures import MultiDict

from noggin.app import mailer
from noggin.form.base import BaseForm
from noggin.form.edit_user import (
    UserSettingsAddOTPForm,
    UserSettingsAgreementSign,
    UserSettingsConfirmOTPForm,
    UserSettingsEmailForm,
    UserSettingsKeysForm,
    UserSettingsOTPNameChange,
    UserSettingsOTPStatusChange,
    UserSettingsProfileForm,
)
from noggin.representation.agreement import Agreement
from noggin.representation.group import Group
from noggin.representation.otptoken import OTPToken
from noggin.representation.user import User
from noggin.security.ipa import maybe_ipa_login
from noggin.utility import messaging
from noggin.utility.controllers import require_self, user_or_404, with_ipa
from noggin.utility.forms import FormError, handle_form_errors
from noggin.utility.token import Audience, make_token, read_token
from noggin_messages import UserUpdateV1

from . import blueprint as bp


# Must be the same as KEY_LENGTH in ipaserver/plugins/otptoken.py
# For maximum compatibility, must be a multiple of 5.
OTP_KEY_LENGTH = 35


[docs] @bp.route('/user/<username>/') @with_ipa() def user(ipa, username): user = User(user_or_404(ipa, username)) # As a speed optimization, we make two separate calls. # Just doing a group_find (with all=True) is super slow here, with a lot of # groups. batch_methods = [ {"method": "group_show", "params": [[name], {"no_members": True}]} for name in user.groups ] # Don't call remote batch method with an empty list if batch_methods: member_groups = [ Group(g["result"]) for g in ipa.batch(batch_methods)["results"] if g["result"].get("fasgroup", False) ] else: member_groups = [] managed_groups = [ Group(group) for group in ipa.group_find( o_membermanager_user=username, o_all=False, fasgroup=True )['result'] ] groups = sorted(list(set(managed_groups + member_groups)), key=lambda g: g.name) # Privacy setting if user != g.current_user and user.is_private: user.anonymize() return render_template( 'user.html', user=user, groups=groups, managed_groups=managed_groups, member_groups=member_groups, )
def _user_mod(ipa, form, user, details, redirect_to): with handle_form_errors(form): try: updated_user = User( ipa.user_mod(user.username, **details, all=True)['result'] ) except python_freeipa.exceptions.BadRequest as e: if e.message == 'no modifications to be performed': raise FormError("non_field_errors", e.message) else: current_app.logger.error( f'An error happened while editing user {user.username}: {e.message}' ) raise FormError("non_field_errors", e.message) flash( Markup( f'Profile Updated: <a href=\"{url_for(".user", username=user.username)}\">' 'view your profile</a>' ), 'success', ) messaging.publish( UserUpdateV1( { "msg": { "agent": user.username, "user": user.username, "fields": user.diff_fields(updated_user), } } ) ) return redirect(url_for(redirect_to, username=user.username))
[docs] @bp.route('/user/<username>/settings/profile/', methods=['GET', 'POST']) @with_ipa() @require_self def user_settings_profile(ipa, username): user = User(user_or_404(ipa, username)) form = UserSettingsProfileForm(obj=user) if form.validate_on_submit(): changes = { user.get_attr_option(field.short_name): getattr(form, field.short_name).data for field in form if field.short_name in user } fullname = f"{form.firstname.data} {form.lastname.data}" changes["o_cn"] = changes["o_displayname"] = changes["o_gecos"] = fullname result = _user_mod( ipa, form, user, changes, ".user_settings_profile", ) if result: return result if not form.errors: form.ircnick.append_entry() return render_template( 'user-settings-profile.html', user=user, form=form, activetab="profile" )
def _send_validation_email(user, attr, value): token = make_token( {"sub": user.username, "attr": attr, "mail": value}, audience=Audience.email_validation, ttl=current_app.config["ACTIVATION_TOKEN_EXPIRATION"], ) email_context = {"token": token, "user": user, "address": value} email = Message( body=render_template("settings-email-validation.txt", **email_context), html=render_template("settings-email-validation.html", **email_context), recipients=[value], subject=_("Verify your email address"), ) if current_app.config["DEBUG"]: # pragma: no cover current_app.logger.debug(email) mailer.send(email)
[docs] @bp.route('/user/<username>/settings/email/', methods=['GET', 'POST']) @with_ipa() @require_self def user_settings_email(ipa, username): user = User(user_or_404(ipa, username)) form = UserSettingsEmailForm(obj=user) attrs = ["mail", "rhbz_mail"] if form.validate_on_submit(): change_now = {} needs_validation = {} for attr in attrs: value = getattr(form, attr).data old_value = getattr(user, attr) or "" option_name = user.get_attr_option(attr) if value != old_value: if not value: # email has been removed change_now[option_name] = value else: needs_validation[attr] = value should_redirect = False if change_now: should_redirect = _user_mod( ipa, form, user, change_now, ".user_settings_email", ) if needs_validation: for attr, value in needs_validation.items(): try: _send_validation_email(user, attr, value) except ConnectionRefusedError as e: current_app.logger.error( f"Impossible to send an address validation email: {e}" ) form["non_field_errors"].errors.append( _( "We could not send you the address validation email, please retry later" ) ) break flash( _( "The email address %(mail)s needs to be validated. Please check your " "inbox and click on the link to proceed. If you can't find the email " "in a couple minutes, check your spam folder.", mail=value, ), "info", ) should_redirect = redirect( url_for('.user_settings_email', username=user.username) ) if should_redirect: return should_redirect if not change_now and not needs_validation: form["non_field_errors"].errors.append(_("No modifications.")) return render_template( 'user-settings-email.html', user=user, form=form, activetab="email" )
[docs] @bp.route('/user/<username>/settings/email/validate', methods=['GET', 'POST']) @with_ipa() @require_self def user_settings_email_validate(ipa, username): user = User(user_or_404(ipa, username)) url = url_for('.user_settings_email', username=user.username) token_string = request.args.get('token') if not token_string: flash( _('No token provided, please check your email validation link.'), 'warning' ) return redirect(url) try: token = read_token(token_string, audience=Audience.email_validation) except jwt.exceptions.DecodeError: flash(_("The token is invalid, please set the email again."), "warning") return redirect(url) except jwt.exceptions.ExpiredSignatureError: flash( _("This token is no longer valid, please set the email again."), "warning" ) return redirect(url) if token["sub"] != user.username: flash(_("This token does not belong to you."), "warning") return redirect(url) attr = token["attr"] value = token["mail"] form = BaseForm() if form.validate_on_submit(): option_name = user.get_attr_option(token["attr"]) result = _user_mod( ipa, form, user, {option_name: value}, ".user_settings_email", ) if result: return result return render_template( 'user-settings-email-validation.html', form=form, user=user, attr_label=UserSettingsEmailForm()[attr].label, value=value, )
[docs] @bp.route('/user/<username>/settings/keys/', methods=['GET', 'POST']) @with_ipa() @require_self def user_settings_keys(ipa, username): user = User(user_or_404(ipa, username)) form = UserSettingsKeysForm(obj=user) if form.validate_on_submit(): result = _user_mod( ipa, form, user, {'o_ipasshpubkey': form.sshpubkeys.data, 'fasgpgkeyid': form.gpgkeys.data}, ".user_settings_keys", ) if result: return result # if the form has errors, we don't want to add new fields. otherwise, # more fields will show up with every validation error if not form.errors: # Append 2 empty entries at the bottom of the gpgkeys fieldlist for i in range(2): form.gpgkeys.append_entry() form.sshpubkeys.append_entry() return render_template( 'user-settings-keys.html', user=user, form=form, activetab="keys" )
[docs] @bp.route('/user/<username>/settings/otp/', methods=['GET', 'POST']) @with_ipa() @require_self def user_settings_otp(ipa, username): addotpform = UserSettingsAddOTPForm(prefix="add-") confirmotpform = UserSettingsConfirmOTPForm(prefix="confirm-") user = User(user_or_404(ipa, username)) secret = None if addotpform.validate_on_submit(): description = addotpform.description.data password = addotpform.password.data if addotpform.otp.data: password += addotpform.otp.data try: maybe_ipa_login(current_app, session, username, password) except python_freeipa.exceptions.InvalidSessionPassword: addotpform.password.errors.append(_("Incorrect password")) else: secret = b32encode(os.urandom(OTP_KEY_LENGTH)).decode('ascii') # Prefill the form for the next step confirmotpform.process( MultiDict( {"confirm-secret": secret, "confirm-description": description} ) ) if confirmotpform.validate_on_submit(): try: ipa.otptoken_add( o_ipatokenowner=username, o_description=confirmotpform.description.data, o_ipatokenotpkey=confirmotpform.secret.data, ) except python_freeipa.exceptions.FreeIPAError as e: current_app.logger.error( f'An error happened while creating an OTP token for user {username}: {e.message}' ) confirmotpform.non_field_errors.errors.append(_('Cannot create the token.')) else: flash(_('The token has been created.'), "success") return redirect(url_for('.user_settings_otp', username=username)) if confirmotpform.is_submitted(): # This form is inside the modal. Keep a value in otp_uri or the modal will not open # to show the errors. secret = confirmotpform.secret.data # Compute the token URI if secret: description = addotpform.description.data or confirmotpform.description.data token = TOTP(secret) otp_uri = token.provisioning_uri(name=description, issuer_name=user.krbname) else: otp_uri = None # List existing tokens tokens = [ OTPToken(t) for t in ipa.otptoken_find(o_ipatokenowner=username)["result"] ] tokens.sort(key=lambda t: t.description or "") return render_template( 'user-settings-otp.html', addotpform=addotpform, confirmotpform=confirmotpform, user=user, activetab="otp", tokens=tokens, otp_uri=otp_uri, )
[docs] @bp.route('/user/<username>/settings/otp/rename/', methods=['POST']) @with_ipa() @require_self def user_settings_otp_rename(ipa, username): form = UserSettingsOTPNameChange() if form.validate_on_submit(): try: ipa.otptoken_mod( a_ipatokenuniqueid=form.token.data, o_description=form.description.data, ) except ( python_freeipa.exceptions.BadRequest, python_freeipa.exceptions.FreeIPAError, ) as e: if e.message != "no modifications to be performed": flash(_('Cannot rename the token.'), 'danger') current_app.logger.error( f'Something went wrong renaming an OTP token for user {username}: {e}' ) for field_errors in form.errors.values(): for error in field_errors: flash(error, 'danger') return redirect(url_for('.user_settings_otp', username=username))
[docs] @bp.route('/user/<username>/settings/otp/disable/', methods=['POST']) @with_ipa() @require_self def user_settings_otp_disable(ipa, username): form = UserSettingsOTPStatusChange() if form.validate_on_submit(): token = form.token.data try: ipa.otptoken_mod(a_ipatokenuniqueid=token, o_ipatokendisabled=True) except python_freeipa.exceptions.BadRequest as e: if ( e.message == "Server is unwilling to perform: Can't disable last active token" ): flash(_('Sorry, You cannot disable your last active token.'), 'warning') else: flash(_('Cannot disable the token.'), 'danger') current_app.logger.error( f'Something went wrong disabling an OTP token for user {username}: {e}' ) except python_freeipa.exceptions.FreeIPAError as e: flash(_('Cannot disable the token.'), 'danger') current_app.logger.error( f'Something went wrong disabling an OTP token for user {username}: {e}' ) for field_errors in form.errors.values(): for error in field_errors: flash(error, 'danger') return redirect(url_for('.user_settings_otp', username=username))
[docs] @bp.route('/user/<username>/settings/otp/enable/', methods=['POST']) @with_ipa() @require_self def user_settings_otp_enable(ipa, username): form = UserSettingsOTPStatusChange() if form.validate_on_submit(): token = form.token.data try: ipa.otptoken_mod(a_ipatokenuniqueid=token, o_ipatokendisabled=False) except ( python_freeipa.exceptions.BadRequest, python_freeipa.exceptions.FreeIPAError, ) as e: flash( _('Cannot enable the token. %(errormessage)s', errormessage=e), 'danger' ) current_app.logger.error( f'Something went wrong enabling an OTP token for user {username}: {e}' ) for field_errors in form.errors.values(): for error in field_errors: flash(error, 'danger') return redirect(url_for('.user_settings_otp', username=username))
[docs] @bp.route('/user/<username>/settings/otp/delete/', methods=['POST']) @with_ipa() @require_self def user_settings_otp_delete(ipa, username): form = UserSettingsOTPStatusChange() if form.validate_on_submit(): username = session.get('noggin_username') token = form.token.data try: ipa.otptoken_del(a_ipatokenuniqueid=token) except python_freeipa.exceptions.BadRequest as e: if ( e.message == "Server is unwilling to perform: Can't delete last active token" ): flash(_('Sorry, You cannot delete your last active token.'), 'warning') else: flash(_('Cannot delete the token.'), 'danger') current_app.logger.error( f'Something went wrong deleting OTP token for user {username}: {e}' ) except python_freeipa.exceptions.FreeIPAError as e: flash(_('Cannot delete the token.'), 'danger') current_app.logger.error( f'Something went wrong deleting OTP token for user {username}: {e}' ) for field_errors in form.errors.values(): for error in field_errors: flash(error, 'danger') return redirect(url_for('.user_settings_otp', username=username))
[docs] @bp.route('/user/<username>/settings/agreements/', methods=['GET', 'POST']) @with_ipa() @require_self def user_settings_agreements(ipa, username): user = User(user_or_404(ipa, username)) agreements = [ Agreement(a) for a in ipa.fasagreement_find(all=False, ipaenabledflag=True) ] form = UserSettingsAgreementSign() if form.validate_on_submit(): agreement_name = form.agreement.data if agreement_name not in [a.name for a in agreements]: flash(_("Unknown agreement: %(name)s.", name=agreement_name), "warning") return redirect(url_for('.user_settings_agreements', username=username)) try: ipa.fasagreement_add_user(agreement_name, user=user.username) except python_freeipa.exceptions.BadRequest as e: current_app.logger.error( f"Cannot sign the agreement {agreement_name!r}: {e}" ) flash( _( 'Cannot sign the agreement "%(name)s": %(error)s', name=agreement_name, error=e, ), 'danger', ) else: flash( _('You signed the "%(name)s" agreement.', name=agreement_name), "success", ) return redirect(url_for('.user_settings_agreements', username=username)) return render_template( 'user-settings-agreements.html', user=user, activetab="agreements", agreementslist=agreements, raw=ipa.fasagreement_find(all=True), )