• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zopefoundation / Products.PluggableAuthService / 10028140575

14 Jun 2024 03:56PM UTC coverage: 90.57% (+0.4%) from 90.185%
10028140575

push

github

web-flow
- Add support for Python 3.12.  - Drop support for Python 3.7. (#119)

1735 of 2189 branches covered (79.26%)

Branch coverage included in aggregate %.

20 of 24 new or added lines in 8 files covered. (83.33%)

1 existing line in 1 file now uncovered.

9588 of 10313 relevant lines covered (92.97%)

0.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.48
/src/Products/PluggableAuthService/plugins/DomainAuthHelper.py
1
##############################################################################
2
#
3
# Copyright (c) 2001 Zope Foundation and Contributors
4
#
5
# This software is subject to the provisions of the Zope Public License,
6
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this
7
# distribution.
8
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
9
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
10
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
11
# FOR A PARTICULAR PURPOSE.
12
#
13
##############################################################################
14
""" DomainAuthHelper   Authentication Plugin for Domain authentication
15
"""
16

17
import re
1✔
18
import socket
1✔
19
import time
1✔
20

21
from AccessControl import ClassSecurityInfo
1✔
22
from AccessControl.class_init import InitializeClass
1✔
23
from AccessControl.Permissions import manage_users
1✔
24
from BTrees.OOBTree import OOBTree
1✔
25
from Products.PageTemplates.PageTemplateFile import PageTemplateFile
1✔
26
from zope.interface import Interface
1✔
27

28
from ..interfaces.plugins import IAuthenticationPlugin
1✔
29
from ..interfaces.plugins import IExtractionPlugin
1✔
30
from ..interfaces.plugins import IRolesPlugin
1✔
31
from ..plugins.BasePlugin import BasePlugin
1✔
32
from ..utils import classImplements
1✔
33

34

35
try:
1✔
36
    from IPy import IP
1✔
37
except ModuleNotFoundError:
1✔
38
    IP = None
1✔
39

40

41
class IDomainAuthHelper(Interface):
1✔
42
    """ Marker interface.
43
    """
44

45

46
class EqualsFilter:
1✔
47

48
    def __init__(self, matchstring):
1✔
49
        self.match_string = matchstring
1✔
50

51
    def __call__(self, candidate):
1✔
52
        return candidate == self.match_string
1✔
53

54

55
class StartsWithFilter:
1✔
56

57
    def __init__(self, matchstring):
1✔
58
        self.match_string = matchstring
1✔
59

60
    def __call__(self, candidate):
1✔
61
        return candidate.startswith(self.match_string)
1✔
62

63

64
class EndsWithFilter:
1✔
65

66
    def __init__(self, matchstring):
1✔
67
        self.match_string = matchstring
1✔
68

69
    def __call__(self, candidate):
1✔
70
        return candidate.endswith(self.match_string)
1✔
71

72

73
class RegexFilter:
1✔
74

75
    def __init__(self, matchstring):
1✔
76
        self.regex = re.compile(matchstring)
1✔
77

78
    def __call__(self, candidate):
1✔
79
        return self.regex.match(candidate)
1✔
80

81

82
_MATCH_TYPE_FILTERS = {
1✔
83
    'equals': EqualsFilter,
84
    'startswith': StartsWithFilter,
85
    'endswith': EndsWithFilter,
86
    'regex': RegexFilter
87
}
88

89
if IP is not None:
1!
90

UNCOV
91
    class IPFilter:
×
92

93
        def __init__(self, matchstring):
×
94
            self.ip = IP(matchstring)
×
95

96
        def __call__(self, candidate):
×
97
            try:
×
98
                c_ip = IP(candidate)
×
99
            except ValueError:
×
100
                return False
×
101
            return c_ip in self.ip
×
102

103
    _MATCH_TYPE_FILTERS['ip'] = IPFilter
×
104

105
manage_addDomainAuthHelperForm = PageTemplateFile(
1✔
106
    'www/daAdd', globals(), __name__='manage_addDomainAuthHelperForm')
107

108

109
def manage_addDomainAuthHelper(self, id, title='', REQUEST=None):
1✔
110
    """ Factory method to instantiate a DomainAuthHelper """
111
    obj = DomainAuthHelper(id, title=title)
×
112
    self._setObject(id, obj)
×
113

114
    if REQUEST is not None:
×
115
        qs = 'manage_tabs_message=DomainAuthHelper+added.'
×
116
        my_url = self.absolute_url()
×
117
        REQUEST['RESPONSE'].redirect(f'{my_url}/manage_workspace?{qs}')
×
118

119

120
class DomainAuthHelper(BasePlugin):
1✔
121
    """ Domain Authentication plugin for the PluggableAuthService """
122
    security = ClassSecurityInfo()
1✔
123
    meta_type = 'Domain Authentication Plugin'
1✔
124
    zmi_icon = 'fas fa-fingerprint'
1✔
125

126
    security.declareProtected(manage_users, 'manage_map')  # NOQA: D001
1✔
127
    manage_map = PageTemplateFile('www/daMatches', globals())
1✔
128

129
    security.declareProtected(manage_users, 'manage_genericmap')  # NOQA: D001
1✔
130
    manage_genericmap = PageTemplateFile('www/daGeneric', globals())
1✔
131

132
    manage_options = (BasePlugin.manage_options[:1] +
1✔
133
                      ({
134
                          'label': 'User Map',
135
                          'action': 'manage_map'
136
                      }, {
137
                          'label': 'Generic Map',
138
                          'action': 'manage_genericmap'
139
                      }) + BasePlugin.manage_options[1:])
140

141
    def __init__(self, id, title=''):
1✔
142
        """ Initialize a new instance """
143
        self.id = id
1✔
144
        self.title = title
1✔
145
        self._domain_map = OOBTree()
1✔
146

147
    @security.private
1✔
148
    def extractCredentials(self, request):
1✔
149
        """ Extract credentials from 'request'.
150
        """
151
        creds = {}
1✔
152

153
        remote_host = request.get('REMOTE_HOST', '')
1✔
154
        if remote_host:
1✔
155
            creds['remote_host'] = request.get('REMOTE_HOST', '')
1✔
156

157
        try:
1✔
158
            remote_address = request.getClientAddr()
1✔
159
        except AttributeError:
1✔
160
            remote_address = request.get('REMOTE_ADDR', '')
1✔
161

162
        if remote_host or remote_address:
1✔
163
            creds['remote_host'] = remote_host
1✔
164
            creds['remote_address'] = remote_address
1✔
165

166
        return creds
1✔
167

168
    @security.private
1✔
169
    def authenticateCredentials(self, credentials):
1✔
170
        """ Fulfill AuthenticationPlugin requirements """
171
        login = credentials.get('login', '')
1✔
172
        r_host = credentials.get('remote_host', '')
1✔
173
        r_address = credentials.get('remote_address', '')
1✔
174
        matches = self._findMatches(login, r_host, r_address)
1✔
175

176
        if len(matches) > 0:
1✔
177
            if login:
1✔
178
                return (login, login)
1✔
179
            else:
180
                best_match = matches[0]
1✔
181
                u_name = best_match.get('username', 'remote')
1✔
182
                return (best_match.get('user_id', u_name), u_name)
1✔
183

184
        return (None, None)
1✔
185

186
    @security.private
1✔
187
    def getRolesForPrincipal(self, user, request=None):
1✔
188
        """ Fulfill RolesPlugin requirements """
189
        roles = []
×
190

191
        if request is None:
×
192
            # Without request there is no way I can do anything...
193
            return tuple(roles)
×
194

195
        uname = user.getUserName()
×
196

197
        if uname.find('Remote User') != -1:
×
198
            uname = ''
×
199

200
        matches = self._findMatches(uname, request.get('REMOTE_HOST', ''),
×
201
                                    request.getClientAddr())
202

203
        # We want to grab the first match because it is the most specific
204
        if len(matches) > 0:
×
205
            roles = matches[0].get('roles', [])
×
206

207
        return tuple(roles)
×
208

209
    @security.private
1✔
210
    def _findMatches(self, login, r_host='', r_address=''):
1✔
211
        """ Find the match """
212
        matches = []
1✔
213

214
        if not r_host and not r_address:
1✔
215
            return tuple(matches)
1✔
216

217
        all_info = list(self._domain_map.get(login, []))
1✔
218
        all_info.extend(self._domain_map.get('', []))
1✔
219

220
        if not r_host:
1✔
221
            try:
1✔
222
                r_host = socket.gethostbyaddr(r_address)[0]
1✔
223
            except OSError:
1✔
224
                pass
1✔
225

226
        if not r_address:
1✔
227
            try:
1✔
228
                r_address = socket.gethostbyname(r_host)
1✔
229
            except OSError:
1✔
230
                pass
1✔
231

232
        if not r_host and not r_address:
1!
233
            return tuple(matches)
×
234

235
        candidates = [r_host, r_address]
1✔
236

237
        for match_info in all_info:
1✔
238
            m_type = match_info['match_type']
1✔
239
            m_string = match_info['match_string']
1✔
240
            filter = match_info.get('match_filter')
1✔
241

242
            if filter is None:  # legacy data
1!
243
                filter = _MATCH_TYPE_FILTERS[m_type](m_string)
×
244

245
            matches.extend([match_info for x in candidates if filter(x)])
1✔
246

247
        return tuple(matches)
1✔
248

249
    @security.protected(manage_users)
1✔
250
    def listMatchTypes(self):
1✔
251
        """ Return a sequence of possible match types """
252
        return _MATCH_TYPE_FILTERS.keys()
×
253

254
    @security.protected(manage_users)
1✔
255
    def listMappingsForUser(self, user_id=''):
1✔
256
        """ List the mappings for a specific user """
257
        result = []
×
258
        record = self._domain_map.get(user_id, [])
×
259

260
        for match_info in record:
×
NEW
261
            result.append({
×
262
                'match_type': match_info['match_type'],
263
                'match_string': match_info['match_string'],
264
                'match_id': match_info['match_id'],
265
                'roles': match_info['roles'],
266
                'username': match_info['username']
267
            })
268

269
        return result
×
270

271
    @security.protected(manage_users)
1✔
272
    def manage_addMapping(self,
1✔
273
                          user_id='',
274
                          match_type='',
275
                          match_string='',
276
                          username='',
277
                          roles=[],
278
                          REQUEST=None):
279
        """ Add a mapping for a user """
280
        msg = ''
1✔
281

282
        try:
1✔
283
            filter = _MATCH_TYPE_FILTERS[match_type](match_string)
1✔
284
        except KeyError:
×
285
            msg = 'Unknown match type %s' % match_type
×
286
        except re.error:
×
287
            msg = 'Invalid regular expression %s' % match_string
×
288
        except ValueError as e:
×
289
            msg = f'Invalid match string {match_string} ({e})'
×
290

291
        if not match_string:
1!
292
            msg = 'No match string specified'
×
293

294
        if msg:
1!
295
            if REQUEST is not None:
×
296
                return self.manage_map(manage_tabs_message=msg)
×
297

298
            raise ValueError(msg)
×
299

300
        record = self._domain_map.get(user_id, [])
1✔
301

302
        match = {
1✔
303
            'match_type': match_type,
304
            'match_string': match_string,
305
            'match_filter': filter,
306
            'match_id': f'{user_id}_{str(time.time())}',
307
            'username': user_id or username or 'Remote User',
308
            'roles': roles
309
        }
310

311
        if match not in record:
1!
312
            record.append(match)
1✔
313
        else:
314
            msg = 'Match already exists'
×
315

316
        self._domain_map[user_id] = record
1✔
317

318
        if REQUEST is not None:
1!
319
            msg = msg or 'Match added.'
×
320
            if user_id:
×
321
                return self.manage_map(manage_tabs_message=msg)
×
322
            else:
323
                return self.manage_genericmap(manage_tabs_message=msg)
×
324

325
    @security.protected(manage_users)
1✔
326
    def manage_removeMappings(self, user_id='', match_ids=[], REQUEST=None):
1✔
327
        """ Remove mappings """
328
        msg = ''
×
329

330
        if len(match_ids) < 1:
×
331
            msg = 'No matches specified'
×
332

333
        record = self._domain_map.get(user_id, [])
×
334

335
        if len(record) < 1:
×
336
            msg = 'No mappings for user %s' % user_id
×
337

338
        if msg:
×
339
            if REQUEST is not None:
×
340
                return self.manage_map(manage_tabs_message=msg)
×
341
            else:
342
                return
×
343

344
        to_delete = [x for x in record if x['match_id'] in match_ids]
×
345

346
        for match in to_delete:
×
347
            record.remove(match)
×
348

349
        self._domain_map[user_id] = record
×
350

351
        if REQUEST is not None:
×
352
            msg = 'Matches deleted'
×
353
            if user_id:
×
354
                return self.manage_map(manage_tabs_message=msg)
×
355
            else:
356
                return self.manage_genericmap(manage_tabs_message=msg)
×
357

358

359
classImplements(DomainAuthHelper, IDomainAuthHelper, IExtractionPlugin,
1✔
360
                IAuthenticationPlugin, IRolesPlugin)
361

362
InitializeClass(DomainAuthHelper)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc