• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OCHA-DAP / hdx-scraper-wfp-foodprices / 14844429278

05 May 2025 07:21PM UTC coverage: 78.703% (+1.3%) from 77.382%
14844429278

push

github

mcarans
Iterate over files repeatedly so as not to run out of memory

144 of 158 new or added lines in 6 files covered. (91.14%)

2 existing lines in 1 file now uncovered.

728 of 925 relevant lines covered (78.7%)

0.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.03
/src/hdx/scraper/wfp/foodprices/world/hapi_output.py
1
import logging
1✔
2
from copy import deepcopy
1✔
3
from os.path import join
1✔
4
from typing import Dict, List, Optional
1✔
5

6
from dateutil.relativedelta import relativedelta
1✔
7

8
from hdx.api.configuration import Configuration
1✔
9
from hdx.api.utilities.hdx_error_handler import HDXErrorHandler
1✔
10
from hdx.location.adminlevel import AdminLevel
1✔
11
from hdx.location.country import Country
1✔
12
from hdx.utilities.dateparse import iso_string_from_datetime, parse_date
1✔
13
from hdx.utilities.dictandlist import write_list_to_csv
1✔
14
from hdx.utilities.downloader import Download
1✔
15
from hdx.utilities.retriever import Retrieve
1✔
16

17
logger = logging.getLogger(__name__)
1✔
18

19

20
class HAPIOutput:
1✔
21
    def __init__(
1✔
22
        self,
23
        configuration: Configuration,
24
        downloader: Download,
25
        folder: str,
26
        error_handler: HDXErrorHandler,
27
    ) -> None:
28
        self._configuration = configuration
1✔
29
        self._downloader = downloader
1✔
30
        self._folder = folder
1✔
31
        self._error_handler = error_handler
1✔
32
        self._admins = []
1✔
33
        self._base_rows = {}
1✔
34

35
    def setup_admins(
1✔
36
        self,
37
        retriever: Retrieve,
38
        countryiso3s: Optional[List[str]] = None,
39
    ):
40
        libhxl_dataset = AdminLevel.get_libhxl_dataset(retriever=retriever).cache()
1✔
41
        libhxl_format_dataset = AdminLevel.get_libhxl_dataset(
1✔
42
            url=AdminLevel.formats_url, retriever=retriever
43
        ).cache()
44
        self._admins = []
1✔
45
        for i in range(2):
1✔
46
            admin = AdminLevel(admin_level=i + 1, retriever=retriever)
1✔
47
            admin.setup_from_libhxl_dataset(
1✔
48
                libhxl_dataset=libhxl_dataset,
49
                countryiso3s=countryiso3s,
50
            )
51
            admin.load_pcode_formats_from_libhxl_dataset(libhxl_format_dataset)
1✔
52
            self._admins.append(admin)
1✔
53

54
    def complete_admin(self, row: Dict, base_row: Dict):
1✔
55
        market_name = row["market"]
1✔
56
        base_row["market_name"] = market_name
1✔
57
        base_row["market_code"] = row["market_id"]
1✔
58
        countryiso3 = row["countryiso3"]
1✔
59
        base_row["location_code"] = countryiso3
1✔
60
        base_row["has_hrp"] = (
1✔
61
            "Y" if Country.get_hrp_status_from_iso3(countryiso3) else "N"
62
        )
63
        base_row["in_gho"] = (
1✔
64
            "Y" if Country.get_gho_status_from_iso3(countryiso3) else "N"
65
        )
66
        base_row["lat"] = row["latitude"] or ""
1✔
67
        base_row["lon"] = row["longitude"] or ""
1✔
68
        provider_admin1_name = row["admin1"]
1✔
69
        provider_admin2_name = row["admin2"]
1✔
70
        base_row["provider_admin1_name"] = provider_admin1_name or ""
1✔
71
        base_row["provider_admin2_name"] = provider_admin2_name or ""
1✔
72
        if countryiso3 in self._configuration["unused_adm1"]:
1✔
73
            if provider_admin2_name:
×
74
                base_row["admin_level"] = 1
×
75
                adm1_code, _ = self._admins[0].get_pcode(
×
76
                    countryiso3, provider_admin2_name
77
                )
78
                if adm1_code:
×
79
                    base_row["admin1_code"] = adm1_code
×
80
                    base_row["admin1_name"] = self._admins[0].pcode_to_name[adm1_code]
×
81
            else:
82
                base_row["admin_level"] = 0
×
83
                self._error_handler.add_missing_value_message(
×
84
                    "WFPFoodPrice",
85
                    countryiso3,
86
                    "admin 1 name for market",
87
                    market_name,
88
                    message_type="warning",
89
                )
90
                base_row["warning"].add("no adm1 name in prov2 name")
×
91
            return
×
92

93
        if countryiso3 in self._configuration["unused_adm2"]:
1✔
94
            if provider_admin1_name:
×
95
                base_row["admin_level"] = 2
×
96
                adm2_code, _ = self._admins[1].get_pcode(
×
97
                    countryiso3, provider_admin1_name
98
                )
99
                if adm2_code:
×
100
                    base_row["admin2_code"] = adm2_code
×
101
                    base_row["admin2_name"] = self._admins[1].pcode_to_name[adm2_code]
×
102
                    adm1_code = self._admins[1].pcode_to_parent.get(adm2_code)
×
103
                    if adm1_code:
×
104
                        base_row["admin1_code"] = adm1_code
×
105
                        base_row["admin2_name"] = self._admins[0].pcode_to_name[
×
106
                            adm1_code
107
                        ]
108
            else:
109
                base_row["admin_level"] = 0
×
110
                self._error_handler.add_missing_value_message(
×
111
                    "WFPFoodPrice",
112
                    countryiso3,
113
                    "admin 2 name for market",
114
                    market_name,
115
                    message_type="warning",
116
                )
117
                base_row["warning"].add("no adm2 name in prov1 name")
×
118
            return
×
119

120
        if provider_admin1_name:
1✔
121
            base_row["admin_level"] = 1
1✔
122
            adm1_code, _ = self._admins[0].get_pcode(countryiso3, provider_admin1_name)
1✔
123
            if adm1_code:
1✔
124
                base_row["admin1_code"] = adm1_code
1✔
125
                base_row["admin1_name"] = self._admins[0].pcode_to_name[adm1_code]
1✔
126
        else:
127
            adm1_code = ""
1✔
128
            base_row["admin_level"] = 0
1✔
129
            self._error_handler.add_missing_value_message(
1✔
130
                "WFPFoodPrice",
131
                countryiso3,
132
                "admin 1 name for market",
133
                market_name,
134
                message_type="warning",
135
            )
136
            base_row["warning"].add("no adm1 name")
1✔
137

138
        if countryiso3 in self._configuration["adm1_only"]:
1✔
139
            return
×
140

141
        if provider_admin2_name:
1✔
142
            base_row["admin_level"] = 2
1✔
143
            adm2_code, _ = self._admins[1].get_pcode(
1✔
144
                countryiso3, provider_admin2_name, parent=adm1_code
145
            )
146
            if adm2_code:
1✔
147
                base_row["admin2_code"] = adm2_code
1✔
148
                base_row["admin2_name"] = self._admins[1].pcode_to_name[adm2_code]
1✔
149
                parent_code = self._admins[1].pcode_to_parent.get(adm2_code)
1✔
150
                if adm1_code and adm1_code != parent_code:
1✔
151
                    message = f"PCode mismatch {adm1_code}->{parent_code} (parent)"
×
152
                    self._error_handler.add_message(
×
153
                        "WFPFoodPrice",
154
                        f"{countryiso3}-{adm2_code}",
155
                        message,
156
                        market_name,
157
                        message_type="warning",
158
                    )
159
                    base_row["warning"].add(message)
×
160
                    base_row["admin1_code"] = parent_code
×
161
                    base_row["admin1_name"] = self._admins[0].pcode_to_name[parent_code]
×
162
            return
1✔
163

164
        if adm1_code:
1✔
165
            identifier = f"{countryiso3}-{adm1_code}"
×
166
        elif provider_admin1_name:
1✔
167
            identifier = f"{countryiso3}-{provider_admin1_name}"
×
168
        else:
169
            identifier = countryiso3
1✔
170
        self._error_handler.add_missing_value_message(
1✔
171
            "WFPFoodPrice",
172
            identifier,
173
            "admin 2 name for market",
174
            market_name,
175
            message_type="warning",
176
        )
177
        base_row["warning"].add("no adm2 name")
1✔
178

179
    def process_currencies(
1✔
180
        self, currencies: List[Dict], dataset_id: str, resource_id: str
181
    ) -> List[Dict]:
182
        logger.info("Processing HAPI currencies output")
1✔
183
        for row in currencies:
1✔
184
            row["dataset_hdx_id"] = dataset_id
1✔
185
            row["resource_hdx_id"] = resource_id
1✔
186
        return currencies
1✔
187

188
    def process_commodities(
1✔
189
        self, commodities: List[Dict], dataset_id: str, resource_id: str
190
    ) -> List[Dict]:
191
        logger.info("Processing HAPI commodities output")
1✔
192
        hapi_rows = []
1✔
193
        for row in commodities:
1✔
194
            hapi_row = {
1✔
195
                "code": row["commodity_id"],
196
                "category": row["category"],
197
                "name": row["commodity"],
198
                "dataset_hdx_id": dataset_id,
199
                "resource_hdx_id": resource_id,
200
            }
201
            hapi_rows.append(hapi_row)
1✔
202
        hapi_rows = sorted(
1✔
203
            hapi_rows, key=lambda row: (row["category"], row["name"], row["code"])
204
        )
205
        return hapi_rows
1✔
206

207
    def process_markets(
1✔
208
        self, markets: List[Dict], dataset_id: str, resource_id: str
209
    ) -> List[Dict]:
210
        logger.info("Processing HAPI markets output")
1✔
211
        hapi_rows = []
1✔
212
        for row in markets:
1✔
213
            hapi_base_row = {
1✔
214
                "admin1_code": "",
215
                "admin1_name": "",
216
                "admin2_code": "",
217
                "admin2_name": "",
218
                "warning": set(),
219
                "error": set(),
220
            }
221
            self.complete_admin(row, hapi_base_row)
1✔
222
            self._base_rows[hapi_base_row["market_code"]] = hapi_base_row
1✔
223
            hapi_row = deepcopy(hapi_base_row)
1✔
224
            hapi_row["dataset_hdx_id"] = dataset_id
1✔
225
            hapi_row["resource_hdx_id"] = resource_id
1✔
226
            hapi_row["warning"] = "|".join(sorted(hapi_row["warning"]))
1✔
227
            hapi_row["error"] = "|".join(sorted(hapi_row["error"]))
1✔
228
            hapi_rows.append(hapi_row)
1✔
229
        hapi_rows = sorted(
1✔
230
            hapi_rows,
231
            key=lambda row: (
232
                row["location_code"],
233
                row["admin1_code"],
234
                row["admin2_code"],
235
                row["provider_admin1_name"],
236
                row["provider_admin2_name"],
237
                row["market_name"],
238
                row["market_code"],
239
            ),
240
        )
241
        return hapi_rows
1✔
242

243
    def create_prices_files(
1✔
244
        self,
245
        year_to_path: Dict,
246
        dataset_id: str,
247
        year_to_prices_resource_id: Dict,
248
        output_dir: str = "",
249
    ) -> Dict:
250
        logger.info("Processing HAPI prices output")
1✔
251
        configuration = self._configuration["hapi_dataset"]["resources"][0]
1✔
252
        hxltags = configuration["hxltags"]
1✔
253
        headers = list(hxltags.keys())
1✔
254

255
        hapi_year_to_path = {}
1✔
256
        for year in sorted(year_to_path, reverse=True):
1✔
257
            rows = [hxltags]
1✔
258
            filepath = year_to_path[year]
1✔
259
            _, iterator = self._downloader.get_tabular_rows(
1✔
260
                filepath, has_hxl=True, dict_form=True, encoding="utf-8"
261
            )
262
            logger.info(f"Reading global prices from {filepath}")
1✔
263
            for row in iterator:
1✔
264
                market_id = row["market_id"]
1✔
265
                if market_id[0] == "#":
1✔
266
                    continue
1✔
267
                hapi_row = deepcopy(self._base_rows[market_id])
1✔
268
                hapi_row["dataset_hdx_id"] = dataset_id
1✔
269
                hapi_row["resource_hdx_id"] = year_to_prices_resource_id[year]
1✔
270
                hapi_row["commodity_category"] = row["category"]
1✔
271
                hapi_row["commodity_name"] = row["commodity"]
1✔
272
                hapi_row["commodity_code"] = row["commodity_id"]
1✔
273
                hapi_row["unit"] = row["unit"]
1✔
274
                hapi_row["price_flag"] = row["priceflag"]
1✔
275
                hapi_row["price_type"] = row["pricetype"]
1✔
276
                hapi_row["currency_code"] = row["currency"]
1✔
277
                hapi_row["price"] = row["price"]
1✔
278
                hapi_row["usd_price"] = row["usdprice"]
1✔
279
                reference_period_start = parse_date(row["date"], date_format="%Y-%m-%d")
1✔
280
                hapi_row["reference_period_start"] = iso_string_from_datetime(
1✔
281
                    reference_period_start
282
                )
283
                reference_period_end = reference_period_start + relativedelta(
1✔
284
                    months=1,
285
                    days=-1,
286
                    hours=23,
287
                    minutes=59,
288
                    seconds=59,
289
                    microseconds=999999,
290
                )  # food price reference period is one month
291
                hapi_row["reference_period_end"] = iso_string_from_datetime(
1✔
292
                    reference_period_end
293
                )
294
                hapi_row["warning"] = "|".join(sorted(hapi_row["warning"]))
1✔
295
                hapi_row["error"] = "|".join(sorted(hapi_row["error"]))
1✔
296
                rows.append(hapi_row)
1✔
297
            if not output_dir:
1✔
NEW
298
                output_dir = self._folder
×
299
            filename = configuration["filename"].format(year)
1✔
300
            filepath = join(output_dir, filename)
1✔
301
            write_list_to_csv(filepath, rows, columns=headers)
1✔
302
            hapi_year_to_path[year] = filepath
1✔
303

304
        return hapi_year_to_path
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc