• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mozilla / relman-auto-nag / #5082

14 Jun 2024 02:51PM CUT coverage: 21.852% (-0.005%) from 21.857%
#5082

push

coveralls-python

benjaminmah
Moved `stackGraph` check to another function, replaced `.get()` with direct hash map access

716 of 3602 branches covered (19.88%)

0 of 11 new or added lines in 1 file covered. (0.0%)

1 existing line in 1 file now uncovered.

1933 of 8846 relevant lines covered (21.85%)

0.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/bugbot/iam.py
1
# This Source Code Form is subject to the terms of the Mozilla Public
2
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
3
# You can obtain one at http://mozilla.org/MPL/2.0/.
4

5
import argparse
×
6
import json
×
7
import os
×
8
from typing import Dict
×
9

10
import requests
×
11
from libmozdata.bugzilla import BugzillaUser
×
12

13
from . import logger, utils
×
14

15

16
def get_access_token():
×
17
    scope = {
×
18
        "classification": [
19
            "mozilla_confidential",
20
            "workgroup:staff_only",
21
            "public",
22
            "workgroup",
23
        ],
24
        "display": ["staff", "ndaed", "vouched", "authenticated", "public", "none"],
25
        "search": ["all"],
26
    }
27
    scope = " ".join(
×
28
        f"{key}:{value}" for key, values in scope.items() for value in values
29
    )
30

31
    payload = {
×
32
        "client_id": utils.get_login_info()["iam_client_id"],
33
        "client_secret": utils.get_login_info()["iam_client_secret"],
34
        "audience": "api.sso.mozilla.com",
35
        "scope": scope,
36
        "grant_type": "client_credentials",
37
    }
38

39
    resp = requests.post("https://auth.mozilla.auth0.com/oauth/token", json=payload)
×
40
    access = resp.json()
×
41

42
    assert "access_token" in access.keys()
×
43

44
    return access["access_token"]
×
45

46

47
def clean_data(d):
×
48
    if isinstance(d, dict):
×
49
        for k in ["metadata", "signature"]:
×
50
            if k in d:
×
51
                del d[k]
×
52

53
        for v in d.values():
×
54
            clean_data(v)
×
55
    elif isinstance(d, list):
×
56
        for v in d:
×
57
            clean_data(v)
×
58

59

60
def get_email_info(email):
×
61
    headers = {"Authorization": f"Bearer {get_access_token()}"}
×
62
    resp = requests.get(
×
63
        f"https://person.api.sso.mozilla.com/v2/user/primary_email/{email}",
64
        headers=headers,
65
    )
66
    data = resp.json()
×
67
    clean_data(data)
×
68

69
    return data
×
70

71

72
def get_all_info(output_dir=""):
×
73
    headers = {"Authorization": f"Bearer {get_access_token()}"}
×
74
    resp = requests.get(
×
75
        "https://person.api.sso.mozilla.com/v2/users/id/all/by_attribute_contains?staff_information.staff=True&active=True&fullProfiles=True",
76
        headers=headers,
77
    )
78
    data = resp.json()
×
79
    clean_data(data)
×
80

81
    next_page = data["nextPage"]
×
82

83
    while next_page is not None:
×
84
        print(f"{next_page}")
×
85
        resp = requests.get(
×
86
            f"https://person.api.sso.mozilla.com/v2/users/id/all/by_attribute_contains?staff_information.staff=True&active=True&fullProfiles=True&nextPage={next_page}",
87
            headers=headers,
88
        )
89
        d = resp.json()
×
90
        clean_data(d)
×
91
        data["users"] += d["users"]
×
92
        next_page = d["nextPage"]
×
93

94
    del data["nextPage"]
×
95

96
    if output_dir:
×
97
        with open(os.path.join(output_dir, "iam_dump.json"), "w") as Out:
×
98
            json.dump(data, Out, sort_keys=True, indent=4, separators=(",", ": "))
×
99

100
    return data
×
101

102

103
def get_phonebook_dump(output_dir=""):
×
104
    data = None
×
105
    if output_dir:
×
106
        path = os.path.join(output_dir, "iam_dump.json")
×
107
        if os.path.isfile(path):
×
108
            with open(path, "r") as In:
×
109
                data = json.load(In)
×
110
    if not data:
×
111
        data = get_all_info(output_dir=output_dir)
×
112

113
    all_cns = {}
×
114
    all_dns = {}
×
115

116
    must_have = {
×
117
        "first_name",
118
        "last_name",
119
        "identities",
120
        "access_information",
121
        "staff_information",
122
    }
123
    new_data = {}
×
124
    for person in data["users"]:
×
125
        person = person["profile"]
×
126

127
        if not (must_have < set(person.keys())):
×
128
            continue
×
129

130
        if not person["access_information"]["hris"]["values"]:
×
131
            continue
×
132
        mail = person["access_information"]["hris"]["values"]["primary_work_email"]
×
133
        dn = person["identities"]["mozilla_ldap_id"]["value"]
×
134
        manager_mail = person["access_information"]["hris"]["values"][
×
135
            "managers_primary_work_email"
136
        ]
137
        if not manager_mail:
×
138
            manager_mail = mail
×
139

140
        _mail = person["identities"]["mozilla_ldap_primary_email"]["value"]
×
141
        assert mail == _mail
×
142

143
        ismanager = person["staff_information"]["manager"]["value"]
×
144
        isdirector = person["staff_information"]["director"]["value"]
×
145
        cn = "{} {}".format(person["first_name"]["value"], person["last_name"]["value"])
×
146
        bugzillaEmail = ""
×
147
        bugzillaID = None
×
148
        if "bugzilla_mozilla_org_primary_email" in person["identities"]:
×
149
            bugzillaEmail = person["identities"]["bugzilla_mozilla_org_primary_email"][
×
150
                "value"
151
            ]
152
            bugzillaID = person["identities"]["bugzilla_mozilla_org_id"]["value"]
×
153

154
        im = None
×
155

156
        values = person.get("usernames", {}).get("values", None)
×
157
        if values is not None:
×
158
            if not bugzillaEmail and "HACK#BMOMAIL" in values:
×
159
                bugzillaEmail = values["HACK#BMOMAIL"]
×
160

161
            values.pop("LDAP-posix_id", None)
×
162
            values.pop("LDAP-posix_uid", None)
×
163
            im = list(values.values())
×
164

165
        if bugzillaEmail is None:
×
166
            bugzillaEmail = ""
×
167

168
        title = person["staff_information"]["title"]["value"]
×
169
        all_cns[mail] = cn
×
170
        all_dns[mail] = dn
×
171

172
        new = {
×
173
            "mail": mail,
174
            "manager": {"cn": "", "dn": manager_mail},
175
            "ismanager": "TRUE" if ismanager else "FALSE",
176
            "isdirector": "TRUE" if isdirector else "FALSE",
177
            "cn": cn,
178
            "dn": dn,
179
            "bugzillaEmail": bugzillaEmail,
180
            "bugzillaID": bugzillaID,
181
            "title": title,
182
        }
183

184
        if im:
×
185
            new["im"] = im
×
186

187
        new_data[mail] = new
×
188

189
    to_remove = []
×
190
    for mail, person in new_data.items():
×
191
        manager_mail = person["manager"]["dn"]
×
192
        if manager_mail not in all_cns:
×
193
            # no manager
194
            to_remove.append(mail)
×
195
            continue
×
196
        manager_cn = all_cns[manager_mail]
×
197
        manager_dn = all_dns[manager_mail]
×
198
        person["manager"]["cn"] = manager_cn
×
199
        person["manager"]["dn"] = manager_dn
×
200

201
    for mail in to_remove:
×
202
        del new_data[mail]
×
203

204
    update_bugzilla_emails(new_data)
×
205
    new_data = list(new_data.values())
×
206

207
    with open("./configs/people.json", "w") as Out:
×
208
        json.dump(new_data, Out, sort_keys=True, indent=4, separators=(",", ": "))
×
209

210

211
def update_bugzilla_emails(data: Dict[str, dict]) -> None:
×
212
    """Update the bugzilla emails based on the Bugzilla ID and the Mozilla LDAP email.
213

214
    Args:
215
        data: The data to update.
216
    """
217

218
    users_by_bugzilla_id = {
×
219
        int(person["bugzillaID"]): person
220
        for person in data.values()
221
        if person["bugzillaID"]
222
    }
223

224
    # Currently employees can have permissions if they use their Mozilla email
225
    # without the need to link their Bugzilla accounts to PMO. Thus we check here
226
    # if employees already have a Bugzilla account using their Mozilla emails.
227
    #
228
    # Once BMO and PMO are fully integrated (plan in progress), this will be
229
    # changed and employees will not have permissions unless they link their
230
    # Bugzilla account to PMO.
231
    users_to_check = [*data, *users_by_bugzilla_id]
×
232

233
    def handler(bz_user, data):
×
234
        if bz_user["id"] in users_by_bugzilla_id:
×
235
            person = users_by_bugzilla_id[bz_user["id"]]
×
236
        elif bz_user["name"].lower() in data:
×
237
            person = data[bz_user["name"].lower()]
×
238
        else:
239
            raise Exception(f"Can't find {bz_user['name']} in the data")
×
240

241
        if (
×
242
            person.get("found_on_bugzilla")
243
            and str(bz_user["id"]) != person["bugzillaID"]
244
        ):
245
            # If the linked Bugzilla account is still active, we should not
246
            # overwrite it with the other account.
247
            return
×
248

249
        person["found_on_bugzilla"] = True
×
250
        if person["bugzillaEmail"] != bz_user["name"]:
×
251
            logger.info(
×
252
                "Update bugzilla email for %s from '%s' to '%s'",
253
                person["cn"],
254
                person["bugzillaEmail"],
255
                bz_user["name"],
256
            )
257
            person["bugzillaEmail"] = bz_user["name"]
×
258

259
    def fault_user_handler(bz_user, data):
×
260
        logger.debug("Can't find %s on bugzilla", bz_user["name"])
×
261

262
    BugzillaUser(
×
263
        users_to_check,
264
        include_fields=["id", "name"],
265
        user_handler=handler,
266
        user_data=data,
267
        fault_user_handler=fault_user_handler,
268
    ).wait()
269

270
    for person in data.values():
×
271
        if "found_on_bugzilla" not in person:
×
272
            person["found_on_bugzilla"] = False
×
273

274

275
if __name__ == "__main__":
×
276
    parser = argparse.ArgumentParser(
×
277
        description="Generate an old phonebook dump using IAM api"
278
    )
279
    parser.add_argument(
×
280
        "-o",
281
        "--output",
282
        dest="output",
283
        action="store",
284
        default="",
285
        help="Output directory where to dump temporary IAM data",
286
    )
287
    args = parser.parse_args()
×
288
    try:
×
289
        get_phonebook_dump(output_dir=args.output)
×
290
    except Exception:
×
291
        logger.exception("Tool iam")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc