• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mozilla / relman-auto-nag / #4381

pending completion
#4381

push

coveralls-python

suhaibmujahid
[topcrash_add_keyword] Rename `top_crash_signatures` to `topcrash_signatures`

563 of 3085 branches covered (18.25%)

2 of 2 new or added lines in 1 file covered. (100.0%)

1798 of 7981 relevant lines covered (22.53%)

0.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/auto_nag/scripts/topcrash_add_keyword.py
1
# This Source Code Form is subject to the terms of the Mozilla Public
2
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
3
# You can obtain one at http://mozilla.org/MPL/2.0/.
4

5
import itertools
×
6
import re
×
7
from collections import defaultdict
×
8
from typing import Dict, Iterable
×
9

10
from libmozdata.bugzilla import Bugzilla
×
11
from libmozdata.connection import Connection
×
12

13
from auto_nag import utils
×
14
from auto_nag.bzcleaner import BzCleaner
×
15
from auto_nag.constants import LOW_SEVERITY
×
16
from auto_nag.history import History
×
17
from auto_nag.topcrash import TOP_CRASH_IDENTIFICATION_CRITERIA, Topcrash
×
18

19
MAX_SIGNATURES_PER_QUERY = 30
×
20

21

22
class TopcrashAddKeyword(BzCleaner):
×
23
    def __init__(self):
×
24
        super().__init__()
×
25
        self.topcrashes = None
×
26
        self.topcrashes_restrictive = None
×
27

28
    def description(self):
×
29
        return "Bugs with missing topcrash keywords"
×
30

31
    def columns(self):
×
32
        return ["id", "summary", "severity", "added_keywords"]
×
33

34
    def handle_bug(self, bug, data):
×
35
        bugid = str(bug["id"])
×
36
        if bugid in data:
×
37
            return
×
38

39
        topcrash_signatures = self._get_topcrash_signatures(bug)
×
40
        keywords_to_add = self._get_keywords_to_be_added(bug, topcrash_signatures)
×
41
        is_keywords_removed = utils.is_keywords_removed_by_autonag(bug, keywords_to_add)
×
42

43
        autofix = {
×
44
            "comment": {
45
                "body": "",
46
            },
47
        }
48

49
        if keywords_to_add and (
×
50
            not is_keywords_removed
51
            or self._is_matching_restrictive_criteria(topcrash_signatures)
52
        ):
53
            autofix["keywords"] = {
×
54
                "add": sorted(keywords_to_add),
55
            }
56
            autofix["comment"]["body"] += self.get_matching_criteria_comment(
×
57
                topcrash_signatures, is_keywords_removed
58
            )
59

60
        ni_person = utils.get_mail_to_ni(bug)
×
61
        if (
×
62
            ni_person
63
            and bug["severity"] in LOW_SEVERITY
64
            and "meta" not in bug["keywords"]
65
            and not self._has_severity_increase_comment(bug)
66
            and not self._was_severity_set_after_topcrash(bug)
67
        ):
68
            autofix["flags"] = [
×
69
                {
70
                    "name": "needinfo",
71
                    "requestee": ni_person["mail"],
72
                    "status": "?",
73
                    "new": "true",
74
                }
75
            ]
76
            autofix["comment"]["body"] += (
×
77
                f'\n:{ ni_person["nickname"] }, '
78
                "could you consider increasing the severity of this top-crash bug?"
79
            )
80

81
        if not autofix["comment"]["body"]:
×
82
            # No comment, no action
83
            return
×
84

85
        autofix["comment"]["body"] += f"\n\n{ self.get_documentation() }\n"
×
86
        self.autofix_changes[bugid] = autofix
×
87

88
        data[bugid] = {
×
89
            "severity": bug["severity"],
90
            "added_keywords": (
91
                utils.english_list(autofix["keywords"]["add"])
92
                if "keywords" in autofix
93
                else "-"
94
            ),
95
        }
96

97
        return bug
×
98

99
    def get_matching_criteria_comment(
×
100
        self,
101
        signatures: list,
102
        is_keywords_removed: bool,
103
    ) -> str:
104
        """Generate a comment with the matching criteria for the given signatures.
105

106
        Args:
107
            signatures: The list of signatures to generate the comment for.
108
            is_keywords_removed: Whether the topcrash keywords was removed earlier.
109

110
        Returns:
111
            The comment for the matching criteria.
112
        """
113
        matching_criteria: Dict[str, bool] = defaultdict(bool)
×
114
        for signature in signatures:
×
115
            for criterion in self.topcrashes[signature]:
×
116
                matching_criteria[criterion["criterion_name"]] |= criterion[
×
117
                    "is_startup"
118
                ]
119

120
        introduction = (
×
121
            (
122
                "Sorry for removing the keyword earlier but there is a recent "
123
                "change in the ranking, so the bug is again linked to "
124
                if is_keywords_removed
125
                else "The bug is linked to "
126
            ),
127
            (
128
                "a topcrash signature, which matches "
129
                if len(signatures) == 1
130
                else "topcrash signatures, which match "
131
            ),
132
            "the following [",
133
            "criterion" if len(matching_criteria) == 1 else "criteria",
134
            "](https://wiki.mozilla.org/CrashKill/Topcrash):\n",
135
        )
136

137
        criteria = (
×
138
            " ".join(("-", criterion_name, "(startup)\n" if is_startup else "\n"))
139
            for criterion_name, is_startup in matching_criteria.items()
140
        )
141

142
        return "".join(itertools.chain(introduction, criteria))
×
143

144
    def _has_severity_increase_comment(self, bug):
×
145
        return any(
×
146
            "could you consider increasing the severity of this top-crash bug?"
147
            in comment["raw_text"]
148
            for comment in reversed(bug["comments"])
149
            if comment["creator"] == History.BOT
150
        )
151

152
    def _was_severity_set_after_topcrash(self, bug: dict) -> bool:
×
153
        """Check if the severity was set after the topcrash keyword was added.
154

155
        Note: this will not catch cases where the topcrash keyword was added
156
        added when the bug was created and the severity was set later.
157

158
        Args:
159
            bug: The bug to check.
160

161
        Returns:
162
            True if the severity was set after the topcrash keyword was added.
163
        """
164
        has_topcrash_added = False
×
165
        for history in bug["history"]:
×
166
            for change in history["changes"]:
×
167
                if not has_topcrash_added and change["field_name"] == "keywords":
×
168
                    has_topcrash_added = "topcrash" in change["added"]
×
169

170
                elif has_topcrash_added and change["field_name"] == "severity":
×
171
                    return True
×
172

173
        return False
×
174

175
    def _get_topcrash_signatures(self, bug: dict) -> list:
×
176
        topcrash_signatures = [
×
177
            signature
178
            for signature in utils.get_signatures(bug["cf_crash_signature"])
179
            if signature in self.topcrashes
180
        ]
181

182
        if not topcrash_signatures:
×
183
            raise Exception("The bug should have a topcrash signature.")
×
184

185
        return topcrash_signatures
×
186

187
    def _get_keywords_to_be_added(self, bug: dict, topcrash_signatures: list) -> set:
×
188
        existing_keywords = {
×
189
            keyword
190
            for keyword in ("topcrash", "topcrash-startup")
191
            if keyword in bug["keywords"]
192
        }
193

194
        if any(
×
195
            # Is it a startup topcrash bug?
196
            any(criterion["is_startup"] for criterion in self.topcrashes[signature])
197
            for signature in topcrash_signatures
198
        ):
199
            keywords_to_add = {"topcrash", "topcrash-startup"}
×
200

201
        else:
202
            keywords_to_add = {"topcrash"}
×
203

204
        return keywords_to_add - existing_keywords
×
205

206
    def get_bugs(self, date="today", bug_ids=[], chunk_size=None):
×
207
        self.query_url = None
×
208
        timeout = self.get_config("bz_query_timeout")
×
209
        bugs = self.get_data()
×
210
        params_list = self.get_bz_params_list(date)
×
211

212
        searches = [
×
213
            Bugzilla(
214
                params,
215
                bughandler=self.bughandler,
216
                bugdata=bugs,
217
                timeout=timeout,
218
            ).get_data()
219
            for params in params_list
220
        ]
221

222
        for search in searches:
×
223
            search.wait()
×
224

225
        return bugs
×
226

227
    def _is_matching_restrictive_criteria(self, signatures: Iterable) -> bool:
×
228
        topcrashes = self._get_restrictive_topcrash_signatures()
×
229
        return any(signature in topcrashes for signature in signatures)
×
230

231
    def _get_restrictive_topcrash_signatures(self) -> dict:
×
232
        if self.topcrashes_restrictive is None:
×
233
            restrictive_criteria = []
×
234
            for criterion in TOP_CRASH_IDENTIFICATION_CRITERIA:
×
235
                restrictive_criterion = {
×
236
                    **criterion,
237
                    "tc_limit": criterion["tc_limit"] // 2,
238
                }
239

240
                if "tc_limit_startup" in criterion:
×
241
                    restrictive_criterion["tc_limit_startup"] //= 2
×
242

243
                restrictive_criteria.append(restrictive_criterion)
×
244

245
            self.topcrashes_restrictive = Topcrash(
×
246
                criteria=restrictive_criteria
247
            ).get_signatures()
248

249
        return self.topcrashes_restrictive
×
250

251
    def get_bz_params_list(self, date):
×
252
        self.topcrashes = Topcrash(date).get_signatures()
×
253

254
        fields = [
×
255
            "triage_owner",
256
            "assigned_to",
257
            "severity",
258
            "keywords",
259
            "cf_crash_signature",
260
            "history",
261
            "comments.creator",
262
            "comments.raw_text",
263
        ]
264
        params_base = {
×
265
            "include_fields": fields,
266
            "resolution": "---",
267
        }
268
        self.amend_bzparams(params_base, [])
×
269

270
        params_list = []
×
271
        for signatures in Connection.chunks(
×
272
            list(self.topcrashes.keys()),
273
            MAX_SIGNATURES_PER_QUERY,
274
        ):
275
            params = params_base.copy()
×
276
            n = int(utils.get_last_field_num(params))
×
277
            params[f"f{n}"] = "OP"
×
278
            params[f"j{n}"] = "OR"
×
279
            for signature in signatures:
×
280
                n += 1
×
281
                params[f"f{n}"] = "cf_crash_signature"
×
282
                params[f"o{n}"] = "regexp"
×
283
                # Using `(@ |@)` instead of `@ ?` and ( \]|\]) instead of ` ?]`
284
                # is a workaround. Strangely `?` stays with the encoded form (%3F)
285
                # in Bugzilla query.
286
                # params[f"v{n}"] = f"\[@ ?{re.escape(signature)} ?\]"
287
                params[f"v{n}"] = rf"\[(@ |@){re.escape(signature)}( \]|\])"
×
288
            params[f"f{n+1}"] = "CP"
×
289
            params_list.append(params)
×
290

291
        return params_list
×
292

293

294
if __name__ == "__main__":
×
295
    TopcrashAddKeyword().run()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc