• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zopefoundation / Zope / 6096135193

06 Sep 2023 10:54AM UTC coverage: 82.228% (-0.006%) from 82.234%
6096135193

push

github

dataflake
- vb and update to the latest dependencies

4329 of 6918 branches covered (0.0%)

Branch coverage included in aggregate %.

27296 of 31542 relevant lines covered (86.54%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.04
/src/App/ApplicationManager.py
1
##############################################################################
2
#
3
# Copyright (c) 2002 Zope Foundation and Contributors.
4
#
5
# This software is subject to the provisions of the Zope Public License,
6
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
7
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
8
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
9
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
10
# FOR A PARTICULAR PURPOSE.
11
#
12
##############################################################################
13

14
import os
1✔
15
import sys
1✔
16
import time
1✔
17
import types
1✔
18
from _thread import get_ident
1✔
19
from urllib import parse
1✔
20

21
from AccessControl.class_init import InitializeClass
1✔
22
from AccessControl.requestmethod import requestmethod
1✔
23
from Acquisition import Implicit
1✔
24
from App.CacheManager import CacheManager
1✔
25
from App.config import getConfiguration
1✔
26
from App.DavLockManager import DavLockManager
1✔
27
from App.Management import Tabs
1✔
28
from App.special_dtml import DTMLFile
1✔
29
from App.Undo import UndoSupport
1✔
30
from App.version_txt import version_txt
1✔
31
from DateTime.DateTime import DateTime
1✔
32
from OFS.Traversable import Traversable
1✔
33
from Persistence import Persistent
1✔
34
from Products.PageTemplates.PageTemplateFile import PageTemplateFile
1✔
35

36

37
class FakeConnection:
1✔
38
    # Supports the methods of Connection that CacheManager needs
39

40
    def __init__(self, db, parent_jar):
1✔
41
        self._db = db
1✔
42

43
    def db(self):
1✔
44
        return self._db
1✔
45

46

47
class DatabaseChooser(Tabs, Traversable, Implicit):
1✔
48
    """ Choose which database to view
49
    """
50

51
    __allow_access_to_unprotected_subobjects__ = 1
1✔
52

53
    id = 'Database'
1✔
54
    name = title = 'Database Management'
1✔
55
    meta_type = 'Database Management'
1✔
56

57
    manage_main = manage_workspace = PageTemplateFile('www/chooseDatabase.pt',
1✔
58
                                                      globals())
59
    manage_main.__name__ = 'manage_main'
1✔
60
    manage_main._need__name__ = 0
1✔
61
    manage_options = (
1✔
62
        {'label': 'Control Panel', 'action': '../manage_main'},
63
        {'label': 'Databases', 'action': 'manage_main'},
64
        {'label': 'Configuration', 'action': '../Configuration/manage_main'},
65
        {'label': 'DAV Locks', 'action': '../DavLocks/manage_main'},
66
        {'label': 'Debug Information', 'action': '../DebugInfo/manage_main'},
67
    )
68
    MANAGE_TABS_NO_BANNER = True
1✔
69

70
    def getDatabaseNames(self, quote=False):
1✔
71
        configuration = getConfiguration()
1✔
72
        names = configuration.dbtab.listDatabaseNames()
1✔
73
        names.sort()
1✔
74
        if quote:
1✔
75
            return [(name, parse.quote(name)) for name in names]
1✔
76
        return names
1✔
77

78
    def __getitem__(self, name):
1✔
79
        configuration = getConfiguration()
1✔
80
        db = configuration.dbtab.getDatabase(name=name)
1✔
81
        m = AltDatabaseManager()
1✔
82
        m.id = name
1✔
83
        m._p_jar = FakeConnection(db, self.getPhysicalRoot()._p_jar)
1✔
84
        return m.__of__(self)
1✔
85

86
    def __bobo_traverse__(self, request, name):
1✔
87
        configuration = getConfiguration()
1✔
88
        if configuration.dbtab.hasDatabase(name):
1✔
89
            return self[name]
1✔
90
        return getattr(self, name)
1✔
91

92

93
InitializeClass(DatabaseChooser)
1✔
94

95

96
class ConfigurationViewer(Tabs, Traversable, Implicit):
1✔
97
    """ Provides information about the running configuration
98
    """
99
    manage = manage_main = manage_workspace = DTMLFile('dtml/cpConfiguration',
1✔
100
                                                       globals())
101
    manage_main._setName('manage_main')
1✔
102
    id = 'Configuration'
1✔
103
    name = title = 'Configuration Viewer'
1✔
104
    meta_type = name
1✔
105
    zmi_icon = 'fa fa-cog'
1✔
106
    manage_options = (
1✔
107
        {'label': 'Control Panel', 'action': '../manage_main'},
108
        {'label': 'Databases', 'action': '../Database/manage_main'},
109
        {'label': 'Configuration', 'action': 'manage_main'},
110
        {'label': 'DAV Locks', 'action': '../DavLocks/manage_main'},
111
        {'label': 'Debug Information', 'action': '../DebugInfo/manage_main'},
112
    )
113
    MANAGE_TABS_NO_BANNER = True
1✔
114

115
    def manage_getSysPath(self):
1✔
116
        return sorted(sys.path)
1✔
117

118
    def manage_getConfiguration(self):
1✔
119
        config_results = []
1✔
120
        config = getConfiguration()
1✔
121

122
        try:
1✔
123
            keys = config.getSectionAttributes()
1✔
124
        except AttributeError:
1✔
125
            # This happens in unit tests with a DefaultConfiguration object
126
            keys = config.__dict__.keys()
1✔
127

128
        for key in keys:
1✔
129

130
            # Databases are visible on the Database Chooser already
131
            if key == 'databases':
1!
132
                continue
×
133

134
            config_results.append({'name': key,
1✔
135
                                   'value': str(getattr(config, key))})
136
        return config_results
1✔
137

138

139
InitializeClass(ConfigurationViewer)
1✔
140

141

142
# refcount snapshot info
143
_v_rcs = None
1✔
144
_v_rst = None
1✔
145

146

147
class DebugManager(Tabs, Traversable, Implicit):
1✔
148
    """ Debug and profiling information
149
    """
150
    manage = manage_main = manage_workspace = DTMLFile('dtml/debug', globals())
1✔
151
    manage_main._setName('manage_main')
1✔
152
    id = 'DebugInfo'
1✔
153
    name = title = 'Debug Information'
1✔
154
    meta_type = name
1✔
155
    zmi_icon = 'fas fa-bug'
1✔
156

157
    manage_options = (
1✔
158
        {'label': 'Control Panel', 'action': '../manage_main'},
159
        {'label': 'Databases', 'action': '../Database/manage_main'},
160
        {'label': 'Configuration', 'action': '../Configuration/manage_main'},
161
        {'label': 'DAV Locks', 'action': '../DavLocks/manage_main'},
162
        {'label': 'Debug Information', 'action': 'manage_main'},
163
    )
164

165
    def refcount(self, n=None, t=(type(Implicit), type(object))):
1✔
166
        # return class reference info
167
        counts = {}
1✔
168
        for m in list(sys.modules.values()):
1✔
169
            if m is None:
1!
170
                continue
×
171
            if not isinstance(m, types.ModuleType) or 'six.' in m.__name__:
1!
172
                continue
×
173
            for sym in dir(m):
1✔
174
                ob = getattr(m, sym)
1✔
175
                if type(ob) in t:
1✔
176
                    counts[ob] = sys.getrefcount(ob)
1✔
177
        pairs = []
1✔
178
        for ob, v in counts.items():
1✔
179
            ob_name = getattr(ob, "__name__", "unknown")
1✔
180
            if hasattr(ob, '__module__'):
1!
181
                name = f'{ob.__module__}.{ob_name}'
1✔
182
            else:
183
                name = '%s' % ob_name
×
184
            pairs.append((v, name))
1✔
185
        pairs.sort()
1✔
186
        pairs.reverse()
1✔
187
        if n is not None:
1!
188
            pairs = pairs[:n]
×
189
        return pairs
1✔
190

191
    def refdict(self):
1✔
192
        counts = {}
1✔
193
        for v, n in self.refcount():
1✔
194
            counts[n] = v
1✔
195
        return counts
1✔
196

197
    def rcsnapshot(self):
1✔
198
        global _v_rcs
199
        global _v_rst
200
        _v_rcs = self.refdict()
1✔
201
        _v_rst = DateTime()
1✔
202

203
    def rcdate(self):
1✔
204
        return _v_rst
1✔
205

206
    def rcdeltas(self):
1✔
207
        if _v_rcs is None:
1!
208
            self.rcsnapshot()
×
209
        nc = self.refdict()
1✔
210
        rc = _v_rcs
1✔
211
        rd = []
1✔
212
        for n, c in nc.items():
1✔
213
            try:
1✔
214
                prev = rc.get(n, 0)
1✔
215
                if c > prev:
1✔
216
                    rd.append((c - prev, (c, prev, n)))
1✔
217
            except Exception:
×
218
                pass
×
219
        rd.sort()
1✔
220
        rd.reverse()
1✔
221
        return [{'name': n[1][2],
1✔
222
                 'delta': n[0],
223
                 'pc': n[1][1],
224
                 'rc': n[1][0],
225
                 } for n in rd]
226

227
    def dbconnections(self):
1✔
228
        import Zope2  # for data
×
229
        return Zope2.DB.connectionDebugInfo()
×
230

231
    def manage_getSysPath(self):
1✔
232
        return list(sys.path)
1✔
233

234

235
InitializeClass(DebugManager)
1✔
236

237

238
class ApplicationManager(CacheManager,
1✔
239
                         Persistent,
240
                         Tabs,
241
                         Traversable,
242
                         Implicit):
243
    """System management
244
    """
245
    __allow_access_to_unprotected_subobjects__ = 1
1✔
246
    __roles__ = ('Manager',)
1✔
247

248
    id = 'Control_Panel'
1✔
249
    name = title = 'Control Panel'
1✔
250
    meta_type = 'Control Panel'
1✔
251
    zmi_icon = 'fa fa-cog'
1✔
252
    process_start = int(time.time())
1✔
253

254
    Database = DatabaseChooser()
1✔
255
    Configuration = ConfigurationViewer()
1✔
256
    DavLocks = DavLockManager()
1✔
257
    DebugInfo = DebugManager()
1✔
258

259
    manage = manage_main = DTMLFile('dtml/cpContents', globals())
1✔
260
    manage_main._setName('manage_main')
1✔
261
    manage_options = (
1✔
262
        {'label': 'Control Panel', 'action': 'manage_main'},
263
        {'label': 'Databases', 'action': 'Database/manage_main'},
264
        {'label': 'Configuration', 'action': 'Configuration/manage_main'},
265
        {'label': 'DAV Locks', 'action': 'DavLocks/manage_main'},
266
        {'label': 'Debug Information', 'action': 'DebugInfo/manage_main'},
267
    )
268
    MANAGE_TABS_NO_BANNER = True
1✔
269

270
    def version_txt(self):
1✔
271
        if not hasattr(self, '_v_version_txt'):
1!
272
            self._v_version_txt = version_txt()
1✔
273

274
        return self._v_version_txt
1✔
275

276
    def process_id(self):
1✔
277
        return os.getpid()
×
278

279
    def process_time(self, _when=None):
1✔
280
        if _when is None:
1!
281
            _when = time.time()
×
282
        s = int(_when) - self.process_start
1✔
283
        d = int(s / 86400)
1✔
284
        s = s - (d * 86400)
1✔
285
        h = int(s / 3600)
1✔
286
        s = s - (h * 3600)
1✔
287
        m = int(s / 60)
1✔
288
        s = s - (m * 60)
1✔
289
        d = d and ('%d day%s' % (d, (d != 1 and 's' or ''))) or ''
1✔
290
        h = h and ('%d hour%s' % (h, (h != 1 and 's' or ''))) or ''
1✔
291
        m = m and ('%d min' % m) or ''
1✔
292
        s = '%d sec' % s
1✔
293
        return f'{d} {h} {m} {s}'
1✔
294

295
    def sys_version(self):
1✔
296
        return sys.version
1✔
297

298
    def sys_platform(self):
1✔
299
        return sys.platform
1✔
300

301
    def thread_get_ident(self):
1✔
302
        return get_ident()
×
303

304
    def debug_mode(self):
1✔
305
        return getConfiguration().debug_mode
×
306

307
    def getINSTANCE_HOME(self):
1✔
308
        return getConfiguration().instancehome
1✔
309

310
    def getCLIENT_HOME(self):
1✔
311
        return getConfiguration().clienthome
1✔
312

313

314
class AltDatabaseManager(Traversable, UndoSupport):
1✔
315
    """ Database management DBTab-style
316
    """
317
    id = 'DatabaseManagement'
1✔
318
    name = title = 'Database Management'
1✔
319
    meta_type = 'Database Management'
1✔
320

321
    manage = manage_main = DTMLFile('dtml/dbMain', globals())
1✔
322
    manage_main._setName('manage_main')
1✔
323
    manage_options = (
1✔
324
        {'label': 'Control Panel', 'action': '../../manage_main'},
325
        {'label': 'Databases', 'action': '../manage_main'},
326
        {'label': 'Database', 'action': 'manage_main'},
327
    ) + UndoSupport.manage_options
328
    MANAGE_TABS_NO_BANNER = True
1✔
329

330
    def _getDB(self):
1✔
331
        return self._p_jar.db()
1✔
332

333
    def cache_length(self):
1✔
334
        return self._getDB().cacheSize()
×
335

336
    def cache_length_bytes(self):
1✔
337
        return self._getDB().getCacheSizeBytes()
×
338

339
    def cache_detail_length(self):
1✔
340
        return self._getDB().cacheDetailSize()
×
341

342
    def cache_size(self):
1✔
343
        db = self._getDB()
1✔
344
        return db.getCacheSize()
1✔
345

346
    def database_size(self):
1✔
347
        return self._getDB().objectCount()
×
348

349
    def db_name(self):
1✔
350
        return self._getDB().getName()
1✔
351

352
    def db_size(self):
1✔
353
        s = self._getDB().getSize()
1✔
354
        if isinstance(s, str):
1✔
355
            return s
1✔
356

357
        if s >= 1048576.0:
1✔
358
            return '%.1fM' % (s / 1048576.0)
1✔
359
        return '%.1fK' % (s / 1024.0)
1✔
360

361
    @requestmethod('POST')
1✔
362
    def manage_minimize(self, value=1, REQUEST=None):
1✔
363
        "Perform a full sweep through the cache"
364
        # XXX Add a deprecation warning about value?
365
        self._getDB().cacheMinimize()
×
366

367
        if REQUEST is not None:
×
368
            msg = 'ZODB in-memory caches minimized.'
×
369
            url = f'{REQUEST["URL1"]}/manage_main?manage_tabs_message={msg}'
×
370
            REQUEST.RESPONSE.redirect(url)
×
371

372
    @requestmethod('POST')
1✔
373
    def manage_pack(self, days=0, REQUEST=None):
1✔
374
        """Pack the database"""
375
        if not isinstance(days, (int, float)):
1✔
376
            try:
1✔
377
                days = float(days)
1✔
378
            except ValueError:
1✔
379
                days = None
1✔
380

381
        if days is not None:
1✔
382
            t = time.time() - (days * 86400)
1✔
383
            self._getDB().pack(t)
1✔
384
            msg = 'Database packed to %s days' % str(days)
1✔
385
        else:
386
            t = None
1✔
387
            msg = 'Invalid days value %s' % str(days)
1✔
388

389
        if REQUEST is not None:
1!
390
            url = f'{REQUEST["URL1"]}/manage_main?manage_tabs_message={msg}'
×
391
            REQUEST['RESPONSE'].redirect(url)
×
392

393
        return t
1✔
394

395

396
InitializeClass(AltDatabaseManager)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc