• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mozilla / relman-auto-nag / #4569

pending completion
#4569

push

coveralls-python

web-flow
Turn off BugBug for the Web Compatibility product (#2104)

It was already disabled on all existing components. However, we recently
added another component called "Knowledge Base", where we also do not
want to get nagged, as some flags depend on internal processes.

Instead of adding another Component-exception, let's turn it off for the
entire product.

641 of 3223 branches covered (19.89%)

1823 of 8011 relevant lines covered (22.76%)

0.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/bugbot/iam.py
1
# This Source Code Form is subject to the terms of the Mozilla Public
2
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
3
# You can obtain one at http://mozilla.org/MPL/2.0/.
4

5
import argparse
×
6
import json
×
7
import os
×
8
from typing import Dict
×
9

10
import requests
×
11
from libmozdata.bugzilla import BugzillaUser
×
12

13
from . import logger, utils
×
14

15

16
def get_access_token():
×
17
    scope = {
×
18
        "classification": [
19
            "mozilla_confidential",
20
            "workgroup:staff_only",
21
            "public",
22
            "workgroup",
23
        ],
24
        "display": ["staff", "ndaed", "vouched", "authenticated", "public", "none"],
25
        "search": ["all"],
26
    }
27
    scope = " ".join(
×
28
        f"{key}:{value}" for key, values in scope.items() for value in values
29
    )
30

31
    payload = {
×
32
        "client_id": utils.get_login_info()["iam_client_id"],
33
        "client_secret": utils.get_login_info()["iam_client_secret"],
34
        "audience": "api.sso.mozilla.com",
35
        "scope": scope,
36
        "grant_type": "client_credentials",
37
    }
38

39
    resp = requests.post("https://auth.mozilla.auth0.com/oauth/token", json=payload)
×
40
    access = resp.json()
×
41

42
    assert "access_token" in access.keys()
×
43

44
    return access["access_token"]
×
45

46

47
def clean_data(d):
×
48
    if isinstance(d, dict):
×
49
        for k in ["metadata", "signature"]:
×
50
            if k in d:
×
51
                del d[k]
×
52

53
        for v in d.values():
×
54
            clean_data(v)
×
55
    elif isinstance(d, list):
×
56
        for v in d:
×
57
            clean_data(v)
×
58

59

60
def get_email_info(email):
×
61
    headers = {"Authorization": f"Bearer {get_access_token()}"}
×
62
    resp = requests.get(
×
63
        f"https://person.api.sso.mozilla.com/v2/user/primary_email/{email}",
64
        headers=headers,
65
    )
66
    data = resp.json()
×
67
    clean_data(data)
×
68

69
    return data
×
70

71

72
def get_all_info(output_dir=""):
×
73
    headers = {"Authorization": f"Bearer {get_access_token()}"}
×
74
    resp = requests.get(
×
75
        "https://person.api.sso.mozilla.com/v2/users/id/all/by_attribute_contains?staff_information.staff=True&active=True&fullProfiles=True",
76
        headers=headers,
77
    )
78
    data = resp.json()
×
79
    clean_data(data)
×
80

81
    next_page = data["nextPage"]
×
82

83
    while next_page is not None:
×
84
        print(f"{next_page}")
×
85
        resp = requests.get(
×
86
            f"https://person.api.sso.mozilla.com/v2/users/id/all/by_attribute_contains?staff_information.staff=True&active=True&fullProfiles=True&nextPage={next_page}",
87
            headers=headers,
88
        )
89
        d = resp.json()
×
90
        clean_data(d)
×
91
        data["users"] += d["users"]
×
92
        next_page = d["nextPage"]
×
93

94
    del data["nextPage"]
×
95

96
    if output_dir:
×
97
        with open(os.path.join(output_dir, "iam_dump.json"), "w") as Out:
×
98
            json.dump(data, Out, sort_keys=True, indent=4, separators=(",", ": "))
×
99

100
    return data
×
101

102

103
def get_phonebook_dump(output_dir=""):
×
104
    data = None
×
105
    if output_dir:
×
106
        path = os.path.join(output_dir, "iam_dump.json")
×
107
        if os.path.isfile(path):
×
108
            with open(path, "r") as In:
×
109
                data = json.load(In)
×
110
    if not data:
×
111
        data = get_all_info(output_dir=output_dir)
×
112

113
    all_cns = {}
×
114
    all_dns = {}
×
115

116
    must_have = {
×
117
        "first_name",
118
        "last_name",
119
        "identities",
120
        "access_information",
121
        "staff_information",
122
    }
123
    new_data = {}
×
124
    for person in data["users"]:
×
125
        person = person["profile"]
×
126

127
        if not (must_have < set(person.keys())):
×
128
            continue
×
129

130
        if not person["access_information"]["hris"]["values"]:
×
131
            continue
×
132
        mail = person["access_information"]["hris"]["values"]["primary_work_email"]
×
133
        dn = person["identities"]["mozilla_ldap_id"]["value"]
×
134
        manager_mail = person["access_information"]["hris"]["values"][
×
135
            "managers_primary_work_email"
136
        ]
137
        if not manager_mail:
×
138
            manager_mail = mail
×
139

140
        _mail = person["identities"]["mozilla_ldap_primary_email"]["value"]
×
141
        assert mail == _mail
×
142

143
        ismanager = person["staff_information"]["manager"]["value"]
×
144
        isdirector = person["staff_information"]["director"]["value"]
×
145
        cn = "{} {}".format(person["first_name"]["value"], person["last_name"]["value"])
×
146
        bugzillaEmail = ""
×
147
        bugzillaID = None
×
148
        if "bugzilla_mozilla_org_primary_email" in person["identities"]:
×
149
            bugzillaEmail = person["identities"]["bugzilla_mozilla_org_primary_email"][
×
150
                "value"
151
            ]
152
            bugzillaID = person["identities"]["bugzilla_mozilla_org_id"]["value"]
×
153

154
        im = None
×
155

156
        values = person.get("usernames", {}).get("values", None)
×
157
        if values is not None:
×
158
            if not bugzillaEmail and "HACK#BMOMAIL" in values:
×
159
                bugzillaEmail = values["HACK#BMOMAIL"]
×
160

161
            values.pop("LDAP-posix_id", None)
×
162
            values.pop("LDAP-posix_uid", None)
×
163
            im = list(values.values())
×
164

165
        if bugzillaEmail is None:
×
166
            bugzillaEmail = ""
×
167

168
        title = person["staff_information"]["title"]["value"]
×
169
        all_cns[mail] = cn
×
170
        all_dns[mail] = dn
×
171

172
        new = {
×
173
            "mail": mail,
174
            "manager": {"cn": "", "dn": manager_mail},
175
            "ismanager": "TRUE" if ismanager else "FALSE",
176
            "isdirector": "TRUE" if isdirector else "FALSE",
177
            "cn": cn,
178
            "dn": dn,
179
            "bugzillaEmail": bugzillaEmail,
180
            "bugzillaID": bugzillaID,
181
            "title": title,
182
        }
183

184
        if im:
×
185
            new["im"] = im
×
186

187
        new_data[mail] = new
×
188

189
    to_remove = []
×
190
    for mail, person in new_data.items():
×
191
        manager_mail = person["manager"]["dn"]
×
192
        if manager_mail not in all_cns:
×
193
            # no manager
194
            to_remove.append(mail)
×
195
            continue
×
196
        manager_cn = all_cns[manager_mail]
×
197
        manager_dn = all_dns[manager_mail]
×
198
        person["manager"]["cn"] = manager_cn
×
199
        person["manager"]["dn"] = manager_dn
×
200

201
    for mail in to_remove:
×
202
        del new_data[mail]
×
203

204
    update_bugzilla_emails(new_data)
×
205
    new_data = list(new_data.values())
×
206

207
    with open("./configs/people.json", "w") as Out:
×
208
        json.dump(new_data, Out, sort_keys=True, indent=4, separators=(",", ": "))
×
209

210

211
def update_bugzilla_emails(data: Dict[str, dict]) -> None:
×
212
    """Update the bugzilla emails based on the Bugzilla ID and the Mozilla LDAP email.
213

214
    Args:
215
        data: The data to update.
216
    """
217

218
    users_by_bugzilla_id = {
×
219
        int(person["bugzillaID"]): person
220
        for person in data.values()
221
        if person["bugzillaID"]
222
    }
223

224
    # Currently employees can have permissions if they use their Mozilla email
225
    # without the need to link their Bugzilla accounts to PMO. Thus we check here
226
    # if employees already have a Bugzilla account using their Mozilla emails.
227
    #
228
    # Once BMO and PMO are fully integrated (plan in progress), this will be
229
    # changed and employees will not have permissions unless they link their
230
    # Bugzilla account to PMO.
231
    users_to_check = [*data, *users_by_bugzilla_id]
×
232

233
    def handler(bz_user, data):
×
234
        if bz_user["id"] in users_by_bugzilla_id:
×
235
            person = users_by_bugzilla_id[bz_user["id"]]
×
236
        elif bz_user["name"] in data:
×
237
            person = data[bz_user["name"]]
×
238
        else:
239
            raise Exception(f"Can't find {bz_user['name']} in the data")
×
240

241
        if (
×
242
            person.get("found_on_bugzilla")
243
            and str(bz_user["id"]) != person["bugzillaID"]
244
        ):
245
            # If the linked Bugzilla account is still active, we should not
246
            # overwrite it with the other account.
247
            return
×
248

249
        person["found_on_bugzilla"] = True
×
250
        if person["bugzillaEmail"] != bz_user["name"]:
×
251
            logger.info(
×
252
                "Update bugzilla email for %s from '%s' to '%s'",
253
                person["cn"],
254
                person["bugzillaEmail"],
255
                bz_user["name"],
256
            )
257
            person["bugzillaEmail"] = bz_user["name"]
×
258

259
    def fault_user_handler(bz_user, data):
×
260
        logger.debug("Can't find %s on bugzilla", bz_user["name"])
×
261

262
    BugzillaUser(
×
263
        users_to_check,
264
        include_fields=["id", "name"],
265
        user_handler=handler,
266
        user_data=data,
267
        fault_user_handler=fault_user_handler,
268
    ).wait()
269

270
    for person in data.values():
×
271
        if "found_on_bugzilla" not in person:
×
272
            person["found_on_bugzilla"] = False
×
273

274

275
if __name__ == "__main__":
×
276
    parser = argparse.ArgumentParser(
×
277
        description="Generate an old phonebook dump using IAM api"
278
    )
279
    parser.add_argument(
×
280
        "-o",
281
        "--output",
282
        dest="output",
283
        action="store",
284
        default="",
285
        help="Output directory where to dump temporary IAM data",
286
    )
287
    args = parser.parse_args()
×
288
    try:
×
289
        get_phonebook_dump(output_dir=args.output)
×
290
    except Exception:
×
291
        logger.exception("Tool iam")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc