import datetime
import re
import jwt
import python_freeipa
from flask import (
abort,
current_app,
flash,
jsonify,
redirect,
render_template,
request,
session,
url_for,
)
from flask_babel import _
from flask_mail import Message
from translitcodec import codecs
from unidecode import unidecode
from noggin.app import csrf, ipa_admin, mailer
from noggin.form.register_user import (
PasswordSetForm,
RegisteringActionForm,
ResendValidationEmailForm,
)
from noggin.l10n import guess_locale
from noggin.representation.user import User
from noggin.security.ipa import NoIPAServer, maybe_ipa_login, untouched_ipa_client
from noggin.signals import stageuser_created, user_registered
from noggin.utility.controllers import with_ipa
from noggin.utility.forms import FormError, handle_form_errors
from noggin.utility.token import Audience, make_token, read_token
from . import blueprint as bp
# Errors coming from FreeIPA are specified by a field name that is different from our form field
# name. This dict maps one to the other. See the `cli_name` in
# https://pagure.io/freeipa/blob/master/f/ipaclient/remote_plugins/2_164/user.py
IPA_TO_FORM_FIELDS = {
"login": "username",
"first": "firstname",
"last": "lastname",
"password": "password",
"email": "mail",
}
def _send_validation_email(user):
ttl = current_app.config["ACTIVATION_TOKEN_EXPIRATION"]
token = make_token(
{"sub": user.username, "mail": user.mail},
audience=Audience.email_validation,
ttl=ttl,
)
valid_until = datetime.datetime.utcnow() + datetime.timedelta(minutes=ttl)
email_context = {
"token": token,
"user": user,
"ttl": ttl,
"valid_until": valid_until,
}
email = Message(
body=render_template("email-validation.txt", **email_context),
html=render_template("email-validation.html", **email_context),
recipients=[user.mail],
subject=_("Verify your email address"),
)
if current_app.config["DEBUG"]: # pragma: no cover
current_app.logger.debug(email)
try:
mailer.send(email)
except ConnectionRefusedError as e:
current_app.logger.error(f"Impossible to send an address validation email: {e}")
flash(
_("We could not send you the address validation email, please retry later"),
"danger",
)
def _handle_registration_validation_error(username, e):
mo = re.match(r"^invalid '([^']+)': (.+)$", e.message)
if mo:
ipa_field_name = mo.group(1)
if ipa_field_name in IPA_TO_FORM_FIELDS:
raise FormError(IPA_TO_FORM_FIELDS[ipa_field_name], mo.group(2))
# Raise a generic error if we can't do better
current_app.logger.error(
f'An unhandled invalid value happened while registering user '
f'{username}: {e.message}'
)
raise FormError("non_field_errors", e.message)
[docs]
@bp.route('/register/spamcheck-wait')
def spamcheck_wait():
username = request.args.get('username')
if not username:
abort(400, "No username provided")
try:
user = User(ipa_admin.stageuser_show(a_uid=username)["result"])
except python_freeipa.exceptions.NotFound:
flash(_("The registration seems to have failed, please try again."), "warning")
return redirect(f"{url_for('.root')}?tab=register")
if user.status_note == "active":
return redirect(f"{url_for('.confirm_registration')}?username={username}")
return render_template('registration-spamcheck-wait.html', user=user)
[docs]
@bp.route('/register/confirm', methods=["GET", "POST"])
def confirm_registration():
username = request.args.get('username')
if not username:
abort(400, "No username provided")
try:
user = User(ipa_admin.stageuser_show(a_uid=username)['result'])
except python_freeipa.exceptions.NotFound:
flash(_("The registration seems to have failed, please try again."), "warning")
return redirect(f"{url_for('.root')}?tab=register")
if current_app.config["BASSET_URL"] and user.status_note != "active":
abort(401, "You should not be here")
form = ResendValidationEmailForm()
if form.validate_on_submit():
_send_validation_email(user)
flash(
_(
'The address validation email has be sent again. Make sure it did not land in '
'your spam folder'
),
'success',
)
return redirect(request.url)
return render_template('registration-confirmation.html', user=user, form=form)
[docs]
@bp.route('/register/activate', methods=["GET", "POST"])
def activate_account():
register_url = f"{url_for('.root')}?tab=register"
token_string = request.args.get('token')
if not token_string:
flash(
_('No token provided, please check your email validation link.'), 'warning'
)
return redirect(register_url)
try:
token = read_token(token_string, audience=Audience.email_validation)
except jwt.exceptions.DecodeError:
flash(_("The token is invalid, please register again."), "warning")
return redirect(register_url)
except jwt.exceptions.ExpiredSignatureError:
flash(_("This token is no longer valid, please register again."), "warning")
return redirect(register_url)
try:
user = User(ipa_admin.stageuser_show(token["sub"])["result"])
except python_freeipa.exceptions.NotFound:
flash(_("This user cannot be found, please register again."), "warning")
return redirect(register_url)
token_mail = token["mail"]
if not user.mail == token_mail:
current_app.logger.error(
f'User {user.username} tried to validate a token for address {token_mail} while they '
f'are registered with address {user.mail}, something fishy may be going on.'
)
flash(
_(
"The username and the email address don't match the token you used, "
"please register again."
),
"warning",
)
return redirect(register_url)
form = PasswordSetForm()
if form.validate_on_submit():
with handle_form_errors(form):
password = form.password.data
# First we activate the stage user
try:
ipa_admin.stageuser_activate(user.username)
except python_freeipa.exceptions.FreeIPAError as e:
current_app.logger.error(
f'An unhandled error {e.__class__.__name__} happened while activating '
f'stage user {user.username}: {e.message}'
)
raise FormError(
"non_field_errors",
_(
"Something went wrong while creating your account, "
"please try again later."
),
)
# User activation succeeded. Send signal.
user_registered.send(user, request=request._get_current_object())
# Now we set the password.
try:
# First, set it as an admin. This will mark it as expired.
ipa_admin.user_mod(user.username, userpassword=password)
# And now we set it again as the user, so it is not expired any more.
ipa = untouched_ipa_client(current_app, session)
ipa.change_password(
user.username, new_password=password, old_password=password
)
except python_freeipa.exceptions.PWChangePolicyError as e:
# The user is active but the password does not match the policy.
# Tell the user what's going to happen.
flash(
_(
'Your account has been created, but the password you chose does not '
'comply with the policy (%(policy_error)s) and has thus been set as '
'expired. You will be asked to change it after logging in.',
policy_error=e.policy_error,
),
'warning',
)
return redirect(url_for(".root"))
except python_freeipa.exceptions.ValidationError as e:
# for example: invalid username. We don't know which field to link it to
_handle_registration_validation_error(user.username, e)
except python_freeipa.exceptions.FreeIPAError as e:
current_app.logger.error(
f'An unhandled error {e.__class__.__name__} happened while changing initial '
f'password for user {user.username}: {e.message}'
)
# At this point the user has been activated, they can't register again. Send them to
# the login page with an appropriate warning.
flash(
_(
'Your account has been created, but an error occurred while setting your '
'password (%(message)s). You may need to change it after logging in.',
message=e.message,
),
'warning',
)
return redirect(url_for(".root"))
except NoIPAServer:
raise FormError("non_field_errors", _("No IPA server available"))
# Try to log them in directly, so they don't have to type their password again.
try:
ipa = maybe_ipa_login(current_app, session, user.username, password)
except python_freeipa.exceptions.FreeIPAError:
ipa = None
if ipa:
flash(
_(
'Congratulations, your account has been created! Welcome, %(name)s.',
name=user.name,
),
'success',
)
else:
# No shortcut for you, you'll have to login properly (maybe the password is
# expired).
flash(
_(
'Congratulations, your account has been created! Go ahead and sign in '
'to proceed.'
),
'success',
)
return redirect(url_for('.root'))
return render_template('registration-activation.html', user=user, form=form)
[docs]
@bp.route('/register/spamcheck-hook', methods=["POST"])
@csrf.exempt
def spamcheck_hook():
if not current_app.config.get("BASSET_URL"):
return jsonify({"error": "Spamcheck disabled"}), 501
data = request.get_json()
if not data:
return jsonify({"error": "Bad payload"}), 400
try:
token = data["token"]
status = data["status"]
except KeyError as e:
return jsonify({"error": f"Missing key: {e}"}), 400
try:
token_data = read_token(token, audience=Audience.spam_check)
except jwt.ExpiredSignatureError:
return jsonify({"error": "The token has expired"}), 400
except jwt.InvalidTokenError as e:
return jsonify({"error": f"Invalid token: {e}"}), 400
username = token_data["sub"]
if status not in ("active", "spamcheck_denied", "spamcheck_manual"):
return jsonify({"error": f"Invalid status: {status}."}), 400
result = ipa_admin.stageuser_mod(a_uid=username, fasstatusnote=status)
user = User(result["result"])
if status == "active":
# Send the address validation email
_send_validation_email(user)
return jsonify({"status": "success"})
[docs]
@bp.route('/registering/', methods=["GET", "POST"])
@with_ipa()
def registering_users(ipa):
stage_users = ipa.stageuser_find()["result"]
stage_users = [User(su) for su in stage_users]
statuses = [
{"name": "", "title": _("All")},
{"name": "spamcheck_manual", "title": _("Unknown")},
{"name": "active", "title": _("Not Spam")},
{"name": "spamcheck_denied", "title": _("Spam")},
{"name": "spamcheck_awaiting", "title": _("Awaiting")},
]
for status in statuses:
status["count"] = len(
[
su
for su in stage_users
if status["name"] == "" or su.status_note == status["name"]
]
)
status_filter = request.args.get("status", "")
if status_filter:
stage_users = [su for su in stage_users if su.status_note == status_filter]
stage_users.sort(key=lambda u: u.creation_time)
stage_users.reverse()
form = RegisteringActionForm()
if form.validate_on_submit():
username = form.username.data
action = form.action.data
try:
user = [su for su in stage_users if su.username == username][0]
except IndexError:
flash(f"Unknown user: {username}", "danger")
return redirect(request.url)
if action == "accept":
try:
current_app.logger.info(f"Accepting registering user {username}")
ipa.stageuser_mod(username, fasstatusnote="active")
_send_validation_email(user)
except Exception as e:
form.non_field_errors.errors.append(
f"Could not accept registering user {username}: {e}"
)
else:
flash(f"Accepted registering user {username}", "success")
return redirect(request.url)
elif action == "spam":
try:
current_app.logger.info(f"Flagging registering user {username} as spam")
ipa.stageuser_mod(username, fasstatusnote="spamcheck_denied")
except Exception as e:
form.non_field_errors.errors.append(
f"Could not flag registering user {username} as spam: {e}"
)
else:
flash(f"Flagged registering user {username} as spam", "success")
return redirect(request.url)
elif action == "delete":
try:
current_app.logger.info(f"Deleting registering user {username}")
ipa.stageuser_del(username)
except Exception as e:
form.non_field_errors.errors.append(
f"Could not delete registering user {username}: {e}"
)
else:
flash(f"Deleted registering user {username}", "success")
return redirect(request.url)
else:
form.non_field_errors.errors.append(f"Invalid action: {action}")
return render_template(
"registering.html",
statuses=statuses,
stage_users=stage_users,
form=form,
filter=status_filter,
)