• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OCHA-DAP / hdx-ckan / #5853

12 Nov 2024 12:41PM UTC coverage: 74.418% (-0.08%) from 74.501%
#5853

push

coveralls-python

danmihaila
HDX-10191 add data for all month, even if missing in mixpanel

2 of 26 new or added lines in 1 file covered. (7.69%)

1 existing line in 1 file now uncovered.

12404 of 16668 relevant lines covered (74.42%)

0.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.77
/ckanext-hdx_theme/ckanext/hdx_theme/util/jql.py
1
import requests
1✔
2
import logging
1✔
3

4
from dogpile.cache import make_region
1✔
5
from datetime import datetime, timedelta, timezone
1✔
6
from collections import OrderedDict
1✔
7
from functools import wraps
1✔
8

9
import ckan.plugins.toolkit as tk
1✔
10

11
import ckanext.hdx_theme.util.jql_queries as jql_queries
1✔
12
from ckanext.hdx_theme.util.timer import Timer
1✔
13
from ckanext.hdx_theme.helpers.caching import dogpile_standard_config, dogpile_config_filter, \
1✔
14
    HDXRedisInvalidationStrategy
15

16
config = tk.config
1✔
17

18
log = logging.getLogger(__name__)
1✔
19

20
dogpile_config = {
1✔
21
    'cache.redis.expiration_time': int(config.get('hdx.analytics.hours_for_results_in_cache', 24)) * 60 * 60,
22
}
23
dogpile_config.update(dogpile_standard_config)
1✔
24

25
dogpile_jql_region = make_region(key_mangler=lambda key: 'jql-' + key)
1✔
26
dogpile_jql_region.configure_from_config(dogpile_config, dogpile_config_filter)
1✔
27
if dogpile_config_filter == 'cache.redis.':
1✔
28
    dogpile_jql_region.region_invalidator = HDXRedisInvalidationStrategy(dogpile_jql_region)
×
29

30
CONFIG_API_SECRET = config.get('hdx.analytics.mixpanel.secret')
1✔
31

32
MIXPANEL_GROUPS = ['0123', '4567', '89ab', 'cdef']
1✔
33

34

35
class JqlQueryExecutor(object):
1✔
36
    def __init__(self, query):
1✔
37
        self.query = query
1✔
38
        self.args = []
1✔
39
        self.payload = None
1✔
40

41
    def run_query(self, transformer):
1✔
42
        self._compile_query()
1✔
43
        try:
1✔
44
            return self._run_query(transformer)
1✔
45
        except Exception as e:
×
46
            log.error('Ran into problems when getting data from mixpanel. Returning empty dict.')
×
47
            log.error(str(e))
×
48
            return {}
×
49

50
    def _compile_query(self):
1✔
51
        self.payload = {
1✔
52
            'script': self.query.format(*self.args)
53
        }
54

55
    def _run_query(self, transformer):
1✔
56
        """
57
        :param transformer: transforms the request result
58
        :type transformer: MappingResultTransformer
59
        :return: a dict mapping the key to the values
60
        :rtype: dict
61
        """
62
        nose_test = True if config.get('ckan.site_id') == 'test.ckan.net' else False
1✔
63
        if nose_test:
1✔
64
            return {}
1✔
65
        else:
66
            r = requests.post('https://mixpanel.com/api/2.0/jql', data=self.payload, auth=(CONFIG_API_SECRET, ''))
×
67
            r.raise_for_status()
×
68
            return transformer.transform(r)
×
69

70

71
class JqlQueryExecutorForHoursSinceNow(JqlQueryExecutor):
1✔
72
    def __init__(self, query, hours_since_now):
1✔
73
        super(JqlQueryExecutorForHoursSinceNow, self).__init__(query)
1✔
74
        self.args += self._compute_period(hours_since_now)
1✔
75

76
    @staticmethod
1✔
77
    def _compute_period(hours_since_now):
1✔
78
        """
79
        :param hours_since_now: for how many hours back should the mixpanel call be made
80
        :type hours_since_now: int
81
        :return: a list with 2 iso date strings representing the beginning and ending of the period
82
        :rtype: list[str]
83
        """
84
        until_date_str = datetime.utcnow().isoformat()[:10]
1✔
85

86
        from_date_str = (datetime.utcnow() - timedelta(hours=hours_since_now)).isoformat()[
1✔
87
                        :10] if hours_since_now else '2016-08-01'
88

89
        return [from_date_str, until_date_str]
1✔
90

91

92
class JqlQueryExecutorForWeeksSinceNow(JqlQueryExecutor):
1✔
93
    def __init__(self, query, weeks_since, since_date):
1✔
94
        """
95
        :param query:
96
        :type query: str
97
        :param weeks_since:
98
        :type weeks_since: int
99
        :param since_date:
100
        :type since_date: datetime
101
        """
102
        super(JqlQueryExecutorForWeeksSinceNow, self).__init__(query)
1✔
103
        self.args += self._compute_period(weeks_since, since_date)
1✔
104

105
    @staticmethod
1✔
106
    def _compute_period(weeks_since, since_date):
1✔
107
        """
108
        :param weeks_since_now: for how many weeks back should the mixpanel call be made ( a week starts monday )
109
        :type weeks_since_now: int
110
        :param since_date:
111
        :type since_date: datetime
112
        :return: a list with 2 iso date strings representing the beginning and ending of the period
113
        :rtype: list[str]
114
        """
115
        until_date = since_date
1✔
116
        until_date_str = until_date.isoformat()[:10]
1✔
117

118
        from_date = until_date - timedelta(weeks=weeks_since, days=until_date.weekday())
1✔
119
        from_date_str = from_date.isoformat()[:10]
1✔
120

121
        return [from_date_str, until_date_str]
1✔
122

123
class JqlQueryExecutorForLast5Years(JqlQueryExecutor):
1✔
124
    def __init__(self, query, org_id):
1✔
125
        """
126
        :param query:
127
        :type query: str
128
        """
129
        super(JqlQueryExecutorForLast5Years, self).__init__(query)
×
NEW
130
        period_list = self._compute_period()
×
NEW
131
        self.args += [period_list[0], period_list[1]]
×
UNCOV
132
        self.args += [org_id]
×
NEW
133
        self.from_to_date  = [period_list[2], period_list[3]]
×
134

135
    @staticmethod
1✔
136
    def _compute_period():
1✔
137
        """
138
        :return: a list with 2 iso date strings representing the beginning and ending of the period,
139
                since 5 years ago on January 1st until last day of previous month
140
        :rtype: list[str]
141
        """
142
        today = datetime.now(timezone.utc)
×
143

144
        # Calculate the date 5 years ago on January 1st
145
        from_date = today.replace(year=today.year - 5, month=1, day=1)
×
146
        from_date_str = from_date.isoformat()[:10]
×
147

148
        # last day of previous month
149
        until_date = today.replace(day=1) - timedelta(days=1)
×
150
        until_date_str = until_date.isoformat()[:10]
×
151

NEW
152
        return [from_date_str, until_date_str, from_date, until_date]
×
153

154

155
class JqlQueryExecutorForWeeksSinceNowWithGroupFiltering(JqlQueryExecutorForWeeksSinceNow):
1✔
156
    def __init__(self, query, weeks_since, since_date, group):
1✔
157
        """
158
        :param query:
159
        :type query: str
160
        :param weeks_since:
161
        :type weeks_since: int
162
        :param since_date:
163
        :type since_date: datetime
164
        :param group:
165
        :type group: MixpanelDatasetGroups
166
        """
167
        super(JqlQueryExecutorForWeeksSinceNowWithGroupFiltering, self).__init__(query, weeks_since, since_date)
1✔
168
        self.args.append(group)
1✔
169

170

171
class MappingResultTransformer(object):
1✔
172
    def __init__(self, key_name):
1✔
173
        self.key_name = key_name
1✔
174

175
    def transform(self, response):
1✔
176
        """
177

178
        :param response: the HTTP response
179
        :type response: requests.Response
180
        :return:
181
        :rtype: dict
182
        """
183
        return {item.get(self.key_name): item.get('value') for item in response.json()}
×
184

185

186
class MappingCustomResultTransformer(object):
1✔
187
    def __init__(self, data_for_each_month):
1✔
NEW
188
        self.data_for_each_month = data_for_each_month
×
189

190
    def transform(self, response):
1✔
191
        """
192

193
        :param response: the HTTP response
194
        :type response: requests.Response
195
        :return:
196
        :rtype: dict
197
        """
NEW
198
        result = self.data_for_each_month
×
199
        for item in response.json():
×
200
            if item.get('date') not in result:
×
201
                result[item.get('date')] = OrderedDict()
×
202
            if item.get('event_name') == 'page view':
×
203
                result[item.get('date')]['pageviews_unique'] = item.get('unique_count')
×
204
                result[item.get('date')]['pageviews_total'] = item.get('total_count')
×
205
            if item.get('event_name') == 'resource download':
×
206
                result[item.get('date')]['downloads_unique'] = item.get('unique_count')
×
207
                result[item.get('date')]['downloads_total'] = item.get('total_count')
×
208
        return dict(sorted(result.items()))
×
209

210
class MultipleValueMappingResultTransformer(MappingResultTransformer):
1✔
211
    def __init__(self, key_name, secondary_key_name):
1✔
212
        super(MultipleValueMappingResultTransformer, self).__init__(key_name)
1✔
213
        self.secondary_key_name = secondary_key_name
1✔
214

215
    def transform(self, response):
1✔
216
        result = {}
×
217
        ''':type : dict[str, OrderedDict]'''
×
218

219
        for item in response.json():
×
220
            main_key = item.get(self.key_name)
×
221
            secondary_key = item.get(self.secondary_key_name)
×
222

223
            if main_key not in result:
×
224
                result[main_key] = OrderedDict()
×
225

226
            result[main_key][secondary_key] = {'value': item.get('value', 0), self.secondary_key_name: secondary_key}
×
227

228
        return result
×
229

230

231
class MultipleValueMandatoryMappingResultTransformer(MappingResultTransformer):
1✔
232
    def __init__(self, key_name, mandatory_key, mandatory_values):
1✔
233
        super(MultipleValueMandatoryMappingResultTransformer, self).__init__(key_name)
1✔
234
        self.mandatory_key = mandatory_key
1✔
235
        self.mandatory_values = mandatory_values
1✔
236

237
        self.template = [(item, {mandatory_key: item, 'value': 0}) for item in mandatory_values]
1✔
238

239
    def transform(self, response):
1✔
240
        result = {}
×
241
        ''':type : dict[str, OrderedDict]'''
×
242

243
        for item in response.json():
×
244
            main_key = item.get(self.key_name)
×
245
            secondary_key = item.get(self.mandatory_key)
×
246

247
            if secondary_key not in self.mandatory_values:
×
248
                log.error('{} not in mandatory values {}'.format(secondary_key, ','.join(self.mandatory_values)))
×
249
                continue
×
250

251
            if main_key not in result:
×
252
                result[main_key] = OrderedDict(self.template)
×
253

254
            result[main_key][secondary_key] = {'value': item.get('value', 0), self.mandatory_key: secondary_key}
×
255

256
        return result
×
257

258

259
def get_dataset_mp_group(dataset_id):
1✔
260
    first_letter = dataset_id[0]
1✔
261
    for group in MIXPANEL_GROUPS:
1✔
262
        if first_letter in group:
1✔
263
            return group
1✔
264
    log.error('Dataset group could not be determined for JQL query')
×
265
    return None
×
266

267

268
def timer_wrapper(original_caching_function):
1✔
269
    @wraps(original_caching_function)
1✔
270
    def timed_caching_function(*args):
1✔
271
        args_to_name = ', '.join(args)
1✔
272
        name = '{} with args ({})'.format(original_caching_function.__name__, args_to_name)
1✔
273
        JQL_WARNING_THRESHOLD = config.get('hdx.analytics.mixpanel.warning_threshold_seconds', 90)
1✔
274
        timer = Timer(name,
1✔
275
                      init_message='creating cache',
276
                      in_millis=False, log_warning_step_threshold=JQL_WARNING_THRESHOLD)
277
        result = original_caching_function(*args)
1✔
278
        timer.next('finished')
1✔
279
        return result
1✔
280
    return timed_caching_function
1✔
281

282

283
@dogpile_jql_region.cache_on_arguments()
1✔
284
@timer_wrapper
1✔
285
def downloads_per_dataset_all_cached():
1✔
286
    return downloads_per_dataset()
1✔
287

288

289
def downloads_per_dataset(hours_since_now=None):
1✔
290
    query_executor = JqlQueryExecutorForHoursSinceNow(jql_queries.DOWNLOADS_PER_DATASET, hours_since_now)
1✔
291
    result = query_executor.run_query(MappingResultTransformer('dataset_id'))
1✔
292

293
    return result
1✔
294

295

296
def fetch_downloads_per_week_for_dataset(dataset_id):
1✔
297
    mixpanel_group = get_dataset_mp_group(dataset_id)
1✔
298
    if mixpanel_group:
1✔
299
        return downloads_per_dataset_per_week_last_24_weeks_cached(mixpanel_group).get(dataset_id, {})
1✔
300
    return {}
×
301

302

303
@dogpile_jql_region.cache_on_arguments()
1✔
304
@timer_wrapper
1✔
305
def downloads_per_dataset_per_week_last_24_weeks_cached(mixpanel_group):
1✔
306
    return downloads_per_dataset_per_week(mixpanel_group, 24)
1✔
307

308

309
def downloads_per_dataset_per_week(mixpanel_group, weeks=24):
1✔
310
    since = datetime.utcnow()
1✔
311
    query_executor = JqlQueryExecutorForWeeksSinceNowWithGroupFiltering(jql_queries.DOWNLOADS_PER_DATASET_PER_WEEK,
1✔
312
                                                                        weeks, since, mixpanel_group)
313

314
    mandatory_values = _generate_mandatory_dates(since, weeks)
1✔
315

316
    result = query_executor.run_query(
1✔
317
        MultipleValueMandatoryMappingResultTransformer('dataset_id', 'date', mandatory_values))
318

319
    return result
1✔
320

321

322
@dogpile_jql_region.cache_on_arguments()
1✔
323
@timer_wrapper
1✔
324
def downloads_per_organization_last_30_days_cached():
1✔
325
    return downloads_per_organization(30)
1✔
326

327

328
def downloads_per_organization(days_since_now=30):
1✔
329
    query_executor = JqlQueryExecutorForHoursSinceNow(jql_queries.DOWNLOADS_PER_ORGANIZATION, days_since_now * 24)
1✔
330
    result = query_executor.run_query(MappingResultTransformer('org_id'))
1✔
331

332
    return result
1✔
333

334

335
@dogpile_jql_region.cache_on_arguments()
1✔
336
@timer_wrapper
1✔
337
def downloads_per_organization_per_week_last_24_weeks_cached():
1✔
338
    return downloads_per_organization_per_week(24)
1✔
339

340

341
def downloads_per_organization_per_week(weeks=24):
1✔
342
    since = datetime.utcnow()
1✔
343
    query_executor = JqlQueryExecutorForWeeksSinceNow(jql_queries.DOWNLOADS_PER_ORGANIZATION_PER_WEEK, weeks, since)
1✔
344

345
    mandatory_values = _generate_mandatory_dates(since, weeks)
1✔
346

347
    result = query_executor.run_query(
1✔
348
        MultipleValueMandatoryMappingResultTransformer('org_id', 'date', mandatory_values))
349

350
    return result
1✔
351

352

353
@dogpile_jql_region.cache_on_arguments()
1✔
354
@timer_wrapper
1✔
355
def downloads_per_organization_per_dataset_last_24_weeks_cached():
1✔
356
    return downloads_per_organization_per_dataset(24)
1✔
357

358

359
def downloads_per_organization_per_dataset(weeks=24):
1✔
360
    since = datetime.utcnow()
1✔
361
    query_executor = JqlQueryExecutorForWeeksSinceNow(jql_queries.DOWNLOADS_PER_ORGANIZATION_PER_DATASET, weeks, since)
1✔
362

363
    result = query_executor.run_query(
1✔
364
        MultipleValueMappingResultTransformer('org_id', 'dataset_id'))
365

366
    return result
1✔
367

368

369
@dogpile_jql_region.cache_on_arguments()
1✔
370
@timer_wrapper
1✔
371
def pageviews_per_dataset_last_14_days_cached():
1✔
372
    hours = 14 * 24
1✔
373
    return pageviews_per_dataset(hours)
1✔
374

375

376
def pageviews_per_dataset(hours_since_now=None):
1✔
377
    query_executor = JqlQueryExecutorForHoursSinceNow(jql_queries.PAGEVIEWS_PER_DATASET, hours_since_now)
1✔
378
    result = query_executor.run_query(MappingResultTransformer('dataset_id'))
1✔
379

380
    return result
1✔
381

382

383
@dogpile_jql_region.cache_on_arguments()
1✔
384
@timer_wrapper
1✔
385
def pageviews_per_organization_last_30_days_cached():
1✔
386
    return pageviews_per_organization(30)
1✔
387

388

389
def pageviews_per_organization(days_since_now=30):
1✔
390
    query_executor = JqlQueryExecutorForHoursSinceNow(jql_queries.PAGEVIEWS_PER_ORGANIZATION, days_since_now * 24)
1✔
391
    result = query_executor.run_query(MappingResultTransformer('org_id'))
1✔
392

393
    return result
1✔
394

395

396
@dogpile_jql_region.cache_on_arguments()
1✔
397
@timer_wrapper
1✔
398
def pageviews_per_organization_per_week_last_24_weeks_cached():
1✔
399
    return pageviews_per_organization_per_week(24)
1✔
400

401

402
def pageviews_per_organization_per_week(weeks=24):
1✔
403
    since = datetime.utcnow()
1✔
404
    query_executor = JqlQueryExecutorForWeeksSinceNow(jql_queries.PAGEVIEWS_PER_ORGANIZATION_PER_WEEK, weeks, since)
1✔
405

406
    mandatory_values = _generate_mandatory_dates(since, weeks)
1✔
407

408
    result = query_executor.run_query(
1✔
409
        MultipleValueMandatoryMappingResultTransformer('org_id', 'date', mandatory_values))
410

411
    return result
1✔
412

413

414
def _generate_mandatory_dates(since, weeks):
1✔
415
    '''
416
    :param since: the datetime "until" object
417
    :type since: datetime
418
    :param weeks:
419
    :type weeks: int
420
    :return: list of mandatory dates
421
    :rtype: list[str]
422
    '''
423
    mandatory_dates = []
1✔
424

425
    ''':type : list[datetime]'''
1✔
426
    for i in range(0, weeks+1):
1✔
427
        mandatory_dates.insert(0, since - timedelta(weeks=i, days=since.weekday()))
1✔
428
    mandatory_values = list(map(lambda x: x.isoformat()[:10], mandatory_dates))
1✔
429
    return mandatory_values
1✔
430

431
@dogpile_jql_region.cache_on_arguments()
1✔
432
@timer_wrapper
1✔
433
def pageviews_downloads_per_organization_last_5_years(org_id):
1✔
434
    query_executor = JqlQueryExecutorForLast5Years(jql_queries.PAGEVIEWS_AND_DOWNLOADS_PER_ORGANIZATION, org_id = org_id)
×
NEW
435
    result = None
×
NEW
436
    try:
×
NEW
437
        data_for_each_month = _generate_data_for_each_month(query_executor.from_to_date[0], query_executor.from_to_date[1])
×
NEW
438
        result = query_executor.run_query(MappingCustomResultTransformer(data_for_each_month))
×
NEW
439
    except Exception as ex:
×
NEW
440
        log.error(ex)
×
441

NEW
442
    return result
×
443

444
def _generate_data_for_each_month(start_date, end_date):
1✔
NEW
445
    from dateutil.relativedelta import relativedelta
×
NEW
446
    current_date = start_date
×
NEW
447
    result = OrderedDict()
×
NEW
448
    while current_date < end_date:
×
NEW
449
        current_date_iso = current_date.isoformat()[:10]
×
NEW
450
        result[current_date_iso]=OrderedDict()
×
NEW
451
        result[current_date_iso]['pageviews_unique'] = 0
×
NEW
452
        result[current_date_iso]['pageviews_total'] = 0
×
NEW
453
        result[current_date_iso]['downloads_unique'] = 0
×
NEW
454
        result[current_date_iso]['downloads_total'] = 0
×
NEW
455
        current_date += relativedelta(months=1)
×
456

457
    return result
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc