• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OCHA-DAP / hdx-scraper-who / 18017304751

25 Sep 2025 06:37PM UTC coverage: 93.898% (-0.06%) from 93.96%
18017304751

Pull #18

github

baripembo
reset params
Pull Request #18: modernize setup and upgrade packages

41 of 44 new or added lines in 6 files covered. (93.18%)

277 of 295 relevant lines covered (93.9%)

0.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.41
/src/hdx/scraper/who/pipeline.py
1
#!/usr/bin/python
2
"""Who scraper"""
3

4
import logging
1✔
5
import re
1✔
6
from collections import OrderedDict
1✔
7
from datetime import datetime
1✔
8
from urllib.parse import quote
1✔
9

10
from hdx.api.configuration import Configuration
1✔
11
from hdx.data.dataset import Dataset
1✔
12
from hdx.data.hdxobject import HDXError
1✔
13
from hdx.data.showcase import Showcase
1✔
14
from hdx.data.vocabulary import Vocabulary
1✔
15
from hdx.location.country import Country
1✔
16
from hdx.utilities.base_downloader import DownloadError
1✔
17
from hdx.utilities.dateparse import parse_date_range
1✔
18
from hdx.utilities.retriever import Retrieve
1✔
19
from slugify import slugify
1✔
20
from sqlalchemy import false, insert, true
1✔
21

22
from .database.db_categories import DBCategories
1✔
23
from .database.db_dimension_values import DBDimensionValues
1✔
24
from .database.db_dimensions import DBDimensions
1✔
25
from .database.db_indicator_data import DBIndicatorData
1✔
26
from .database.db_indicators import DBIndicators
1✔
27

28
logger = logging.getLogger(__name__)
1✔
29

30
_BATCH_SIZE = 1000
1✔
31
_TAG_CLEAN_TABLE = str.maketrans(
1✔
32
    {
33
        "(": "",
34
        ")": "",
35
        "/": "",
36
        ",": "",
37
    }
38
)
39

40

41
def _clean_tag(s: str) -> str:
1✔
42
    """Remove punctuation used by old multiple_replace and trim whitespace."""
43
    return s.translate(_TAG_CLEAN_TABLE).strip()
1✔
44

45

46
class Pipeline:
1✔
47
    def __init__(
1✔
48
        self, configuration: Configuration, retriever: Retrieve, tempdir: str, session
49
    ):
50
        self._configuration = configuration
1✔
51
        self._retriever = retriever
1✔
52
        self._tempdir = tempdir
1✔
53
        self._session = session
1✔
54
        self._dimension_value_names_dict = dict()
1✔
55
        self._hxltags = {
1✔
56
            "GHO (CODE)": "#indicator+code",
57
            "GHO (DISPLAY)": "#indicator+name",
58
            "GHO (URL)": "#indicator+url",
59
            "YEAR (DISPLAY)": "#date+year",
60
            "STARTYEAR": "#date+year+start",
61
            "ENDYEAR": "#date+year+end",
62
            "REGION (CODE)": "#region+code",
63
            "REGION (DISPLAY)": "#region+name",
64
            "COUNTRY (CODE)": "#country+code",
65
            "COUNTRY (DISPLAY)": "#country+name",
66
            "DIMENSION (TYPE)": "#dimension+type",
67
            "DIMENSION (CODE)": "#dimension+code",
68
            "DIMENSION (NAME)": "#dimension+name",
69
            "Numeric": "#indicator+value+num",
70
            "Value": "#indicator+value",
71
            "Low": "#indicator+value+low",
72
            "High": "#indicator+value+high",
73
        }
74

75
    def populate_db(self, populate_db: bool, create_archived_datasets: bool):
1✔
76
        """Populate the database and create convenience dictionaries and
77
        lists
78

79
        Args:
80
            populate_db (bool): populate the database
81

82
        Returns:
83
            None
84
        """
85
        if populate_db:
1✔
86
            self._populate_dimensions_db()
1✔
87
        # This dictionary is needed for populating the other DBs
88
        self._create_dimension_value_names_dict()
1✔
89
        self._create_countries_dict()
1✔
90
        if populate_db:
1✔
91
            self._populate_categories_and_indicators_db()
1✔
92
            self._populate_indicator_data_db(create_archived_datasets)
1✔
93

94
    def get_countries(self):
1✔
95
        """Public method that returns countries in the format required
96
        for progress_starting_folder"""
97
        return [{"Code": country_iso3} for country_iso3 in self._countries_dict.keys()]
1✔
98

99
    def _populate_dimensions_db(self):
1✔
100
        """The main API only provides the dimension codes. This method
101
        queries the dimensions in the API to get their names, that can
102
        be used for quickcharts, etc."""
103
        logger.info("Populating dimensions DB")
1✔
104
        dimensions_url = f"{self._configuration['base_url']}api/dimension"
1✔
105
        dimensions_result = self._retriever.download_json(dimensions_url)["value"]
1✔
106
        for dimensions_row in dimensions_result:
1✔
107
            dimension_code = dimensions_row["Code"]
1✔
108
            dimension_title = dimensions_row["Title"]
1✔
109

110
            db_dimensions_row = DBDimensions(code=dimension_code, title=dimension_title)
1✔
111
            self._session.add(db_dimensions_row)
1✔
112
            self._session.commit()
1✔
113
            dimension_values_url = (
1✔
114
                f"{self._configuration['base_url']}api/DIMENSION/"
115
                f"{dimension_code}/DimensionValues"
116
            )
117
            dimension_values_result = self._retriever.download_json(
1✔
118
                dimension_values_url
119
            )["value"]
120
            for dimension_values_row in dimension_values_result:
1✔
121
                db_dimension_values_row = DBDimensionValues(
1✔
122
                    code=dimension_values_row["Code"],
123
                    title=dimension_values_row["Title"],
124
                    dimension_code=dimension_code,
125
                )
126
                self._session.add(db_dimension_values_row)
1✔
127
            self._session.commit()
1✔
128
        logger.info("Done populating dimensions DB")
1✔
129

130
    def _create_dimension_value_names_dict(self):
1✔
131
        results = self._session.query(DBDimensionValues).all()
1✔
132
        self._dimension_value_names_dict = {row.code: row.title for row in results}
1✔
133

134
    def _create_countries_dict(self):
1✔
135
        results = self._session.query(DBDimensionValues).filter(
1✔
136
            DBDimensionValues.dimension_code == "COUNTRY"
137
        )
138

139
        self._countries_dict = {
1✔
140
            row.code: Country.get_country_name_from_iso3(row.code) for row in results
141
        }
142

143
    def _populate_categories_and_indicators_db(self):
1✔
144
        # Get the indicator results
145
        indicator_url = f"{self._configuration['base_url']}api/indicator"
1✔
146
        indicator_result = self._retriever.download_json(indicator_url)["value"]
1✔
147

148
        # Loop through all indicators and add to table, checking for duplicates
149
        for indicator_row in indicator_result:
1✔
150
            db_indicators_row = DBIndicators(
1✔
151
                code=indicator_row["IndicatorCode"],
152
                title=indicator_row["IndicatorName"],
153
            )
154
            self._session.add(db_indicators_row)
1✔
155
        self._session.commit()
1✔
156

157
        # Get the category results
158
        category_url = (
1✔
159
            f"{self._configuration['category_url']}GHO_MODEL/SF_HIERARCHY_INDICATORS"
160
        )
161
        category_result = self._retriever.download_json(category_url)["value"]
1✔
162

163
        # Loop through categories and add to category DB table, also
164
        # add indicator URL to indicator.
165
        for category_row in category_result:
1✔
166
            # Some indicator codes have "\t" in them on the category page
167
            # which isn't present in the indicator page, such as RADON_Q602,
168
            # so need to .strip()
169
            indicator_code = category_row["INDICATOR_CODE"].strip()
1✔
170
            indicator_url = f"https://www.who.int/data/gho/data/indicators/indicator-details/GHO/{quote(category_row['INDICATOR_URL_NAME'])}"
1✔
171
            category_title = category_row["THEME_TITLE"]
1✔
172

173
            # Add the category to the table
174
            # Categories can repeat but should be unique in combination with
175
            # the indicator code, together the title and indicator code make the PK
176
            category_plus_indicator_exists = (
1✔
177
                self._session.query(DBCategories)
178
                .filter_by(title=category_title, indicator_code=indicator_code)
179
                .first()
180
            )
181
            if category_plus_indicator_exists:
1✔
182
                logger.warning(
×
183
                    f"Category {category_title} with indicator {indicator_code} already exists, skipping"
184
                )
185
                continue
×
186

187
            db_categories_row = DBCategories(
1✔
188
                title=category_title, indicator_code=indicator_code
189
            )
190
            self._session.add(db_categories_row)
1✔
191
            self._session.commit()
1✔
192
            # Add URL to indicator
193
            indicator_row = (
1✔
194
                self._session.query(DBIndicators)
195
                .filter(DBIndicators.code == indicator_code)
196
                .first()
197
            )
198
            if not indicator_row:
1✔
199
                logger.warning(
×
200
                    f"Indicator code {indicator_code} was not found on the "
201
                    f"indicators page"
202
                )
203
                continue
×
204
            else:
205
                indicator_row.url = indicator_url
1✔
206
                indicator_row.to_archive = False
1✔
207
            self._session.commit()
1✔
208

209
    def _create_tags(self, country_iso3: str, to_archive: bool):
1✔
210
        """Use category titles to create tags"""
211
        base_tags = ["hxl", "indicators"]
1✔
212
        if to_archive:
1✔
213
            return base_tags
1✔
214
        tags = []
1✔
215

216
        # This is done in a roundabout way for speed, doing it as a single sql
217
        # query is very slow
218
        country_category_names = []
1✔
219
        all_category_names = [
1✔
220
            row.title
221
            for row in self._session.query(DBCategories.title).distinct().all()
222
        ]
223
        for category_name in all_category_names:
1✔
224
            data_exists = (
1✔
225
                self._session.query(DBIndicatorData)
226
                .join(
227
                    DBIndicators,
228
                    DBIndicators.code == DBIndicatorData.indicator_code,
229
                )
230
                .join(
231
                    DBCategories,
232
                    DBCategories.indicator_code == DBIndicators.code,
233
                )
234
                .filter(DBCategories.title == category_name)
235
                .filter(DBIndicatorData.country_code == country_iso3)
236
                .filter(DBIndicators.to_archive.is_(false()))
237
                .first()
238
            )
239
            if data_exists:
1✔
240
                country_category_names.append(category_name)
1✔
241
        for category_name in country_category_names:
1✔
242
            parts = re.split(r"\s+and\s+", category_name, flags=re.IGNORECASE)
1✔
243
            for part in parts:
1✔
244
                cleaned = _clean_tag(part)
1✔
245
                if cleaned:
1✔
246
                    tags.append(cleaned)
1✔
247

248
        tags = list(OrderedDict.fromkeys(tags).keys())
1✔
249
        tags, _ = Vocabulary.get_mapped_tags(tags)
1✔
250
        tags = base_tags + tags
1✔
251
        return tags
1✔
252

253
    def _populate_indicator_data_db(self, create_archived_datasets: bool):
1✔
254
        for db_row in self._session.query(DBIndicators).all():
1✔
255
            indicator_name = db_row.title
1✔
256
            indicator_url = db_row.url
1✔
257
            indicator_code = db_row.code
1✔
258
            to_archive = db_row.to_archive
1✔
259

260
            # If we're not creating the archived datasets,
261
            # save time by not downloading and populating
262
            # the outdated indicators (there are thousands)
263
            if to_archive and not create_archived_datasets:
1✔
264
                continue
1✔
265

266
            logger.info(f"Downloading file for indicator {indicator_name}")
1✔
267
            base_url = self._configuration["base_url"]
1✔
268
            url = f"{base_url}api/{indicator_code}"
1✔
269
            try:
1✔
270
                indicator_json = self._retriever.download_json(url)
1✔
271
            except (DownloadError, FileNotFoundError):
1✔
272
                logger.warning(f"{url} has no data")
1✔
273
                continue
1✔
274
            logger.info(f"Populating DB for indicator {indicator_name}")
1✔
275

276
            batch = []
1✔
277
            irow = 0
1✔
278
            for row in indicator_json["value"]:
1✔
279
                if row["SpatialDimType"] != "COUNTRY":
1✔
280
                    continue
1✔
281
                country_iso3 = row["SpatialDim"]
1✔
282
                country_name = self._countries_dict[country_iso3]
1✔
283
                startyear = datetime.fromisoformat(row["TimeDimensionBegin"]).strftime(
1✔
284
                    "%Y"
285
                )
286
                endyear = datetime.fromisoformat(row["TimeDimensionEnd"]).strftime("%Y")
1✔
287
                db_indicators_row = dict(
1✔
288
                    id=row["Id"],
289
                    indicator_code=indicator_code,
290
                    indicator_name=indicator_name,
291
                    indicator_url=indicator_url,
292
                    year=row["TimeDim"],
293
                    start_year=startyear,
294
                    end_year=endyear,
295
                    region_code=row["ParentLocationCode"],
296
                    region_display=row["ParentLocation"],
297
                    country_code=country_iso3,
298
                    country_display=country_name,
299
                    dimension_type=row["Dim1Type"],
300
                    dimension_code=row["Dim1"],
301
                    dimension_name=self._dimension_value_names_dict.get(row["Dim1"]),
302
                    numeric=row["NumericValue"],
303
                    value=row["Value"],
304
                    low=row["Low"],
305
                    high=row["High"],
306
                )
307
                batch.append(db_indicators_row)
1✔
308
                irow += 1
1✔
309
                if len(batch) >= _BATCH_SIZE:
1✔
310
                    logger.info(f"Added {irow} rows")
×
311
                    self._session.execute(insert(DBIndicatorData), batch)
×
312
                    batch = []
×
313
            if batch:
1✔
314
                self._session.execute(insert(DBIndicatorData), batch)
1✔
315
            self._session.commit()
1✔
316
            logger.info(f"Done indicator {indicator_name}")
1✔
317

318
    @staticmethod
1✔
319
    def get_showcase(retriever, country_iso3, country_name, slugified_name, alltags):
1✔
320
        try:
1✔
321
            lower_iso3 = country_iso3.lower()
1✔
322
            url = f"https://www.who.int/countries/{lower_iso3}/en/"
1✔
323
            retriever.download_file(url)
1✔
324
            showcase = Showcase(
1✔
325
                {
326
                    "name": f"{slugified_name}-showcase",
327
                    "title": f"Indicators for {country_name}",
328
                    "notes": f"Health indicators for {country_name}",
329
                    "url": url,
330
                    "image_url": f"https://cdn.who.int/media/images/default-source/countries-overview/flags/{lower_iso3}.jpg",
331
                }
332
            )
333
            showcase.add_tags(alltags)
1✔
334
            return showcase
1✔
335
        except DownloadError:
1✔
336
            # If the showcase URL doesn't exist, only return the showcase id
337
            # so that it can be deleted if needed
338
            return Showcase({"name": f"{slugified_name}-showcase"})
1✔
339

340
    def generate_dataset_and_showcase(self, country):
1✔
341
        # Setup the dataset information
342
        country_iso3 = country["Code"]
1✔
343
        country_name = self._countries_dict[country_iso3]
1✔
344
        title = f"{country_name} - Health Indicators"
1✔
345

346
        logger.info(f"Creating dataset: {title}")
1✔
347
        slugified_name = slugify(f"WHO data for {country_iso3}").lower()
1✔
348

349
        # Get unique category names
350
        category_names = [
1✔
351
            row.title
352
            for row in self._session.query(DBCategories.title).distinct().all()
353
        ]
354
        cat_str = ", ".join(category_names)
1✔
355
        dataset = Dataset(
1✔
356
            {
357
                "name": slugified_name,
358
                "notes": f"This dataset contains data from WHO's "
359
                f"[data portal](https://www.who.int/gho/en/) covering "
360
                f"the following categories:  \n  \n"
361
                f"{cat_str}.  \n  \nFor links to individual indicator "
362
                f"metadata, see resource descriptions.",
363
                "title": title,
364
            }
365
        )
366
        dataset.set_subnational(False)
1✔
367
        try:
1✔
368
            dataset.add_country_location(country_iso3)
1✔
369
        except HDXError:
×
370
            logger.error(f"Couldn't find country {country_iso3}, skipping")
×
NEW
371
            return None, None
×
372
        tags = self._create_tags(country_iso3=country_iso3, to_archive=False)
1✔
373
        dataset.add_tags(tags)
1✔
374

375
        # Loop through categories and generate resource for each
376
        for category_name in category_names:
1✔
377
            logger.info(f"Category: {category_name}")
1✔
378

379
            all_rows_for_category = (
1✔
380
                self._session.query(DBIndicatorData)
381
                .join(
382
                    DBIndicators,
383
                    DBIndicators.code == DBIndicatorData.indicator_code,
384
                )
385
                .join(
386
                    DBCategories,
387
                    DBCategories.indicator_code == DBIndicators.code,
388
                )
389
                .filter(DBCategories.title == category_name)
390
                .filter(DBIndicatorData.country_code == country_iso3)
391
                # Create the archived dataset later
392
                .filter(DBIndicators.to_archive.is_(false()))
393
                .all()
394
            )
395

396
            category_data = [_parse_indicator_row(row) for row in all_rows_for_category]
1✔
397
            indicator_links = [
1✔
398
                f"[{row.title}]({row.url})"
399
                for row in (
400
                    self._session.query(DBIndicators)
401
                    .join(
402
                        DBCategories,
403
                        DBCategories.indicator_code == DBIndicators.code,
404
                    )
405
                    .filter(DBCategories.title == category_name)
406
                )
407
            ]
408

409
            category_link = f"*{category_name}:*\n{', '.join(indicator_links)}"
1✔
410
            slugified_category = slugify(category_name, separator="_")
1✔
411
            filename = f"{slugified_category}_indicators_{country_iso3.lower()}.csv"
1✔
412
            resourcedata = {
1✔
413
                "name": f"{category_name} Indicators for {country_name}",
414
                "description": category_link,
415
            }
416

417
            success, results = dataset.generate_resource_from_iterable(
1✔
418
                list(self._hxltags.keys()),
419
                category_data,
420
                self._hxltags,
421
                self._tempdir,
422
                filename,
423
                resourcedata,
424
                date_function=None,
425
                quickcharts=None,
426
            )
427

428
            if not success:
1✔
NEW
429
                logger.error(f"Resource for category {category_name} failed:{results}")
×
430

431
        # Create the dataset with all indicators
432

433
        filename = f"health_indicators_{country_iso3.lower()}.csv"
1✔
434
        resourcedata = {
1✔
435
            "name": f"All Health Indicators for {country_name}",
436
            "description": "See resource descriptions below for links "
437
            "to indicator metadata",
438
        }
439
        all_rows = (
1✔
440
            self._session.query(DBIndicatorData)
441
            .join(
442
                DBIndicators,
443
                DBIndicatorData.indicator_code == DBIndicators.code,
444
            )
445
            .filter(DBIndicatorData.country_code == country_iso3)
446
            .filter(DBIndicators.to_archive.is_(false()))
447
            .all()
448
        )
449

450
        all_indicators_data = [_parse_indicator_row(row) for row in all_rows]
1✔
451

452
        success_all_indicators, results_all_indicators = (
1✔
453
            dataset.generate_resource_from_iterable(
454
                list(self._hxltags.keys()),
455
                all_indicators_data,
456
                self._hxltags,
457
                self._tempdir,
458
                filename,
459
                resourcedata,
460
                date_function=_yearcol_function,
461
                quickcharts=None,
462
            )
463
        )
464

465
        if not success_all_indicators:
1✔
466
            logger.error(f"{country_name} has no data!")
×
NEW
467
            return None, None
×
468

469
        # Move the "all data" resource to the beginning
470
        # TODO: this doesn't appear to work on dev
471
        resources = dataset.get_resources()
1✔
472
        resources.insert(0, resources.pop(-2))
1✔
473

474
        showcase = self.get_showcase(
1✔
475
            self._retriever,
476
            country_iso3,
477
            country_name,
478
            slugified_name,
479
            tags,
480
        )
481
        return dataset, showcase
1✔
482

483
    def generate_archived_dataset(self, country):
1✔
484
        # Setup the dataset information
485
        country_iso3 = country["Code"]
1✔
486
        country_name = self._countries_dict[country_iso3]
1✔
487
        title = f"{country_name} - Historical Health Indicators"
1✔
488

489
        logger.info(f"Creating dataset: {title}")
1✔
490
        slugified_name = slugify(f"WHO historical data for {country_iso3}").lower()
1✔
491

492
        dataset = Dataset(
1✔
493
            {
494
                "name": slugified_name,
495
                "notes": "This dataset contains historical data from WHO's "
496
                "[data portal](https://www.who.int/gho/en/).",
497
                "title": title,
498
                "archived": True,
499
            }
500
        )
501
        dataset.set_expected_update_frequency("Never")
1✔
502
        dataset.set_subnational(False)
1✔
503
        try:
1✔
504
            dataset.add_country_location(country_iso3)
1✔
505
        except HDXError:
×
506
            logger.error(f"Couldn't find country {country_iso3}, skipping")
×
507
            return None
×
508
        tags = self._create_tags(country_iso3=country_iso3, to_archive=True)
1✔
509
        dataset.add_tags(tags)
1✔
510

511
        # Create the dataset with all indicators
512

513
        filename = f"historical_health_indicators_{country_iso3.lower()}.csv"
1✔
514
        resourcedata = {
1✔
515
            "name": f"All Historical Health Indicators for {country_name}",
516
            "description": "Historical health indicators no longer updated by WHO",
517
        }
518

519
        all_rows = (
1✔
520
            self._session.query(DBIndicatorData)
521
            .join(
522
                DBIndicators,
523
                DBIndicatorData.indicator_code == DBIndicators.code,
524
            )
525
            .filter(DBIndicatorData.country_code == country_iso3)
526
            .filter(DBIndicators.to_archive.is_(true()))
527
            .all()
528
        )
529
        all_indicators_data = [_parse_indicator_row(row) for row in all_rows]
1✔
530

531
        success_all_indicators, results_all_indicators = (
1✔
532
            dataset.generate_resource_from_iterable(
533
                list(self._hxltags.keys()),
534
                all_indicators_data,
535
                self._hxltags,
536
                self._tempdir,
537
                filename,
538
                resourcedata,
539
                date_function=_yearcol_function,
540
            )
541
        )
542

543
        if not success_all_indicators:
1✔
544
            logger.error(f"{country_name} has no data!")
×
545
            return None
×
546

547
        return dataset
1✔
548

549

550
def _parse_indicator_row(row):
1✔
551
    return {
1✔
552
        "GHO (CODE)": row.indicator_code,
553
        "GHO (DISPLAY)": row.indicator_name,
554
        "GHO (URL)": row.indicator_url,
555
        "YEAR (DISPLAY)": row.year,
556
        "STARTYEAR": row.start_year,
557
        "ENDYEAR": row.end_year,
558
        "REGION (CODE)": row.region_code,
559
        "REGION (DISPLAY)": row.region_display,
560
        "COUNTRY (CODE)": row.country_code,
561
        "COUNTRY (DISPLAY)": row.country_display,
562
        "DIMENSION (TYPE)": row.dimension_type,
563
        "DIMENSION (CODE)": row.dimension_code,
564
        "DIMENSION (NAME)": row.dimension_name,
565
        "Numeric": row.numeric,
566
        "Value": row.value,
567
        "Low": row.low,
568
        "High": row.high,
569
    }
570

571

572
def _yearcol_function(row):
1✔
573
    result = dict()
1✔
574
    year = row["YEAR (DISPLAY)"]
1✔
575
    if year:
1✔
576
        result["startdate"], result["enddate"] = parse_date_range(
1✔
577
            str(year), date_format="%Y"
578
        )
579
    return result
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc