• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zopefoundation / Products.PluggableAuthService / 5235276870

pending completion
5235276870

push

github

web-flow
Drop support for Python 2.7, 3.5, 3.6. (#116)

* Drop zserver extra in setup.py. Thus dropping FTP support.
* Drop support for Zope < 5.
Co-authored-by: Jens Vagelpohl <jens@plyp.com>

1288 of 1745 branches covered (73.81%)

Branch coverage included in aggregate %.

127 of 127 new or added lines in 30 files covered. (100.0%)

9619 of 10349 relevant lines covered (92.95%)

0.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.87
/src/Products/PluggableAuthService/plugins/DomainAuthHelper.py
1
##############################################################################
2
#
3
# Copyright (c) 2001 Zope Foundation and Contributors
4
#
5
# This software is subject to the provisions of the Zope Public License,
6
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this
7
# distribution.
8
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
9
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
10
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
11
# FOR A PARTICULAR PURPOSE.
12
#
13
##############################################################################
14
""" DomainAuthHelper   Authentication Plugin for Domain authentication
1✔
15
"""
16

17
import re
1✔
18
import socket
1✔
19
import time
1✔
20

21
from AccessControl import ClassSecurityInfo
1✔
22
from AccessControl.class_init import InitializeClass
1✔
23
from AccessControl.Permissions import manage_users
1✔
24
from BTrees.OOBTree import OOBTree
1✔
25
from Products.PageTemplates.PageTemplateFile import PageTemplateFile
1✔
26
from zope.interface import Interface
1✔
27

28
from ..interfaces.plugins import IAuthenticationPlugin
1✔
29
from ..interfaces.plugins import IExtractionPlugin
1✔
30
from ..interfaces.plugins import IRolesPlugin
1✔
31
from ..plugins.BasePlugin import BasePlugin
1✔
32
from ..utils import classImplements
1✔
33

34

35
try:
1✔
36
    from IPy import IP
1✔
37
except ImportError:
38
    IP = None
39

40

41
class IDomainAuthHelper(Interface):
1✔
42
    """ Marker interface.
43
    """
44

45

46
class EqualsFilter:
1✔
47

48
    def __init__(self, matchstring):
1✔
49
        self.match_string = matchstring
1✔
50

51
    def __call__(self, candidate):
1✔
52
        return candidate == self.match_string
1✔
53

54

55
class StartsWithFilter:
1✔
56

57
    def __init__(self, matchstring):
1✔
58
        self.match_string = matchstring
1✔
59

60
    def __call__(self, candidate):
1✔
61
        return candidate.startswith(self.match_string)
1✔
62

63

64
class EndsWithFilter:
1✔
65

66
    def __init__(self, matchstring):
1✔
67
        self.match_string = matchstring
1✔
68

69
    def __call__(self, candidate):
1✔
70
        return candidate.endswith(self.match_string)
1✔
71

72

73
class RegexFilter:
1✔
74

75
    def __init__(self, matchstring):
1✔
76
        self.regex = re.compile(matchstring)
1✔
77

78
    def __call__(self, candidate):
1✔
79
        return self.regex.match(candidate)
1✔
80

81

82
_MATCH_TYPE_FILTERS = {'equals': EqualsFilter,
1✔
83
                       'startswith': StartsWithFilter,
84
                       'endswith': EndsWithFilter,
85
                       'regex': RegexFilter}
86

87
if IP is not None:
1!
88
    class IPFilter:
×
89

90
        def __init__(self, matchstring):
×
91
            self.ip = IP(matchstring)
×
92

93
        def __call__(self, candidate):
×
94
            try:
×
95
                c_ip = IP(candidate)
×
96
            except ValueError:
×
97
                return False
×
98
            return c_ip in self.ip
×
99

100
    _MATCH_TYPE_FILTERS['ip'] = IPFilter
×
101

102

103
manage_addDomainAuthHelperForm = PageTemplateFile(
1✔
104
    'www/daAdd', globals(), __name__='manage_addDomainAuthHelperForm')
105

106

107
def manage_addDomainAuthHelper(self, id, title='', REQUEST=None):
1✔
108
    """ Factory method to instantiate a DomainAuthHelper """
109
    obj = DomainAuthHelper(id, title=title)
×
110
    self._setObject(id, obj)
×
111

112
    if REQUEST is not None:
×
113
        qs = 'manage_tabs_message=DomainAuthHelper+added.'
×
114
        my_url = self.absolute_url()
×
115
        REQUEST['RESPONSE'].redirect(f'{my_url}/manage_workspace?{qs}')
×
116

117

118
class DomainAuthHelper(BasePlugin):
1✔
119
    """ Domain Authentication plugin for the PluggableAuthService """
120
    security = ClassSecurityInfo()
1✔
121
    meta_type = 'Domain Authentication Plugin'
1✔
122
    zmi_icon = 'fas fa-fingerprint'
1✔
123

124
    security.declareProtected(manage_users, 'manage_map')  # NOQA: D001
1✔
125
    manage_map = PageTemplateFile('www/daMatches', globals())
1✔
126

127
    security.declareProtected(manage_users, 'manage_genericmap')  # NOQA: D001
1✔
128
    manage_genericmap = PageTemplateFile('www/daGeneric', globals())
1✔
129

130
    manage_options = (BasePlugin.manage_options[:1]
1✔
131
                      + ({'label': 'User Map', 'action': 'manage_map'},
132
                         {'label': 'Generic Map',
133
                          'action': 'manage_genericmap'})
134
                      + BasePlugin.manage_options[1:])
135

136
    def __init__(self, id, title=''):
1✔
137
        """ Initialize a new instance """
138
        self.id = id
1✔
139
        self.title = title
1✔
140
        self._domain_map = OOBTree()
1✔
141

142
    @security.private
1✔
143
    def extractCredentials(self, request):
1✔
144
        """ Extract credentials from 'request'.
145
        """
146
        creds = {}
1✔
147

148
        remote_host = request.get('REMOTE_HOST', '')
1✔
149
        if remote_host:
1✔
150
            creds['remote_host'] = request.get('REMOTE_HOST', '')
1✔
151

152
        try:
1✔
153
            remote_address = request.getClientAddr()
1✔
154
        except AttributeError:
1✔
155
            remote_address = request.get('REMOTE_ADDR', '')
1✔
156

157
        if remote_host or remote_address:
1✔
158
            creds['remote_host'] = remote_host
1✔
159
            creds['remote_address'] = remote_address
1✔
160

161
        return creds
1✔
162

163
    @security.private
1✔
164
    def authenticateCredentials(self, credentials):
1✔
165
        """ Fulfill AuthenticationPlugin requirements """
166
        login = credentials.get('login', '')
1✔
167
        r_host = credentials.get('remote_host', '')
1✔
168
        r_address = credentials.get('remote_address', '')
1✔
169
        matches = self._findMatches(login, r_host, r_address)
1✔
170

171
        if len(matches) > 0:
1✔
172
            if login:
1✔
173
                return (login, login)
1✔
174
            else:
175
                best_match = matches[0]
1✔
176
                u_name = best_match.get('username', 'remote')
1✔
177
                return (best_match.get('user_id', u_name), u_name)
1✔
178

179
        return (None, None)
1✔
180

181
    @security.private
1✔
182
    def getRolesForPrincipal(self, user, request=None):
1✔
183
        """ Fulfill RolesPlugin requirements """
184
        roles = []
×
185

186
        if request is None:
×
187
            # Without request there is no way I can do anything...
188
            return tuple(roles)
×
189

190
        uname = user.getUserName()
×
191

192
        if uname.find('Remote User') != -1:
×
193
            uname = ''
×
194

195
        matches = self._findMatches(uname, request.get('REMOTE_HOST', ''),
×
196
                                    request.getClientAddr())
197

198
        # We want to grab the first match because it is the most specific
199
        if len(matches) > 0:
×
200
            roles = matches[0].get('roles', [])
×
201

202
        return tuple(roles)
×
203

204
    @security.private
1✔
205
    def _findMatches(self, login, r_host='', r_address=''):
1✔
206
        """ Find the match """
207
        matches = []
1✔
208

209
        if not r_host and not r_address:
1✔
210
            return tuple(matches)
1✔
211

212
        all_info = list(self._domain_map.get(login, []))
1✔
213
        all_info.extend(self._domain_map.get('', []))
1✔
214

215
        if not r_host:
1✔
216
            try:
1✔
217
                r_host = socket.gethostbyaddr(r_address)[0]
1✔
218
            except OSError:
1✔
219
                pass
1✔
220

221
        if not r_address:
1✔
222
            try:
1✔
223
                r_address = socket.gethostbyname(r_host)
1✔
224
            except OSError:
1✔
225
                pass
1✔
226

227
        if not r_host and not r_address:
1!
228
            return tuple(matches)
×
229

230
        candidates = [r_host, r_address]
1✔
231

232
        for match_info in all_info:
1✔
233
            m_type = match_info['match_type']
1✔
234
            m_string = match_info['match_string']
1✔
235
            filter = match_info.get('match_filter')
1✔
236

237
            if filter is None:  # legacy data
1!
238
                filter = _MATCH_TYPE_FILTERS[m_type](m_string)
×
239

240
            matches.extend([match_info for x in candidates if filter(x)])
1✔
241

242
        return tuple(matches)
1✔
243

244
    @security.protected(manage_users)
1✔
245
    def listMatchTypes(self):
1✔
246
        """ Return a sequence of possible match types """
247
        return _MATCH_TYPE_FILTERS.keys()
×
248

249
    @security.protected(manage_users)
1✔
250
    def listMappingsForUser(self, user_id=''):
1✔
251
        """ List the mappings for a specific user """
252
        result = []
×
253
        record = self._domain_map.get(user_id, [])
×
254

255
        for match_info in record:
×
256
            result.append({'match_type': match_info['match_type'],
×
257
                           'match_string': match_info['match_string'],
258
                           'match_id': match_info['match_id'],
259
                           'roles': match_info['roles'],
260
                           'username': match_info['username']})
261

262
        return result
×
263

264
    @security.protected(manage_users)
1✔
265
    def manage_addMapping(self, user_id='', match_type='', match_string='',
1✔
266
                          username='', roles=[], REQUEST=None):
267
        """ Add a mapping for a user """
268
        msg = ''
1✔
269

270
        try:
1✔
271
            filter = _MATCH_TYPE_FILTERS[match_type](match_string)
1✔
272
        except KeyError:
×
273
            msg = 'Unknown match type %s' % match_type
×
274
        except re.error:
×
275
            msg = 'Invalid regular expression %s' % match_string
×
276
        except ValueError as e:
×
277
            msg = f'Invalid match string {match_string} ({e})'
×
278

279
        if not match_string:
1!
280
            msg = 'No match string specified'
×
281

282
        if msg:
1!
283
            if REQUEST is not None:
×
284
                return self.manage_map(manage_tabs_message=msg)
×
285

286
            raise ValueError(msg)
×
287

288
        record = self._domain_map.get(user_id, [])
1✔
289

290
        match = {'match_type': match_type,
1✔
291
                 'match_string': match_string,
292
                 'match_filter': filter,
293
                 'match_id': '{}_{}'.format(user_id, str(time.time())),
294
                 'username': user_id or username or 'Remote User',
295
                 'roles': roles}
296

297
        if match not in record:
1!
298
            record.append(match)
1✔
299
        else:
300
            msg = 'Match already exists'
×
301

302
        self._domain_map[user_id] = record
1✔
303

304
        if REQUEST is not None:
1!
305
            msg = msg or 'Match added.'
×
306
            if user_id:
×
307
                return self.manage_map(manage_tabs_message=msg)
×
308
            else:
309
                return self.manage_genericmap(manage_tabs_message=msg)
×
310

311
    @security.protected(manage_users)
1✔
312
    def manage_removeMappings(self, user_id='', match_ids=[], REQUEST=None):
1✔
313
        """ Remove mappings """
314
        msg = ''
×
315

316
        if len(match_ids) < 1:
×
317
            msg = 'No matches specified'
×
318

319
        record = self._domain_map.get(user_id, [])
×
320

321
        if len(record) < 1:
×
322
            msg = 'No mappings for user %s' % user_id
×
323

324
        if msg:
×
325
            if REQUEST is not None:
×
326
                return self.manage_map(manage_tabs_message=msg)
×
327
            else:
328
                return
×
329

330
        to_delete = [x for x in record if x['match_id'] in match_ids]
×
331

332
        for match in to_delete:
×
333
            record.remove(match)
×
334

335
        self._domain_map[user_id] = record
×
336

337
        if REQUEST is not None:
×
338
            msg = 'Matches deleted'
×
339
            if user_id:
×
340
                return self.manage_map(manage_tabs_message=msg)
×
341
            else:
342
                return self.manage_genericmap(manage_tabs_message=msg)
×
343

344

345
classImplements(DomainAuthHelper, IDomainAuthHelper, IExtractionPlugin,
1✔
346
                IAuthenticationPlugin, IRolesPlugin)
347

348

349
InitializeClass(DomainAuthHelper)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc