• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx / 8628139215

10 Apr 2024 08:18AM UTC coverage: 62.44% (-0.05%) from 62.489%
8628139215

push

github

web-flow
Merge pull request #12851 from zmstone/0327-feat-add-emqx_variform

emqx_variform for string substitution and transform

206 of 238 new or added lines in 3 files covered. (86.55%)

28 existing lines in 16 files now uncovered.

34895 of 55886 relevant lines covered (62.44%)

6585.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.8
/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_consumer_worker.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%--------------------------------------------------------------------
4

5
-module(emqx_bridge_gcp_pubsub_consumer_worker).
6

7
-behaviour(ecpool_worker).
8
-behaviour(gen_server).
9

10
-include_lib("emqx/include/logger.hrl").
11
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
12

13
%% `ecpool_worker' API
14
-export([connect/1, health_check/1]).
15

16
%% `gen_server' API
17
-export([
18
    init/1,
19
    handle_info/2,
20
    handle_cast/2,
21
    handle_call/3,
22
    handle_continue/2,
23
    terminate/2
24
]).
25

26
-export([get_subscription/1]).
27
-export([reply_delegator/4, pull_async/1, process_pull_response/2, ensure_subscription/1]).
28

29
-type subscription_id() :: binary().
30
-type bridge_name() :: atom() | binary().
31
-type ack_id() :: binary().
32
-type message_id() :: binary().
33
-type duration() :: non_neg_integer().
34
-type config() :: #{
35
    ack_deadline := emqx_schema:timeout_duration_s(),
36
    ack_retry_interval := emqx_schema:timeout_duration_ms(),
37
    client := emqx_bridge_gcp_pubsub_client:state(),
38
    ecpool_worker_id => non_neg_integer(),
39
    forget_interval := duration(),
40
    hookpoints := [binary()],
41
    connector_resource_id := binary(),
42
    source_resource_id := binary(),
43
    mqtt_config => emqx_bridge_gcp_pubsub_impl_consumer:mqtt_config(),
44
    project_id := emqx_bridge_gcp_pubsub_client:project_id(),
45
    pull_max_messages := non_neg_integer(),
46
    pull_retry_interval := emqx_schema:timeout_duration_ms(),
47
    request_ttl := emqx_schema:duration_ms() | infinity,
48
    subscription_id => subscription_id(),
49
    topic => emqx_bridge_gcp_pubsub_client:topic()
50
}.
51
-type state() :: #{
52
    ack_deadline := emqx_schema:timeout_duration_s(),
53
    ack_retry_interval := emqx_schema:timeout_duration_ms(),
54
    ack_timer := undefined | reference(),
55
    async_workers := #{pid() => reference()},
56
    client := emqx_bridge_gcp_pubsub_client:state(),
57
    ecpool_worker_id := non_neg_integer(),
58
    forget_interval := duration(),
59
    hookpoints := [binary()],
60
    connector_resource_id := binary(),
61
    source_resource_id := binary(),
62
    mqtt_config := #{} | emqx_bridge_gcp_pubsub_impl_consumer:mqtt_config(),
63
    pending_acks := #{message_id() => ack_id()},
64
    project_id := emqx_bridge_gcp_pubsub_client:project_id(),
65
    pull_max_messages := non_neg_integer(),
66
    pull_retry_interval := emqx_schema:timeout_duration_ms(),
67
    pull_timer := undefined | reference(),
68
    request_ttl := emqx_schema:duration_ms() | infinity,
69
    %% In order to avoid re-processing the same message twice due to race conditions
70
    %% between acknlowledging a message and receiving a duplicate pulled message, we need
71
    %% to keep the seen message IDs for a while...
72
    seen_message_ids := sets:set(message_id()),
73
    subscription_id := subscription_id(),
74
    topic := emqx_bridge_gcp_pubsub_client:topic()
75
}.
76
-type decoded_message() :: map().
77

78
%% initialization states
79
-define(ensure_subscription, ensure_subscription).
80
-define(patch_subscription, patch_subscription).
81

82
-define(HEALTH_CHECK_TIMEOUT, 10_000).
83
-define(OPTVAR_SUB_OK(PID), {?MODULE, subscription_ok, PID}).
84

85
%%-------------------------------------------------------------------------------------------------
86
%% API used by `reply_delegator'
87
%%-------------------------------------------------------------------------------------------------
88

89
-spec pull_async(pid()) -> ok.
90
pull_async(WorkerPid) ->
91
    gen_server:cast(WorkerPid, pull_async).
332✔
92

93
-spec process_pull_response(pid(), binary()) -> ok.
94
process_pull_response(WorkerPid, RespBody) ->
95
    gen_server:cast(WorkerPid, {process_pull_response, RespBody}).
12✔
96

97
-spec ensure_subscription(pid()) -> ok.
98
ensure_subscription(WorkerPid) ->
99
    gen_server:cast(WorkerPid, ensure_subscription).
2✔
100

101
-spec reply_delegator(pid(), pull_async, binary(), {ok, map()} | {error, timeout | term()}) -> ok.
102
reply_delegator(WorkerPid, pull_async = _Action, SourceResId, Result) ->
103
    ?tp(gcp_pubsub_consumer_worker_reply_delegator, #{result => Result}),
290✔
104
    case Result of
290✔
105
        {error, timeout} ->
106
            ?MODULE:pull_async(WorkerPid);
90✔
107
        {error, Reason} ->
108
            ?tp(
188✔
109
                warning,
110
                "gcp_pubsub_consumer_worker_pull_error",
111
                #{
112
                    instance_id => SourceResId,
113
                    reason => Reason
114
                }
115
            ),
116
            case Reason of
188✔
117
                #{status_code := 404} ->
118
                    %% the subscription was not found; deleted?!
119
                    ?MODULE:ensure_subscription(WorkerPid);
2✔
120
                _ ->
121
                    ?MODULE:pull_async(WorkerPid)
186✔
122
            end;
123
        {ok, #{status_code := 200, body := RespBody}} ->
124
            ?MODULE:process_pull_response(WorkerPid, RespBody)
12✔
125
    end.
126

127
%%-------------------------------------------------------------------------------------------------
128
%% Debugging API
129
%%-------------------------------------------------------------------------------------------------
130

131
-spec get_subscription(pid()) -> {ok, map()} | {error, term()}.
132
get_subscription(WorkerPid) ->
133
    gen_server:call(WorkerPid, get_subscription, 5_000).
1✔
134

135
%%-------------------------------------------------------------------------------------------------
136
%% `ecpool' health check
137
%%-------------------------------------------------------------------------------------------------
138

139
-spec health_check(pid()) -> subscription_ok | topic_not_found | timeout.
140
health_check(WorkerPid) ->
141
    case optvar:read(?OPTVAR_SUB_OK(WorkerPid), ?HEALTH_CHECK_TIMEOUT) of
236✔
142
        {ok, Status} ->
143
            Status;
233✔
144
        timeout ->
145
            timeout
×
146
    end.
147

148
%%-------------------------------------------------------------------------------------------------
149
%% `ecpool' API
150
%%-------------------------------------------------------------------------------------------------
151

152
connect(Opts0) ->
153
    Opts = maps:from_list(Opts0),
47✔
154
    #{
47✔
155
        ack_deadline := AckDeadlineSeconds,
156
        ack_retry_interval := AckRetryInterval,
157
        bridge_name := BridgeName,
158
        client := Client,
159
        ecpool_worker_id := WorkerId,
160
        forget_interval := ForgetInterval,
161
        hookpoints := Hookpoints,
162
        connector_resource_id := ConnectorResId,
163
        source_resource_id := SourceResId,
164
        project_id := ProjectId,
165
        pull_max_messages := PullMaxMessages,
166
        pull_retry_interval := PullRetryInterval,
167
        request_ttl := RequestTTL,
168
        topic_mapping := TopicMapping
169
    } = Opts,
170
    TopicMappingList = lists:keysort(1, maps:to_list(TopicMapping)),
47✔
171
    Index = 1 + (WorkerId rem map_size(TopicMapping)),
47✔
172
    {Topic, MQTTConfig} = lists:nth(Index, TopicMappingList),
47✔
173
    Config = #{
47✔
174
        ack_deadline => AckDeadlineSeconds,
175
        ack_retry_interval => AckRetryInterval,
176
        %% Note: the `client' value here must be immutable and not changed by the
177
        %% bridge during `on_get_status', since we have handed it over to the pull
178
        %% workers.
179
        client => Client,
180
        forget_interval => ForgetInterval,
181
        hookpoints => Hookpoints,
182
        connector_resource_id => ConnectorResId,
183
        source_resource_id => SourceResId,
184
        mqtt_config => MQTTConfig,
185
        project_id => ProjectId,
186
        pull_max_messages => PullMaxMessages,
187
        pull_retry_interval => PullRetryInterval,
188
        request_ttl => RequestTTL,
189
        topic => Topic,
190
        subscription_id => subscription_id(BridgeName, Topic)
191
    },
192
    ?tp(gcp_pubsub_consumer_worker_about_to_spawn, #{}),
47✔
193
    start_link(Config).
47✔
194

195
%%-------------------------------------------------------------------------------------------------
196
%% `gen_server' API
197
%%-------------------------------------------------------------------------------------------------
198

199
-spec init(config()) -> {ok, state(), {continue, ?ensure_subscription}}.
200
init(Config) ->
201
    process_flag(trap_exit, true),
47✔
202
    State = Config#{
47✔
203
        ack_timer => undefined,
204
        async_workers => #{},
205
        pending_acks => #{},
206
        pull_timer => undefined,
207
        seen_message_ids => sets:new([{version, 2}])
208
    },
209
    ?tp(gcp_pubsub_consumer_worker_init, #{topic => maps:get(topic, State)}),
47✔
210
    {ok, State, {continue, ?ensure_subscription}}.
47✔
211

212
handle_continue(?ensure_subscription, State0) ->
213
    case ensure_subscription_exists(State0) of
437✔
214
        already_exists ->
215
            {noreply, State0, {continue, ?patch_subscription}};
9✔
216
        continue ->
217
            #{source_resource_id := SourceResId} = State0,
36✔
218
            ?MODULE:pull_async(self()),
36✔
219
            optvar:set(?OPTVAR_SUB_OK(self()), subscription_ok),
36✔
220
            ?tp(
36✔
221
                debug,
222
                "gcp_pubsub_consumer_worker_subscription_ready",
223
                #{instance_id => SourceResId}
224
            ),
225
            {noreply, State0};
36✔
226
        retry ->
227
            {noreply, State0, {continue, ?ensure_subscription}};
388✔
228
        not_found ->
229
            %% there's nothing much to do if the topic suddenly doesn't exist anymore.
230
            {stop, {error, topic_not_found}, State0};
2✔
231
        bad_credentials ->
232
            {stop, {error, bad_credentials}, State0};
1✔
233
        permission_denied ->
234
            {stop, {error, permission_denied}, State0}
1✔
235
    end;
236
handle_continue(?patch_subscription, State0) ->
237
    ?tp(gcp_pubsub_consumer_worker_patch_subscription_enter, #{}),
13✔
238
    case patch_subscription(State0) of
13✔
239
        ok ->
240
            #{source_resource_id := SourceResId} = State0,
9✔
241
            ?MODULE:pull_async(self()),
9✔
242
            optvar:set(?OPTVAR_SUB_OK(self()), subscription_ok),
9✔
243
            ?tp(
9✔
244
                debug,
245
                "gcp_pubsub_consumer_worker_subscription_ready",
246
                #{instance_id => SourceResId}
247
            ),
248
            {noreply, State0};
9✔
249
        error ->
250
            %% retry; add a random delay for the case where multiple workers step on each
251
            %% other's toes before retrying.
252
            RandomMS = rand:uniform(500),
4✔
253
            timer:sleep(RandomMS),
4✔
254
            {noreply, State0, {continue, ?patch_subscription}}
4✔
255
    end.
256

257
handle_call(get_subscription, _From, State0) ->
258
    Res = do_get_subscription(State0),
1✔
259
    {reply, Res, State0};
1✔
260
handle_call(_Request, _From, State0) ->
261
    {reply, {error, unknown_call}, State0}.
×
262

263
handle_cast(pull_async, State0) ->
264
    State = do_pull_async(State0),
330✔
265
    {noreply, State};
330✔
266
handle_cast({process_pull_response, RespBody}, State0) ->
267
    ?tp(gcp_pubsub_consumer_worker_pull_response_received, #{}),
12✔
268
    State = do_process_pull_response(State0, RespBody),
12✔
269
    {noreply, State};
11✔
270
handle_cast(ensure_subscription, State0) ->
271
    {noreply, State0, {continue, ?ensure_subscription}};
2✔
272
handle_cast(_Request, State0) ->
273
    {noreply, State0}.
×
274

275
handle_info({timeout, TRef, ack}, State0 = #{ack_timer := TRef}) ->
276
    State = acknowledge(State0),
11✔
277
    {noreply, State};
11✔
278
handle_info({timeout, TRef, pull}, State0 = #{pull_timer := TRef}) ->
279
    State1 = State0#{pull_timer := undefined},
55✔
280
    State = do_pull_async(State1),
55✔
281
    {noreply, State};
55✔
282
handle_info(
283
    {'DOWN', _Ref, process, AsyncWorkerPid, _Reason}, State0 = #{async_workers := Workers0}
284
) when
285
    is_map_key(AsyncWorkerPid, Workers0)
286
->
287
    Workers = maps:remove(AsyncWorkerPid, Workers0),
1✔
288
    State1 = State0#{async_workers := Workers},
1✔
289
    State = do_pull_async(State1),
1✔
290
    ?tp(gcp_pubsub_consumer_worker_handled_async_worker_down, #{async_worker_pid => AsyncWorkerPid}),
1✔
291
    {noreply, State};
1✔
292
handle_info({forget_message_ids, MsgIds}, State0) ->
293
    State = maps:update_with(
6✔
294
        seen_message_ids, fun(Seen) -> sets:subtract(Seen, MsgIds) end, State0
6✔
295
    ),
296
    ?tp(gcp_pubsub_consumer_worker_message_ids_forgotten, #{message_ids => MsgIds}),
6✔
297
    {noreply, State};
6✔
298
handle_info(Msg, State0) ->
299
    #{
1✔
300
        source_resource_id := SoureceResId,
301
        topic := Topic
302
    } = State0,
303
    ?SLOG(debug, #{
1✔
304
        msg => "gcp_pubsub_consumer_worker_unexpected_message",
305
        unexpected_msg => Msg,
306
        instance_id => SoureceResId,
307
        topic => Topic
308
    }),
1✔
309
    {noreply, State0}.
1✔
310

311
terminate({error, Reason}, State) when
312
    Reason =:= topic_not_found;
313
    Reason =:= bad_credentials;
314
    Reason =:= permission_denied
315
->
316
    #{
4✔
317
        source_resource_id := SourceResId,
318
        topic := _Topic
319
    } = State,
320
    optvar:unset(?OPTVAR_SUB_OK(self())),
4✔
321
    emqx_bridge_gcp_pubsub_impl_consumer:mark_as_unhealthy(SourceResId, Reason),
4✔
322
    ?tp(gcp_pubsub_consumer_worker_terminate, #{reason => {error, Reason}, topic => _Topic}),
4✔
323
    ok;
4✔
324
terminate(_Reason, _State) ->
325
    optvar:unset(?OPTVAR_SUB_OK(self())),
41✔
326
    ?tp(gcp_pubsub_consumer_worker_terminate, #{reason => _Reason, topic => maps:get(topic, _State)}),
41✔
327
    ok.
41✔
328

329
%%-------------------------------------------------------------------------------------------------
330
%% Internal fns
331
%%-------------------------------------------------------------------------------------------------
332

333
-spec start_link(config()) -> gen_server:start_ret().
334
start_link(Config) ->
335
    gen_server:start_link(?MODULE, Config, []).
47✔
336

337
-spec ensure_ack_timer(state()) -> state().
338
ensure_ack_timer(State = #{ack_timer := TRef, pending_acks := PendingAcks}) ->
339
    case {map_size(PendingAcks) =:= 0, is_reference(TRef)} of
12✔
340
        {false, false} ->
341
            #{ack_retry_interval := AckRetryInterval} = State,
12✔
342
            State#{ack_timer := emqx_utils:start_timer(AckRetryInterval, ack)};
12✔
343
        {_, _} ->
344
            State
×
345
    end.
346

347
-spec ensure_pull_timer(state()) -> state().
348
ensure_pull_timer(State = #{pull_timer := TRef}) when is_reference(TRef) ->
349
    State;
287✔
350
ensure_pull_timer(State = #{pull_retry_interval := PullRetryInterval}) ->
351
    State#{pull_timer := emqx_utils:start_timer(PullRetryInterval, pull)}.
99✔
352

353
-spec ensure_subscription_exists(state()) ->
354
    continue | retry | not_found | permission_denied | bad_credentials | already_exists.
355
ensure_subscription_exists(State) ->
356
    ?tp(gcp_pubsub_consumer_worker_create_subscription_enter, #{}),
437✔
357
    #{
437✔
358
        client := Client,
359
        source_resource_id := SourceResId,
360
        request_ttl := RequestTTL,
361
        subscription_id := SubscriptionId,
362
        topic := Topic
363
    } = State,
364
    Method = put,
437✔
365
    Path = path(State, create),
437✔
366
    Body = body(State, create),
437✔
367
    ReqOpts = #{request_ttl => RequestTTL},
437✔
368
    PreparedRequest = {prepared_request, {Method, Path, Body}, ReqOpts},
437✔
369
    Res = emqx_bridge_gcp_pubsub_client:query_sync(PreparedRequest, Client),
437✔
370
    case Res of
437✔
371
        {error, #{status_code := 409}} ->
372
            %% already exists
373
            ?tp(
9✔
374
                debug,
375
                "gcp_pubsub_consumer_worker_subscription_already_exists",
376
                #{
377
                    instance_id => SourceResId,
378
                    topic => Topic,
379
                    subscription_id => SubscriptionId
380
                }
381
            ),
382
            already_exists;
9✔
383
        {error, #{status_code := 404}} ->
384
            %% nonexistent topic
385
            ?tp(
2✔
386
                warning,
387
                "gcp_pubsub_consumer_worker_nonexistent_topic",
388
                #{
389
                    instance_id => SourceResId,
390
                    topic => Topic
391
                }
392
            ),
393
            not_found;
2✔
394
        {error, #{status_code := 403}} ->
395
            %% permission denied
396
            ?tp(
1✔
397
                warning,
398
                "gcp_pubsub_consumer_worker_permission_denied",
399
                #{
400
                    instance_id => SourceResId,
401
                    topic => Topic
402
                }
403
            ),
404
            permission_denied;
1✔
405
        {error, #{status_code := 401}} ->
406
            %% bad credentials
407
            ?tp(
1✔
408
                warning,
409
                "gcp_pubsub_consumer_worker_bad_credentials",
410
                #{
411
                    instance_id => SourceResId,
412
                    topic => Topic
413
                }
414
            ),
415
            bad_credentials;
1✔
416
        {ok, #{status_code := 200}} ->
417
            ?tp(
36✔
418
                debug,
419
                "gcp_pubsub_consumer_worker_subscription_created",
420
                #{
421
                    instance_id => SourceResId,
422
                    topic => Topic,
423
                    subscription_id => SubscriptionId
424
                }
425
            ),
426
            continue;
36✔
427
        {error, Reason} ->
428
            ?tp(
388✔
429
                error,
430
                "gcp_pubsub_consumer_worker_subscription_error",
431
                #{
432
                    instance_id => SourceResId,
433
                    topic => Topic,
434
                    reason => Reason
435
                }
436
            ),
437
            retry
388✔
438
    end.
439

440
-spec patch_subscription(state()) -> ok | error.
441
patch_subscription(State) ->
442
    #{
13✔
443
        client := Client,
444
        source_resource_id := SourceResId,
445
        subscription_id := SubscriptionId,
446
        request_ttl := RequestTTL,
447
        topic := Topic
448
    } = State,
449
    Method1 = patch,
13✔
450
    Path1 = path(State, create),
13✔
451
    Body1 = body(State, patch_subscription),
13✔
452
    ReqOpts = #{request_ttl => RequestTTL},
13✔
453
    PreparedRequest1 = {prepared_request, {Method1, Path1, Body1}, ReqOpts},
13✔
454
    Res = emqx_bridge_gcp_pubsub_client:query_sync(PreparedRequest1, Client),
13✔
455
    case Res of
13✔
456
        {ok, _} ->
457
            ?tp(
9✔
458
                debug,
459
                "gcp_pubsub_consumer_worker_subscription_patched",
460
                #{
461
                    instance_id => SourceResId,
462
                    topic => Topic,
463
                    subscription_id => SubscriptionId,
464
                    result => Res
465
                }
466
            ),
467
            ok;
9✔
468
        {error, Reason} ->
469
            ?tp(
4✔
470
                warning,
471
                "gcp_pubsub_consumer_worker_subscription_patch_error",
472
                #{
473
                    instance_id => SourceResId,
474
                    topic => Topic,
475
                    subscription_id => SubscriptionId,
476
                    reason => Reason
477
                }
478
            ),
479
            error
4✔
480
    end.
481

482
%% We use async requests so that this process will be more responsive to system messages.
483
-spec do_pull_async(state()) -> state().
484
do_pull_async(State0) ->
485
    ?tp_span(
386✔
486
        gcp_pubsub_consumer_worker_pull_async,
487
        #{topic => maps:get(topic, State0), subscription_id => maps:get(subscription_id, State0)},
386✔
488
        begin
386✔
489
            #{
386✔
490
                client := Client,
491
                source_resource_id := SourceResId,
492
                request_ttl := RequestTTL
493
            } = State0,
494
            Method = post,
386✔
495
            Path = path(State0, pull),
386✔
496
            Body = body(State0, pull),
386✔
497
            ReqOpts = #{request_ttl => RequestTTL},
386✔
498
            PreparedRequest = {prepared_request, {Method, Path, Body}, ReqOpts},
386✔
499
            ReplyFunAndArgs = {fun ?MODULE:reply_delegator/4, [self(), pull_async, SourceResId]},
386✔
500
            Res = emqx_bridge_gcp_pubsub_client:query_async(
386✔
501
                PreparedRequest,
502
                ReplyFunAndArgs,
503
                Client
504
            ),
505
            case Res of
386✔
506
                {ok, AsyncWorkerPid} ->
507
                    State1 = ensure_pull_timer(State0),
385✔
508
                    ensure_async_worker_monitored(State1, AsyncWorkerPid);
385✔
509
                {error, no_pool_worker_available} ->
510
                    ensure_pull_timer(State0)
1✔
511
            end
512
        end
386✔
513
    ).
514

515
-spec ensure_async_worker_monitored(state(), pid()) -> state().
516
ensure_async_worker_monitored(State = #{async_workers := Workers0}, AsyncWorkerPid) ->
517
    case is_map_key(AsyncWorkerPid, Workers0) of
385✔
518
        true ->
519
            State;
292✔
520
        false ->
521
            Ref = monitor(process, AsyncWorkerPid),
93✔
522
            Workers = Workers0#{AsyncWorkerPid => Ref},
93✔
523
            State#{async_workers := Workers}
93✔
524
    end.
525

526
-spec do_process_pull_response(state(), binary()) -> state().
527
do_process_pull_response(State0, RespBody) ->
528
    #{
12✔
529
        pending_acks := PendingAcks,
530
        seen_message_ids := SeenMsgIds
531
    } = State0,
532
    Messages = decode_response(RespBody),
12✔
533
    ?tp(gcp_pubsub_consumer_worker_decoded_messages, #{messages => Messages}),
12✔
534
    {NewPendingAcks, NewSeenMsgIds} =
12✔
535
        lists:foldl(
536
            fun(
537
                Msg = #{
538
                    <<"ackId">> := AckId,
539
                    <<"message">> := #{<<"messageId">> := MsgId}
540
                },
541
                {AccAck, AccSeen}
542
            ) ->
543
                case is_map_key(MsgId, PendingAcks) or sets:is_element(MsgId, SeenMsgIds) of
13✔
544
                    true ->
545
                        ?tp(message_redelivered, #{message => Msg}),
1✔
546
                        %% even though it was redelivered, pubsub might change the ack
547
                        %% id...  we should ack this latest value.
548
                        {AccAck#{MsgId => AckId}, AccSeen};
1✔
549
                    false ->
550
                        _ = handle_message(State0, Msg),
12✔
551
                        {AccAck#{MsgId => AckId}, sets:add_element(MsgId, AccSeen)}
12✔
552
                end
553
            end,
554
            {PendingAcks, SeenMsgIds},
555
            Messages
556
        ),
557
    State1 = State0#{pending_acks := NewPendingAcks, seen_message_ids := NewSeenMsgIds},
12✔
558
    State2 = acknowledge(State1),
12✔
559
    pull_async(self()),
11✔
560
    State2.
11✔
561

562
-spec acknowledge(state()) -> state().
563
acknowledge(State0 = #{pending_acks := PendingAcks}) ->
564
    case map_size(PendingAcks) =:= 0 of
23✔
565
        true ->
566
            State0;
×
567
        false ->
568
            do_acknowledge(State0)
23✔
569
    end.
570

571
do_acknowledge(State0) ->
572
    ?tp(gcp_pubsub_consumer_worker_acknowledge_enter, #{}),
23✔
573
    State1 = State0#{ack_timer := undefined},
23✔
574
    #{
23✔
575
        client := Client,
576
        forget_interval := ForgetInterval,
577
        request_ttl := RequestTTL,
578
        pending_acks := PendingAcks
579
    } = State1,
580
    AckIds = maps:values(PendingAcks),
23✔
581
    Method = post,
23✔
582
    Path = path(State1, ack),
23✔
583
    Body = body(State1, ack, #{ack_ids => AckIds}),
23✔
584
    ReqOpts = #{request_ttl => RequestTTL},
23✔
585
    PreparedRequest = {prepared_request, {Method, Path, Body}, ReqOpts},
23✔
586
    ?tp(gcp_pubsub_consumer_worker_will_acknowledge, #{acks => PendingAcks}),
23✔
587
    Res = emqx_bridge_gcp_pubsub_client:query_sync(PreparedRequest, Client),
23✔
588
    case Res of
22✔
589
        {error, Reason} ->
590
            ?tp(
12✔
591
                warning,
592
                "gcp_pubsub_consumer_worker_ack_error",
593
                #{reason => Reason}
594
            ),
595
            ensure_ack_timer(State1);
12✔
596
        {ok, #{status_code := 200}} ->
597
            ?tp(gcp_pubsub_consumer_worker_acknowledged, #{acks => PendingAcks}),
10✔
598
            MsgIds = maps:keys(PendingAcks),
10✔
599
            forget_message_ids_after(MsgIds, ForgetInterval),
10✔
600
            State1#{pending_acks := #{}};
10✔
601
        {ok, Details} ->
602
            ?tp(
×
603
                warning,
604
                "gcp_pubsub_consumer_worker_ack_error",
605
                #{details => Details}
606
            ),
607
            ensure_ack_timer(State1)
×
608
    end.
609

610
-spec do_get_subscription(state()) -> {ok, emqx_utils_json:json_term()} | {error, term()}.
611
do_get_subscription(State) ->
612
    #{
1✔
613
        client := Client,
614
        request_ttl := RequestTTL
615
    } = State,
616
    Method = get,
1✔
617
    Path = path(State, get_subscription),
1✔
618
    Body = body(State, get_subscription),
1✔
619
    ReqOpts = #{request_ttl => RequestTTL},
1✔
620
    PreparedRequest = {prepared_request, {Method, Path, Body}, ReqOpts},
1✔
621
    Res = emqx_bridge_gcp_pubsub_client:query_sync(PreparedRequest, Client),
1✔
622
    case Res of
1✔
623
        {error, Reason} ->
UNCOV
624
            ?SLOG(warning, #{
×
625
                msg => "gcp_pubsub_consumer_worker_get_subscription_error",
626
                reason => Reason
627
            }),
×
UNCOV
628
            {error, Reason};
×
629
        {ok, #{status_code := 200, body := RespBody}} ->
630
            DecodedBody = emqx_utils_json:decode(RespBody, [return_maps]),
1✔
631
            {ok, DecodedBody};
1✔
632
        {ok, Details} ->
633
            ?SLOG(warning, #{
×
634
                msg => "gcp_pubsub_consumer_worker_get_subscription_unexpected_response",
635
                details => Details
636
            }),
×
637
            {error, Details}
×
638
    end.
639

640
-spec subscription_id(bridge_name(), emqx_bridge_gcp_pubsub_client:topic()) -> subscription_id().
641
subscription_id(BridgeName0, Topic) ->
642
    %% The real GCP PubSub accepts colons in subscription names, but its emulator
643
    %% doesn't...  We currently validate bridge names to not include that character.  The
644
    %% exception is the prefix from the probe API.
645
    BridgeName1 = to_bin(BridgeName0),
47✔
646
    BridgeName = binary:replace(BridgeName1, <<":">>, <<"-">>),
47✔
647
    to_bin(uri_string:quote(<<"emqx-sub-", BridgeName/binary, "-", Topic/binary>>)).
47✔
648

649
-spec path(state(), pull | create | ack | get_subscription) -> binary().
650
path(State, Type) ->
651
    #{
860✔
652
        client := #{project_id := ProjectId},
653
        subscription_id := SubscriptionId
654
    } = State,
655
    SubscriptionResource = subscription_resource(ProjectId, SubscriptionId),
860✔
656
    case Type of
860✔
657
        pull ->
658
            <<"/v1/", SubscriptionResource/binary, ":pull">>;
386✔
659
        create ->
660
            <<"/v1/", SubscriptionResource/binary>>;
450✔
661
        ack ->
662
            <<"/v1/", SubscriptionResource/binary, ":acknowledge">>;
23✔
663
        get_subscription ->
664
            <<"/v1/", SubscriptionResource/binary>>
1✔
665
    end.
666

667
-spec body(state(), pull | create | patch_subscription | get_subscription) -> binary().
668
body(State, pull) ->
669
    #{pull_max_messages := PullMaxMessages} = State,
386✔
670
    emqx_utils_json:encode(#{<<"maxMessages">> => PullMaxMessages});
386✔
671
body(State, create) ->
672
    #{
437✔
673
        ack_deadline := AckDeadlineSeconds,
674
        project_id := ProjectId,
675
        topic := PubSubTopic
676
    } = State,
677
    TopicResource = <<"projects/", ProjectId/binary, "/topics/", PubSubTopic/binary>>,
437✔
678
    JSON = #{
437✔
679
        <<"topic">> => TopicResource,
680
        <<"ackDeadlineSeconds">> => AckDeadlineSeconds
681
    },
682
    emqx_utils_json:encode(JSON);
437✔
683
body(State, patch_subscription) ->
684
    #{
13✔
685
        ack_deadline := AckDeadlineSeconds,
686
        project_id := ProjectId,
687
        topic := PubSubTopic,
688
        subscription_id := SubscriptionId
689
    } = State,
690
    TopicResource = <<"projects/", ProjectId/binary, "/topics/", PubSubTopic/binary>>,
13✔
691
    SubscriptionResource = subscription_resource(ProjectId, SubscriptionId),
13✔
692
    JSON = #{
13✔
693
        <<"subscription">> =>
694
            #{
695
                <<"ackDeadlineSeconds">> => AckDeadlineSeconds,
696
                <<"name">> => SubscriptionResource,
697
                <<"topic">> => TopicResource
698
            },
699
        %% topic is immutable; don't add it here.
700
        <<"updateMask">> => <<"ackDeadlineSeconds">>
701
    },
702
    emqx_utils_json:encode(JSON);
13✔
703
body(_State, get_subscription) ->
704
    <<>>.
1✔
705

706
-spec body(state(), ack, map()) -> binary().
707
body(_State, ack, Opts) ->
708
    #{ack_ids := AckIds} = Opts,
23✔
709
    JSON = #{<<"ackIds">> => AckIds},
23✔
710
    emqx_utils_json:encode(JSON).
23✔
711

712
-spec subscription_resource(emqx_bridge_gcp_pubsub_client:project_id(), subscription_id()) ->
713
    binary().
714
subscription_resource(ProjectId, SubscriptionId) ->
715
    <<"projects/", ProjectId/binary, "/subscriptions/", SubscriptionId/binary>>.
873✔
716

717
-spec decode_response(binary()) -> [decoded_message()].
718
decode_response(RespBody) ->
719
    case emqx_utils_json:decode(RespBody, [return_maps]) of
12✔
720
        #{<<"receivedMessages">> := Msgs0} ->
721
            lists:map(
12✔
722
                fun(Msg0 = #{<<"message">> := InnerMsg0}) ->
723
                    InnerMsg = emqx_utils_maps:update_if_present(
13✔
724
                        <<"data">>, fun base64:decode/1, InnerMsg0
725
                    ),
726
                    Msg0#{<<"message">> := InnerMsg}
13✔
727
                end,
728
                Msgs0
729
            );
730
        #{} ->
731
            []
×
732
    end.
733

734
-spec handle_message(state(), decoded_message()) -> ok.
735
handle_message(State, #{<<"ackId">> := AckId, <<"message">> := InnerMsg} = _Message) ->
736
    ?tp_span(
12✔
737
        debug,
738
        "gcp_pubsub_consumer_worker_handle_message",
739
        #{message_id => maps:get(<<"messageId">>, InnerMsg), message => _Message, ack_id => AckId},
12✔
740
        begin
12✔
741
            #{
12✔
742
                source_resource_id := SourceResId,
743
                hookpoints := Hookpoints,
744
                mqtt_config := MQTTConfig,
745
                topic := Topic
746
            } = State,
747
            #{
12✔
748
                <<"messageId">> := MessageId,
749
                <<"publishTime">> := PublishTime
750
            } = InnerMsg,
751
            FullMessage0 = #{
12✔
752
                message_id => MessageId,
753
                publish_time => PublishTime,
754
                topic => Topic
755
            },
756
            FullMessage =
12✔
757
                lists:foldl(
758
                    fun({FromKey, ToKey}, Acc) ->
759
                        add_if_present(FromKey, InnerMsg, ToKey, Acc)
36✔
760
                    end,
761
                    FullMessage0,
762
                    [
763
                        {<<"data">>, value},
764
                        {<<"attributes">>, attributes},
765
                        {<<"orderingKey">>, ordering_key}
766
                    ]
767
                ),
768
            legacy_maybe_publish_mqtt_message(MQTTConfig, SourceResId, FullMessage),
12✔
769
            lists:foreach(
12✔
770
                fun(Hookpoint) -> emqx_hooks:run(Hookpoint, [FullMessage]) end,
24✔
771
                Hookpoints
772
            ),
773
            emqx_resource_metrics:received_inc(SourceResId),
12✔
774
            ok
12✔
775
        end
12✔
776
    ).
777

778
legacy_maybe_publish_mqtt_message(
779
    _MQTTConfig = #{
780
        payload_template := PayloadTemplate,
781
        qos := MQTTQoS,
782
        mqtt_topic := MQTTTopic
783
    },
784
    SourceResId,
785
    FullMessage
786
) when MQTTTopic =/= <<>> ->
787
    Payload = render(FullMessage, PayloadTemplate),
11✔
788
    MQTTMessage = emqx_message:make(SourceResId, MQTTQoS, MQTTTopic, Payload),
11✔
789
    _ = emqx:publish(MQTTMessage),
11✔
790
    ok;
11✔
791
legacy_maybe_publish_mqtt_message(_MQTTConfig, _SourceResId, _FullMessage) ->
792
    ok.
1✔
793

794
-spec add_if_present(any(), map(), any(), map()) -> map().
795
add_if_present(FromKey, Message, ToKey, Map) ->
796
    case maps:get(FromKey, Message, undefined) of
36✔
797
        undefined ->
798
            Map;
15✔
799
        Value ->
800
            Map#{ToKey => Value}
21✔
801
    end.
802

803
render(FullMessage, PayloadTemplate) ->
804
    Opts = #{return => full_binary},
11✔
805
    emqx_placeholder:proc_tmpl(PayloadTemplate, FullMessage, Opts).
11✔
806

807
forget_message_ids_after(MsgIds0, Timeout) ->
808
    MsgIds = sets:from_list(MsgIds0, [{version, 2}]),
10✔
809
    _ = erlang:send_after(Timeout, self(), {forget_message_ids, MsgIds}),
10✔
810
    ok.
10✔
811

812
to_bin(A) when is_atom(A) -> atom_to_binary(A);
×
813
to_bin(L) when is_list(L) -> iolist_to_binary(L);
×
814
to_bin(B) when is_binary(B) -> B.
94✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc