from codecs import utf_8_decode
from codecs import utf_8_encode
from hashlib import md5
import base64
import datetime
import re
import time as time_mod
from zope.interface import implementer
from pyramid.compat import (
long,
text_type,
binary_type,
url_unquote,
url_quote,
bytes_,
ascii_native_,
)
from pyramid.interfaces import (
IAuthenticationPolicy,
IDebugLogger,
)
from pyramid.security import (
Authenticated,
Everyone,
)
from pyramid.util import strings_differ
VALID_TOKEN = re.compile(r"^[A-Za-z][A-Za-z0-9+_-]*$")
class CallbackAuthenticationPolicy(object):
""" Abstract class """
debug = False
callback = None
def _log(self, msg, methodname, request):
logger = request.registry.queryUtility(IDebugLogger)
if logger:
cls = self.__class__
classname = cls.__module__ + '.' + cls.__name__
methodname = classname + '.' + methodname
logger.debug(methodname + ': ' + msg)
def authenticated_userid(self, request):
debug = self.debug
userid = self.unauthenticated_userid(request)
if userid is None:
debug and self._log(
'call to unauthenticated_userid returned None; returning None',
'authenticated_userid',
request)
return None
if self.callback is None:
debug and self._log(
'there was no groupfinder callback; returning %r' % (userid,),
'authenticated_userid',
request)
return userid
callback_ok = self.callback(userid, request)
if callback_ok is not None: # is not None!
debug and self._log(
'groupfinder callback returned %r; returning %r' % (
callback_ok, userid),
'authenticated_userid',
request
)
return userid
debug and self._log(
'groupfinder callback returned None; returning None',
'authenticated_userid',
request
)
def effective_principals(self, request):
debug = self.debug
effective_principals = [Everyone]
userid = self.unauthenticated_userid(request)
if userid is None:
debug and self._log(
'unauthenticated_userid returned %r; returning %r' % (
userid, effective_principals),
'effective_principals',
request
)
return effective_principals
if self.callback is None:
debug and self._log(
'groupfinder callback is None, so groups is []',
'effective_principals',
request)
groups = []
else:
groups = self.callback(userid, request)
debug and self._log(
'groupfinder callback returned %r as groups' % (groups,),
'effective_principals',
request)
if groups is None: # is None!
debug and self._log(
'returning effective principals: %r' % (
effective_principals,),
'effective_principals',
request
)
return effective_principals
effective_principals.append(Authenticated)
effective_principals.append(userid)
effective_principals.extend(groups)
debug and self._log(
'returning effective principals: %r' % (
effective_principals,),
'effective_principals',
request
)
return effective_principals
@implementer(IAuthenticationPolicy)
[docs]class RepozeWho1AuthenticationPolicy(CallbackAuthenticationPolicy):
""" A :app:`Pyramid` :term:`authentication policy` which
obtains data from the :mod:`repoze.who` 1.X WSGI 'API' (the
``repoze.who.identity`` key in the WSGI environment).
Constructor Arguments
``identifier_name``
Default: ``auth_tkt``. The :mod:`repoze.who` plugin name that
performs remember/forget. Optional.
``callback``
Default: ``None``. A callback passed the :mod:`repoze.who` identity
and the :term:`request`, expected to return ``None`` if the user
represented by the identity doesn't exist or a sequence of principal
identifiers (possibly empty) representing groups if the user does
exist. If ``callback`` is None, the userid will be assumed to exist
with no group principals.
Objects of this class implement the interface described by
:class:`pyramid.interfaces.IAuthenticationPolicy`.
"""
def __init__(self, identifier_name='auth_tkt', callback=None):
self.identifier_name = identifier_name
self.callback = callback
def _get_identity(self, request):
return request.environ.get('repoze.who.identity')
def _get_identifier(self, request):
plugins = request.environ.get('repoze.who.plugins')
if plugins is None:
return None
identifier = plugins[self.identifier_name]
return identifier
def authenticated_userid(self, request):
identity = self._get_identity(request)
if identity is None:
return None
if self.callback is None:
return identity['repoze.who.userid']
if self.callback(identity, request) is not None: # is not None!
return identity['repoze.who.userid']
def unauthenticated_userid(self, request):
identity = self._get_identity(request)
if identity is None:
return None
return identity['repoze.who.userid']
def effective_principals(self, request):
effective_principals = [Everyone]
identity = self._get_identity(request)
if identity is None:
return effective_principals
if self.callback is None:
groups = []
else:
groups = self.callback(identity, request)
if groups is None: # is None!
return effective_principals
userid = identity['repoze.who.userid']
effective_principals.append(Authenticated)
effective_principals.append(userid)
effective_principals.extend(groups)
return effective_principals
def remember(self, request, principal, **kw):
identifier = self._get_identifier(request)
if identifier is None:
return []
environ = request.environ
identity = {'repoze.who.userid':principal}
return identifier.remember(environ, identity)
def forget(self, request):
identifier = self._get_identifier(request)
if identifier is None:
return []
identity = self._get_identity(request)
return identifier.forget(request.environ, identity)
@implementer(IAuthenticationPolicy)
[docs]class RemoteUserAuthenticationPolicy(CallbackAuthenticationPolicy):
""" A :app:`Pyramid` :term:`authentication policy` which
obtains data from the ``REMOTE_USER`` WSGI environment variable.
Constructor Arguments
``environ_key``
Default: ``REMOTE_USER``. The key in the WSGI environ which
provides the userid.
``callback``
Default: ``None``. A callback passed the userid and the request,
expected to return None if the userid doesn't exist or a sequence of
principal identifiers (possibly empty) representing groups if the
user does exist. If ``callback`` is None, the userid will be assumed
to exist with no group principals.
``debug``
Default: ``False``. If ``debug`` is ``True``, log messages to the
Pyramid debug logger about the results of various authentication
steps. The output from debugging is useful for reporting to maillist
or IRC channels when asking for support.
Objects of this class implement the interface described by
:class:`pyramid.interfaces.IAuthenticationPolicy`.
"""
def __init__(self, environ_key='REMOTE_USER', callback=None, debug=False):
self.environ_key = environ_key
self.callback = callback
self.debug = debug
def unauthenticated_userid(self, request):
return request.environ.get(self.environ_key)
def remember(self, request, principal, **kw):
return []
def forget(self, request):
return []
@implementer(IAuthenticationPolicy)
[docs]class AuthTktAuthenticationPolicy(CallbackAuthenticationPolicy):
""" A :app:`Pyramid` :term:`authentication policy` which
obtains data from a Pyramid "auth ticket" cookie.
Constructor Arguments
``secret``
The secret (a string) used for auth_tkt cookie signing.
Required.
``callback``
Default: ``None``. A callback passed the userid and the
request, expected to return ``None`` if the userid doesn't
exist or a sequence of principal identifiers (possibly empty) if
the user does exist. If ``callback`` is ``None``, the userid
will be assumed to exist with no principals. Optional.
``cookie_name``
Default: ``auth_tkt``. The cookie name used
(string). Optional.
``secure``
Default: ``False``. Only send the cookie back over a secure
conn. Optional.
``include_ip``
Default: ``False``. Make the requesting IP address part of
the authentication data in the cookie. Optional.
``timeout``
Default: ``None``. Maximum number of seconds which a newly
issued ticket will be considered valid. After this amount of
time, the ticket will expire (effectively logging the user
out). If this value is ``None``, the ticket never expires.
Optional.
``reissue_time``
Default: ``None``. If this parameter is set, it represents the number
of seconds that must pass before an authentication token cookie is
automatically reissued as the result of a request which requires
authentication. The duration is measured as the number of seconds
since the last auth_tkt cookie was issued and 'now'. If this value is
``0``, a new ticket cookie will be reissued on every request which
requires authentication.
A good rule of thumb: if you want auto-expired cookies based on
inactivity: set the ``timeout`` value to 1200 (20 mins) and set the
``reissue_time`` value to perhaps a tenth of the ``timeout`` value
(120 or 2 mins). It's nonsensical to set the ``timeout`` value lower
than the ``reissue_time`` value, as the ticket will never be reissued
if so. However, such a configuration is not explicitly prevented.
Optional.
``max_age``
Default: ``None``. The max age of the auth_tkt cookie, in
seconds. This differs from ``timeout`` inasmuch as ``timeout``
represents the lifetime of the ticket contained in the cookie,
while this value represents the lifetime of the cookie itself.
When this value is set, the cookie's ``Max-Age`` and
``Expires`` settings will be set, allowing the auth_tkt cookie
to last between browser sessions. It is typically nonsensical
to set this to a value that is lower than ``timeout`` or
``reissue_time``, although it is not explicitly prevented.
Optional.
``path``
Default: ``/``. The path for which the auth_tkt cookie is valid.
May be desirable if the application only serves part of a domain.
Optional.
``http_only``
Default: ``False``. Hide cookie from JavaScript by setting the
HttpOnly flag. Not honored by all browsers.
Optional.
``wild_domain``
Default: ``True``. An auth_tkt cookie will be generated for the
wildcard domain.
Optional.
``debug``
Default: ``False``. If ``debug`` is ``True``, log messages to the
Pyramid debug logger about the results of various authentication
steps. The output from debugging is useful for reporting to maillist
or IRC channels when asking for support.
Objects of this class implement the interface described by
:class:`pyramid.interfaces.IAuthenticationPolicy`.
"""
def __init__(self,
secret,
callback=None,
cookie_name='auth_tkt',
secure=False,
include_ip=False,
timeout=None,
reissue_time=None,
max_age=None,
path="/",
http_only=False,
wild_domain=True,
debug=False,
):
self.cookie = AuthTktCookieHelper(
secret,
cookie_name=cookie_name,
secure=secure,
include_ip=include_ip,
timeout=timeout,
reissue_time=reissue_time,
max_age=max_age,
http_only=http_only,
path=path,
wild_domain=wild_domain,
)
self.callback = callback
self.debug = debug
def unauthenticated_userid(self, request):
result = self.cookie.identify(request)
if result:
return result['userid']
def remember(self, request, principal, **kw):
""" Accepts the following kw args: ``max_age=<int-seconds>,
``tokens=<sequence-of-ascii-strings>``"""
return self.cookie.remember(request, principal, **kw)
def forget(self, request):
return self.cookie.forget(request)
def b64encode(v):
return base64.b64encode(bytes_(v)).strip().replace(b'\n', b'')
def b64decode(v):
return base64.b64decode(bytes_(v))
# this class licensed under the MIT license (stolen from Paste)
class AuthTicket(object):
"""
This class represents an authentication token. You must pass in
the shared secret, the userid, and the IP address. Optionally you
can include tokens (a list of strings, representing role names),
'user_data', which is arbitrary data available for your own use in
later scripts. Lastly, you can override the cookie name and
timestamp.
Once you provide all the arguments, use .cookie_value() to
generate the appropriate authentication ticket.
Usage::
token = AuthTicket('sharedsecret', 'username',
os.environ['REMOTE_ADDR'], tokens=['admin'])
val = token.cookie_value()
"""
def __init__(self, secret, userid, ip, tokens=(), user_data='',
time=None, cookie_name='auth_tkt', secure=False):
self.secret = secret
self.userid = userid
self.ip = ip
self.tokens = ','.join(tokens)
self.user_data = user_data
if time is None:
self.time = time_mod.time()
else:
self.time = time
self.cookie_name = cookie_name
self.secure = secure
def digest(self):
return calculate_digest(
self.ip, self.time, self.secret, self.userid, self.tokens,
self.user_data)
def cookie_value(self):
v = '%s%08x%s!' % (self.digest(), int(self.time),
url_quote(self.userid))
if self.tokens:
v += self.tokens + '!'
v += self.user_data
return v
# this class licensed under the MIT license (stolen from Paste)
class BadTicket(Exception):
"""
Exception raised when a ticket can't be parsed. If we get far enough to
determine what the expected digest should have been, expected is set.
This should not be shown by default, but can be useful for debugging.
"""
def __init__(self, msg, expected=None):
self.expected = expected
Exception.__init__(self, msg)
# this function licensed under the MIT license (stolen from Paste)
def parse_ticket(secret, ticket, ip):
"""
Parse the ticket, returning (timestamp, userid, tokens, user_data).
If the ticket cannot be parsed, a ``BadTicket`` exception will be raised
with an explanation.
"""
ticket = ticket.strip('"')
digest = ticket[:32]
try:
timestamp = int(ticket[32:40], 16)
except ValueError as e:
raise BadTicket('Timestamp is not a hex integer: %s' % e)
try:
userid, data = ticket[40:].split('!', 1)
except ValueError:
raise BadTicket('userid is not followed by !')
userid = url_unquote(userid)
if '!' in data:
tokens, user_data = data.split('!', 1)
else: # pragma: no cover (never generated)
# @@: Is this the right order?
tokens = ''
user_data = data
expected = calculate_digest(ip, timestamp, secret,
userid, tokens, user_data)
# Avoid timing attacks (see
# http://seb.dbzteam.org/crypto/python-oauth-timing-hmac.pdf)
if strings_differ(expected, digest):
raise BadTicket('Digest signature is not correct',
expected=(expected, digest))
tokens = tokens.split(',')
return (timestamp, userid, tokens, user_data)
# this function licensed under the MIT license (stolen from Paste)
def calculate_digest(ip, timestamp, secret, userid, tokens, user_data):
secret = bytes_(secret, 'utf-8')
userid = bytes_(userid, 'utf-8')
tokens = bytes_(tokens, 'utf-8')
user_data = bytes_(user_data, 'utf-8')
digest0 = md5(
encode_ip_timestamp(ip, timestamp) + secret + userid + b'\0'
+ tokens + b'\0' + user_data).hexdigest()
digest = md5(bytes_(digest0) + secret).hexdigest()
return digest
# this function licensed under the MIT license (stolen from Paste)
def encode_ip_timestamp(ip, timestamp):
ip_chars = ''.join(map(chr, map(int, ip.split('.'))))
t = int(timestamp)
ts = ((t & 0xff000000) >> 24,
(t & 0xff0000) >> 16,
(t & 0xff00) >> 8,
t & 0xff)
ts_chars = ''.join(map(chr, ts))
return bytes_(ip_chars + ts_chars)
EXPIRE = object()
[docs]class AuthTktCookieHelper(object):
"""
A helper class for use in third-party authentication policy
implementations. See
:class:`pyramid.authentication.AuthTktAuthenticationPolicy` for the
meanings of the constructor arguments.
"""
parse_ticket = staticmethod(parse_ticket) # for tests
AuthTicket = AuthTicket # for tests
BadTicket = BadTicket # for tests
now = None # for tests
userid_type_decoders = {
'int':int,
'unicode':lambda x: utf_8_decode(x)[0], # bw compat for old cookies
'b64unicode': lambda x: utf_8_decode(b64decode(x))[0],
'b64str': lambda x: b64decode(x),
}
userid_type_encoders = {
int: ('int', str),
long: ('int', str),
text_type: ('b64unicode', lambda x: b64encode(utf_8_encode(x)[0])),
binary_type: ('b64str', lambda x: b64encode(x)),
}
def __init__(self, secret, cookie_name='auth_tkt', secure=False,
include_ip=False, timeout=None, reissue_time=None,
max_age=None, http_only=False, path="/", wild_domain=True):
self.secret = secret
self.cookie_name = cookie_name
self.include_ip = include_ip
self.secure = secure
self.timeout = timeout
self.reissue_time = reissue_time
self.max_age = max_age
self.http_only = http_only
self.path = path
self.wild_domain = wild_domain
static_flags = []
if self.secure:
static_flags.append('; Secure')
if self.http_only:
static_flags.append('; HttpOnly')
self.static_flags = "".join(static_flags)
def _get_cookies(self, environ, value, max_age=None):
if max_age is EXPIRE:
max_age = "; Max-Age=0; Expires=Wed, 31-Dec-97 23:59:59 GMT"
elif max_age is not None:
later = datetime.datetime.utcnow() + datetime.timedelta(
seconds=int(max_age))
# Wdy, DD-Mon-YY HH:MM:SS GMT
expires = later.strftime('%a, %d %b %Y %H:%M:%S GMT')
# the Expires header is *required* at least for IE7 (IE7 does
# not respect Max-Age)
max_age = "; Max-Age=%s; Expires=%s" % (max_age, expires)
else:
max_age = ''
cur_domain = environ.get('HTTP_HOST', environ.get('SERVER_NAME'))
# While Chrome, IE, and Firefox can cope, Opera (at least) cannot
# cope with a port number in the cookie domain when the URL it
# receives the cookie from does not also have that port number in it
# (e.g via a proxy). In the meantime, HTTP_HOST is sent with port
# number, and neither Firefox nor Chrome do anything with the
# information when it's provided in a cookie domain except strip it
# out. So we strip out any port number from the cookie domain
# aggressively to avoid problems. See also
# https://github.com/Pylons/pyramid/issues/131
if ':' in cur_domain:
cur_domain = cur_domain.split(':', 1)[0]
cookies = [
('Set-Cookie', '%s="%s"; Path=%s%s%s' % (
self.cookie_name, value, self.path, max_age, self.static_flags)),
('Set-Cookie', '%s="%s"; Path=%s; Domain=%s%s%s' % (
self.cookie_name, value, self.path, cur_domain, max_age,
self.static_flags)),
]
if self.wild_domain:
wild_domain = '.' + cur_domain
cookies.append(('Set-Cookie', '%s="%s"; Path=%s; Domain=%s%s%s' % (
self.cookie_name, value, self.path, wild_domain, max_age,
self.static_flags)))
return cookies
[docs] def identify(self, request):
""" Return a dictionary with authentication information, or ``None``
if no valid auth_tkt is attached to ``request``"""
environ = request.environ
cookie = request.cookies.get(self.cookie_name)
if cookie is None:
return None
if self.include_ip:
remote_addr = environ['REMOTE_ADDR']
else:
remote_addr = '0.0.0.0'
try:
timestamp, userid, tokens, user_data = self.parse_ticket(
self.secret, cookie, remote_addr)
except self.BadTicket:
return None
now = self.now # service tests
if now is None:
now = time_mod.time()
if self.timeout and ( (timestamp + self.timeout) < now ):
# the auth_tkt data has expired
return None
userid_typename = 'userid_type:'
user_data_info = user_data.split('|')
for datum in filter(None, user_data_info):
if datum.startswith(userid_typename):
userid_type = datum[len(userid_typename):]
decoder = self.userid_type_decoders.get(userid_type)
if decoder:
userid = decoder(userid)
reissue = self.reissue_time is not None
if reissue and not hasattr(request, '_authtkt_reissued'):
if ( (now - timestamp) > self.reissue_time ):
# work around https://github.com/Pylons/pyramid/issues#issue/108
tokens = list(filter(None, tokens))
headers = self.remember(request, userid, max_age=self.max_age,
tokens=tokens)
def reissue_authtkt(request, response):
if not hasattr(request, '_authtkt_reissue_revoked'):
for k, v in headers:
response.headerlist.append((k, v))
request.add_response_callback(reissue_authtkt)
request._authtkt_reissued = True
environ['REMOTE_USER_TOKENS'] = tokens
environ['REMOTE_USER_DATA'] = user_data
environ['AUTH_TYPE'] = 'cookie'
identity = {}
identity['timestamp'] = timestamp
identity['userid'] = userid
identity['tokens'] = tokens
identity['userdata'] = user_data
return identity
[docs] def forget(self, request):
""" Return a set of expires Set-Cookie headers, which will destroy
any existing auth_tkt cookie when attached to a response"""
environ = request.environ
request._authtkt_reissue_revoked = True
return self._get_cookies(environ, '', max_age=EXPIRE)
[docs] def remember(self, request, userid, max_age=None, tokens=()):
""" Return a set of Set-Cookie headers; when set into a response,
these headers will represent a valid authentication ticket.
``max_age``
The max age of the auth_tkt cookie, in seconds. When this value is
set, the cookie's ``Max-Age`` and ``Expires`` settings will be set,
allowing the auth_tkt cookie to last between browser sessions. If
this value is ``None``, the ``max_age`` value provided to the
helper itself will be used as the ``max_age`` value. Default:
``None``.
``tokens``
A sequence of strings that will be placed into the auth_tkt tokens
field. Each string in the sequence must be of the Python ``str``
type and must match the regex ``^[A-Za-z][A-Za-z0-9+_-]*$``.
Tokens are available in the returned identity when an auth_tkt is
found in the request and unpacked. Default: ``()``.
"""
if max_age is None:
max_age = self.max_age
environ = request.environ
if self.include_ip:
remote_addr = environ['REMOTE_ADDR']
else:
remote_addr = '0.0.0.0'
user_data = ''
encoding_data = self.userid_type_encoders.get(type(userid))
if encoding_data:
encoding, encoder = encoding_data
userid = encoder(userid)
user_data = 'userid_type:%s' % encoding
new_tokens = []
for token in tokens:
if isinstance(token, text_type):
try:
token = ascii_native_(token)
except UnicodeEncodeError:
raise ValueError("Invalid token %r" % (token,))
if not (isinstance(token, str) and VALID_TOKEN.match(token)):
raise ValueError("Invalid token %r" % (token,))
new_tokens.append(token)
tokens = tuple(new_tokens)
if hasattr(request, '_authtkt_reissued'):
request._authtkt_reissue_revoked = True
ticket = self.AuthTicket(
self.secret,
userid,
remote_addr,
tokens=tokens,
user_data=user_data,
cookie_name=self.cookie_name,
secure=self.secure)
cookie_value = ticket.cookie_value()
return self._get_cookies(environ, cookie_value, max_age)
@implementer(IAuthenticationPolicy)
[docs]class SessionAuthenticationPolicy(CallbackAuthenticationPolicy):
""" A :app:`Pyramid` authentication policy which gets its data from the
configured :term:`session`. For this authentication policy to work, you
will have to follow the instructions in the :ref:`sessions_chapter` to
configure a :term:`session factory`.
Constructor Arguments
``prefix``
A prefix used when storing the authentication parameters in the
session. Defaults to 'auth.'. Optional.
``callback``
Default: ``None``. A callback passed the userid and the
request, expected to return ``None`` if the userid doesn't
exist or a sequence of principal identifiers (possibly empty) if
the user does exist. If ``callback`` is ``None``, the userid
will be assumed to exist with no principals. Optional.
``debug``
Default: ``False``. If ``debug`` is ``True``, log messages to the
Pyramid debug logger about the results of various authentication
steps. The output from debugging is useful for reporting to maillist
or IRC channels when asking for support.
"""
def __init__(self, prefix='auth.', callback=None, debug=False):
self.callback = callback
self.prefix = prefix or ''
self.userid_key = prefix + 'userid'
self.debug = debug
def remember(self, request, principal, **kw):
""" Store a principal in the session."""
request.session[self.userid_key] = principal
return []
def forget(self, request):
""" Remove the stored principal from the session."""
if self.userid_key in request.session:
del request.session[self.userid_key]
return []
def unauthenticated_userid(self, request):
return request.session.get(self.userid_key)