• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zopefoundation / RestrictedPython / 5499716159

pending completion
5499716159

push

github

web-flow
Merge pull request from GHSA-wqc8-x2pr-7jqh

* move the commented fix into this branch

* more verbose infos, and linting

* 3 tests for generators

* - add change log entry

---------

Co-authored-by: Jens Vagelpohl <jens@plyp.com>

340 of 354 branches covered (96.05%)

16 of 16 new or added lines in 2 files covered. (100.0%)

2457 of 2486 relevant lines covered (98.83%)

0.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.23
/src/RestrictedPython/transformer.py
1
##############################################################################
2
#
3
# Copyright (c) 2002 Zope Foundation and Contributors.
4
#
5
# This software is subject to the provisions of the Zope Public License,
6
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
7
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
8
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
9
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
10
# FOR A PARTICULAR PURPOSE
11
#
12
##############################################################################
13
"""
1✔
14
transformer module:
15

16
uses Python standard library ast module and its containing classes to transform
17
the parsed python code to create a modified AST for a byte code generation.
18
"""
19

20

21
import ast
1✔
22
import contextlib
1✔
23
import textwrap
1✔
24

25
from ._compat import IS_PY38_OR_GREATER
1✔
26

27

28
# For AugAssign the operator must be converted to a string.
29
IOPERATOR_TO_STR = {
1✔
30
    ast.Add: '+=',
31
    ast.Sub: '-=',
32
    ast.Mult: '*=',
33
    ast.Div: '/=',
34
    ast.Mod: '%=',
35
    ast.Pow: '**=',
36
    ast.LShift: '<<=',
37
    ast.RShift: '>>=',
38
    ast.BitOr: '|=',
39
    ast.BitXor: '^=',
40
    ast.BitAnd: '&=',
41
    ast.FloorDiv: '//=',
42
    ast.MatMult: '@=',
43
}
44

45
# For creation allowed magic method names. See also
46
# https://docs.python.org/3/reference/datamodel.html#special-method-names
47
ALLOWED_FUNC_NAMES = frozenset([
1✔
48
    '__init__',
49
    '__contains__',
50
    '__lt__',
51
    '__le__',
52
    '__eq__',
53
    '__ne__',
54
    '__gt__',
55
    '__ge__',
56
])
57

58

59
FORBIDDEN_FUNC_NAMES = frozenset([
1✔
60
    'print',
61
    'printed',
62
    'builtins',
63
    'breakpoint',
64
])
65

66
# inspect attributes. See also
67
# https://docs.python.org/3/library/inspect.html
68
INSPECT_ATTRIBUTES = frozenset([
1✔
69
    # traceback
70
    "tb_frame",
71
    "tb_next",
72
    # code
73
    "co_code",
74
    # frame
75
    "f_back",
76
    "f_builtins",
77
    "f_code",
78
    "f_globals",
79
    "f_locals",
80
    "f_trace",
81
    # generator
82
    "gi_frame",
83
    "gi_code",
84
    "gi_yieldfrom",
85
    # coroutine
86
    "cr_await",
87
    "cr_frame",
88
    "cr_code",
89
    "cr_origin",
90
])
91

92

93
# When new ast nodes are generated they have no 'lineno', 'end_lineno',
94
# 'col_offset' and 'end_col_offset'. This function copies these fields from the
95
# incoming node:
96
def copy_locations(new_node, old_node):
1✔
97
    assert 'lineno' in new_node._attributes
1✔
98
    new_node.lineno = old_node.lineno
1✔
99

100
    if IS_PY38_OR_GREATER:
1!
101
        assert 'end_lineno' in new_node._attributes
1✔
102
        new_node.end_lineno = old_node.end_lineno
1✔
103

104
    assert 'col_offset' in new_node._attributes
1✔
105
    new_node.col_offset = old_node.col_offset
1✔
106

107
    if IS_PY38_OR_GREATER:
1!
108
        assert 'end_col_offset' in new_node._attributes
1✔
109
        new_node.end_col_offset = old_node.end_col_offset
1✔
110

111
    ast.fix_missing_locations(new_node)
1✔
112

113

114
class PrintInfo:
1✔
115
    def __init__(self):
1✔
116
        self.print_used = False
1✔
117
        self.printed_used = False
1✔
118

119
    @contextlib.contextmanager
1✔
120
    def new_print_scope(self):
1✔
121
        old_print_used = self.print_used
1✔
122
        old_printed_used = self.printed_used
1✔
123

124
        self.print_used = False
1✔
125
        self.printed_used = False
1✔
126

127
        try:
1✔
128
            yield
1✔
129
        finally:
130
            self.print_used = old_print_used
1✔
131
            self.printed_used = old_printed_used
1✔
132

133

134
class RestrictingNodeTransformer(ast.NodeTransformer):
1✔
135

136
    def __init__(self, errors=None, warnings=None, used_names=None):
1✔
137
        super().__init__()
1✔
138
        self.errors = [] if errors is None else errors
1✔
139
        self.warnings = [] if warnings is None else warnings
1✔
140

141
        # All the variables used by the incoming source.
142
        # Internal names/variables, like the ones from 'gen_tmp_name', don't
143
        # have to be added.
144
        # 'used_names' is for example needed by 'RestrictionCapableEval' to
145
        # know wich names it has to supply when calling the final code.
146
        self.used_names = {} if used_names is None else used_names
1✔
147

148
        # Global counter to construct temporary variable names.
149
        self._tmp_idx = 0
1✔
150

151
        self.print_info = PrintInfo()
1✔
152

153
    def gen_tmp_name(self):
1✔
154
        # 'check_name' ensures that no variable is prefixed with '_'.
155
        # => Its safe to use '_tmp..' as a temporary variable.
156
        name = '_tmp%i' % self._tmp_idx
1✔
157
        self._tmp_idx += 1
1✔
158
        return name
1✔
159

160
    def error(self, node, info):
1✔
161
        """Record a security error discovered during transformation."""
162
        lineno = getattr(node, 'lineno', None)
1✔
163
        self.errors.append(
1✔
164
            f'Line {lineno}: {info}')
165

166
    def warn(self, node, info):
1✔
167
        """Record a security error discovered during transformation."""
168
        lineno = getattr(node, 'lineno', None)
1✔
169
        self.warnings.append(
1✔
170
            f'Line {lineno}: {info}')
171

172
    def guard_iter(self, node):
1✔
173
        """
174
        Converts:
175
            for x in expr
176
        to
177
            for x in _getiter_(expr)
178

179
        Also used for
180
        * list comprehensions
181
        * dict comprehensions
182
        * set comprehensions
183
        * generator expresions
184
        """
185
        node = self.node_contents_visit(node)
1✔
186

187
        if isinstance(node.target, ast.Tuple):
1✔
188
            spec = self.gen_unpack_spec(node.target)
1✔
189
            new_iter = ast.Call(
1✔
190
                func=ast.Name('_iter_unpack_sequence_', ast.Load()),
191
                args=[node.iter, spec, ast.Name('_getiter_', ast.Load())],
192
                keywords=[])
193
        else:
194
            new_iter = ast.Call(
1✔
195
                func=ast.Name("_getiter_", ast.Load()),
196
                args=[node.iter],
197
                keywords=[])
198

199
        copy_locations(new_iter, node.iter)
1✔
200
        node.iter = new_iter
1✔
201
        return node
1✔
202

203
    def is_starred(self, ob):
1✔
204
        return isinstance(ob, ast.Starred)
1✔
205

206
    def gen_unpack_spec(self, tpl):
1✔
207
        """Generate a specification for 'guarded_unpack_sequence'.
208

209
        This spec is used to protect sequence unpacking.
210
        The primary goal of this spec is to tell which elements in a sequence
211
        are sequences again. These 'child' sequences have to be protected
212
        again.
213

214
        For example there is a sequence like this:
215
            (a, (b, c), (d, (e, f))) = g
216

217
        On a higher level the spec says:
218
            - There is a sequence of len 3
219
            - The element at index 1 is a sequence again with len 2
220
            - The element at index 2 is a sequence again with len 2
221
              - The element at index 1 in this subsequence is a sequence again
222
                with len 2
223

224
        With this spec 'guarded_unpack_sequence' does something like this for
225
        protection (len checks are omitted):
226

227
            t = list(_getiter_(g))
228
            t[1] = list(_getiter_(t[1]))
229
            t[2] = list(_getiter_(t[2]))
230
            t[2][1] = list(_getiter_(t[2][1]))
231
            return t
232

233
        The 'real' spec for the case above is then:
234
            spec = {
235
                'min_len': 3,
236
                'childs': (
237
                    (1, {'min_len': 2, 'childs': ()}),
238
                    (2, {
239
                            'min_len': 2,
240
                            'childs': (
241
                                (1, {'min_len': 2, 'childs': ()})
242
                            )
243
                        }
244
                    )
245
                )
246
            }
247

248
        So finally the assignment above is converted into:
249
            (a, (b, c), (d, (e, f))) = guarded_unpack_sequence(g, spec)
250
        """
251
        spec = ast.Dict(keys=[], values=[])
1✔
252

253
        spec.keys.append(ast.Str('childs'))
1✔
254
        spec.values.append(ast.Tuple([], ast.Load()))
1✔
255

256
        # starred elements in a sequence do not contribute into the min_len.
257
        # For example a, b, *c = g
258
        # g must have at least 2 elements, not 3. 'c' is empyt if g has only 2.
259
        min_len = len([ob for ob in tpl.elts if not self.is_starred(ob)])
1✔
260
        offset = 0
1✔
261

262
        for idx, val in enumerate(tpl.elts):
1✔
263
            # After a starred element specify the child index from the back.
264
            # Since it is unknown how many elements from the sequence are
265
            # consumed by the starred element.
266
            # For example a, *b, (c, d) = g
267
            # Then (c, d) has the index '-1'
268
            if self.is_starred(val):
1✔
269
                offset = min_len + 1
1✔
270

271
            elif isinstance(val, ast.Tuple):
1✔
272
                el = ast.Tuple([], ast.Load())
1✔
273
                el.elts.append(ast.Num(idx - offset))
1✔
274
                el.elts.append(self.gen_unpack_spec(val))
1✔
275
                spec.values[0].elts.append(el)
1✔
276

277
        spec.keys.append(ast.Str('min_len'))
1✔
278
        spec.values.append(ast.Num(min_len))
1✔
279

280
        return spec
1✔
281

282
    def protect_unpack_sequence(self, target, value):
1✔
283
        spec = self.gen_unpack_spec(target)
1✔
284
        return ast.Call(
1✔
285
            func=ast.Name('_unpack_sequence_', ast.Load()),
286
            args=[value, spec, ast.Name('_getiter_', ast.Load())],
287
            keywords=[])
288

289
    def gen_unpack_wrapper(self, node, target):
1✔
290
        """Helper function to protect tuple unpacks.
291

292
        node: used to copy the locations for the new nodes.
293
        target: is the tuple which must be protected.
294

295
        It returns a tuple with two element.
296

297
        Element 1: Is a temporary name node which must be used to
298
                   replace the target.
299
                   The context (store, param) is defined
300
                   by the 'ctx' parameter..
301

302
        Element 2: Is a try .. finally where the body performs the
303
                   protected tuple unpack of the temporary variable
304
                   into the original target.
305
        """
306

307
        # Generate a tmp name to replace the tuple with.
308
        tmp_name = self.gen_tmp_name()
1✔
309

310
        # Generates an expressions which protects the unpack.
311
        # converter looks like 'wrapper(tmp_name)'.
312
        # 'wrapper' takes care to protect sequence unpacking with _getiter_.
313
        converter = self.protect_unpack_sequence(
1✔
314
            target,
315
            ast.Name(tmp_name, ast.Load()))
316

317
        # Assign the expression to the original names.
318
        # Cleanup the temporary variable.
319
        # Generates:
320
        # try:
321
        #     # converter is 'wrapper(tmp_name)'
322
        #     arg = converter
323
        # finally:
324
        #     del tmp_arg
325
        try_body = [ast.Assign(targets=[target], value=converter)]
1✔
326
        finalbody = [self.gen_del_stmt(tmp_name)]
1✔
327
        cleanup = ast.Try(
1✔
328
            body=try_body, finalbody=finalbody, handlers=[], orelse=[])
329

330
        # This node is used to catch the tuple in a tmp variable.
331
        tmp_target = ast.Name(tmp_name, ast.Store())
1✔
332

333
        copy_locations(tmp_target, node)
1✔
334
        copy_locations(cleanup, node)
1✔
335

336
        return (tmp_target, cleanup)
1✔
337

338
    def gen_none_node(self):
1✔
339
        return ast.NameConstant(value=None)
1✔
340

341
    def gen_del_stmt(self, name_to_del):
1✔
342
        return ast.Delete(targets=[ast.Name(name_to_del, ast.Del())])
1✔
343

344
    def transform_slice(self, slice_):
1✔
345
        """Transform slices into function parameters.
346

347
        ast.Slice nodes are only allowed within a ast.Subscript node.
348
        To use a slice as an argument of ast.Call it has to be converted.
349
        Conversion is done by calling the 'slice' function from builtins
350
        """
351

352
        if isinstance(slice_, ast.expr):
1!
353
            # Python 3.9+
354
            return slice_
×
355

356
        elif isinstance(slice_, ast.Index):
1✔
357
            return slice_.value
1✔
358

359
        elif isinstance(slice_, ast.Slice):
1✔
360
            # Create a python slice object.
361
            args = []
1✔
362

363
            if slice_.lower:
1✔
364
                args.append(slice_.lower)
1✔
365
            else:
366
                args.append(self.gen_none_node())
1✔
367

368
            if slice_.upper:
1✔
369
                args.append(slice_.upper)
1✔
370
            else:
371
                args.append(self.gen_none_node())
1✔
372

373
            if slice_.step:
1✔
374
                args.append(slice_.step)
1✔
375
            else:
376
                args.append(self.gen_none_node())
1✔
377

378
            return ast.Call(
1✔
379
                func=ast.Name('slice', ast.Load()),
380
                args=args,
381
                keywords=[])
382

383
        elif isinstance(slice_, ast.ExtSlice):
1✔
384
            dims = ast.Tuple([], ast.Load())
1✔
385
            for item in slice_.dims:
1✔
386
                dims.elts.append(self.transform_slice(item))
1✔
387
            return dims
1✔
388

389
        else:  # pragma: no cover
390
            # Index, Slice and ExtSlice are only defined Slice types.
391
            raise NotImplementedError(f"Unknown slice type: {slice_}")
392

393
    def check_name(self, node, name, allow_magic_methods=False):
1✔
394
        """Check names if they are allowed.
395

396
        If ``allow_magic_methods is True`` names in `ALLOWED_FUNC_NAMES`
397
        are additionally allowed although their names start with `_`.
398

399
        """
400
        if name is None:
1✔
401
            return
1✔
402

403
        if (name.startswith('_')
1✔
404
                and name != '_'
405
                and not (allow_magic_methods
406
                         and name in ALLOWED_FUNC_NAMES
407
                         and node.col_offset != 0)):
408
            self.error(
1✔
409
                node,
410
                '"{name}" is an invalid variable name because it '
411
                'starts with "_"'.format(name=name))
412
        elif name.endswith('__roles__'):
1✔
413
            self.error(node, '"%s" is an invalid variable name because '
1✔
414
                       'it ends with "__roles__".' % name)
415
        elif name in FORBIDDEN_FUNC_NAMES:
1✔
416
            self.error(node, f'"{name}" is a reserved name.')
1✔
417

418
    def check_function_argument_names(self, node):
1✔
419
        for arg in node.args.args:
1✔
420
            self.check_name(node, arg.arg)
1✔
421

422
        if node.args.vararg:
1✔
423
            self.check_name(node, node.args.vararg.arg)
1✔
424

425
        if node.args.kwarg:
1✔
426
            self.check_name(node, node.args.kwarg.arg)
1✔
427

428
        for arg in node.args.kwonlyargs:
1✔
429
            self.check_name(node, arg.arg)
1✔
430

431
    def check_import_names(self, node):
1✔
432
        """Check the names being imported.
433

434
        This is a protection against rebinding dunder names like
435
        _getitem_, _write_ via imports.
436

437
        => 'from _a import x' is ok, because '_a' is not added to the scope.
438
        """
439
        for name in node.names:
1✔
440
            if '*' in name.name:
1✔
441
                self.error(node, '"*" imports are not allowed.')
1✔
442
            self.check_name(node, name.name)
1✔
443
            if name.asname:
1✔
444
                self.check_name(node, name.asname)
1✔
445

446
        return self.node_contents_visit(node)
1✔
447

448
    def inject_print_collector(self, node, position=0):
1✔
449
        print_used = self.print_info.print_used
1✔
450
        printed_used = self.print_info.printed_used
1✔
451

452
        if print_used or printed_used:
1✔
453
            # Add '_print = _print_(_getattr_)' add the top of a
454
            # function/module.
455
            _print = ast.Assign(
1✔
456
                targets=[ast.Name('_print', ast.Store())],
457
                value=ast.Call(
458
                    func=ast.Name("_print_", ast.Load()),
459
                    args=[ast.Name("_getattr_", ast.Load())],
460
                    keywords=[]))
461

462
            if isinstance(node, ast.Module):
1✔
463
                _print.lineno = position
1✔
464
                _print.col_offset = position
1✔
465
                if IS_PY38_OR_GREATER:
1!
466
                    _print.end_lineno = position
1✔
467
                    _print.end_col_offset = position
1✔
468
                ast.fix_missing_locations(_print)
1✔
469
            else:
470
                copy_locations(_print, node)
1✔
471

472
            node.body.insert(position, _print)
1✔
473

474
            if not printed_used:
1✔
475
                self.warn(node, "Prints, but never reads 'printed' variable.")
1✔
476

477
            elif not print_used:
1✔
478
                self.warn(node, "Doesn't print, but reads 'printed' variable.")
1✔
479

480
    # Special Functions for an ast.NodeTransformer
481

482
    def generic_visit(self, node):
1✔
483
        """Reject ast nodes which do not have a corresponding `visit_` method.
484

485
        This is needed to prevent new ast nodes from new Python versions to be
486
        trusted before any security review.
487

488
        To access `generic_visit` on the super class use `node_contents_visit`.
489
        """
490
        self.warn(
1✔
491
            node,
492
            '{0.__class__.__name__}'
493
            ' statement is not known to RestrictedPython'.format(node)
494
        )
495
        self.not_allowed(node)
1✔
496

497
    def not_allowed(self, node):
1✔
498
        self.error(
1✔
499
            node,
500
            f'{node.__class__.__name__} statements are not allowed.')
501

502
    def node_contents_visit(self, node):
1✔
503
        """Visit the contents of a node."""
504
        return super().generic_visit(node)
1✔
505

506
    # ast for Literals
507

508
    if IS_PY38_OR_GREATER:
1!
509

510
        def visit_Constant(self, node):
1✔
511
            """Allow constant literals with restriction for Ellipsis.
512

513
            Constant replaces Num, Str, Bytes, NameConstant and Ellipsis in
514
            Python 3.8+.
515
            :see: https://docs.python.org/dev/whatsnew/3.8.html#deprecated
516
            """
517
            if node.value is Ellipsis:
1✔
518
                # Deny using `...`.
519
                # Special handling necessary as ``self.not_allowed(node)``
520
                # would return the Error Message:
521
                # 'Constant statements are not allowed.'
522
                # which is only partial true.
523
                self.error(node, 'Ellipsis statements are not allowed.')
1✔
524
                return
1✔
525
            return self.node_contents_visit(node)
1✔
526

527
    else:
528

529
        def visit_Num(self, node):
×
530
            """Allow integer numbers without restrictions.
531

532
            Replaced by Constant in Python 3.8.
533
            """
534
            return self.node_contents_visit(node)
×
535

536
        def visit_Str(self, node):
×
537
            """Allow string literals without restrictions.
538

539
            Replaced by Constant in Python 3.8.
540
            """
541
            return self.node_contents_visit(node)
×
542

543
        def visit_Bytes(self, node):
×
544
            """Allow bytes literals without restrictions.
545

546
            Replaced by Constant in Python 3.8.
547
            """
548
            return self.node_contents_visit(node)
×
549

550
        def visit_Ellipsis(self, node):
×
551
            """Deny using `...`.
552

553
            Replaced by Constant in Python 3.8.
554
            """
555
            return self.not_allowed(node)
×
556

557
        def visit_NameConstant(self, node):
×
558
            """Allow constant literals (True, False, None) without ...
559

560
            restrictions.
561

562
            Replaced by Constant in Python 3.8.
563
            """
564
            return self.node_contents_visit(node)
×
565

566
    def visit_List(self, node):
1✔
567
        """Allow list literals without restrictions."""
568
        return self.node_contents_visit(node)
1✔
569

570
    def visit_Tuple(self, node):
1✔
571
        """Allow tuple literals without restrictions."""
572
        return self.node_contents_visit(node)
1✔
573

574
    def visit_Set(self, node):
1✔
575
        """Allow set literals without restrictions."""
576
        return self.node_contents_visit(node)
1✔
577

578
    def visit_Dict(self, node):
1✔
579
        """Allow dict literals without restrictions."""
580
        return self.node_contents_visit(node)
1✔
581

582
    def visit_FormattedValue(self, node):
1✔
583
        """Allow f-strings without restrictions."""
584
        return self.node_contents_visit(node)
1✔
585

586
    def visit_JoinedStr(self, node):
1✔
587
        """Allow joined string without restrictions."""
588
        return self.node_contents_visit(node)
1✔
589

590
    # ast for Variables
591

592
    def visit_Name(self, node):
1✔
593
        """Prevents access to protected names.
594

595
        Converts use of the name 'printed' to this expression: '_print()'
596
        """
597

598
        node = self.node_contents_visit(node)
1✔
599

600
        if isinstance(node.ctx, ast.Load):
1✔
601
            if node.id == 'printed':
1✔
602
                self.print_info.printed_used = True
1✔
603
                new_node = ast.Call(
1✔
604
                    func=ast.Name("_print", ast.Load()),
605
                    args=[],
606
                    keywords=[])
607

608
                copy_locations(new_node, node)
1✔
609
                return new_node
1✔
610

611
            elif node.id == 'print':
1✔
612
                self.print_info.print_used = True
1✔
613
                new_node = ast.Attribute(
1✔
614
                    value=ast.Name('_print', ast.Load()),
615
                    attr="_call_print",
616
                    ctx=ast.Load())
617

618
                copy_locations(new_node, node)
1✔
619
                return new_node
1✔
620

621
            self.used_names[node.id] = True
1✔
622

623
        self.check_name(node, node.id)
1✔
624
        return node
1✔
625

626
    def visit_Load(self, node):
1✔
627
        """
628

629
        """
630
        return self.node_contents_visit(node)
1✔
631

632
    def visit_Store(self, node):
1✔
633
        """
634

635
        """
636
        return self.node_contents_visit(node)
1✔
637

638
    def visit_Del(self, node):
1✔
639
        """
640

641
        """
642
        return self.node_contents_visit(node)
1✔
643

644
    def visit_Starred(self, node):
1✔
645
        """
646

647
        """
648
        return self.node_contents_visit(node)
1✔
649

650
    # Expressions
651

652
    def visit_Expression(self, node):
1✔
653
        """Allow Expression statements without restrictions.
654

655
        They are in the AST when using the `eval` compile mode.
656
        """
657
        return self.node_contents_visit(node)
1✔
658

659
    def visit_Expr(self, node):
1✔
660
        """Allow Expr statements (any expression) without restrictions."""
661
        return self.node_contents_visit(node)
1✔
662

663
    def visit_UnaryOp(self, node):
1✔
664
        """
665
        UnaryOp (Unary Operations) is the overall element for:
666
        * Not --> which should be allowed
667
        * UAdd --> Positive notation of variables (e.g. +var)
668
        * USub --> Negative notation of variables (e.g. -var)
669
        """
670
        return self.node_contents_visit(node)
1✔
671

672
    def visit_UAdd(self, node):
1✔
673
        """Allow positive notation of variables. (e.g. +var)"""
674
        return self.node_contents_visit(node)
1✔
675

676
    def visit_USub(self, node):
1✔
677
        """Allow negative notation of variables. (e.g. -var)"""
678
        return self.node_contents_visit(node)
1✔
679

680
    def visit_Not(self, node):
1✔
681
        """Allow the `not` operator."""
682
        return self.node_contents_visit(node)
1✔
683

684
    def visit_Invert(self, node):
1✔
685
        """Allow `~` expressions."""
686
        return self.node_contents_visit(node)
1✔
687

688
    def visit_BinOp(self, node):
1✔
689
        """Allow binary operations."""
690
        return self.node_contents_visit(node)
1✔
691

692
    def visit_Add(self, node):
1✔
693
        """Allow `+` expressions."""
694
        return self.node_contents_visit(node)
1✔
695

696
    def visit_Sub(self, node):
1✔
697
        """Allow `-` expressions."""
698
        return self.node_contents_visit(node)
1✔
699

700
    def visit_Mult(self, node):
1✔
701
        """Allow `*` expressions."""
702
        return self.node_contents_visit(node)
1✔
703

704
    def visit_Div(self, node):
1✔
705
        """Allow `/` expressions."""
706
        return self.node_contents_visit(node)
1✔
707

708
    def visit_FloorDiv(self, node):
1✔
709
        """Allow `//` expressions."""
710
        return self.node_contents_visit(node)
1✔
711

712
    def visit_Mod(self, node):
1✔
713
        """Allow `%` expressions."""
714
        return self.node_contents_visit(node)
1✔
715

716
    def visit_Pow(self, node):
1✔
717
        """Allow `**` expressions."""
718
        return self.node_contents_visit(node)
1✔
719

720
    def visit_LShift(self, node):
1✔
721
        """Allow `<<` expressions."""
722
        return self.node_contents_visit(node)
1✔
723

724
    def visit_RShift(self, node):
1✔
725
        """Allow `>>` expressions."""
726
        return self.node_contents_visit(node)
1✔
727

728
    def visit_BitOr(self, node):
1✔
729
        """Allow `|` expressions."""
730
        return self.node_contents_visit(node)
1✔
731

732
    def visit_BitXor(self, node):
1✔
733
        """Allow `^` expressions."""
734
        return self.node_contents_visit(node)
1✔
735

736
    def visit_BitAnd(self, node):
1✔
737
        """Allow `&` expressions."""
738
        return self.node_contents_visit(node)
1✔
739

740
    def visit_MatMult(self, node):
1✔
741
        """Matrix multiplication (`@`) is currently not allowed."""
742
        self.not_allowed(node)
1✔
743

744
    def visit_BoolOp(self, node):
1✔
745
        """Allow bool operator without restrictions."""
746
        return self.node_contents_visit(node)
1✔
747

748
    def visit_And(self, node):
1✔
749
        """Allow bool operator `and` without restrictions."""
750
        return self.node_contents_visit(node)
1✔
751

752
    def visit_Or(self, node):
1✔
753
        """Allow bool operator `or` without restrictions."""
754
        return self.node_contents_visit(node)
1✔
755

756
    def visit_Compare(self, node):
1✔
757
        """Allow comparison expressions without restrictions."""
758
        return self.node_contents_visit(node)
1✔
759

760
    def visit_Eq(self, node):
1✔
761
        """Allow == expressions."""
762
        return self.node_contents_visit(node)
1✔
763

764
    def visit_NotEq(self, node):
1✔
765
        """Allow != expressions."""
766
        return self.node_contents_visit(node)
1✔
767

768
    def visit_Lt(self, node):
1✔
769
        """Allow < expressions."""
770
        return self.node_contents_visit(node)
1✔
771

772
    def visit_LtE(self, node):
1✔
773
        """Allow <= expressions."""
774
        return self.node_contents_visit(node)
1✔
775

776
    def visit_Gt(self, node):
1✔
777
        """Allow > expressions."""
778
        return self.node_contents_visit(node)
1✔
779

780
    def visit_GtE(self, node):
1✔
781
        """Allow >= expressions."""
782
        return self.node_contents_visit(node)
1✔
783

784
    def visit_Is(self, node):
1✔
785
        """Allow `is` expressions."""
786
        return self.node_contents_visit(node)
1✔
787

788
    def visit_IsNot(self, node):
1✔
789
        """Allow `is not` expressions."""
790
        return self.node_contents_visit(node)
1✔
791

792
    def visit_In(self, node):
1✔
793
        """Allow `in` expressions."""
794
        return self.node_contents_visit(node)
1✔
795

796
    def visit_NotIn(self, node):
1✔
797
        """Allow `not in` expressions."""
798
        return self.node_contents_visit(node)
1✔
799

800
    def visit_Call(self, node):
1✔
801
        """Checks calls with '*args' and '**kwargs'.
802

803
        Note: The following happens only if '*args' or '**kwargs' is used.
804

805
        Transfroms 'foo(<all the possible ways of args>)' into
806
        _apply_(foo, <all the possible ways for args>)
807

808
        The thing is that '_apply_' has only '*args', '**kwargs', so it gets
809
        Python to collapse all the myriad ways to call functions
810
        into one manageable from.
811

812
        From there, '_apply_()' wraps args and kws in guarded accessors,
813
        then calls the function, returning the value.
814
        """
815

816
        if isinstance(node.func, ast.Name):
1✔
817
            if node.func.id == 'exec':
1✔
818
                self.error(node, 'Exec calls are not allowed.')
1✔
819
            elif node.func.id == 'eval':
1✔
820
                self.error(node, 'Eval calls are not allowed.')
1✔
821

822
        needs_wrap = False
1✔
823

824
        for pos_arg in node.args:
1✔
825
            if isinstance(pos_arg, ast.Starred):
1✔
826
                needs_wrap = True
1✔
827

828
        for keyword_arg in node.keywords:
1✔
829
            if keyword_arg.arg is None:
1✔
830
                needs_wrap = True
1✔
831

832
        node = self.node_contents_visit(node)
1✔
833

834
        if not needs_wrap:
1✔
835
            return node
1✔
836

837
        node.args.insert(0, node.func)
1✔
838
        node.func = ast.Name('_apply_', ast.Load())
1✔
839
        copy_locations(node.func, node.args[0])
1✔
840
        return node
1✔
841

842
    def visit_keyword(self, node):
1✔
843
        """
844

845
        """
846
        return self.node_contents_visit(node)
1✔
847

848
    def visit_IfExp(self, node):
1✔
849
        """Allow `if` expressions without restrictions."""
850
        return self.node_contents_visit(node)
1✔
851

852
    def visit_Attribute(self, node):
1✔
853
        """Checks and mutates attribute access/assignment.
854

855
        'a.b' becomes '_getattr_(a, "b")'
856
        'a.b = c' becomes '_write_(a).b = c'
857
        'del a.b' becomes 'del _write_(a).b'
858

859
        The _write_ function should return a security proxy.
860
        """
861
        if node.attr.startswith('_') and node.attr != '_':
1✔
862
            self.error(
1✔
863
                node,
864
                '"{name}" is an invalid attribute name because it starts '
865
                'with "_".'.format(name=node.attr))
866

867
        if node.attr.endswith('__roles__'):
1✔
868
            self.error(
1✔
869
                node,
870
                '"{name}" is an invalid attribute name because it ends '
871
                'with "__roles__".'.format(name=node.attr))
872

873
        if node.attr in INSPECT_ATTRIBUTES:
1✔
874
            self.error(
1✔
875
                node,
876
                f'"{node.attr}" is a restricted name,'
877
                ' that is forbidden to access in RestrictedPython.',
878
            )
879

880
        if isinstance(node.ctx, ast.Load):
1✔
881
            node = self.node_contents_visit(node)
1✔
882
            new_node = ast.Call(
1✔
883
                func=ast.Name('_getattr_', ast.Load()),
884
                args=[node.value, ast.Str(node.attr)],
885
                keywords=[])
886

887
            copy_locations(new_node, node)
1✔
888
            return new_node
1✔
889

890
        elif isinstance(node.ctx, (ast.Store, ast.Del)):
1✔
891
            node = self.node_contents_visit(node)
1✔
892
            new_value = ast.Call(
1✔
893
                func=ast.Name('_write_', ast.Load()),
894
                args=[node.value],
895
                keywords=[])
896

897
            copy_locations(new_value, node.value)
1✔
898
            node.value = new_value
1✔
899
            return node
1✔
900

901
        else:  # pragma: no cover
902
            # Impossible Case only ctx Load, Store and Del are defined in ast.
903
            raise NotImplementedError(
904
                f"Unknown ctx type: {type(node.ctx)}")
905

906
    # Subscripting
907

908
    def visit_Subscript(self, node):
1✔
909
        """Transforms all kinds of subscripts.
910

911
        'foo[bar]' becomes '_getitem_(foo, bar)'
912
        'foo[:ab]' becomes '_getitem_(foo, slice(None, ab, None))'
913
        'foo[ab:]' becomes '_getitem_(foo, slice(ab, None, None))'
914
        'foo[a:b]' becomes '_getitem_(foo, slice(a, b, None))'
915
        'foo[a:b:c]' becomes '_getitem_(foo, slice(a, b, c))'
916
        'foo[a, b:c] becomes '_getitem_(foo, (a, slice(b, c, None)))'
917
        'foo[a] = c' becomes '_write_(foo)[a] = c'
918
        'del foo[a]' becomes 'del _write_(foo)[a]'
919

920
        The _write_ function should return a security proxy.
921
        """
922
        node = self.node_contents_visit(node)
1✔
923

924
        # 'AugStore' and 'AugLoad' are defined in 'Python.asdl' as possible
925
        # 'expr_context'. However, according to Python/ast.c
926
        # they are NOT used by the implementation => No need to worry here.
927
        # Instead ast.c creates 'AugAssign' nodes, which can be visited.
928

929
        if isinstance(node.ctx, ast.Load):
1✔
930
            new_node = ast.Call(
1✔
931
                func=ast.Name('_getitem_', ast.Load()),
932
                args=[node.value, self.transform_slice(node.slice)],
933
                keywords=[])
934

935
            copy_locations(new_node, node)
1✔
936
            return new_node
1✔
937

938
        elif isinstance(node.ctx, (ast.Del, ast.Store)):
1✔
939
            new_value = ast.Call(
1✔
940
                func=ast.Name('_write_', ast.Load()),
941
                args=[node.value],
942
                keywords=[])
943

944
            copy_locations(new_value, node)
1✔
945
            node.value = new_value
1✔
946
            return node
1✔
947

948
        else:  # pragma: no cover
949
            # Impossible Case only ctx Load, Store and Del are defined in ast.
950
            raise NotImplementedError(
951
                f"Unknown ctx type: {type(node.ctx)}")
952

953
    def visit_Index(self, node):
1✔
954
        """
955

956
        """
957
        return self.node_contents_visit(node)
1✔
958

959
    def visit_Slice(self, node):
1✔
960
        """
961

962
        """
963
        return self.node_contents_visit(node)
1✔
964

965
    def visit_ExtSlice(self, node):
1✔
966
        """
967

968
        """
969
        return self.node_contents_visit(node)
1✔
970

971
    # Comprehensions
972

973
    def visit_ListComp(self, node):
1✔
974
        """
975

976
        """
977
        return self.node_contents_visit(node)
1✔
978

979
    def visit_SetComp(self, node):
1✔
980
        """
981

982
        """
983
        return self.node_contents_visit(node)
1✔
984

985
    def visit_GeneratorExp(self, node):
1✔
986
        """
987

988
        """
989
        return self.node_contents_visit(node)
1✔
990

991
    def visit_DictComp(self, node):
1✔
992
        """
993

994
        """
995
        return self.node_contents_visit(node)
1✔
996

997
    def visit_comprehension(self, node):
1✔
998
        """
999

1000
        """
1001
        return self.guard_iter(node)
1✔
1002

1003
    # Statements
1004

1005
    def visit_Assign(self, node):
1✔
1006
        """
1007

1008
        """
1009

1010
        node = self.node_contents_visit(node)
1✔
1011

1012
        if not any(isinstance(t, ast.Tuple) for t in node.targets):
1✔
1013
            return node
1✔
1014

1015
        # Handle sequence unpacking.
1016
        # For briefness this example omits cleanup of the temporary variables.
1017
        # Check 'transform_tuple_assign' how its done.
1018
        #
1019
        # - Single target (with nested support)
1020
        # (a, (b, (c, d))) = <exp>
1021
        # is converted to
1022
        # (a, t1) = _getiter_(<exp>)
1023
        # (b, t2) = _getiter_(t1)
1024
        # (c, d) = _getiter_(t2)
1025
        #
1026
        # - Multi targets
1027
        # (a, b) = (c, d) = <exp>
1028
        # is converted to
1029
        # (c, d) = _getiter_(<exp>)
1030
        # (a, b) = _getiter_(<exp>)
1031
        # Why is this valid ? The original bytecode for this multi targets
1032
        # behaves the same way.
1033

1034
        # ast.NodeTransformer works with list results.
1035
        # He injects it at the right place of the node's parent statements.
1036
        new_nodes = []
1✔
1037

1038
        # python fills the right most target first.
1039
        for target in reversed(node.targets):
1✔
1040
            if isinstance(target, ast.Tuple):
1✔
1041
                wrapper = ast.Assign(
1✔
1042
                    targets=[target],
1043
                    value=self.protect_unpack_sequence(target, node.value))
1044
                new_nodes.append(wrapper)
1✔
1045
            else:
1046
                new_node = ast.Assign(targets=[target], value=node.value)
1✔
1047
                new_nodes.append(new_node)
1✔
1048

1049
        for new_node in new_nodes:
1✔
1050
            copy_locations(new_node, node)
1✔
1051

1052
        return new_nodes
1✔
1053

1054
    def visit_AugAssign(self, node):
1✔
1055
        """Forbid certain kinds of AugAssign
1056

1057
        According to the language reference (and ast.c) the following nodes
1058
        are are possible:
1059
        Name, Attribute, Subscript
1060

1061
        Note that although augmented assignment of attributes and
1062
        subscripts is disallowed, augmented assignment of names (such
1063
        as 'n += 1') is allowed.
1064
        'n += 1' becomes 'n = _inplacevar_("+=", n, 1)'
1065
        """
1066

1067
        node = self.node_contents_visit(node)
1✔
1068

1069
        if isinstance(node.target, ast.Attribute):
1✔
1070
            self.error(
1✔
1071
                node,
1072
                "Augmented assignment of attributes is not allowed.")
1073
            return node
1✔
1074

1075
        elif isinstance(node.target, ast.Subscript):
1✔
1076
            self.error(
1✔
1077
                node,
1078
                "Augmented assignment of object items "
1079
                "and slices is not allowed.")
1080
            return node
1✔
1081

1082
        elif isinstance(node.target, ast.Name):
1✔
1083
            new_node = ast.Assign(
1✔
1084
                targets=[node.target],
1085
                value=ast.Call(
1086
                    func=ast.Name('_inplacevar_', ast.Load()),
1087
                    args=[
1088
                        ast.Str(IOPERATOR_TO_STR[type(node.op)]),
1089
                        ast.Name(node.target.id, ast.Load()),
1090
                        node.value
1091
                    ],
1092
                    keywords=[]))
1093

1094
            copy_locations(new_node, node)
1✔
1095
            return new_node
1✔
1096
        else:  # pragma: no cover
1097
            # Impossible Case - Only Node Types:
1098
            # * Name
1099
            # * Attribute
1100
            # * Subscript
1101
            # defined, those are checked before.
1102
            raise NotImplementedError(
1103
                f"Unknown target type: {type(node.target)}")
1104

1105
    def visit_Raise(self, node):
1✔
1106
        """Allow `raise` statements without restrictions."""
1107
        return self.node_contents_visit(node)
1✔
1108

1109
    def visit_Assert(self, node):
1✔
1110
        """Allow assert statements without restrictions."""
1111
        return self.node_contents_visit(node)
1✔
1112

1113
    def visit_Delete(self, node):
1✔
1114
        """Allow `del` statements without restrictions."""
1115
        return self.node_contents_visit(node)
1✔
1116

1117
    def visit_Pass(self, node):
1✔
1118
        """Allow `pass` statements without restrictions."""
1119
        return self.node_contents_visit(node)
1✔
1120

1121
    # Imports
1122

1123
    def visit_Import(self, node):
1✔
1124
        """Allow `import` statements with restrictions.
1125
        See check_import_names."""
1126
        return self.check_import_names(node)
1✔
1127

1128
    def visit_ImportFrom(self, node):
1✔
1129
        """Allow `import from` statements with restrictions.
1130
        See check_import_names."""
1131
        return self.check_import_names(node)
1✔
1132

1133
    def visit_alias(self, node):
1✔
1134
        """Allow `as` statements in import and import from statements."""
1135
        return self.node_contents_visit(node)
1✔
1136

1137
    # Control flow
1138

1139
    def visit_If(self, node):
1✔
1140
        """Allow `if` statements without restrictions."""
1141
        return self.node_contents_visit(node)
1✔
1142

1143
    def visit_For(self, node):
1✔
1144
        """Allow `for` statements with some restrictions."""
1145
        return self.guard_iter(node)
1✔
1146

1147
    def visit_While(self, node):
1✔
1148
        """Allow `while` statements."""
1149
        return self.node_contents_visit(node)
1✔
1150

1151
    def visit_Break(self, node):
1✔
1152
        """Allow `break` statements without restrictions."""
1153
        return self.node_contents_visit(node)
1✔
1154

1155
    def visit_Continue(self, node):
1✔
1156
        """Allow `continue` statements without restrictions."""
1157
        return self.node_contents_visit(node)
1✔
1158

1159
    def visit_Try(self, node):
1✔
1160
        """Allow `try` without restrictions."""
1161
        return self.node_contents_visit(node)
1✔
1162

1163
    def visit_TryStar(self, node):
1✔
1164
        """Allow `ExceptionGroup` without restrictions."""
1165
        return self.node_contents_visit(node)
×
1166

1167
    def visit_ExceptHandler(self, node):
1✔
1168
        """Protect exception handlers."""
1169
        node = self.node_contents_visit(node)
1✔
1170
        self.check_name(node, node.name)
1✔
1171
        return node
1✔
1172

1173
    def visit_With(self, node):
1✔
1174
        """Protect tuple unpacking on with statements."""
1175
        node = self.node_contents_visit(node)
1✔
1176

1177
        for item in reversed(node.items):
1✔
1178
            if isinstance(item.optional_vars, ast.Tuple):
1✔
1179
                tmp_target, unpack = self.gen_unpack_wrapper(
1✔
1180
                    node,
1181
                    item.optional_vars)
1182

1183
                item.optional_vars = tmp_target
1✔
1184
                node.body.insert(0, unpack)
1✔
1185

1186
        return node
1✔
1187

1188
    def visit_withitem(self, node):
1✔
1189
        """Allow `with` statements (context managers) without restrictions."""
1190
        return self.node_contents_visit(node)
1✔
1191

1192
    # Function and class definitions
1193

1194
    def visit_FunctionDef(self, node):
1✔
1195
        """Allow function definitions (`def`) with some restrictions."""
1196
        self.check_name(node, node.name, allow_magic_methods=True)
1✔
1197
        self.check_function_argument_names(node)
1✔
1198

1199
        with self.print_info.new_print_scope():
1✔
1200
            node = self.node_contents_visit(node)
1✔
1201
            self.inject_print_collector(node)
1✔
1202
        return node
1✔
1203

1204
    def visit_Lambda(self, node):
1✔
1205
        """Allow lambda with some restrictions."""
1206
        self.check_function_argument_names(node)
1✔
1207
        return self.node_contents_visit(node)
1✔
1208

1209
    def visit_arguments(self, node):
1✔
1210
        """
1211

1212
        """
1213
        return self.node_contents_visit(node)
1✔
1214

1215
    def visit_arg(self, node):
1✔
1216
        """
1217

1218
        """
1219
        return self.node_contents_visit(node)
1✔
1220

1221
    def visit_Return(self, node):
1✔
1222
        """Allow `return` statements without restrictions."""
1223
        return self.node_contents_visit(node)
1✔
1224

1225
    def visit_Yield(self, node):
1✔
1226
        """Allow `yield`statements without restrictions."""
1227
        return self.node_contents_visit(node)
1✔
1228

1229
    def visit_YieldFrom(self, node):
1✔
1230
        """Allow `yield`statements without restrictions."""
1231
        return self.node_contents_visit(node)
1✔
1232

1233
    def visit_Global(self, node):
1✔
1234
        """Allow `global` statements without restrictions."""
1235
        return self.node_contents_visit(node)
1✔
1236

1237
    def visit_Nonlocal(self, node):
1✔
1238
        """Deny `nonlocal` statements."""
1239
        self.not_allowed(node)
1✔
1240

1241
    def visit_ClassDef(self, node):
1✔
1242
        """Check the name of a class definition."""
1243
        self.check_name(node, node.name)
1✔
1244
        node = self.node_contents_visit(node)
1✔
1245
        if any(keyword.arg == 'metaclass' for keyword in node.keywords):
1✔
1246
            self.error(
1✔
1247
                node, 'The keyword argument "metaclass" is not allowed.')
1248
        CLASS_DEF = textwrap.dedent('''\
1✔
1249
            class {0.name}(metaclass=__metaclass__):
1250
                pass
1251
        '''.format(node))
1252
        new_class_node = ast.parse(CLASS_DEF).body[0]
1✔
1253
        new_class_node.body = node.body
1✔
1254
        new_class_node.bases = node.bases
1✔
1255
        new_class_node.decorator_list = node.decorator_list
1✔
1256
        return new_class_node
1✔
1257

1258
    def visit_Module(self, node):
1✔
1259
        """Add the print_collector (only if print is used) at the top."""
1260
        node = self.node_contents_visit(node)
1✔
1261

1262
        # Inject the print collector after 'from __future__ import ....'
1263
        position = 0
1✔
1264
        for position, child in enumerate(node.body):  # pragma: no branch
1✔
1265
            if not isinstance(child, ast.ImportFrom):
1✔
1266
                break
1✔
1267

1268
            if not child.module == '__future__':
1✔
1269
                break
1✔
1270

1271
        self.inject_print_collector(node, position)
1✔
1272
        return node
1✔
1273

1274
    # Async und await
1275

1276
    def visit_AsyncFunctionDef(self, node):
1✔
1277
        """Deny async functions."""
1278
        self.not_allowed(node)
1✔
1279

1280
    def visit_Await(self, node):
1✔
1281
        """Deny async functionality."""
1282
        self.not_allowed(node)
1✔
1283

1284
    def visit_AsyncFor(self, node):
1✔
1285
        """Deny async functionality."""
1286
        self.not_allowed(node)
1✔
1287

1288
    def visit_AsyncWith(self, node):
1✔
1289
        """Deny async functionality."""
1290
        self.not_allowed(node)
1✔
1291

1292
    # Assignment expressions (walrus operator ``:=``)
1293
    # New in 3.8
1294
    def visit_NamedExpr(self, node):
1✔
1295
        """Allow assignment expressions under some circumstances."""
1296
        # while the grammar requires ``node.target`` to be a ``Name``
1297
        # the abstract syntax is more permissive and allows an ``expr``.
1298
        # We support only a ``Name``.
1299
        # This is safe as the expression can only add/modify local
1300
        # variables. While this may hide global variables, an
1301
        # (implicitly performed) name check guarantees (as usual)
1302
        # that no essential global variable is hidden.
1303
        node = self.node_contents_visit(node)  # this checks ``node.target``
1✔
1304
        target = node.target
1✔
1305
        if not isinstance(target, ast.Name):
1✔
1306
            self.error(
1✔
1307
                node,
1308
                "Assignment expressions are only allowed for simple targets")
1309
        return node
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc