• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zopefoundation / Products.ZSQLMethods / 12348259030

06 Jun 2024 07:13AM UTC coverage: 73.402% (-0.8%) from 74.196%
12348259030

push

github

web-flow
Drop support for Python 3.7. (#49)

337 of 630 branches covered (53.49%)

Branch coverage included in aggregate %.

2144 of 2750 relevant lines covered (77.96%)

0.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

54.76
/src/Shared/DC/ZRDB/Aqueduct.py
1
##############################################################################
2
#
3
# Copyright (c) 2002 Zope Foundation and Contributors.
4
#
5
# This software is subject to the provisions of the Zope Public License,
6
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
7
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
8
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
9
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
10
# FOR A PARTICULAR PURPOSE
11
#
12
##############################################################################
13

14
import binascii
1✔
15
import os
1✔
16
import re
1✔
17
from io import StringIO
1✔
18

19
from Acquisition import Implicit
1✔
20
from App.Common import package_home
1✔
21
from DateTime.DateTime import DateTime
1✔
22
from DocumentTemplate import HTML
1✔
23
from DocumentTemplate import File
1✔
24
from DocumentTemplate.html_quote import html_quote
1✔
25
from OFS.role import RoleManager
1✔
26
from OFS.SimpleItem import Item
1✔
27
from Persistence import Persistent
1✔
28
from zExceptions import Redirect
1✔
29

30

31
dtml_dir = os.path.join(package_home(globals()), 'dtml')
1✔
32
InvalidParameter = 'Invalid Parameter'
1✔
33

34

35
class BaseQuery(Persistent, Item, Implicit, RoleManager):
1✔
36

37
    _col = None
1✔
38
    _arg = {}
1✔
39
    query_date = DateTime()
1✔
40
    manage_options = ()
1✔
41
    MissingArgumentError = 'Bad Request'
1✔
42

43
    def query_year(self):
1✔
44
        return self.query_date.year()
×
45

46
    def query_month(self):
1✔
47
        return self.query_date.month()
×
48

49
    def query_day(self):
1✔
50
        return self.query_date.day()
×
51

52
    def quoted_input(self):
1✔
53
        return quotedHTML(self.input_src)
×
54

55
    def quoted_report(self):
1✔
56
        return quotedHTML(self.report_src)
×
57

58
    def _convert(self):
1✔
59
        self._arg = parse(self.arguments_src)
×
60

61
    def _argdata(self, REQUEST):
1✔
62
        r = {}
1✔
63

64
        try:
1✔
65
            args = self._arg
1✔
66
        except Exception:
×
67
            self._convert()
×
68
            args = self._arg
×
69

70
        id = self.id
1✔
71
        missing = []
1✔
72

73
        for name in args.keys():
1✔
74
            idname = f'{id}/{name}'
1✔
75
            try:
1✔
76
                r[name] = REQUEST[idname]
1✔
77
            except Exception:
1✔
78
                try:
1✔
79
                    r[name] = REQUEST[name]
1✔
80
                except Exception:
1✔
81
                    arg = args[name]
1✔
82
                    try:
1✔
83
                        r[name] = arg['default']
1✔
84
                    except Exception:
1✔
85
                        try:
1✔
86
                            if not arg['optional']:
1!
87
                                missing.append(name)
×
88
                        except Exception:
1✔
89
                            missing.append(name)
1✔
90

91
        # Note: the code above tries to check if an argument of the
92
        # ZSQL method above has the "optional" flag set (in case the
93
        # argument is omitted from the ZSQL function call). But there
94
        # is neither corresponding code inside the parse() function to
95
        # check for the "optional" parameter nor any documentation.
96
        # So we omit the check for the optional parameter. There will
97
        # be probably no code break but there will be hopefully more code
98
        # to work as supposed to work.
99

100
#        if missing:
101
#            raise self.MissingArgumentError(  \
102
#                "The following arguments were omitted " \
103
#                " from the ZSQL method call: %s" % str(missing))
104
#
105

106
        return r
1✔
107

108

109
class Searchable(BaseQuery):
1✔
110

111
    def _searchable_arguments(self):
1✔
112

113
        try:
×
114
            return self._arg
×
115
        except Exception:
×
116
            self._convert()
×
117
            return self._arg
×
118

119
    def _searchable_result_columns(self):
1✔
120
        return self._col
×
121

122
    def manage_testForm(self, REQUEST):
1✔
123
        """Provide testing interface"""
124
        input_src = default_input_form(self.title_or_id(),
×
125
                                       self._searchable_arguments(),
126
                                       'manage_test')
127
        return HTML(input_src)(self, REQUEST)
×
128

129
    def manage_test(self, REQUEST):
1✔
130
        """Perform an actual query"""
131

132
        result = self(REQUEST)
×
133
        report = HTML(custom_default_report(self.id, result))
×
134
        return report(*(self, REQUEST), **{self.id: result})
×
135

136
    def index_html(self, URL1):
1✔
137
        """ """
138
        raise Redirect('%s/manage_testForm' % URL1)
×
139

140

141
class Composite:
1✔
142

143
    def _getquery(self, id):
1✔
144

145
        o = self
×
146
        i = 0
×
147
        while 1:
×
148
            __traceback_info__ = o
×
149
            q = getattr(o, id)
×
150
            try:
×
151
                if hasattr(q, '_searchable_arguments'):
×
152
                    try:
×
153
                        q = q.__of__(self.aq_parent)
×
154
                    except Exception:
×
155
                        pass
×
156
                    return q
×
157
            except Exception:
×
158
                pass
×
159
            if i > 100:
×
160
                raise AttributeError(id)
×
161
            i = i + 1
×
162
            o = o.aq_parent
×
163

164
    def myQueryIds(self):
1✔
165
        return map(
×
166
            lambda k, queries=self.queries:
167
            {'id': k, 'selected': k in queries},
168
            self.ZQueryIds())
169

170

171
def default_input_form(id, arguments, action='query', tabs=''):
1✔
172
    if arguments:
×
173
        items = arguments.items()
×
174
        return (
×
175
            '{}\n{}{}'.format(
176
                '<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.0 Transitional//EN"'
177
                ' "http://www.w3.org/TR/REC-html40/loose.dtd">\n'
178
                '<html lang="en"><head><title>%s Input Data</title></head>\n'
179
                '<body bgcolor="#FFFFFF" link="#000099" vlink="#555555">\n%s\n'
180
                '<form action="&dtml-URL2;/&dtml-id;/%s" '
181
                'method="get">\n'
182
                '<h2>%s Input Data</h2>\n'
183
                'Enter query parameters:<br>'
184
                '<table>\n'
185
                % (id, tabs, action, id),
186
                '/n'.join(
187
                    map(
188
                        lambda a:
189
                        ('<tr> <th>%s</th>\n'
190
                         '     <td><input name="%s"\n'
191
                         '                size="30" value="%s">'
192
                         '     </td></tr>'
193
                         % (html_quote(nicify(a[0])),
194
                            'type' in a[1]
195
                             and ('{}:{}'.format(a[0], a[1]['type']))
196
                             or a[0],
197
                            'default' in a[1] and a[1]['default'] or '')),
198
                            items)),
199
                '\n<tr><td colspan=2 align=center>\n'
200
                '<input type="SUBMIT" name="SUBMIT" value="Submit Query">\n'
201
                '<dtml-if HTTP_REFERER>\n'
202
                '  <input type="SUBMIT" name="SUBMIT" value="Cancel">\n'
203
                '  <INPUT NAME="CANCEL_ACTION" TYPE="HIDDEN"\n'
204
                '         VALUE="&dtml-HTTP_REFERER;">\n'
205
                '</dtml-if>\n'
206
                '</td></tr>\n</table>\n</form>\n</body>\n</html>\n'))
207
    else:
208
        return (
×
209
            '<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.0 Transitional//EN" '
210
            '"http://www.w3.org/TR/REC-html40/loose.dtd">\n'
211
            '<html lang="en"><head><title>%s Input Data</title></head>\n'
212
            '<body bgcolor="#FFFFFF" link="#000099" vlink="#555555">\n%s\n'
213
            '<form action="&dtml-URL2;/&dtml-id;/%s" '
214
            'method="get">\n'
215
            '<h2>%s Input Data</h2>\n'
216
            'This query requires no input.<p>\n'
217
            '<input type="SUBMIT" name="SUBMIT" value="Submit Query">\n'
218
            '<dtml-if HTTP_REFERER>\n'
219
            '  <input type="SUBMIT" name="SUBMIT" value="Cancel">\n'
220
            '  <INPUT NAME="CANCEL_ACTION" TYPE="HIDDEN"\n'
221
            '         VALUE="&dtml-HTTP_REFERER;">\n'
222
            '</dtml-if>\n'
223
            '</td></tr>\n</table>\n</form>\n</body>\n</html>\n'
224
            % (id, tabs, action, id))
225

226

227
custom_default_report_src = File(
1✔
228
    os.path.join(dtml_dir, 'customDefaultReport.dtml'))
229
custom_default_zpt_report_src = File(
1✔
230
    os.path.join(dtml_dir, 'customDefaultZPTReport.dtml'))
231

232

233
def custom_default_report(id, result, action='', no_table=0,
1✔
234
                          goofy=re.compile(r'\W').search):
235

236
    columns = result._searchable_result_columns()
1✔
237
    __traceback_info__ = columns
1✔
238
    heading = ('<tr>\n%s        </tr>' %
1✔
239
               ''.join(
240
                   map(lambda c:
241
                       '          <th>%s</th>\n' %
242
                       html_quote(nicify(c['name'])),
243
                       columns)))
244

245
    if no_table:
1✔
246
        tr, _tr, td, _td, delim = '<p>', '</p>', '', '', ',\n'
1✔
247
    else:
248
        tr, _tr, td, _td, delim = '<tr>', '</tr>', '<td>', '</td>', '\n'
1✔
249

250
    row = []
1✔
251
    for c in columns:
1✔
252
        n = c['name']
1✔
253
        if goofy(n) is not None:
1✔
254
            n = 'expr="_[\'%s]"' % (repr('"' + n)[2:])
1✔
255
        row.append('          %s<dtml-var %s%s html_quote>%s'
1✔
256
                   % (td, n, c['type'] != 's' and ' null=""' or '', _td))
257

258
    row = ('     {}\n{}\n        {}'.format(
1✔
259
        tr, delim.join(row), _tr))
260

261
    return custom_default_report_src(
1✔
262
        id=id, heading=heading, row=row, action=action, no_table=no_table)
263

264

265
def custom_default_zpt_report(id, result, action='', no_table=0,
1✔
266
                              goofy=re.compile(r'\W').search):
267

268
    columns = result._searchable_result_columns()
1✔
269
    __traceback_info__ = columns
1✔
270
    heading = ('<tr>\n%s        </tr>' %
1✔
271
               ''.join(
272
                   map(lambda c:
273
                       '          <th>%s</th>\n' %
274
                       html_quote(nicify(c['name'])),
275
                       columns)))
276

277
    if no_table:
1!
278
        tr, _tr, td, _td, delim = '<p>', '</p>', '', '', ',\n'
1✔
279
    else:
280
        tr, _tr, td, _td, delim = '<tr>', '</tr>', '<td>', '</td>', '\n'
×
281

282
    row = []
1✔
283
    for c in columns:
1✔
284
        n = c['name']
1✔
285
        tpl = '          %s<span tal:replace="result/%s">%s goes here</span>%s'
1✔
286
        row.append(tpl % (td, n, n, _td))
1✔
287

288
    row = ('     {}\n{}\n        {}'.format(
1✔
289
        tr, delim.join(row), _tr))
290

291
    return custom_default_zpt_report_src(
1✔
292
        id=id, heading=heading, row=row, action=action, no_table=no_table)
293

294

295
def detypify(arg):
1✔
296
    idx = arg.find(':')
×
297
    if idx > 0:
×
298
        arg = arg[:idx]
×
299
    return arg
×
300

301

302
def decode(input, output):
1✔
303
    while 1:
×
304
        line = input.readline()
×
305
        if not line:
×
306
            break
×
307
        s = binascii.a2b_base64(line[:-1])
×
308
        output.write(s)
×
309

310

311
def decodestring(s):
1✔
312
    f = StringIO(s)
×
313
    g = StringIO()
×
314
    decode(f, g)
×
315
    return g.getvalue()
×
316

317

318
class Args:
1✔
319

320
    def __init__(self, data=None, keys=None):
1✔
321
        self._data = data or {}
1✔
322
        self._keys = keys or []
1✔
323

324
    def items(self):
1✔
325
        return map(lambda k, d=self._data: (k, d[k]), self._keys)
1✔
326

327
    def values(self):
1✔
328
        return map(lambda k, d=self._data: d[k], self._keys)
1✔
329

330
    def keys(self):
1✔
331
        return list(self._keys)
1✔
332

333
    def has_key(self, key):
1✔
334
        return key in self._data
×
335

336
    def __contains__(self, key):
1✔
337
        return key in self._data
1✔
338

339
    def __getitem__(self, key):
1✔
340
        return self._data[key]
1✔
341

342
    def __setitem__(self, key, v):
1✔
343
        self._data[key] = v
1✔
344

345
    def __delitem__(self, key):
1✔
346
        del self._data[key]
1✔
347

348
    def __len__(self):
1✔
349
        return len(self._data)
1✔
350

351

352
def parse(text,
1✔
353
          result=None,
354
          keys=None,
355
          unparmre=re.compile(
356
              r'([\000- ]*([^\000- ="]+))'),
357
          parmre=re.compile(
358
              r'([\000- ]*([^\000- ="]+)=([^\000- ="]+))'),
359
          qparmre=re.compile(
360
              r'([\000- ]*([^\000- ="]+)="([^"]*)")'),
361
          ):
362

363
    if result is None:
1✔
364
        result = {}
1✔
365
        keys = []
1✔
366

367
    __traceback_info__ = text
1✔
368

369
    mo = parmre.match(text)
1✔
370

371
    if mo:
1!
372
        name = mo.group(2)
×
373
        value = {'default': mo.group(3)}
×
374
        group_len = len(mo.group(1))
×
375

376
    else:
377
        mo = qparmre.match(text)
1✔
378

379
        if mo:
1!
380
            name = mo.group(2)
×
381
            value = {'default': mo.group(3)}
×
382
            group_len = len(mo.group(1))
×
383

384
        else:
385
            mo = unparmre.match(text)
1✔
386

387
            if mo:
1✔
388
                name = mo.group(2)
1✔
389
                value = {}
1✔
390
                group_len = len(mo.group(1))
1✔
391
            else:
392
                if not text or not text.strip():
1!
393
                    return Args(result, keys)
1✔
394
                raise InvalidParameter(text)
×
395

396
    lt = name.find(':')
1✔
397
    if lt > 0:
1!
398
        value['type'] = name[lt + 1:]
×
399
        name = name[:lt]
×
400

401
    result[name] = value
1✔
402
    keys.append(name)
1✔
403

404
    return parse(text[group_len:], result, keys)
1✔
405

406

407
def quotedHTML(text,
1✔
408
               character_entities=(
409
                   ('&', '&amp;'),
410
                   ('<', '&lt;'),
411
                   ('>', '&gt;'),
412
                   ('"', '&quot;'))):
413

414
    for char, name in character_entities:
×
415
        text = text.replace(char, name)
×
416

417
    return text
×
418

419

420
def nicify(name):
1✔
421
    name = name.strip().replace('_', ' ')
1✔
422
    return name[:1].upper() + name[1:]
1✔
423

424

425
def decapitate(html, RESPONSE=None,
1✔
426
               header_re=re.compile(
427
                   r'(('
428
                   r'[^\000- <>:]+:[^\n]*\n'
429
                   r'|'
430
                   r'[ \011]+[^\000- ][^\n]*\n'
431
                   r')+)[ \t]*\n([\000-\377]+)'),  # please kill me now
432
               space_re=re.compile(r'([ \t]+)'),
433
               name_re=re.compile(r'([^\000- <>:]+):([^\n]*)'),
434
               ):
435

436
    mo = header_re.match(html)
×
437
    if mo is None:
×
438
        return html
×
439

440
    headers, html = mo.group(1, 3)
×
441

442
    headers = headers.split('\n')
×
443

444
    i = 1
×
445
    while i < len(headers):
×
446
        if not headers[i]:
×
447
            del headers[i]
×
448
        else:
449
            mo = space_re.match(headers[i])
×
450
            if mo:
×
451
                headers[i - 1] = '{} {}'.format(
×
452
                    headers[i - 1], headers[i][len(mo.group(1)):])
453
                del headers[i]
×
454
            else:
455
                i = i + 1
×
456

457
    for i in range(len(headers)):
×
458
        mo = name_re.match(headers[i])
×
459
        if mo:
×
460
            k, v = mo.group(1, 2)
×
461
            v = v.strip()
×
462
        else:
463
            raise ValueError('Invalid Header (%d): %s ' % (i, headers[i]))
×
464
        RESPONSE.setHeader(k, v)
×
465

466
    return html
×
467

468

469
def delimited_output(results, REQUEST, RESPONSE):
1✔
470
    delim = REQUEST['output-delimiter']
×
471
    try:
×
472
        output_type = REQUEST['output-type']
×
473
    except Exception:
×
474
        output_type = 'text/plain'
×
475

476
    RESPONSE.setHeader('content-type', output_type)
×
477
    return '{}\n{}\n'.format(
×
478
        delim.join(results.names()),
479
        '\n'.join(map(lambda row, delim=delim:
480
                  delim.join(map(str, row)),
481
                  results)),
482
    )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc