• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx / 8625278842

10 Apr 2024 02:52AM UTC coverage: 62.489% (-0.002%) from 62.491%
8625278842

push

github

web-flow
Merge pull request #12855 from JimMoen/fix-share-queue-format

fix(mgmt): $queue shared topics format in mgmt topics api

0 of 1 new or added line in 1 file covered. (0.0%)

49 existing lines in 10 files now uncovered.

34606 of 55379 relevant lines covered (62.49%)

6756.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.7
/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%--------------------------------------------------------------------
4
-module(emqx_bridge_iotdb_connector).
5

6
-behaviour(emqx_connector_examples).
7

8
-behaviour(emqx_resource).
9

10
-include("emqx_bridge_iotdb.hrl").
11
-include_lib("emqx/include/logger.hrl").
12
-include_lib("hocon/include/hoconsc.hrl").
13
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
14
-include_lib("emqx_resource/include/emqx_resource.hrl").
15

16
%% `emqx_resource' API
17
-export([
18
    callback_mode/0,
19
    on_start/2,
20
    on_stop/2,
21
    on_get_status/2,
22
    on_query/3,
23
    on_query_async/4,
24
    on_add_channel/4,
25
    on_remove_channel/3,
26
    on_get_channels/1,
27
    on_get_channel_status/3
28
]).
29

30
-export([
31
    namespace/0,
32
    roots/0,
33
    fields/1,
34
    desc/1,
35
    connector_examples/1,
36
    connector_example_values/0
37
]).
38

39
%% emqx_connector_resource behaviour callbacks
40
-export([connector_config/2]).
41

42
-type config() ::
43
    #{
44
        base_url := #{
45
            scheme := http | https,
46
            host := iolist(),
47
            port := inet:port_number(),
48
            path := _
49
        },
50
        connect_timeout := pos_integer(),
51
        pool_type := random | hash,
52
        pool_size := pos_integer(),
53
        iotdb_version := atom(),
54
        request => undefined | map(),
55
        atom() => _
56
    }.
57

58
-type state() ::
59
    #{
60
        base_path := _,
61
        connect_timeout := pos_integer(),
62
        pool_type := random | hash,
63
        channels := map(),
64
        iotdb_version := atom(),
65
        request => undefined | map(),
66
        atom() => _
67
    }.
68

69
-type manager_id() :: binary().
70

71
-define(CONNECTOR_TYPE, iotdb).
72
-define(IOTDB_PING_PATH, <<"ping">>).
73

74
-import(hoconsc, [mk/2, enum/1, ref/2]).
75

76
%%-------------------------------------------------------------------------------------
77
%% connector examples
78
%%-------------------------------------------------------------------------------------
79
connector_examples(Method) ->
80
    [
1,566✔
81
        #{
82
            <<"iotdb">> =>
83
                #{
84
                    summary => <<"Apache IoTDB Connector">>,
85
                    value => emqx_connector_schema:connector_values(
86
                        Method, ?CONNECTOR_TYPE, connector_example_values()
87
                    )
88
                }
89
        }
90
    ].
91

92
connector_example_values() ->
93
    #{
1,566✔
94
        name => <<"iotdb_connector">>,
95
        type => iotdb,
96
        enable => true,
97
        iotdb_version => ?VSN_1_1_X,
98
        authentication => #{
99
            <<"username">> => <<"root">>,
100
            <<"password">> => <<"******">>
101
        },
102
        base_url => <<"http://iotdb.local:18080/">>,
103
        connect_timeout => <<"15s">>,
104
        pool_type => <<"random">>,
105
        pool_size => 8,
106
        enable_pipelining => 100,
107
        ssl => #{enable => false}
108
    }.
109

110
%%-------------------------------------------------------------------------------------
111
%% schema
112
%%-------------------------------------------------------------------------------------
113
namespace() -> "iotdb".
1,730✔
114

115
roots() ->
116
    [{config, #{type => hoconsc:ref(?MODULE, config)}}].
×
117

118
fields(config) ->
119
    proplists_without(
120
        [url, request, retry_interval, headers],
121
        emqx_bridge_http_schema:fields("config_connector")
122
    ) ++
9,068✔
123
        fields("connection_fields");
124
fields("connection_fields") ->
125
    [
9,068✔
126
        {base_url,
127
            mk(
128
                emqx_schema:url(),
129
                #{
130
                    required => true,
131
                    desc => ?DESC(emqx_bridge_iotdb, "config_base_url")
132
                }
133
            )},
134
        {iotdb_version,
135
            mk(
136
                hoconsc:enum([?VSN_1_1_X, ?VSN_1_0_X, ?VSN_0_13_X]),
137
                #{
138
                    desc => ?DESC(emqx_bridge_iotdb, "config_iotdb_version"),
139
                    default => ?VSN_1_1_X
140
                }
141
            )},
142
        {authentication,
143
            mk(
144
                hoconsc:union([ref(?MODULE, auth_basic)]),
145
                #{
146
                    default => auth_basic, desc => ?DESC("config_authentication")
147
                }
148
            )}
149
    ];
150
fields(auth_basic) ->
151
    [
449✔
152
        {username, mk(binary(), #{required => true, desc => ?DESC("config_auth_basic_username")})},
153
        {password,
154
            emqx_schema_secret:mk(#{
155
                required => true,
156
                desc => ?DESC("config_auth_basic_password")
157
            })}
158
    ];
159
fields("post") ->
160
    emqx_connector_schema:type_and_name_fields(enum([iotdb])) ++ fields(config);
242✔
161
fields("put") ->
162
    fields(config);
113✔
163
fields("get") ->
164
    emqx_bridge_schema:status_fields() ++ fields("post").
108✔
165

166
desc(config) ->
167
    ?DESC("desc_config");
103✔
168
desc(auth_basic) ->
169
    "Basic Authentication";
128✔
170
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
171
    ["Configuration for IoTDB using `", string:to_upper(Method), "` method."];
31✔
172
desc(_) ->
173
    undefined.
×
174

175
connector_config(Conf, #{name := Name, parse_confs := ParseConfs}) ->
176
    #{
40✔
177
        base_url := BaseUrl,
178
        authentication :=
179
            #{
180
                username := Username,
181
                password := Password0
182
            }
183
    } = Conf,
184

185
    Password = emqx_secret:unwrap(Password0),
40✔
186
    BasicToken = base64:encode(<<Username/binary, ":", Password/binary>>),
40✔
187

188
    WebhookConfig =
40✔
189
        Conf#{
190
            url => BaseUrl,
191
            headers => [
192
                {<<"Content-type">>, <<"application/json">>},
193
                {<<"Authorization">>, BasicToken}
194
            ]
195
        },
196
    ParseConfs(
40✔
197
        <<"http">>,
198
        Name,
199
        WebhookConfig
200
    ).
201

202
proplists_without(Keys, List) ->
203
    [El || El = {K, _} <- List, not lists:member(K, Keys)].
9,068✔
204

205
%%-------------------------------------------------------------------------------------
206
%% `emqx_resource' API
207
%%-------------------------------------------------------------------------------------
208
callback_mode() -> async_if_possible.
40✔
209

210
-spec on_start(manager_id(), config()) -> {ok, state()} | no_return().
211
on_start(InstanceId, #{iotdb_version := Version} = Config) ->
212
    %% [FIXME] The configuration passed in here is pre-processed and transformed
213
    %% in emqx_bridge_resource:parse_confs/2.
214
    case emqx_bridge_http_connector:on_start(InstanceId, Config) of
38✔
215
        {ok, State} ->
216
            ?SLOG(info, #{
38✔
217
                msg => "iotdb_bridge_started",
218
                instance_id => InstanceId,
219
                request => emqx_utils:redact(maps:get(request, State, <<>>))
220
            }),
38✔
221
            ?tp(iotdb_bridge_started, #{instance_id => InstanceId}),
38✔
222
            {ok, State#{iotdb_version => Version, channels => #{}}};
38✔
223
        {error, Reason} ->
224
            ?SLOG(error, #{
×
225
                msg => "failed_to_start_iotdb_bridge",
226
                instance_id => InstanceId,
227
                request => emqx_utils:redact(maps:get(request, Config, <<>>)),
228
                reason => Reason
229
            }),
×
230
            throw(failed_to_start_iotdb_bridge)
×
231
    end.
232

233
-spec on_stop(manager_id(), state()) -> ok | {error, term()}.
234
on_stop(InstanceId, State) ->
235
    ?SLOG(info, #{
38✔
236
        msg => "stopping_iotdb_bridge",
237
        connector => InstanceId
238
    }),
38✔
239
    Res = emqx_bridge_http_connector:on_stop(InstanceId, State),
38✔
240
    ?tp(iotdb_bridge_stopped, #{instance_id => InstanceId}),
38✔
241
    Res.
38✔
242

243
-spec on_get_status(manager_id(), state()) ->
244
    connected | connecting | {disconnected, state(), term()}.
245
on_get_status(InstanceId, #{base_path := BasePath} = State) ->
246
    Func = fun(Worker, Timeout) ->
298✔
247
        Request = {?IOTDB_PING_PATH, [], undefined},
2,384✔
248
        NRequest = emqx_bridge_http_connector:formalize_request(get, BasePath, Request),
2,384✔
249
        Result0 = ehttpc:request(Worker, get, NRequest, Timeout),
2,384✔
250
        case emqx_bridge_http_connector:transform_result(Result0) of
2,384✔
251
            {ok, 200, _, Body} ->
252
                case emqx_utils_json:decode(Body) of
2,345✔
253
                    #{<<"code">> := 200} ->
254
                        ok;
2,345✔
255
                    Json ->
256
                        {error, {unexpected_status, Json}}
×
257
                end;
258
            {error, _} = Error ->
259
                Error;
39✔
260
            Result ->
261
                {error, {unexpected_ping_result, Result}}
×
262
        end
263
    end,
264
    emqx_bridge_http_connector:on_get_status(InstanceId, State, Func).
298✔
265

266
-spec on_query(manager_id(), {send_message, map()}, state()) ->
267
    {ok, pos_integer(), [term()], term()}
268
    | {ok, pos_integer(), [term()]}
269
    | {error, term()}.
270
on_query(
271
    InstanceId,
272
    {ChannelId, _Message} = Req,
273
    #{iotdb_version := IoTDBVsn, channels := Channels} = State
274
) ->
275
    ?tp(iotdb_bridge_on_query, #{instance_id => InstanceId}),
24✔
276
    ?SLOG(debug, #{
24✔
277
        msg => "iotdb_bridge_on_query_called",
278
        instance_id => InstanceId,
279
        send_message => Req,
280
        state => emqx_utils:redact(State)
281
    }),
24✔
282

283
    case try_render_message(Req, IoTDBVsn, Channels) of
24✔
284
        {ok, IoTDBPayload} ->
285
            handle_response(
14✔
286
                emqx_bridge_http_connector:on_query(
287
                    InstanceId, {ChannelId, IoTDBPayload}, State
288
                )
289
            );
290
        Error ->
291
            Error
10✔
292
    end.
293

294
-spec on_query_async(manager_id(), {send_message, map()}, {function(), [term()]}, state()) ->
295
    {ok, pid()} | {error, empty_request}.
296
on_query_async(
297
    InstanceId,
298
    {ChannelId, _Message} = Req,
299
    ReplyFunAndArgs0,
300
    #{iotdb_version := IoTDBVsn, channels := Channels} = State
301
) ->
302
    ?tp(iotdb_bridge_on_query_async, #{instance_id => InstanceId}),
8✔
303
    ?SLOG(debug, #{
8✔
304
        msg => "iotdb_bridge_on_query_async_called",
305
        instance_id => InstanceId,
306
        send_message => Req,
307
        state => emqx_utils:redact(State)
308
    }),
8✔
309
    case try_render_message(Req, IoTDBVsn, Channels) of
8✔
310
        {ok, IoTDBPayload} ->
311
            ReplyFunAndArgs =
4✔
312
                {
313
                    fun(Result) ->
314
                        Response = handle_response(Result),
4✔
315
                        emqx_resource:apply_reply_fun(ReplyFunAndArgs0, Response)
4✔
316
                    end,
317
                    []
318
                },
319
            emqx_bridge_http_connector:on_query_async(
4✔
320
                InstanceId, {ChannelId, IoTDBPayload}, ReplyFunAndArgs, State
321
            );
322
        Error ->
323
            Error
4✔
324
    end.
325

326
on_add_channel(
327
    InstanceId,
328
    #{iotdb_version := Version, channels := Channels} = OldState0,
329
    ChannelId,
330
    #{
331
        parameters := #{data := Data} = Parameter
332
    }
333
) ->
334
    case maps:is_key(ChannelId, Channels) of
46✔
335
        true ->
336
            {error, already_exists};
2✔
337
        _ ->
338
            %% update HTTP channel
339
            InsertTabletPathV1 = <<"rest/v1/insertTablet">>,
44✔
340
            InsertTabletPathV2 = <<"rest/v2/insertTablet">>,
44✔
341

342
            Path =
44✔
343
                case Version of
344
                    ?VSN_1_1_X -> InsertTabletPathV2;
22✔
345
                    _ -> InsertTabletPathV1
22✔
346
                end,
347

348
            HTTPReq = #{
44✔
349
                parameters => Parameter#{
350
                    path => Path,
351
                    method => <<"post">>
352
                }
353
            },
354

355
            {ok, OldState} = emqx_bridge_http_connector:on_add_channel(
44✔
356
                InstanceId, OldState0, ChannelId, HTTPReq
357
            ),
358

359
            %% update IoTDB channel
360
            DeviceId = maps:get(device_id, Parameter, <<>>),
44✔
361
            Channel = Parameter#{
44✔
362
                device_id => emqx_placeholder:preproc_tmpl(DeviceId),
363
                data := preproc_data_template(Data)
364
            },
365
            Channels2 = Channels#{ChannelId => Channel},
44✔
366
            {ok, OldState#{channels := Channels2}}
44✔
367
    end.
368

369
on_remove_channel(InstanceId, #{channels := Channels} = OldState0, ChannelId) ->
370
    {ok, OldState} = emqx_bridge_http_connector:on_remove_channel(InstanceId, OldState0, ChannelId),
46✔
371
    Channels2 = maps:remove(ChannelId, Channels),
46✔
372
    {ok, OldState#{channels => Channels2}}.
46✔
373

374
on_get_channels(InstanceId) ->
375
    emqx_bridge_v2:get_channels_for_connector(InstanceId).
268✔
376

377
on_get_channel_status(InstanceId, _ChannelId, State) ->
378
    case on_get_status(InstanceId, State) of
128✔
379
        ?status_connected ->
380
            ?status_connected;
125✔
381
        _ ->
382
            ?status_disconnected
3✔
383
    end.
384

385
%%--------------------------------------------------------------------
386
%% Internal Functions
387
%%--------------------------------------------------------------------
388

389
get_payload(#{payload := Payload}) ->
390
    Payload;
32✔
391
get_payload(#{<<"payload">> := Payload}) ->
392
    Payload;
×
393
get_payload(Payload) ->
394
    Payload.
×
395

396
parse_payload(ParsedPayload) when is_map(ParsedPayload) ->
397
    ParsedPayload;
2✔
398
parse_payload(UnparsedPayload) when is_binary(UnparsedPayload) ->
399
    emqx_utils_json:decode(UnparsedPayload);
30✔
400
parse_payload(UnparsedPayloads) when is_list(UnparsedPayloads) ->
401
    lists:map(fun parse_payload/1, UnparsedPayloads).
×
402

403
preproc_data_list(DataList) ->
404
    lists:foldl(
26✔
405
        fun preproc_data/2,
406
        [],
407
        DataList
408
    ).
409

410
preproc_data(
411
    #{
412
        <<"measurement">> := Measurement,
413
        <<"data_type">> := DataType,
414
        <<"value">> := Value
415
    } = Data,
416
    Acc
417
) ->
418
    [
80✔
419
        #{
420
            timestamp => maybe_preproc_tmpl(
421
                maps:get(<<"timestamp">>, Data, <<"now">>)
422
            ),
423
            measurement => emqx_placeholder:preproc_tmpl(Measurement),
424
            data_type => emqx_placeholder:preproc_tmpl(DataType),
425
            value => maybe_preproc_tmpl(Value)
426
        }
427
        | Acc
428
    ];
429
preproc_data(_NoMatch, Acc) ->
430
    ?SLOG(
4✔
431
        warning,
4✔
432
        #{
433
            msg => "iotdb_bridge_preproc_data_failed",
434
            required_fields => ['measurement', 'data_type', 'value'],
435
            received => _NoMatch
436
        }
×
437
    ),
438
    Acc.
4✔
439

440
maybe_preproc_tmpl(Value) when is_binary(Value) ->
441
    emqx_placeholder:preproc_tmpl(Value);
76✔
442
maybe_preproc_tmpl(Value) ->
443
    Value.
84✔
444

445
proc_data(PreProcessedData, Msg) ->
446
    NowNS = erlang:system_time(nanosecond),
24✔
447
    Nows = #{
24✔
448
        now_ms => erlang:convert_time_unit(NowNS, nanosecond, millisecond),
449
        now_us => erlang:convert_time_unit(NowNS, nanosecond, microsecond),
450
        now_ns => NowNS
451
    },
452
    proc_data(PreProcessedData, Msg, Nows, []).
24✔
453

454
proc_data(
455
    [
456
        #{
457
            timestamp := TimestampTkn,
458
            measurement := Measurement,
459
            data_type := DataType0,
460
            value := ValueTkn
461
        }
462
        | T
463
    ],
464
    Msg,
465
    Nows,
466
    Acc
467
) ->
468
    DataType = list_to_binary(
82✔
469
        string:uppercase(binary_to_list(emqx_placeholder:proc_tmpl(DataType0, Msg)))
470
    ),
471
    try
82✔
472
        proc_data(T, Msg, Nows, [
82✔
473
            #{
474
                timestamp => iot_timestamp(TimestampTkn, Msg, Nows),
475
                measurement => emqx_placeholder:proc_tmpl(Measurement, Msg),
476
                data_type => DataType,
477
                value => proc_value(DataType, ValueTkn, Msg)
478
            }
479
            | Acc
480
        ])
481
    catch
482
        throw:Reason ->
483
            {error, Reason};
2✔
484
        Error:Reason:Stacktrace ->
485
            ?SLOG(debug, #{exception => Error, reason => Reason, stacktrace => Stacktrace}),
4✔
486
            {error, invalid_data}
4✔
487
    end;
488
proc_data([], _Msg, _Nows, Acc) ->
489
    {ok, lists:reverse(Acc)}.
18✔
490

491
iot_timestamp(Timestamp, _, _) when is_integer(Timestamp) ->
492
    Timestamp;
52✔
493
iot_timestamp(TimestampTkn, Msg, Nows) ->
494
    iot_timestamp(emqx_placeholder:proc_tmpl(TimestampTkn, Msg), Nows).
30✔
495

496
iot_timestamp(<<"now_us">>, #{now_us := NowUs}) ->
497
    NowUs;
2✔
498
iot_timestamp(<<"now_ns">>, #{now_ns := NowNs}) ->
499
    NowNs;
2✔
500
iot_timestamp(Timestamp, #{now_ms := NowMs}) when
501
    Timestamp =:= <<"now">>; Timestamp =:= <<"now_ms">>; Timestamp =:= <<>>
502
->
503
    NowMs;
24✔
504
iot_timestamp(Timestamp, _) when is_binary(Timestamp) ->
505
    binary_to_integer(Timestamp).
2✔
506

507
proc_value(<<"TEXT">>, ValueTkn, Msg) ->
508
    case emqx_placeholder:proc_tmpl(ValueTkn, Msg) of
4✔
509
        <<"undefined">> -> null;
×
510
        Val -> Val
4✔
511
    end;
512
proc_value(<<"BOOLEAN">>, ValueTkn, Msg) ->
513
    convert_bool(replace_var(ValueTkn, Msg));
34✔
514
proc_value(Int, ValueTkn, Msg) when Int =:= <<"INT32">>; Int =:= <<"INT64">> ->
515
    convert_int(replace_var(ValueTkn, Msg));
30✔
516
proc_value(Int, ValueTkn, Msg) when Int =:= <<"FLOAT">>; Int =:= <<"DOUBLE">> ->
517
    convert_float(replace_var(ValueTkn, Msg));
12✔
518
proc_value(Type, _, _) ->
519
    throw(#{reason => invalid_type, type => Type}).
2✔
520

521
replace_var(Tokens, Data) when is_list(Tokens) ->
522
    [Val] = emqx_placeholder:proc_tmpl(Tokens, Data, #{return => rawlist}),
44✔
523
    Val;
44✔
524
replace_var(Val, _Data) ->
525
    Val.
32✔
526

527
convert_bool(B) when is_boolean(B) -> B;
10✔
528
convert_bool(null) -> null;
2✔
UNCOV
529
convert_bool(undefined) -> null;
×
530
convert_bool(1) -> true;
2✔
531
convert_bool(0) -> false;
2✔
532
convert_bool(<<"1">>) -> true;
2✔
533
convert_bool(<<"0">>) -> false;
2✔
534
convert_bool(<<"true">>) -> true;
2✔
535
convert_bool(<<"True">>) -> true;
2✔
536
convert_bool(<<"TRUE">>) -> true;
2✔
537
convert_bool(<<"false">>) -> false;
2✔
538
convert_bool(<<"False">>) -> false;
2✔
539
convert_bool(<<"FALSE">>) -> false.
2✔
540

541
convert_int(Int) when is_integer(Int) -> Int;
4✔
542
convert_int(Float) when is_float(Float) -> floor(Float);
4✔
543
convert_int(Str) when is_binary(Str) ->
544
    try
22✔
545
        binary_to_integer(Str)
22✔
546
    catch
547
        _:_ ->
548
            convert_int(binary_to_float(Str))
2✔
549
    end;
550
convert_int(null) ->
UNCOV
551
    null;
×
552
convert_int(undefined) ->
UNCOV
553
    null.
×
554

555
convert_float(Float) when is_float(Float) -> Float;
4✔
556
convert_float(Int) when is_integer(Int) -> Int * 10 / 10;
4✔
557
convert_float(Str) when is_binary(Str) ->
558
    try
4✔
559
        binary_to_float(Str)
4✔
560
    catch
561
        _:_ ->
UNCOV
562
            convert_float(binary_to_integer(Str))
×
563
    end;
564
convert_float(null) ->
UNCOV
565
    null;
×
566
convert_float(undefined) ->
UNCOV
567
    null.
×
568

569
make_iotdb_insert_request(DataList, IsAligned, DeviceId, IoTDBVsn) ->
570
    InitAcc = #{timestamps => [], measurements => [], dtypes => [], values => []},
18✔
571
    Rows = replace_dtypes(aggregate_rows(DataList, InitAcc), IoTDBVsn),
18✔
572
    {ok,
18✔
573
        maps:merge(Rows, #{
574
            iotdb_field_key(is_aligned, IoTDBVsn) => IsAligned,
575
            iotdb_field_key(device_id, IoTDBVsn) => DeviceId
576
        })}.
577

578
replace_dtypes(Rows0, IoTDBVsn) ->
579
    {Types, Rows} = maps:take(dtypes, Rows0),
18✔
580
    Rows#{iotdb_field_key(data_types, IoTDBVsn) => Types}.
18✔
581

582
aggregate_rows(DataList, InitAcc) ->
583
    lists:foldr(
18✔
584
        fun(
585
            #{
586
                timestamp := Timestamp,
587
                measurement := Measurement,
588
                data_type := DataType,
589
                value := Data
590
            },
591
            #{
592
                timestamps := AccTs,
593
                measurements := AccM,
594
                dtypes := AccDt,
595
                values := AccV
596
            } = Acc
597
        ) ->
598
            Timestamps = [Timestamp | AccTs],
76✔
599
            case index_of(Measurement, AccM) of
76✔
600
                0 ->
601
                    Acc#{
48✔
602
                        timestamps => Timestamps,
603
                        values => [pad_value(Data, length(AccTs)) | pad_existing_values(AccV)],
604
                        measurements => [Measurement | AccM],
605
                        dtypes => [DataType | AccDt]
606
                    };
607
                Index ->
608
                    Acc#{
28✔
609
                        timestamps => Timestamps,
610
                        values => insert_value(Index, Data, AccV),
611
                        measurements => AccM,
612
                        dtypes => AccDt
613
                    }
614
            end
615
        end,
616
        InitAcc,
617
        DataList
618
    ).
619

620
pad_value(Data, N) ->
621
    [Data | lists:duplicate(N, null)].
48✔
622

623
pad_existing_values(Values) ->
624
    [[null | Value] || Value <- Values].
48✔
625

626
index_of(E, List) ->
627
    string:str(List, [E]).
76✔
628

629
insert_value(_Index, _Data, []) ->
630
    [];
28✔
631
insert_value(1, Data, [Value | Values]) ->
632
    [[Data | Value] | insert_value(0, Data, Values)];
28✔
633
insert_value(Index, Data, [Value | Values]) ->
634
    [[null | Value] | insert_value(Index - 1, Data, Values)].
10✔
635

636
iotdb_field_key(is_aligned, ?VSN_1_1_X) ->
637
    <<"is_aligned">>;
9✔
638
iotdb_field_key(is_aligned, ?VSN_1_0_X) ->
UNCOV
639
    <<"is_aligned">>;
×
640
iotdb_field_key(is_aligned, ?VSN_0_13_X) ->
641
    <<"isAligned">>;
9✔
642
iotdb_field_key(device_id, ?VSN_1_1_X) ->
643
    <<"device">>;
9✔
644
iotdb_field_key(device_id, ?VSN_1_0_X) ->
UNCOV
645
    <<"device">>;
×
646
iotdb_field_key(device_id, ?VSN_0_13_X) ->
647
    <<"deviceId">>;
9✔
648
iotdb_field_key(data_types, ?VSN_1_1_X) ->
649
    <<"data_types">>;
9✔
650
iotdb_field_key(data_types, ?VSN_1_0_X) ->
UNCOV
651
    <<"data_types">>;
×
652
iotdb_field_key(data_types, ?VSN_0_13_X) ->
653
    <<"dataTypes">>.
9✔
654

655
to_list(List) when is_list(List) -> List;
2✔
656
to_list(Data) -> [Data].
30✔
657

658
%% If device_id is missing from the channel data, try to find it from the payload
659
device_id(Message, Payloads, Channel) ->
660
    case maps:get(device_id, Channel, []) of
32✔
661
        [] ->
662
            maps:get(<<"device_id">>, hd(Payloads), undefined);
28✔
663
        DeviceIdTkn ->
664
            emqx_placeholder:proc_tmpl(DeviceIdTkn, Message)
4✔
665
    end.
666

667
handle_response({ok, 200, _Headers, Body} = Resp) ->
668
    eval_response_body(Body, Resp);
18✔
669
handle_response({ok, 200, Body} = Resp) ->
UNCOV
670
    eval_response_body(Body, Resp);
×
671
handle_response({ok, Code, _Headers, Body}) ->
UNCOV
672
    {error, #{code => Code, body => Body}};
×
673
handle_response({ok, Code, Body}) ->
UNCOV
674
    {error, #{code => Code, body => Body}};
×
675
handle_response({error, _} = Error) ->
UNCOV
676
    Error.
×
677

678
eval_response_body(Body, Resp) ->
679
    case emqx_utils_json:decode(Body) of
18✔
680
        #{<<"code">> := 200} -> Resp;
18✔
UNCOV
681
        Reason -> {error, Reason}
×
682
    end.
683

684
preproc_data_template(DataList) ->
685
    Atom2Bin = fun
44✔
686
        (Atom) when is_atom(Atom) ->
687
            erlang:atom_to_binary(Atom);
2✔
688
        (Bin) ->
689
            Bin
2✔
690
    end,
691
    lists:map(
44✔
692
        fun(
693
            #{
694
                timestamp := Timestamp,
695
                measurement := Measurement,
696
                data_type := DataType,
697
                value := Value
698
            }
699
        ) ->
700
            #{
2✔
701
                timestamp => emqx_placeholder:preproc_tmpl(Atom2Bin(Timestamp)),
702
                measurement => emqx_placeholder:preproc_tmpl(Measurement),
703
                data_type => emqx_placeholder:preproc_tmpl(Atom2Bin(DataType)),
704
                value => emqx_placeholder:preproc_tmpl(Value)
705
            }
706
        end,
707
        DataList
708
    ).
709

710
try_render_message({ChannelId, Msg}, IoTDBVsn, Channels) ->
711
    case maps:find(ChannelId, Channels) of
32✔
712
        {ok, Channel} ->
713
            render_channel_message(Channel, IoTDBVsn, Msg);
32✔
714
        _ ->
UNCOV
715
            {error, {unrecoverable_error, {invalid_channel_id, ChannelId}}}
×
716
    end.
717

718
render_channel_message(#{is_aligned := IsAligned} = Channel, IoTDBVsn, Message) ->
719
    Payloads = to_list(parse_payload(get_payload(Message))),
32✔
720
    case device_id(Message, Payloads, Channel) of
32✔
721
        undefined ->
722
            {error, device_id_missing};
4✔
723
        DeviceId ->
724
            case get_data_template(Channel, Payloads) of
28✔
725
                [] ->
726
                    {error, invalid_template};
4✔
727
                DataTemplate ->
728
                    case proc_data(DataTemplate, Message) of
24✔
729
                        {ok, DataList} ->
730
                            make_iotdb_insert_request(DataList, IsAligned, DeviceId, IoTDBVsn);
18✔
731
                        Error ->
732
                            Error
6✔
733
                    end
734
            end
735
    end.
736

737
%% Get the message template.
738
%% In order to be compatible with 4.4, the template version has higher priority
739
%% This is a template, using it
740
get_data_template(#{data := Data}, _Payloads) when Data =/= [] ->
741
    Data;
2✔
742
%% This is a self-describing message
743
get_data_template(#{data := []}, Payloads) ->
744
    preproc_data_list(Payloads).
26✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc