• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mozilla / relman-auto-nag / #4176

pending completion
#4176

push

coveralls-python

suhaibmujahid
Generate the release cycle span based on the Firefox Trains API instead of the wiki page

553 of 3095 branches covered (17.87%)

7 of 7 new or added lines in 1 file covered. (100.0%)

1799 of 8002 relevant lines covered (22.48%)

0.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

31.15
/auto_nag/utils.py
1
# This Source Code Form is subject to the terms of the Mozilla Public
2
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
3
# You can obtain one at http://mozilla.org/MPL/2.0/.
4

5
import copy
1✔
6
import datetime
1✔
7
import json
1✔
8
import os
1✔
9
import random
1✔
10
import re
1✔
11
from typing import Iterable, Union
1✔
12
from urllib.parse import urlencode
1✔
13

14
import dateutil.parser
1✔
15
import humanize
1✔
16
import pytz
1✔
17
import requests
1✔
18
from dateutil.relativedelta import relativedelta
1✔
19
from libmozdata import release_calendar as rc
1✔
20
from libmozdata import utils as lmdutils
1✔
21
from libmozdata import versions as lmdversions
1✔
22
from libmozdata.bugzilla import Bugzilla, BugzillaShorten
1✔
23
from libmozdata.fx_trains import FirefoxTrains
1✔
24
from libmozdata.hgmozilla import Mercurial
1✔
25
from requests.exceptions import HTTPError
1✔
26

27
from auto_nag.constants import (
1✔
28
    BOT_MAIN_ACCOUNT,
29
    HIGH_PRIORITY,
30
    HIGH_SEVERITY,
31
    OLD_SEVERITY_MAP,
32
)
33

34
_CONFIG = None
1✔
35
_CYCLE_SPAN = None
1✔
36
_MERGE_DAY = None
1✔
37
_TRIAGE_OWNERS = None
1✔
38
_DEFAULT_ASSIGNEES = None
1✔
39
_CURRENT_VERSIONS = None
1✔
40
_CONFIG_PATH = "./auto_nag/scripts/configs/"
1✔
41

42

43
BZ_FIELD_PAT = re.compile(r"^[fovj]([0-9]+)$")
1✔
44
PAR_PAT = re.compile(r"\([^\)]*\)")
1✔
45
BRA_PAT = re.compile(r"\[[^\]]*\]")
1✔
46
DIA_PAT = re.compile("<[^>]*>")
1✔
47
UTC_PAT = re.compile(r"UTC\+[^ \t]*")
1✔
48
COL_PAT = re.compile(":[^:]*")
1✔
49
BACKOUT_PAT = re.compile("^back(s|(ed))?[ \t]*out", re.I)
1✔
50
BUG_PAT = re.compile(r"^bug[s]?[ \t]*([0-9]+)", re.I)
1✔
51
WHITEBOARD_ACCESS_PAT = re.compile(r"\[access\-s\d\]")
1✔
52

53
MAX_URL_LENGTH = 512
1✔
54

55

56
def get_weekdays():
1✔
57
    return {"Mon": 0, "Tue": 1, "Wed": 2, "Thu": 3, "Fri": 4, "Sat": 5, "Sun": 6}
1✔
58

59

60
def _get_config():
1✔
61
    global _CONFIG
62
    if _CONFIG is None:
1✔
63
        try:
1✔
64
            with open(_CONFIG_PATH + "/tools.json", "r") as In:
1✔
65
                _CONFIG = json.load(In)
1✔
66
        except IOError:
×
67
            _CONFIG = {}
×
68
    return _CONFIG
1✔
69

70

71
def get_config(name, entry, default=None):
1✔
72
    conf = _get_config()
1✔
73
    if name not in conf:
1✔
74
        name = "common"
1✔
75
    tool_conf = conf[name]
1✔
76
    if entry in tool_conf:
1✔
77
        return tool_conf[entry]
1✔
78
    tool_conf = conf["common"]
1✔
79
    return tool_conf.get(entry, default)
1✔
80

81

82
def get_receivers(tool_name):
1✔
83
    receiver_lists = get_config("common", "receiver_list", default={})
×
84

85
    receivers = get_config(tool_name, "receivers", [])
×
86
    if isinstance(receivers, str):
×
87
        receivers = receiver_lists[receivers]
×
88

89
    additional_receivers = get_config(tool_name, "additional_receivers", [])
×
90
    if isinstance(additional_receivers, str):
×
91
        additional_receivers = receiver_lists[additional_receivers]
×
92

93
    return list(dict.fromkeys([*receivers, *additional_receivers]))
×
94

95

96
def init_random():
1✔
97
    now = datetime.datetime.utcnow()
1✔
98
    now = now.timestamp()
1✔
99
    random.seed(now)
1✔
100

101

102
def get_signatures(sgns):
1✔
103
    if not sgns:
1!
104
        return set()
×
105

106
    res = set()
1✔
107
    sgns = map(lambda x: x.strip(), sgns.split("[@"))
1✔
108
    for s in filter(None, sgns):
1✔
109
        try:
1✔
110
            i = s.rindex("]")
1✔
111
            res.add(s[:i].strip())
1✔
112
        except ValueError:
×
113
            res.add(s)
×
114
    return res
1✔
115

116

117
def add_signatures(old, new):
1✔
118
    added_sgns = "[@ " + "]\n[@ ".join(sorted(new)) + "]"
1✔
119
    if old:
1✔
120
        return old + "\n" + added_sgns
1✔
121
    return added_sgns
1✔
122

123

124
def get_empty_assignees(params, negation=False):
1✔
125
    n = get_last_field_num(params)
1✔
126
    n = int(n)
1✔
127
    params.update(
1✔
128
        {
129
            "j" + str(n): "OR",
130
            "f" + str(n): "OP",
131
            "f" + str(n + 1): "assigned_to",
132
            "o" + str(n + 1): "equals",
133
            "v" + str(n + 1): "nobody@mozilla.org",
134
            "f" + str(n + 2): "assigned_to",
135
            "o" + str(n + 2): "regexp",
136
            "v" + str(n + 2): r"^.*\.bugs$",
137
            "f" + str(n + 3): "assigned_to",
138
            "o" + str(n + 3): "isempty",
139
            "f" + str(n + 4): "CP",
140
        }
141
    )
142
    if negation:
1!
143
        params["n" + str(n)] = 1
×
144

145
    return params
1✔
146

147

148
def is_no_assignee(mail):
1✔
149
    return mail == "nobody@mozilla.org" or mail.endswith(".bugs") or mail == ""
1✔
150

151

152
def get_login_info():
1✔
153
    with open(_CONFIG_PATH + "config.json", "r") as In:
×
154
        return json.load(In)
×
155

156

157
def get_private():
1✔
158
    with open(_CONFIG_PATH + "config.json", "r") as In:
×
159
        return json.load(In)["private"]
×
160

161

162
def plural(sword, data, pword=""):
1✔
163
    if isinstance(data, int):
1!
164
        p = data != 1
×
165
    else:
166
        p = len(data) != 1
1✔
167
    if not p:
1✔
168
        return sword
1✔
169
    if pword:
1!
170
        return pword
1✔
171
    return sword + "s"
×
172

173

174
def english_list(items):
1✔
175
    assert len(items) > 0
1✔
176
    if len(items) == 1:
1!
177
        return items[0]
×
178

179
    return "{} and {}".format(", ".join(items[:-1]), items[-1])
1✔
180

181

182
def shorten_long_bz_url(url):
1✔
183
    if not url or len(url) <= MAX_URL_LENGTH:
×
184
        return url
×
185

186
    # the url can be very long and line length are limited in email protocol:
187
    # https://datatracker.ietf.org/doc/html/rfc5322#section-2.1.1
188
    # So we need to generate a short URL.
189

190
    def url_handler(u, data):
×
191
        data["url"] = u
×
192

193
    data = {}
×
194
    try:
×
195
        BugzillaShorten(url, url_data=data, url_handler=url_handler).wait()
×
196
    except HTTPError:  # workaround for https://github.com/mozilla/relman-auto-nag/issues/1402
×
197
        return "\n".join(
×
198
            [url[i : i + MAX_URL_LENGTH] for i in range(0, len(url), MAX_URL_LENGTH)]
199
        )
200

201
    return data["url"]
×
202

203

204
def get_cycle_span() -> str:
1✔
205
    """Return the cycle span in the format YYYYMMDD-YYYYMMDD"""
206
    global _CYCLE_SPAN
207
    if _CYCLE_SPAN is None:
×
208
        schedule = FirefoxTrains().get_release_schedule("nightly")
×
209
        start = lmdutils.get_date_ymd(schedule["nightly_start"])
×
210
        end = lmdutils.get_date_ymd(schedule["merge_day"])
×
211

212
        now = lmdutils.get_date_ymd("today")
×
213
        assert start <= now <= end
×
214

215
        _CYCLE_SPAN = start.strftime("%Y%m%d") + "-" + end.strftime("%Y%m%d")
×
216

217
    return _CYCLE_SPAN
×
218

219

220
def get_next_release_date():
1✔
221
    return rc.get_next_release_date()
×
222

223

224
def get_release_calendar():
1✔
225
    return rc.get_calendar()
×
226

227

228
def get_merge_day():
1✔
229
    global _MERGE_DAY
230
    if _MERGE_DAY is None:
×
231
        cal = get_release_calendar()
×
232
        _MERGE_DAY = cal[0]["merge"]
×
233
    return _MERGE_DAY
×
234

235

236
def is_merge_day():
1✔
237
    next_merge = get_merge_day()
×
238
    today = lmdutils.get_date_ymd("today")
×
239

240
    return next_merge == today
×
241

242

243
def get_report_bugs(channel, op="+"):
1✔
244
    url = "https://bugzilla.mozilla.org/page.cgi?id=release_tracking_report.html"
×
245
    params = {
×
246
        "q": "approval-mozilla-{}:{}:{}:0:and:".format(channel, op, get_cycle_span())
247
    }
248

249
    # allow_redirects=False avoids to load the data
250
    # and we'll just get the redirected url to get all the bug ids we need
251
    r = requests.get(url, params=params, allow_redirects=False)
×
252

253
    # something like https://bugzilla.mozilla.org/buglist.cgi?bug_id=1493711,1502766,1499908
254
    url = r.headers["Location"]
×
255

256
    return url.split("=")[1].split(",")
×
257

258

259
def get_flag(version, name, channel):
1✔
260
    if name in ["status", "tracking"]:
1!
261
        if channel == "esr":
1✔
262
            return "cf_{}_firefox_esr{}".format(name, version)
1✔
263
        return "cf_{}_firefox{}".format(name, version)
1✔
264
    elif name == "approval":
×
265
        if channel == "esr":
×
266
            return "approval-mozilla-esr{}".format(version)
×
267
        return "approval-mozilla-{}".format(channel)
×
268

269

270
def get_needinfo(bug, days=-1):
1✔
271
    now = datetime.datetime.utcnow().replace(tzinfo=pytz.utc)
×
272
    for flag in bug.get("flags", []):
×
273
        if flag.get("name", "") == "needinfo" and flag["status"] == "?":
×
274
            date = flag["modification_date"]
×
275
            date = dateutil.parser.parse(date)
×
276
            if (now - date).days >= days:
×
277
                yield flag
×
278

279

280
def get_last_field_num(params):
1✔
281
    s = set()
1✔
282
    for k in params.keys():
1✔
283
        m = BZ_FIELD_PAT.match(k)
1✔
284
        if m:
1✔
285
            s.add(int(m.group(1)))
1✔
286

287
    x = max(s) + 1 if s else 1
1✔
288
    return str(x)
1✔
289

290

291
def add_prod_comp_to_query(params, prod_comp):
1✔
292
    n = int(get_last_field_num(params))
×
293
    params[f"j{n}"] = "OR"
×
294
    params[f"f{n}"] = "OP"
×
295
    n += 1
×
296
    for pc in prod_comp:
×
297
        prod, comp = pc.split("::")
×
298
        params[f"j{n}"] = "AND"
×
299
        params[f"f{n}"] = "OP"
×
300
        n += 1
×
301
        params[f"f{n}"] = "product"
×
302
        params[f"o{n}"] = "equals"
×
303
        params[f"v{n}"] = prod
×
304
        n += 1
×
305
        params[f"f{n}"] = "component"
×
306
        params[f"o{n}"] = "equals"
×
307
        params[f"v{n}"] = comp
×
308
        n += 1
×
309
        params[f"f{n}"] = "CP"
×
310
        n += 1
×
311
    params[f"f{n}"] = "CP"
×
312

313

314
def get_bz_search_url(params):
1✔
315
    return "https://bugzilla.mozilla.org/buglist.cgi?" + urlencode(params, doseq=True)
1✔
316

317

318
def has_bot_set_ni(bug):
1✔
319
    bot = get_config("common", "bot_bz_mail")
×
320
    for flag in get_needinfo(bug):
×
321
        if flag["setter"] in bot:
×
322
            return True
×
323
    return False
×
324

325

326
def get_triage_owners():
1✔
327
    global _TRIAGE_OWNERS
328
    if _TRIAGE_OWNERS is not None:
×
329
        return _TRIAGE_OWNERS
×
330

331
    # accessible is the union of:
332
    #  selectable (all product we can see)
333
    #  enterable (all product a user can file bugs into).
334
    prods = get_config("common", "products")
×
335
    url = "https://bugzilla.mozilla.org/rest/product"
×
336
    params = {
×
337
        "type": "accessible",
338
        "include_fields": ["name", "components.name", "components.triage_owner"],
339
        "names": prods,
340
    }
341
    r = requests.get(url, params=params)
×
342
    products = r.json()["products"]
×
343
    _TRIAGE_OWNERS = {}
×
344
    for prod in products:
×
345
        prod_name = prod["name"]
×
346
        for comp in prod["components"]:
×
347
            owner = comp["triage_owner"]
×
348
            if owner and not is_no_assignee(owner):
×
349
                comp_name = comp["name"]
×
350
                pc = f"{prod_name}::{comp_name}"
×
351
                if owner not in _TRIAGE_OWNERS:
×
352
                    _TRIAGE_OWNERS[owner] = [pc]
×
353
                else:
354
                    _TRIAGE_OWNERS[owner].append(pc)
×
355
    return _TRIAGE_OWNERS
×
356

357

358
def get_default_assignees():
1✔
359
    global _DEFAULT_ASSIGNEES
360
    if _DEFAULT_ASSIGNEES is not None:
×
361
        return _DEFAULT_ASSIGNEES
×
362

363
    # accessible is the union of:
364
    #  selectable (all product we can see)
365
    #  enterable (all product a user can file bugs into).
366
    prods = get_config("common", "products")
×
367
    url = "https://bugzilla.mozilla.org/rest/product"
×
368
    params = {
×
369
        "type": "accessible",
370
        "include_fields": ["name", "components.name", "components.default_assigned_to"],
371
        "names": prods,
372
    }
373
    r = requests.get(url, params=params)
×
374
    products = r.json()["products"]
×
375
    _DEFAULT_ASSIGNEES = {}
×
376
    for prod in products:
×
377
        prod_name = prod["name"]
×
378
        _DEFAULT_ASSIGNEES[prod_name] = dap = {}
×
379
        for comp in prod["components"]:
×
380
            comp_name = comp["name"]
×
381
            assignee = comp["default_assigned_to"]
×
382
            dap[comp_name] = assignee
×
383
    return _DEFAULT_ASSIGNEES
×
384

385

386
def organize(bugs, columns, key=None):
1✔
387
    if isinstance(bugs, dict):
×
388
        # we suppose that the values are the bugdata dict
389
        bugs = bugs.values()
×
390

391
    def identity(x):
×
392
        return x
×
393

394
    def bugid_key(x):
×
395
        return -int(x)
×
396

397
    lambdas = {"id": bugid_key}
×
398

399
    def mykey(p):
×
400
        return tuple(lambdas.get(c, identity)(x) for x, c in zip(p, columns))
×
401

402
    if len(columns) >= 2:
×
403
        res = [tuple(info[c] for c in columns) for info in bugs]
×
404
    else:
405
        c = columns[0]
×
406
        res = [info[c] for info in bugs]
×
407

408
    return sorted(res, key=mykey if not key else key)
×
409

410

411
def merge_bz_changes(c1, c2):
1✔
412
    if not c1:
×
413
        return c2
×
414
    if not c2:
×
415
        return c1
×
416

417
    assert set(c1.keys()).isdisjoint(
×
418
        c2.keys()
419
    ), "Merge changes with common keys is not a good idea"
420
    c = copy.deepcopy(c1)
×
421
    c.update(c2)
×
422

423
    return c
×
424

425

426
def is_test_file(path):
1✔
427
    e = os.path.splitext(path)[1][1:].lower()
×
428
    return "test" in path and e not in {"ini", "list", "in", "py", "json", "manifest"}
×
429

430

431
def get_better_name(name):
1✔
432
    if not name:
×
433
        return ""
×
434

435
    def repl(m):
×
436
        if m.start(0) == 0:
×
437
            return m.group(0)
×
438
        return ""
×
439

440
    if name.startswith("Nobody;"):
×
441
        s = "Nobody"
×
442
    else:
443
        s = PAR_PAT.sub("", name)
×
444
        s = BRA_PAT.sub("", s)
×
445
        s = DIA_PAT.sub("", s)
×
446
        s = COL_PAT.sub(repl, s)
×
447
        s = UTC_PAT.sub("", s)
×
448
        s = s.strip()
×
449
        if s.startswith(":"):
×
450
            s = s[1:]
×
451
    return s.encode("utf-8").decode("utf-8")
×
452

453

454
def is_backout(json):
1✔
455
    return json.get("backedoutby", "") != "" or bool(BACKOUT_PAT.search(json["desc"]))
1✔
456

457

458
def get_pushlog(startdate, enddate, channel="nightly"):
1✔
459
    """Get the pushlog from hg.mozilla.org"""
460
    # Get the pushes where startdate <= pushdate <= enddate
461
    # pushlog uses strict inequality, it's why we add +/- 1 second
462
    fmt = "%Y-%m-%d %H:%M:%S"
×
463
    startdate -= relativedelta(seconds=1)
×
464
    startdate = startdate.strftime(fmt)
×
465
    enddate += relativedelta(seconds=1)
×
466
    enddate = enddate.strftime(fmt)
×
467
    url = "{}/json-pushes".format(Mercurial.get_repo_url(channel))
×
468
    r = requests.get(
×
469
        url,
470
        params={"startdate": startdate, "enddate": enddate, "version": 2, "full": 1},
471
    )
472
    return r.json()
×
473

474

475
def get_bugs_from_desc(desc):
1✔
476
    """Get a bug number from the patch description"""
477
    return BUG_PAT.findall(desc)
×
478

479

480
def get_bugs_from_pushlog(startdate, enddate, channel="nightly"):
1✔
481
    pushlog = get_pushlog(startdate, enddate, channel=channel)
×
482
    bugs = set()
×
483
    for push in pushlog["pushes"].values():
×
484
        for chgset in push["changesets"]:
×
485
            if chgset.get("backedoutby", "") != "":
×
486
                continue
×
487
            desc = chgset["desc"]
×
488
            for bug in get_bugs_from_desc(desc):
×
489
                bugs.add(bug)
×
490
    return bugs
×
491

492

493
def get_checked_versions():
1✔
494
    # There are different reasons to not return versions:
495
    # i) we're merge day: the versions are changing
496
    # ii) not consecutive versions numbers
497
    # iii) bugzilla updated nightly version but p-d is not updated
498
    if is_merge_day():
×
499
        return {}
×
500

501
    versions = lmdversions.get(base=True)
×
502
    versions["central"] = versions["nightly"]
×
503

504
    v = [versions[k] for k in ["release", "beta", "central"]]
×
505
    versions = {k: str(v) for k, v in versions.items()}
×
506

507
    if v[0] + 2 == v[1] + 1 == v[2]:
×
508
        nightly_bugzilla = get_nightly_version_from_bz()
×
509
        if v[2] != nightly_bugzilla:
×
510
            from . import logger
×
511

512
            logger.info("Versions mismatch between Bugzilla and product-details")
×
513
            return {}
×
514
        return versions
×
515

516
    from . import logger
×
517

518
    logger.info("Not consecutive versions in product/details")
×
519
    return {}
×
520

521

522
def get_info_from_hg(json):
1✔
523
    res = {}
×
524
    push = json["pushdate"][0]
×
525
    push = datetime.datetime.utcfromtimestamp(push)
×
526
    push = lmdutils.as_utc(push)
×
527
    res["date"] = lmdutils.get_date_str(push)
×
528
    res["backedout"] = json.get("backedoutby", "") != ""
×
529
    m = BUG_PAT.search(json["desc"])
×
530
    res["bugid"] = m.group(1) if m else ""
×
531

532
    return res
×
533

534

535
def bz_ignore_case(s):
1✔
536
    return "[" + "][".join(c + c.upper() for c in s) + "]"
×
537

538

539
def check_product_component(data, bug):
1✔
540
    prod = bug["product"]
×
541
    comp = bug["component"]
×
542
    pc = prod + "::" + comp
×
543
    return pc in data or comp in data
×
544

545

546
def get_components(data):
1✔
547
    res = []
×
548
    for comp in data:
×
549
        if "::" in comp:
×
550
            _, comp = comp.split("::", 1)
×
551
        res.append(comp)
×
552
    return res
×
553

554

555
def get_products_components(data):
1✔
556
    prods = set()
×
557
    comps = set()
×
558
    for pc in data:
×
559
        if "::" in pc:
×
560
            p, c = pc.split("::", 1)
×
561
            prods.add(p)
×
562
        else:
563
            c = pc
×
564
        comps.add(c)
×
565
    return prods, comps
×
566

567

568
def ireplace(old, repl, text):
1✔
569
    return re.sub("(?i)" + re.escape(old), lambda m: repl, text)
×
570

571

572
def get_human_lag(date):
1✔
573
    today = pytz.utc.localize(datetime.datetime.utcnow())
×
574
    dt = dateutil.parser.parse(date) if isinstance(date, str) else date
×
575

576
    return humanize.naturaldelta(today - dt)
×
577

578

579
def get_nightly_version_from_bz():
1✔
580
    def bug_handler(bug, data):
×
581
        status = "cf_status_firefox"
×
582
        N = len(status)
×
583
        for k in bug.keys():
×
584
            if k.startswith(status):
×
585
                k = k[N:]
×
586
                if k.isdigit():
×
587
                    data.append(int(k))
×
588

589
    data = []
×
590
    Bugzilla(bugids=["1234567"], bughandler=bug_handler, bugdata=data).get_data().wait()
×
591

592
    return max(data)
×
593

594

595
def nice_round(val):
1✔
596
    return int(round(100 * val))
×
597

598

599
def is_bot_email(email: str) -> bool:
1✔
600
    """Check if the email is belong to a bot or component-watching account.
601

602
    Args:
603
        email: the account login email.
604
    """
605
    if email.endswith("@disabled.tld"):
1✔
606
        return False
1✔
607

608
    return email.endswith(".bugs") or email.endswith(".tld")
1✔
609

610

611
def get_last_no_bot_comment_date(bug: dict) -> str:
1✔
612
    """Get the create date of the last comment by non bot account.
613

614
    Args:
615
        bug: the bug dictionary; it must has the comments list.
616

617
    Returns:
618
        If no comments or all comments are posted by bots, the creation date of
619
        the bug itself will be returned.
620
    """
621
    for comment in reversed(bug["comments"]):
×
622
        if not is_bot_email(comment["creator"]):
×
623
            return comment["creation_time"]
×
624

625
    return bug["comments"][0]["creation_time"]
×
626

627

628
def get_sort_by_bug_importance_key(bug):
1✔
629
    """
630
    We need bugs with high severity (S1 or S2) or high priority (P1 or P2) to be
631
    first (do not need to be high in both). Next, bugs with higher priority and
632
    severity are preferred. Finally, for bugs with the same severity and priority,
633
    we favour recently changed or created bugs.
634
    """
635

636
    is_important = bug["priority"] in HIGH_PRIORITY or bug["severity"] in HIGH_SEVERITY
×
637
    priority = bug["priority"] if bug["priority"].startswith("P") else "P10"
×
638
    severity = (
×
639
        bug["severity"]
640
        if bug["severity"].startswith("S")
641
        else OLD_SEVERITY_MAP.get(bug["severity"], "S10")
642
    )
643
    time_order = (
×
644
        lmdutils.get_timestamp(bug["last_change_time"])
645
        if "last_change_time" in bug
646
        else int(bug["id"])  # Bug ID reflects the creation order
647
    )
648

649
    return (
×
650
        not is_important,
651
        severity,
652
        priority,
653
        time_order * -1,
654
    )
655

656

657
def get_mail_to_ni(bug: dict) -> Union[dict, None]:
1✔
658
    """Get the person that should be needinfoed about the bug.
659

660
    If the bug is assigned, the assignee will be selected. Otherwise, will
661
    fallback to the triage owner.
662

663
    Args:
664
        bug: The bug that you need to send a needinfo request about.
665

666
    Returns:
667
        A dict with the nicname and the email of the person that should receive
668
        the needinfo request. If not available will return None.
669

670
    """
671

672
    for field in ["assigned_to", "triage_owner"]:
×
673
        person = bug.get(field, "")
×
674
        if not is_no_assignee(person):
×
675
            return {"mail": person, "nickname": bug[f"{field}_detail"]["nick"]}
×
676

677
    return None
×
678

679

680
def get_name_from_user_detail(detail: dict) -> str:
1✔
681
    """Get the name of the user from the detail object.
682

683
    Returns:
684
        The name of the user or the email as a fallback.
685
    """
686
    name = detail["real_name"]
×
687
    if is_no_assignee(detail["email"]):
×
688
        name = "nobody"
×
689
    if name.strip() == "":
×
690
        name = detail["name"]
×
691
        if name.strip() == "":
×
692
            name = detail["email"]
×
693

694
    return name
×
695

696

697
def is_weekend(date: Union[datetime.datetime, str]) -> bool:
1✔
698
    """Get if the provided date is a weekend day (Saturday or Sunday)"""
699
    parsed_date = lmdutils.get_date_ymd(date)
×
700
    return parsed_date.weekday() >= 5
×
701

702

703
def get_whiteboard_access_rating(whiteboard: str) -> str:
1✔
704
    """Get the access rating tag from the whiteboard.
705

706
    Args:
707
        whiteboard: a whiteboard string that contains an access rating tag.
708

709
    Returns:
710
        An access rating tag.
711
    """
712

713
    access_tags = WHITEBOARD_ACCESS_PAT.findall(whiteboard)
×
714
    assert len(access_tags) == 1, "Should have only one access tag"
×
715

716
    return access_tags[0]
×
717

718

719
def create_bug(bug_data: dict) -> dict:
1✔
720
    """Create a new bug.
721

722
    Args:
723
        bug_data: The bug data to create.
724

725
    Returns:
726
        A dictionary with the bug id of the newly created bug.
727
    """
728
    resp = requests.post(
×
729
        url=Bugzilla.API_URL,
730
        json=bug_data,
731
        headers=Bugzilla([]).get_header(),
732
        verify=True,
733
        timeout=Bugzilla.TIMEOUT,
734
    )
735
    resp.raise_for_status()
×
736
    return resp.json()
×
737

738

739
def is_keywords_removed_by_autonag(bug: dict, keywords: Iterable) -> bool:
1✔
740
    """Check if the bug had any of the provided keywords removed by autonag.
741

742
    Args:
743
        bug: The bug to check.
744
        keywords: The keywords to check.
745

746
    Returns:
747
        True if any of the keywords was removed by autonag, False otherwise.
748
    """
749
    return any(
×
750
        keyword in change["removed"]
751
        for entry in bug["history"]
752
        if entry["who"] == BOT_MAIN_ACCOUNT
753
        for change in entry["changes"]
754
        if change["field_name"] == "keywords"
755
        for keyword in keywords
756
    )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc