• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zopefoundation / z3c.rml / 16098868126

14 Apr 2025 06:50AM UTC coverage: 87.385%. Remained the same
16098868126

push

github

icemac
Back to development: 5.1

561 of 792 branches covered (70.83%)

Branch coverage included in aggregate %.

3990 of 4416 relevant lines covered (90.35%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.31
/src/z3c/rml/pdfinclude.py
1
##############################################################################
2
#
3
# Copyright (c) 2012 Zope Foundation and Contributors.
4
# All Rights Reserved.
5
#
6
# This software is subject to the provisions of the Zope Public License,
7
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
8
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
9
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
10
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
11
# FOR A PARTICULAR PURPOSE.
12
#
13
##############################################################################
14
"""``pdfInclude`` Directive.
15
"""
16
__docformat__ = "reStructuredText"
1✔
17

18
import io
1✔
19
import logging
1✔
20
import os
1✔
21
import subprocess
1✔
22

23
from backports import tempfile
1✔
24

25

26
try:
1✔
27
    import pikepdf
1✔
28
    from pikepdf import Dictionary  # noqa: F401 imported but unused
1✔
29
except ImportError:
30
    pikepdf = None
31
from reportlab.platypus import flowables
1✔
32

33
from z3c.rml import attr
1✔
34
from z3c.rml import flowable
1✔
35
from z3c.rml import interfaces
1✔
36
from z3c.rml import occurence
1✔
37

38

39
log = logging.getLogger(__name__)
1✔
40

41
# by default False to avoid burping on
42
# PdfReadWarning: Multiple definitions in dictionary at byte xxx
43
STRICT = False
1✔
44

45

46
def _letter(val, base=ord('A'), radix=26):
1✔
47
    __traceback_info__ = val, base
×
48
    index = val - 1
×
49
    if index < 0:
×
50
        raise ValueError('Value must be greater than 0.')
×
51
    s = ''
×
52
    while True:
×
53
        val, off = divmod(index, radix)
×
54
        index = val - 1
×
55
        s = chr(base + off) + s
×
56
        if not val:
×
57
            return s
×
58

59

60
def do(cmd, cwd=None, captureOutput=True, ignoreErrors=False):
1✔
61
    log.debug('Command: ' + cmd)
×
62
    if captureOutput:
×
63
        stdout = stderr = subprocess.PIPE
×
64
    else:
65
        stdout = stderr = None
×
66
    p = subprocess.Popen(
×
67
        cmd, stdout=stdout, stderr=stderr,
68
        shell=True, cwd=cwd)
69
    stdout, stderr = p.communicate()
×
70
    if stdout is None:
×
71
        stdout = "See output above"
×
72
    if stderr is None:
×
73
        stderr = "See output above"
×
74
    if p.returncode != 0 and not ignoreErrors:
×
75
        log.error(f'An error occurred while running command: {cmd}')
×
76
        log.error(f'Error Output: \n{stderr}')
×
77
        raise ValueError(
×
78
            f'Shell Process had non-zero error code: {p.returncode}. \n'
79
            f'Stdout: {stdout}\n'
80
            f'StdErr: {stderr}'
81
        )
82
    log.debug(f'Output: \n{stdout}')
×
83
    return stdout
×
84

85

86
class ConcatenationPostProcessor:
1✔
87

88
    def __init__(self):
1✔
89
        self.operations = []
1✔
90

91
    def process(self, inputFile1):
1✔
92
        input1 = pikepdf.open(inputFile1)
1✔
93
        offset = 0
1✔
94
        for (
1✔
95
                start_page, inputFile2, page_ranges, num_pages, on_first_page
96
        ) in self.operations:
97
            sp = start_page + offset
1✔
98
            for page_range in page_ranges:
1✔
99
                prs, pre = page_range
1✔
100
                input2 = pikepdf.open(inputFile2)
1✔
101
                for i in range(num_pages):
1✔
102
                    if on_first_page and i > 0:
1✔
103
                        # The platypus pipeline doesn't insert blank pages if
104
                        # we are including on the first page. So we need to
105
                        # insert our additional pages between start_page and
106
                        # the next.
107
                        input1.pages.insert(sp + i, input2.pages[prs + i])
1✔
108
                        offset += 1
1✔
109
                    else:
110
                        # Here, Platypus has added more blank pages, so we'll
111
                        # emplace our pages. Doing this copy will preserve
112
                        # references to the original pages if there is a
113
                        # TOC/Bookmarks.
114
                        input1.pages.append(input2.pages[prs + i])
1✔
115
                        input1.pages[sp + i].emplace(input1.pages[-1])
1✔
116
                        del input1.pages[-1]
1✔
117

118
        outputFile = io.BytesIO()
1✔
119
        input1.save(outputFile)
1✔
120
        return outputFile
1✔
121

122

123
class PdfTkConcatenationPostProcessor:
1✔
124

125
    EXECUTABLE = 'pdftk'
1✔
126
    PRESERVE_OUTLINE = True
1✔
127

128
    def __init__(self):
1✔
129
        self.operations = []
×
130

131
    def _process(self, inputFile1, dir):
1✔
132
        file_path = os.path.join(dir, 'A.pdf')
×
133
        with open(file_path, 'wb') as file:
×
134
            file.write(inputFile1.read())
×
135

136
        file_map = {'A': file_path}
×
137
        file_id = 2
×
138
        merges = []
×
139

140
        curr_page = 0
×
141
        for (
×
142
                start_page, inputFile2, page_ranges, num_pages, on_first_page
143
        ) in self.operations:
144
            # Catch up with the main file.
145
            if curr_page < start_page:
×
146
                # Convert curr_page to human counting, start_page is okay,
147
                # since pdftk is upper-bound inclusive.
148
                merges.append('A%i-%i' % (curr_page + 1, start_page))
×
149
            curr_page = start_page + num_pages
×
150

151
            # Store file.
152
            file_letter = _letter(file_id)
×
153
            file_path = os.path.join(dir, file_letter + '.pdf')
×
154
            inputFile2.seek(0)
×
155
            with open(file_path, 'wb') as file:
×
156
                file.write(inputFile2.read())
×
157
            file_map[file_letter] = file_path
×
158
            file_id += 1
×
159

160
            for (prs, pre) in page_ranges:
×
161
                # pdftk uses lower and upper bound inclusive.
162
                merges.append('%s%i-%i' % (file_letter, prs + 1, pre))
×
163

164
        mergedFile = os.path.join(dir, 'merged.pdf')
×
165
        do('{} {} cat {} output {}'.format(
×
166
            self.EXECUTABLE,
167
            ' '.join(f'{l_}="{p}"' for l_, p in file_map.items()),
168
            ' '.join(merges),
169
            mergedFile))
170

171
        if not self.PRESERVE_OUTLINE:
×
172
            with open(mergedFile, 'rb') as file:
×
173
                return io.BytesIO(file.read())
×
174

175
        outputFile = os.path.join(dir, 'output.pdf')
×
176
        do('{} {}/A.pdf dump_data > {}/in.info'.format(
×
177
            self.EXECUTABLE, dir, dir))
178
        do('{} {} update_info {}/in.info output {}'.format(
×
179
            self.EXECUTABLE, mergedFile, dir, outputFile))
180

181
        with open(outputFile, 'rb') as file:
×
182
            return io.BytesIO(file.read())
×
183

184
    def process(self, inputFile1):
1✔
185
        with tempfile.TemporaryDirectory() as tmpdirname:
×
186
            return self._process(inputFile1, tmpdirname)
×
187

188

189
class IncludePdfPagesFlowable(flowables.Flowable):
1✔
190

191
    def __init__(self, pdf_file, pages, concatprocessor,
1✔
192
                 included_on_first_page):
193
        flowables.Flowable.__init__(self)
1✔
194
        self.pdf_file = pdf_file
1✔
195
        self.proc = concatprocessor
1✔
196
        self.pages = pages
1✔
197
        self.included_on_first_page = included_on_first_page
1✔
198

199
        if self.included_on_first_page:
1✔
200
            self.width = 0
1✔
201
            self.height = 0
1✔
202
        else:
203
            self.width = 10 << 32
1✔
204
            self.height = 10 << 32
1✔
205

206
    def draw(self):
1✔
207
        if self.included_on_first_page:
1!
208
            self.split(None, None)
1✔
209

210
    def split(self, availWidth, availheight):
1✔
211
        pages = self.pages
1✔
212
        if not pages:
1✔
213
            pdf = pikepdf.open(self.pdf_file)
1✔
214
            pages = [(0, len(pdf.pages))]
1✔
215

216
        num_pages = sum(pr[1] - pr[0] for pr in pages)
1✔
217

218
        start_page = self.canv.getPageNumber()
1✔
219
        if self.included_on_first_page:
1✔
220
            start_page -= 1
1✔
221
        self.proc.operations.append(
1✔
222
            (start_page, self.pdf_file, pages,
223
             num_pages, self.included_on_first_page))
224

225
        # Insert blank pages instead of pdf for now, to correctly number the
226
        # pages. We will replace these blank pages with included PDF in
227
        # ConcatenationPostProcessor.
228
        result = []
1✔
229
        for i in range(num_pages):
1✔
230
            # Add empty spacer so platypus don't complain about too many empty
231
            # pages
232
            result.append(flowables.Spacer(0, 0))
1✔
233
            result.append(flowables.PageBreak())
1✔
234
        if start_page >= len(pages):
1✔
235
            # Make sure we get a flowable at the end of the document for the
236
            # last page.
237
            result.append(flowables.Spacer(0, 0))
1✔
238
        return result
1✔
239

240

241
class IIncludePdfPages(interfaces.IRMLDirectiveSignature):
1✔
242
    """Inserts a set of pages from a given PDF."""
243

244
    filename = attr.File(
1✔
245
        title='Path to file',
246
        description='The pdf file to include.',
247
        required=True)
248

249
    pages = attr.IntegerSequence(
1✔
250
        title='Pages',
251
        description='A list of pages to insert.',
252
        numberingStartsAt=1,
253
        required=False)
254

255

256
class IncludePdfPages(flowable.Flowable):
1✔
257
    signature = IIncludePdfPages
1✔
258

259
    ConcatenationPostProcessorFactory = ConcatenationPostProcessor
1✔
260

261
    def getProcessor(self):
1✔
262
        manager = attr.getManager(self, interfaces.IPostProcessorManager)
1✔
263
        procs = dict(manager.postProcessors)
1✔
264
        if 'CONCAT' not in procs:
1✔
265
            log.debug(
1✔
266
                'Using concetation post-processor: %s',
267
                self.ConcatenationPostProcessorFactory)
268
            proc = self.ConcatenationPostProcessorFactory()
1✔
269
            manager.postProcessors.append(('CONCAT', proc))
1✔
270
            return proc
1✔
271
        return procs['CONCAT']
1✔
272

273
    def process(self):
1✔
274
        if pikepdf is None:
1!
275
            raise Exception(
×
276
                'pikepdf is not installed, so this feature is not available.')
277
        args = dict(self.getAttributeValues())
1✔
278
        proc = self.getProcessor()
1✔
279
        self.parent.flow.append(
1✔
280
            IncludePdfPagesFlowable(
281
                args['filename'], args.get('pages'), proc, not self.parent.flow
282
            ))
283

284

285
flowable.Flow.factories['includePdfPages'] = IncludePdfPages
1✔
286
flowable.IFlow.setTaggedValue(
1✔
287
    'directives',
288
    flowable.IFlow.getTaggedValue('directives') +
289
    (occurence.ZeroOrMore('includePdfPages', IIncludePdfPages),)
290
)
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc