• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zopefoundation / Products.PluggableAuthService / 5303493172

pending completion
5303493172

push

github

web-flow
Drop support for Python 2.7, 3.5, 3.6. (#116)

* Drop zserver extra in setup.py. Thus dropping FTP support.
* Drop support for Zope < 5.
Co-authored-by: Jens Vagelpohl <jens@plyp.com>

1288 of 1745 branches covered (73.81%)

Branch coverage included in aggregate %.

127 of 127 new or added lines in 30 files covered. (100.0%)

9619 of 10349 relevant lines covered (92.95%)

0.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.71
/src/Products/PluggableAuthService/utils.py
1
##############################################################################
2
#
3
# Copyright (c) 2001 Zope Foundation and Contributors
4
#
5
# This software is subject to the provisions of the Zope Public License,
6
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this
7
# distribution.
8
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
9
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
10
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
11
# FOR A PARTICULAR PURPOSE.
12
#
13
##############################################################################
14
import binascii
1✔
15
import functools
1✔
16
import inspect
1✔
17
import logging
1✔
18
import os
1✔
19
from hashlib import sha1
1✔
20
from urllib.parse import urlparse
1✔
21
from urllib.parse import urlunparse
1✔
22

23
from AccessControl import ClassSecurityInfo
1✔
24
# BBB import
25
from AccessControl.requestmethod import postonly  # noqa
1✔
26
from App.Common import package_home
1✔
27
from zExceptions import Forbidden
1✔
28
from zope import interface
1✔
29
from zope.publisher.interfaces.browser import IBrowserRequest  # noqa
1✔
30

31

32
logger = logging.getLogger('PluggableAuthService')
1✔
33

34

35
def directlyProvides(obj, *interfaces):
1✔
36
    normalized_interfaces = []
1✔
37
    for i in interfaces:
1✔
38
        normalized_interfaces.append(i)
1✔
39
    return interface.directlyProvides(obj,  # NOQA: D001
1✔
40
                                      *normalized_interfaces)
41

42

43
def classImplements(class_, *interfaces):
1✔
44
    normalized_interfaces = []
1✔
45
    for i in interfaces:
1✔
46
        normalized_interfaces.append(i)
1✔
47
    return interface.classImplements(class_,  # NOQA: D001
1✔
48
                                     *normalized_interfaces)
49

50

51
product_dir = package_home(globals())
1✔
52
product_prefix = os.path.split(product_dir)[0]
1✔
53
_wwwdir = os.path.join(product_dir, 'www')
1✔
54

55

56
def makestr(s):
1✔
57
    """Converts 's' to a non-Unicode string"""
58
    if isinstance(s, bytes):
1✔
59
        return s
1✔
60
    if not isinstance(s, str):
1✔
61
        s = repr(s)
1✔
62
    if isinstance(s, str):
1!
63
        s = s.encode('utf-8')
1✔
64
    return s
1✔
65

66

67
def createViewName(method_name, user_handle=None):
1✔
68
    """
69
        Centralized place for creating the "View Name" that identifies
70
        a ZCacheable record in a RAMCacheManager
71
    """
72
    if not user_handle:
1✔
73
        return makestr(method_name)
1✔
74
    else:
75
        return b'%s-%s' % (makestr(method_name), makestr(user_handle))
1✔
76

77

78
def createKeywords(**kw):
1✔
79
    """
80
        Centralized place for creating the keywords that identify
81
        a ZCacheable record in a RAMCacheManager.
82

83
        Keywords are hashed so we don't accidentally expose sensitive
84
        information.
85
    """
86
    keywords = sha1()
1✔
87
    for k, v in sorted(kw.items()):
1✔
88
        keywords.update(makestr(k))
1✔
89
        keywords.update(makestr(v))
1✔
90

91
    return {'keywords': keywords.hexdigest()}
1✔
92

93

94
def getCSRFToken(request):
1✔
95
    session = getattr(request, 'SESSION', None)
1✔
96
    if session is not None:
1!
97
        token = session.get('_csrft_', None)
1✔
98
        if token is None:
1✔
99
            token = session['_csrft_'] = binascii.hexlify(os.urandom(20))
1✔
100
    else:
101
        # Can happen in tests.
102
        token = binascii.hexlify(os.urandom(20))
×
103
    if isinstance(token, bytes):
1✔
104
        token = token.decode('utf8')
1✔
105
    return token
1✔
106

107

108
def checkCSRFToken(request, token='csrf_token', raises=True):
1✔
109
    """ Check CSRF token in session against token formdata.
110

111
    If the values don't match, and 'raises' is True, raise a Forbidden.
112

113
    If the values don't match, and 'raises' is False, return False.
114

115
    If the values match, return True.
116
    """
117
    if getattr(request, 'SESSION', None) is None:
1!
118
        # Sessioning is not available at all, just give up
119
        logger.warning(
×
120
            'Built-in CSRF check disabled - sessioning not available')
121
        return True
×
122

123
    if request.form.get(token) != getCSRFToken(request):
1✔
124
        if raises:
1✔
125
            raise Forbidden('incorrect CSRF token')
1✔
126
        return False
1✔
127
    return True
1✔
128

129

130
class CSRFToken:
1✔
131
    # View helper for rendering CSRF token in templates.
132
    #
133
    # E.g., in every protected form, add this::
134
    #
135
    #   <input type="hidden" name="csrf_token"
136
    #          tal:attributes="value context/@@csrf_token" />
137

138
    security = ClassSecurityInfo()
1✔
139
    security.declareObjectPublic()
1✔
140

141
    def __init__(self, context, request):
1✔
142
        self.context = context
1✔
143
        self.request = request
1✔
144

145
    def __call__(self):
1✔
146
        raise Forbidden()
1✔
147

148
    def token(self):
1✔
149
        # API for template use
150
        return getCSRFToken(self.request)
1✔
151

152

153
def csrf_only(wrapped):
1✔
154
    wrapped_spec = inspect.getfullargspec(wrapped)
1✔
155
    args, varargs, kwargs, defaults = wrapped_spec[:4]
1✔
156
    if 'REQUEST' not in args:
1✔
157
        raise ValueError("Method doesn't name request")
1✔
158
    r_index = args.index('REQUEST')
1✔
159

160
    arglen = len(args)
1✔
161
    if defaults is not None:
1✔
162
        defaults = list(zip(args[arglen - len(defaults):], defaults))
1✔
163
        arglen -= len(defaults)
1✔
164

165
    spec = (args, varargs, kwargs, defaults)
1✔
166
    signature = inspect.signature(wrapped)
1✔
167
    new_parameters = []
1✔
168
    for param in signature.parameters.values():
1✔
169
        if param.default is not inspect.Parameter.empty:
1✔
170
            param = param.replace(default=None)
1✔
171
        new_parameters.append(param)
1✔
172
    argspec = str(signature.replace(parameters=new_parameters))
1✔
173
    lines = ['def wrapper' + argspec + ':',
1✔
174
             '    if IBrowserRequest.providedBy(REQUEST):',
175
             '        checkCSRFToken(REQUEST)',
176
             '    return wrapped(' + ','.join(args) + ')',
177
             ]
178
    g = globals().copy()
1✔
179
    l_copy = locals().copy()
1✔
180
    g['wrapped'] = wrapped
1✔
181
    exec('\n'.join(lines), g, l_copy)
1✔
182

183
    return functools.wraps(wrapped)(l_copy['wrapper'])
1✔
184

185

186
def url_local(url):
1✔
187
    """Helper to convert a URL into a site-local URL
188

189
    This function removes the protocol and host parts of a URL in order to
190
    prevent open redirect issues.
191
    """
192
    if url:
1✔
193
        parsed = urlparse(url)
1✔
194
        url = urlunparse(('', '') + parsed[2:])
1✔
195
    return url
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc