• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mozilla / relman-auto-nag / #4350

pending completion
#4350

push

coveralls-python

web-flow
Update the protocol from git to https in Readme file (#1984)

563 of 3079 branches covered (18.29%)

1795 of 7971 relevant lines covered (22.52%)

0.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

30.76
/auto_nag/utils.py
1
# This Source Code Form is subject to the terms of the Mozilla Public
2
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
3
# You can obtain one at http://mozilla.org/MPL/2.0/.
4

5
import copy
1✔
6
import datetime
1✔
7
import json
1✔
8
import os
1✔
9
import random
1✔
10
import re
1✔
11
from typing import Iterable, Union
1✔
12
from urllib.parse import urlencode
1✔
13

14
import dateutil.parser
1✔
15
import humanize
1✔
16
import pytz
1✔
17
import requests
1✔
18
from dateutil.relativedelta import relativedelta
1✔
19
from libmozdata import utils as lmdutils
1✔
20
from libmozdata import versions as lmdversions
1✔
21
from libmozdata.bugzilla import Bugzilla, BugzillaShorten
1✔
22
from libmozdata.fx_trains import FirefoxTrains
1✔
23
from libmozdata.hgmozilla import Mercurial
1✔
24
from requests.exceptions import HTTPError
1✔
25

26
from auto_nag.constants import (
1✔
27
    BOT_MAIN_ACCOUNT,
28
    HIGH_PRIORITY,
29
    HIGH_SEVERITY,
30
    OLD_SEVERITY_MAP,
31
)
32

33
_CONFIG = None
1✔
34
_CYCLE_SPAN = None
1✔
35
_MERGE_DAY = None
1✔
36
_TRIAGE_OWNERS = None
1✔
37
_DEFAULT_ASSIGNEES = None
1✔
38
_CURRENT_VERSIONS = None
1✔
39
_CONFIG_PATH = "./auto_nag/scripts/configs/"
1✔
40

41

42
BZ_FIELD_PAT = re.compile(r"^[fovj]([0-9]+)$")
1✔
43
PAR_PAT = re.compile(r"\([^\)]*\)")
1✔
44
BRA_PAT = re.compile(r"\[[^\]]*\]")
1✔
45
DIA_PAT = re.compile("<[^>]*>")
1✔
46
UTC_PAT = re.compile(r"UTC\+[^ \t]*")
1✔
47
COL_PAT = re.compile(":[^:]*")
1✔
48
BACKOUT_PAT = re.compile("^back(s|(ed))?[ \t]*out", re.I)
1✔
49
BUG_PAT = re.compile(r"^bug[s]?[ \t]*([0-9]+)", re.I)
1✔
50
WHITEBOARD_ACCESS_PAT = re.compile(r"\[access\-s\d\]")
1✔
51

52
MAX_URL_LENGTH = 512
1✔
53

54

55
def get_weekdays():
1✔
56
    return {"Mon": 0, "Tue": 1, "Wed": 2, "Thu": 3, "Fri": 4, "Sat": 5, "Sun": 6}
1✔
57

58

59
def _get_config():
1✔
60
    global _CONFIG
61
    if _CONFIG is None:
1✔
62
        try:
1✔
63
            with open(_CONFIG_PATH + "/tools.json", "r") as In:
1✔
64
                _CONFIG = json.load(In)
1✔
65
        except IOError:
×
66
            _CONFIG = {}
×
67
    return _CONFIG
1✔
68

69

70
def get_config(name, entry, default=None):
1✔
71
    conf = _get_config()
1✔
72
    if name not in conf:
1✔
73
        name = "common"
1✔
74
    tool_conf = conf[name]
1✔
75
    if entry in tool_conf:
1✔
76
        return tool_conf[entry]
1✔
77
    tool_conf = conf["common"]
1✔
78
    return tool_conf.get(entry, default)
1✔
79

80

81
def get_receivers(tool_name):
1✔
82
    receiver_lists = get_config("common", "receiver_list", default={})
×
83

84
    receivers = get_config(tool_name, "receivers", [])
×
85
    if isinstance(receivers, str):
×
86
        receivers = receiver_lists[receivers]
×
87

88
    additional_receivers = get_config(tool_name, "additional_receivers", [])
×
89
    if isinstance(additional_receivers, str):
×
90
        additional_receivers = receiver_lists[additional_receivers]
×
91

92
    return list(dict.fromkeys([*receivers, *additional_receivers]))
×
93

94

95
def init_random():
1✔
96
    now = datetime.datetime.utcnow()
1✔
97
    now = now.timestamp()
1✔
98
    random.seed(now)
1✔
99

100

101
def get_signatures(sgns):
1✔
102
    if not sgns:
1!
103
        return set()
×
104

105
    res = set()
1✔
106
    sgns = map(lambda x: x.strip(), sgns.split("[@"))
1✔
107
    for s in filter(None, sgns):
1✔
108
        try:
1✔
109
            i = s.rindex("]")
1✔
110
            res.add(s[:i].strip())
1✔
111
        except ValueError:
×
112
            res.add(s)
×
113
    return res
1✔
114

115

116
def add_signatures(old, new):
1✔
117
    added_sgns = "[@ " + "]\n[@ ".join(sorted(new)) + "]"
1✔
118
    if old:
1✔
119
        return old + "\n" + added_sgns
1✔
120
    return added_sgns
1✔
121

122

123
def get_empty_assignees(params, negation=False):
1✔
124
    n = get_last_field_num(params)
1✔
125
    n = int(n)
1✔
126
    params.update(
1✔
127
        {
128
            "j" + str(n): "OR",
129
            "f" + str(n): "OP",
130
            "f" + str(n + 1): "assigned_to",
131
            "o" + str(n + 1): "equals",
132
            "v" + str(n + 1): "nobody@mozilla.org",
133
            "f" + str(n + 2): "assigned_to",
134
            "o" + str(n + 2): "regexp",
135
            "v" + str(n + 2): r"^.*\.bugs$",
136
            "f" + str(n + 3): "assigned_to",
137
            "o" + str(n + 3): "isempty",
138
            "f" + str(n + 4): "CP",
139
        }
140
    )
141
    if negation:
1!
142
        params["n" + str(n)] = 1
×
143

144
    return params
1✔
145

146

147
def is_no_assignee(mail):
1✔
148
    return mail == "nobody@mozilla.org" or mail.endswith(".bugs") or mail == ""
1✔
149

150

151
def get_login_info():
1✔
152
    with open(_CONFIG_PATH + "config.json", "r") as In:
×
153
        return json.load(In)
×
154

155

156
def get_private():
1✔
157
    with open(_CONFIG_PATH + "config.json", "r") as In:
×
158
        return json.load(In)["private"]
×
159

160

161
def get_gcp_service_account_info() -> dict:
1✔
162
    """Get the GCP service account info from the downloaded key file."""
163
    with open(
×
164
        _CONFIG_PATH + "gcp_service_account.json", "r", encoding="utf-8"
165
    ) as json_file:
166
        return json.load(json_file)
×
167

168

169
def plural(sword, data, pword=""):
1✔
170
    if isinstance(data, int):
1!
171
        p = data != 1
×
172
    else:
173
        p = len(data) != 1
1✔
174
    if not p:
1✔
175
        return sword
1✔
176
    if pword:
1!
177
        return pword
1✔
178
    return sword + "s"
×
179

180

181
def english_list(items):
1✔
182
    assert len(items) > 0
1✔
183
    if len(items) == 1:
1!
184
        return items[0]
×
185

186
    return "{} and {}".format(", ".join(items[:-1]), items[-1])
1✔
187

188

189
def shorten_long_bz_url(url):
1✔
190
    if not url or len(url) <= MAX_URL_LENGTH:
×
191
        return url
×
192

193
    # the url can be very long and line length are limited in email protocol:
194
    # https://datatracker.ietf.org/doc/html/rfc5322#section-2.1.1
195
    # So we need to generate a short URL.
196

197
    def url_handler(u, data):
×
198
        data["url"] = u
×
199

200
    data = {}
×
201
    try:
×
202
        BugzillaShorten(url, url_data=data, url_handler=url_handler).wait()
×
203
    except HTTPError:  # workaround for https://github.com/mozilla/relman-auto-nag/issues/1402
×
204
        return "\n".join(
×
205
            [url[i : i + MAX_URL_LENGTH] for i in range(0, len(url), MAX_URL_LENGTH)]
206
        )
207

208
    return data["url"]
×
209

210

211
def get_cycle_span() -> str:
1✔
212
    """Return the cycle span in the format YYYYMMDD-YYYYMMDD"""
213
    global _CYCLE_SPAN
214
    if _CYCLE_SPAN is None:
×
215
        schedule = FirefoxTrains().get_release_schedule("nightly")
×
216
        start = lmdutils.get_date_ymd(schedule["nightly_start"])
×
217
        end = lmdutils.get_date_ymd(schedule["merge_day"])
×
218

219
        now = lmdutils.get_date_ymd("today")
×
220
        assert start <= now <= end
×
221

222
        _CYCLE_SPAN = start.strftime("%Y%m%d") + "-" + end.strftime("%Y%m%d")
×
223

224
    return _CYCLE_SPAN
×
225

226

227
def get_next_release_date() -> datetime.datetime:
1✔
228
    """Return the next release date"""
229
    schedule = FirefoxTrains().get_release_schedule("beta")
×
230
    release_date = lmdutils.get_date_ymd(schedule["release"])
×
231
    release_date = release_date.replace(hour=0, minute=0, second=0, microsecond=0)
×
232
    return release_date
×
233

234

235
def is_merge_day(date: datetime.datetime = None) -> bool:
1✔
236
    """Check if the date is the merge day
237

238
    Args:
239
        date: the date to check. If None, the current date is used.
240

241
    Returns:
242
        True if the date is the merge day
243
    """
244
    if date is None:
×
245
        date = lmdutils.get_date_ymd("today")
×
246

247
    schedule = FirefoxTrains().get_release_schedule("nightly")
×
248
    last_merge = lmdutils.get_date_ymd(schedule["nightly_start"])
×
249
    next_merge = lmdutils.get_date_ymd(schedule["merge_day"])
×
250

251
    return date in (next_merge, last_merge)
×
252

253

254
def get_report_bugs(channel, op="+"):
1✔
255
    url = "https://bugzilla.mozilla.org/page.cgi?id=release_tracking_report.html"
×
256
    params = {
×
257
        "q": "approval-mozilla-{}:{}:{}:0:and:".format(channel, op, get_cycle_span())
258
    }
259

260
    # allow_redirects=False avoids to load the data
261
    # and we'll just get the redirected url to get all the bug ids we need
262
    r = requests.get(url, params=params, allow_redirects=False)
×
263

264
    # something like https://bugzilla.mozilla.org/buglist.cgi?bug_id=1493711,1502766,1499908
265
    url = r.headers["Location"]
×
266

267
    return url.split("=")[1].split(",")
×
268

269

270
def get_flag(version, name, channel):
1✔
271
    if name in ["status", "tracking"]:
1!
272
        if channel == "esr":
1✔
273
            return "cf_{}_firefox_esr{}".format(name, version)
1✔
274
        return "cf_{}_firefox{}".format(name, version)
1✔
275
    elif name == "approval":
×
276
        if channel == "esr":
×
277
            return "approval-mozilla-esr{}".format(version)
×
278
        return "approval-mozilla-{}".format(channel)
×
279

280

281
def get_needinfo(bug, days=-1):
1✔
282
    now = datetime.datetime.utcnow().replace(tzinfo=pytz.utc)
×
283
    for flag in bug.get("flags", []):
×
284
        if flag.get("name", "") == "needinfo" and flag["status"] == "?":
×
285
            date = flag["modification_date"]
×
286
            date = dateutil.parser.parse(date)
×
287
            if (now - date).days >= days:
×
288
                yield flag
×
289

290

291
def get_last_field_num(params):
1✔
292
    s = set()
1✔
293
    for k in params.keys():
1✔
294
        m = BZ_FIELD_PAT.match(k)
1✔
295
        if m:
1✔
296
            s.add(int(m.group(1)))
1✔
297

298
    x = max(s) + 1 if s else 1
1✔
299
    return str(x)
1✔
300

301

302
def add_prod_comp_to_query(params, prod_comp):
1✔
303
    n = int(get_last_field_num(params))
×
304
    params[f"j{n}"] = "OR"
×
305
    params[f"f{n}"] = "OP"
×
306
    n += 1
×
307
    for pc in prod_comp:
×
308
        prod, comp = pc.split("::")
×
309
        params[f"j{n}"] = "AND"
×
310
        params[f"f{n}"] = "OP"
×
311
        n += 1
×
312
        params[f"f{n}"] = "product"
×
313
        params[f"o{n}"] = "equals"
×
314
        params[f"v{n}"] = prod
×
315
        n += 1
×
316
        params[f"f{n}"] = "component"
×
317
        params[f"o{n}"] = "equals"
×
318
        params[f"v{n}"] = comp
×
319
        n += 1
×
320
        params[f"f{n}"] = "CP"
×
321
        n += 1
×
322
    params[f"f{n}"] = "CP"
×
323

324

325
def get_bz_search_url(params):
1✔
326
    return "https://bugzilla.mozilla.org/buglist.cgi?" + urlencode(params, doseq=True)
1✔
327

328

329
def has_bot_set_ni(bug):
1✔
330
    bot = get_config("common", "bot_bz_mail")
×
331
    for flag in get_needinfo(bug):
×
332
        if flag["setter"] in bot:
×
333
            return True
×
334
    return False
×
335

336

337
def get_triage_owners():
1✔
338
    global _TRIAGE_OWNERS
339
    if _TRIAGE_OWNERS is not None:
×
340
        return _TRIAGE_OWNERS
×
341

342
    # accessible is the union of:
343
    #  selectable (all product we can see)
344
    #  enterable (all product a user can file bugs into).
345
    prods = get_config("common", "products")
×
346
    url = "https://bugzilla.mozilla.org/rest/product"
×
347
    params = {
×
348
        "type": "accessible",
349
        "include_fields": ["name", "components.name", "components.triage_owner"],
350
        "names": prods,
351
    }
352
    r = requests.get(url, params=params)
×
353
    products = r.json()["products"]
×
354
    _TRIAGE_OWNERS = {}
×
355
    for prod in products:
×
356
        prod_name = prod["name"]
×
357
        for comp in prod["components"]:
×
358
            owner = comp["triage_owner"]
×
359
            if owner and not is_no_assignee(owner):
×
360
                comp_name = comp["name"]
×
361
                pc = f"{prod_name}::{comp_name}"
×
362
                if owner not in _TRIAGE_OWNERS:
×
363
                    _TRIAGE_OWNERS[owner] = [pc]
×
364
                else:
365
                    _TRIAGE_OWNERS[owner].append(pc)
×
366
    return _TRIAGE_OWNERS
×
367

368

369
def get_default_assignees():
1✔
370
    global _DEFAULT_ASSIGNEES
371
    if _DEFAULT_ASSIGNEES is not None:
×
372
        return _DEFAULT_ASSIGNEES
×
373

374
    # accessible is the union of:
375
    #  selectable (all product we can see)
376
    #  enterable (all product a user can file bugs into).
377
    prods = get_config("common", "products")
×
378
    url = "https://bugzilla.mozilla.org/rest/product"
×
379
    params = {
×
380
        "type": "accessible",
381
        "include_fields": ["name", "components.name", "components.default_assigned_to"],
382
        "names": prods,
383
    }
384
    r = requests.get(url, params=params)
×
385
    products = r.json()["products"]
×
386
    _DEFAULT_ASSIGNEES = {}
×
387
    for prod in products:
×
388
        prod_name = prod["name"]
×
389
        _DEFAULT_ASSIGNEES[prod_name] = dap = {}
×
390
        for comp in prod["components"]:
×
391
            comp_name = comp["name"]
×
392
            assignee = comp["default_assigned_to"]
×
393
            dap[comp_name] = assignee
×
394
    return _DEFAULT_ASSIGNEES
×
395

396

397
def organize(bugs, columns, key=None):
1✔
398
    if isinstance(bugs, dict):
×
399
        # we suppose that the values are the bugdata dict
400
        bugs = bugs.values()
×
401

402
    def identity(x):
×
403
        return x
×
404

405
    def bugid_key(x):
×
406
        return -int(x)
×
407

408
    lambdas = {"id": bugid_key}
×
409

410
    def mykey(p):
×
411
        return tuple(lambdas.get(c, identity)(x) for x, c in zip(p, columns))
×
412

413
    if len(columns) >= 2:
×
414
        res = [tuple(info[c] for c in columns) for info in bugs]
×
415
    else:
416
        c = columns[0]
×
417
        res = [info[c] for info in bugs]
×
418

419
    return sorted(res, key=mykey if not key else key)
×
420

421

422
def merge_bz_changes(c1, c2):
1✔
423
    if not c1:
×
424
        return c2
×
425
    if not c2:
×
426
        return c1
×
427

428
    assert set(c1.keys()).isdisjoint(
×
429
        c2.keys()
430
    ), "Merge changes with common keys is not a good idea"
431
    c = copy.deepcopy(c1)
×
432
    c.update(c2)
×
433

434
    return c
×
435

436

437
def is_test_file(path):
1✔
438
    e = os.path.splitext(path)[1][1:].lower()
×
439
    return "test" in path and e not in {"ini", "list", "in", "py", "json", "manifest"}
×
440

441

442
def get_better_name(name):
1✔
443
    if not name:
×
444
        return ""
×
445

446
    def repl(m):
×
447
        if m.start(0) == 0:
×
448
            return m.group(0)
×
449
        return ""
×
450

451
    if name.startswith("Nobody;"):
×
452
        s = "Nobody"
×
453
    else:
454
        s = PAR_PAT.sub("", name)
×
455
        s = BRA_PAT.sub("", s)
×
456
        s = DIA_PAT.sub("", s)
×
457
        s = COL_PAT.sub(repl, s)
×
458
        s = UTC_PAT.sub("", s)
×
459
        s = s.strip()
×
460
        if s.startswith(":"):
×
461
            s = s[1:]
×
462
    return s.encode("utf-8").decode("utf-8")
×
463

464

465
def is_backout(json):
1✔
466
    return json.get("backedoutby", "") != "" or bool(BACKOUT_PAT.search(json["desc"]))
1✔
467

468

469
def get_pushlog(startdate, enddate, channel="nightly"):
1✔
470
    """Get the pushlog from hg.mozilla.org"""
471
    # Get the pushes where startdate <= pushdate <= enddate
472
    # pushlog uses strict inequality, it's why we add +/- 1 second
473
    fmt = "%Y-%m-%d %H:%M:%S"
×
474
    startdate -= relativedelta(seconds=1)
×
475
    startdate = startdate.strftime(fmt)
×
476
    enddate += relativedelta(seconds=1)
×
477
    enddate = enddate.strftime(fmt)
×
478
    url = "{}/json-pushes".format(Mercurial.get_repo_url(channel))
×
479
    r = requests.get(
×
480
        url,
481
        params={"startdate": startdate, "enddate": enddate, "version": 2, "full": 1},
482
    )
483
    return r.json()
×
484

485

486
def get_bugs_from_desc(desc):
1✔
487
    """Get a bug number from the patch description"""
488
    return BUG_PAT.findall(desc)
×
489

490

491
def get_bugs_from_pushlog(startdate, enddate, channel="nightly"):
1✔
492
    pushlog = get_pushlog(startdate, enddate, channel=channel)
×
493
    bugs = set()
×
494
    for push in pushlog["pushes"].values():
×
495
        for chgset in push["changesets"]:
×
496
            if chgset.get("backedoutby", "") != "":
×
497
                continue
×
498
            desc = chgset["desc"]
×
499
            for bug in get_bugs_from_desc(desc):
×
500
                bugs.add(bug)
×
501
    return bugs
×
502

503

504
def get_checked_versions():
1✔
505
    # There are different reasons to not return versions:
506
    # i) we're merge day: the versions are changing
507
    # ii) not consecutive versions numbers
508
    # iii) bugzilla updated nightly version but p-d is not updated
509
    if is_merge_day():
×
510
        return {}
×
511

512
    versions = lmdversions.get(base=True)
×
513
    versions["central"] = versions["nightly"]
×
514

515
    v = [versions[k] for k in ["release", "beta", "central"]]
×
516
    versions = {k: str(v) for k, v in versions.items()}
×
517

518
    if v[0] + 2 == v[1] + 1 == v[2]:
×
519
        nightly_bugzilla = get_nightly_version_from_bz()
×
520
        if v[2] != nightly_bugzilla:
×
521
            from . import logger
×
522

523
            logger.info("Versions mismatch between Bugzilla and product-details")
×
524
            return {}
×
525
        return versions
×
526

527
    from . import logger
×
528

529
    logger.info("Not consecutive versions in product/details")
×
530
    return {}
×
531

532

533
def get_info_from_hg(json):
1✔
534
    res = {}
×
535
    push = json["pushdate"][0]
×
536
    push = datetime.datetime.utcfromtimestamp(push)
×
537
    push = lmdutils.as_utc(push)
×
538
    res["date"] = lmdutils.get_date_str(push)
×
539
    res["backedout"] = json.get("backedoutby", "") != ""
×
540
    m = BUG_PAT.search(json["desc"])
×
541
    res["bugid"] = m.group(1) if m else ""
×
542

543
    return res
×
544

545

546
def bz_ignore_case(s):
1✔
547
    return "[" + "][".join(c + c.upper() for c in s) + "]"
×
548

549

550
def check_product_component(data, bug):
1✔
551
    prod = bug["product"]
×
552
    comp = bug["component"]
×
553
    pc = prod + "::" + comp
×
554
    return pc in data or comp in data
×
555

556

557
def get_components(data):
1✔
558
    res = []
×
559
    for comp in data:
×
560
        if "::" in comp:
×
561
            _, comp = comp.split("::", 1)
×
562
        res.append(comp)
×
563
    return res
×
564

565

566
def get_products_components(data):
1✔
567
    prods = set()
×
568
    comps = set()
×
569
    for pc in data:
×
570
        if "::" in pc:
×
571
            p, c = pc.split("::", 1)
×
572
            prods.add(p)
×
573
        else:
574
            c = pc
×
575
        comps.add(c)
×
576
    return prods, comps
×
577

578

579
def ireplace(old, repl, text):
1✔
580
    return re.sub("(?i)" + re.escape(old), lambda m: repl, text)
×
581

582

583
def get_human_lag(date):
1✔
584
    today = pytz.utc.localize(datetime.datetime.utcnow())
×
585
    dt = dateutil.parser.parse(date) if isinstance(date, str) else date
×
586

587
    return humanize.naturaldelta(today - dt)
×
588

589

590
def get_nightly_version_from_bz():
1✔
591
    def bug_handler(bug, data):
×
592
        status = "cf_status_firefox"
×
593
        N = len(status)
×
594
        for k in bug.keys():
×
595
            if k.startswith(status):
×
596
                k = k[N:]
×
597
                if k.isdigit():
×
598
                    data.append(int(k))
×
599

600
    data = []
×
601
    Bugzilla(bugids=["1234567"], bughandler=bug_handler, bugdata=data).get_data().wait()
×
602

603
    return max(data)
×
604

605

606
def nice_round(val):
1✔
607
    return int(round(100 * val))
×
608

609

610
def is_bot_email(email: str) -> bool:
1✔
611
    """Check if the email is belong to a bot or component-watching account.
612

613
    Args:
614
        email: the account login email.
615
    """
616
    if email.endswith("@disabled.tld"):
1✔
617
        return False
1✔
618

619
    return email.endswith(".bugs") or email.endswith(".tld")
1✔
620

621

622
def get_last_no_bot_comment_date(bug: dict) -> str:
1✔
623
    """Get the create date of the last comment by non bot account.
624

625
    Args:
626
        bug: the bug dictionary; it must has the comments list.
627

628
    Returns:
629
        If no comments or all comments are posted by bots, the creation date of
630
        the bug itself will be returned.
631
    """
632
    for comment in reversed(bug["comments"]):
×
633
        if not is_bot_email(comment["creator"]):
×
634
            return comment["creation_time"]
×
635

636
    return bug["comments"][0]["creation_time"]
×
637

638

639
def get_sort_by_bug_importance_key(bug):
1✔
640
    """
641
    We need bugs with high severity (S1 or S2) or high priority (P1 or P2) to be
642
    first (do not need to be high in both). Next, bugs with higher priority and
643
    severity are preferred. Finally, for bugs with the same severity and priority,
644
    we favour recently changed or created bugs.
645
    """
646

647
    is_important = bug["priority"] in HIGH_PRIORITY or bug["severity"] in HIGH_SEVERITY
×
648
    priority = bug["priority"] if bug["priority"].startswith("P") else "P10"
×
649
    severity = (
×
650
        bug["severity"]
651
        if bug["severity"].startswith("S")
652
        else OLD_SEVERITY_MAP.get(bug["severity"], "S10")
653
    )
654
    time_order = (
×
655
        lmdutils.get_timestamp(bug["last_change_time"])
656
        if "last_change_time" in bug
657
        else int(bug["id"])  # Bug ID reflects the creation order
658
    )
659

660
    return (
×
661
        not is_important,
662
        severity,
663
        priority,
664
        time_order * -1,
665
    )
666

667

668
def get_mail_to_ni(bug: dict) -> Union[dict, None]:
1✔
669
    """Get the person that should be needinfoed about the bug.
670

671
    If the bug is assigned, the assignee will be selected. Otherwise, will
672
    fallback to the triage owner.
673

674
    Args:
675
        bug: The bug that you need to send a needinfo request about.
676

677
    Returns:
678
        A dict with the nicname and the email of the person that should receive
679
        the needinfo request. If not available will return None.
680

681
    """
682

683
    for field in ["assigned_to", "triage_owner"]:
×
684
        person = bug.get(field, "")
×
685
        if not is_no_assignee(person):
×
686
            return {"mail": person, "nickname": bug[f"{field}_detail"]["nick"]}
×
687

688
    return None
×
689

690

691
def get_name_from_user_detail(detail: dict) -> str:
1✔
692
    """Get the name of the user from the detail object.
693

694
    Returns:
695
        The name of the user or the email as a fallback.
696
    """
697
    name = detail["real_name"]
×
698
    if is_no_assignee(detail["email"]):
×
699
        name = "nobody"
×
700
    if name.strip() == "":
×
701
        name = detail["name"]
×
702
        if name.strip() == "":
×
703
            name = detail["email"]
×
704

705
    return name
×
706

707

708
def is_weekend(date: Union[datetime.datetime, str]) -> bool:
1✔
709
    """Get if the provided date is a weekend day (Saturday or Sunday)"""
710
    parsed_date = lmdutils.get_date_ymd(date)
×
711
    return parsed_date.weekday() >= 5
×
712

713

714
def get_whiteboard_access_rating(whiteboard: str) -> str:
1✔
715
    """Get the access rating tag from the whiteboard.
716

717
    Args:
718
        whiteboard: a whiteboard string that contains an access rating tag.
719

720
    Returns:
721
        An access rating tag.
722
    """
723

724
    access_tags = WHITEBOARD_ACCESS_PAT.findall(whiteboard)
×
725
    assert len(access_tags) == 1, "Should have only one access tag"
×
726

727
    return access_tags[0]
×
728

729

730
def create_bug(bug_data: dict) -> dict:
1✔
731
    """Create a new bug.
732

733
    Args:
734
        bug_data: The bug data to create.
735

736
    Returns:
737
        A dictionary with the bug id of the newly created bug.
738
    """
739
    resp = requests.post(
×
740
        url=Bugzilla.API_URL,
741
        json=bug_data,
742
        headers=Bugzilla([]).get_header(),
743
        verify=True,
744
        timeout=Bugzilla.TIMEOUT,
745
    )
746
    resp.raise_for_status()
×
747
    return resp.json()
×
748

749

750
def is_keywords_removed_by_autonag(bug: dict, keywords: Iterable) -> bool:
1✔
751
    """Check if the bug had any of the provided keywords removed by autonag.
752

753
    Args:
754
        bug: The bug to check.
755
        keywords: The keywords to check.
756

757
    Returns:
758
        True if any of the keywords was removed by autonag, False otherwise.
759
    """
760
    return any(
×
761
        keyword in change["removed"]
762
        for entry in bug["history"]
763
        if entry["who"] == BOT_MAIN_ACCOUNT
764
        for change in entry["changes"]
765
        if change["field_name"] == "keywords"
766
        for keyword in keywords
767
    )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc