import python_freeipa
import srvlookup
from cryptography.fernet import Fernet
from python_freeipa.client_meta import ClientMeta as IPAClient
from python_freeipa.exceptions import BadRequest, ValidationError
from requests import RequestException
[docs]
class Client(IPAClient):
"""
Subclass the official client to add missing methods that we need.
TODO: send this upstream.
"""
[docs]
def ping(self):
"""
Checks that the server is alive.
"""
return self._request("ping")
[docs]
def otptoken_sync(self, user, password, first_code, second_code, token=None):
"""
Sync an otptoken for a user.
:param user: the user to sync the token for
:type user: string
:param password: the user's password
:type password: string
:param first_code: the first OTP token
:type first_code: string
:param second_code: the second OTP token
:type second_code: string
:param token: the token description (optional)
:type token: string
"""
data = {
'user': user,
'password': password,
'first_code': first_code,
'second_code': second_code,
'token': token,
}
url = "https://" + self._host + "/ipa/session/sync_token"
try:
response = self._session.post(url=url, data=data, verify=self._verify_ssl)
if response.ok and "Token sync rejected" not in response.text:
return response
else:
raise BadRequest(
message="The username, password or token codes are not correct."
)
except RequestException:
raise BadRequest(message="Something went wrong trying to sync OTP token.")
[docs]
def fasagreement_find(self, **kwargs):
"""
Search agreements
"""
data = self._request('fasagreement_find', params=kwargs)
return data['result']
[docs]
def fasagreement_add(self, agreement, **kwargs):
"""
Add a new agreement
:param agreement: Agreement name.
:type agreement: string
"""
data = self._request('fasagreement_add', agreement, kwargs)
return data['result']
[docs]
def fasagreement_del(self, agreement, **kwargs):
"""
Delete an agreement
:param agreement: Agreement name.
:type agreement: string
"""
return self._request('fasagreement_del', agreement, kwargs)
[docs]
def fasagreement_add_user(self, agreement, **kwargs):
"""
Add a user to an agreement
:param agreement: Agreement name.
:type agreement: string
"""
data = self._request('fasagreement_add_user', agreement, kwargs)
return data['result']
[docs]
def fasagreement_add_group(self, agreement, **kwargs):
"""
Add a group to an agreement
:param agreement: Agreement name.
:type agreement: string
"""
data = self._request('fasagreement_add_group', agreement, kwargs)
return data['result']
[docs]
def fasagreement_remove_group(self, agreement, **kwargs):
"""
Remove a group from an agreement
:param agreement: Agreement name.
:type agreement: string
"""
data = self._request('fasagreement_remove_group', agreement, kwargs)
return data['result']
[docs]
def fasagreement_disable(self, agreement, **kwargs):
"""
Disable an agreement
"""
self._request('fasagreement_disable', agreement, kwargs)
[docs]
class NoIPAServer(Exception):
"""No IPA server available."""
pass
[docs]
def raise_on_failed(result):
failed = result.get("failed", {})
num_failed = sum(
sum(len(failures) for failures in object_types.values())
for object_types in failed.values()
)
if num_failed == 0:
return # no actual failure
raise ValidationError(failed)
[docs]
def choose_server(app, session=None):
"""
Choose a server among the configured IPA server and store the result in the session.
"""
server = None
if session is not None:
server = session.get('noggin_ipa_server_hostname', None)
try:
available_servers = [
record.hostname
for record in srvlookup.lookup('ldap', domain=app.config["FREEIPA_DOMAIN"])
]
except srvlookup.SRVQueryFailure:
available_servers = []
if server is None or server not in available_servers:
try:
server = available_servers[0]
except IndexError:
app.logger.warning(
"IPA server not found. Available servers: %s",
", ".join(available_servers),
)
raise NoIPAServer
if session is not None:
session['noggin_ipa_server_hostname'] = server
return server
# Construct an IPA client from app config, but don't attempt to log in with it
# or to form a session of any kind with it. This is useful for one-off cases
# like password resets where a session isn't actually required.
[docs]
def untouched_ipa_client(app, session):
return Client(
choose_server(app, session),
verify_ssl=app.config['FREEIPA_CACERT'],
)
# Attempt to obtain an IPA session from a cookie.
#
# If we are given a token as a cookie in the request, decrypt it and see if we
# are left with a valid IPA session.
#
# NOTE: You *MUST* check the result of this function every time you call it.
# It will be None if no session was provided or was provided but invalid.
[docs]
def maybe_ipa_session(app, session):
encrypted_session = session.get('noggin_session', None)
try:
server_hostname = choose_server(app, session)
except NoIPAServer:
return None
if encrypted_session and server_hostname:
fernet = Fernet(app.config['FERNET_SECRET'])
ipa_session = fernet.decrypt(encrypted_session)
client = Client(server_hostname, verify_ssl=app.config['FREEIPA_CACERT'])
client._current_host = server_hostname
client._session.cookies['ipa_session'] = str(ipa_session, 'utf8')
# We have reconstructed a client, let's send a ping and see if we are
# successful.
try:
ping = client.ping()
client.ipa_version = ping['summary']
except python_freeipa.exceptions.Unauthorized:
return None
# If there's any other kind of exception, we let it propagate up for the
# controller (and, more practically, @with_ipa) to handle.
return client
return None
# Attempt to log in to an IPA server.
#
# On a successful login, we will encrypt the session token and put it in the
# user's session, returning the client handler to the caller.
#
# On an unsuccessful login, we'll let the exception bubble up.
[docs]
def maybe_ipa_login(app, session, username, userpassword):
# A session token is bound to a particular server, so we store the server
# in the session and just always use that. Flask sessions are signed, so we
# are safe in later assuming that the server hostname cookie has not been
# altered.
try:
client = Client(
choose_server(app, session), verify_ssl=app.config['FREEIPA_CACERT']
)
except NoIPAServer:
return None
auth = client.login(username, userpassword)
if auth and auth.logged_in:
fernet = Fernet(app.config['FERNET_SECRET'])
encrypted_session = fernet.encrypt(
bytes(client._session.cookies['ipa_session'], 'utf8')
)
session['noggin_session'] = encrypted_session
session['noggin_username'] = username
return client
return None