• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx / 8644497754

11 Apr 2024 09:24AM UTC coverage: 62.388% (-0.05%) from 62.44%
8644497754

push

github

web-flow
Merge pull request #12858 from zmstone/0410-fix-variform-number-handling

fix(variform): allow numbers to be numbers

2 of 3 new or added lines in 1 file covered. (66.67%)

67 existing lines in 12 files now uncovered.

34873 of 55897 relevant lines covered (62.39%)

6476.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.66
/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%--------------------------------------------------------------------
4
-module(emqx_bridge_greptimedb_connector).
5

6
-include_lib("emqx_connector/include/emqx_connector.hrl").
7
-include_lib("emqx_resource/include/emqx_resource.hrl").
8
-include_lib("hocon/include/hoconsc.hrl").
9
-include_lib("typerefl/include/types.hrl").
10
-include_lib("emqx/include/logger.hrl").
11
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
12

13
-import(hoconsc, [mk/2, enum/1, ref/2]).
14

15
-behaviour(emqx_resource).
16

17
%% callbacks of behaviour emqx_resource
18
-export([
19
    callback_mode/0,
20
    on_start/2,
21
    on_stop/2,
22
    on_add_channel/4,
23
    on_remove_channel/3,
24
    on_get_channel_status/3,
25
    on_get_channels/1,
26
    on_query/3,
27
    on_batch_query/3,
28
    on_query_async/4,
29
    on_batch_query_async/4,
30
    on_get_status/2
31
]).
32
-export([reply_callback/2]).
33

34
-export([
35
    roots/0,
36
    namespace/0,
37
    fields/1,
38
    desc/1
39
]).
40

41
-export([precision_field/0]).
42

43
%% only for test
44
-ifdef(TEST).
45
-export([is_unrecoverable_error/1]).
46
-endif.
47

48
-type ts_precision() :: ns | us | ms | s.
49

50
%% Allocatable resources
51
-define(greptime_client, greptime_client).
52

53
-define(GREPTIMEDB_DEFAULT_PORT, 4001).
54

55
-define(DEFAULT_DB, <<"public">>).
56

57
-define(GREPTIMEDB_HOST_OPTIONS, #{
58
    default_port => ?GREPTIMEDB_DEFAULT_PORT
59
}).
60

61
-define(DEFAULT_TIMESTAMP_TMPL, "${timestamp}").
62

63
-define(AUTO_RECONNECT_S, 1).
64

65
-define(CONNECT_TIMEOUT, 5_000).
66

67
%% -------------------------------------------------------------------------------------------------
68
%% resource callback
69
callback_mode() -> async_if_possible.
65✔
70

71
on_add_channel(
72
    _InstanceId,
73
    #{channels := Channels} = OldState,
74
    ChannelId,
75
    #{parameters := Parameters} = ChannelConfig0
76
) ->
77
    #{write_syntax := WriteSyntaxTmpl} = Parameters,
54✔
78
    Precision = maps:get(precision, Parameters, ms),
54✔
79
    ChannelConfig = maps:merge(
54✔
80
        Parameters,
81
        ChannelConfig0#{
82
            precision => Precision,
83
            write_syntax => to_config(WriteSyntaxTmpl, Precision)
84
        }
85
    ),
86
    {ok, OldState#{
54✔
87
        channels => Channels#{ChannelId => ChannelConfig}
88
    }}.
89

90
on_remove_channel(_InstanceId, #{channels := Channels} = State, ChannelId) ->
91
    NewState = State#{channels => maps:remove(ChannelId, Channels)},
55✔
92
    {ok, NewState}.
55✔
93

94
on_get_channel_status(InstanceId, _ChannelId, State) ->
95
    case on_get_status(InstanceId, State) of
138✔
96
        ?status_connected -> ?status_connected;
126✔
97
        _ -> ?status_connecting
12✔
98
    end.
99

100
on_get_channels(InstanceId) ->
101
    emqx_bridge_v2:get_channels_for_connector(InstanceId).
388✔
102

103
on_start(InstId, Config) ->
104
    %% InstID as pool would be handled by greptimedb client
105
    %% so there is no need to allocate pool_name here
106
    %% See: greptimedb:start_client/1
107
    start_client(InstId, Config).
70✔
108

109
on_stop(InstId, _State) ->
110
    case emqx_resource:get_allocated_resources(InstId) of
55✔
111
        #{?greptime_client := Client} ->
112
            Res = greptimedb:stop_client(Client),
54✔
113
            ?tp(greptimedb_client_stopped, #{instance_id => InstId}),
54✔
114
            Res;
54✔
115
        _ ->
116
            ok
1✔
117
    end.
118

119
on_query(InstId, {Channel, Message}, State) ->
UNCOV
120
    #{
×
121
        channels := #{Channel := #{write_syntax := SyntaxLines}},
122
        client := Client,
123
        dbname := DbName
124
    } = State,
UNCOV
125
    case data_to_points(Message, DbName, SyntaxLines) of
×
126
        {ok, Points} ->
UNCOV
127
            ?tp(
×
128
                greptimedb_connector_send_query,
129
                #{points => Points, batch => false, mode => sync}
130
            ),
UNCOV
131
            do_query(InstId, Client, Points);
×
132
        {error, ErrorPoints} ->
133
            ?tp(
×
134
                greptimedb_connector_send_query_error,
135
                #{batch => false, mode => sync, error => ErrorPoints}
136
            ),
137
            log_error_points(InstId, ErrorPoints),
×
138
            {error, ErrorPoints}
×
139
    end.
140

141
%% Once a Batched Data trans to points failed.
142
%% This batch query failed
143
on_batch_query(InstId, [{Channel, _} | _] = BatchData, State) ->
UNCOV
144
    #{
×
145
        channels := #{Channel := #{write_syntax := SyntaxLines}},
146
        client := Client,
147
        dbname := DbName
148
    } = State,
UNCOV
149
    case parse_batch_data(InstId, DbName, BatchData, SyntaxLines) of
×
150
        {ok, Points} ->
UNCOV
151
            ?tp(
×
152
                greptimedb_connector_send_query,
153
                #{points => Points, batch => true, mode => sync}
154
            ),
UNCOV
155
            do_query(InstId, Client, Points);
×
156
        {error, Reason} ->
157
            ?tp(
×
158
                greptimedb_connector_send_query_error,
159
                #{batch => true, mode => sync, error => Reason}
160
            ),
161
            {error, {unrecoverable_error, Reason}}
×
162
    end.
163

164
on_query_async(InstId, {Channel, Message}, {ReplyFun, Args}, State) ->
165
    #{
34✔
166
        channels := #{Channel := #{write_syntax := SyntaxLines}},
167
        client := Client,
168
        dbname := DbName
169
    } = State,
170
    case data_to_points(Message, DbName, SyntaxLines) of
34✔
171
        {ok, Points} ->
172
            ?tp(
28✔
173
                greptimedb_connector_send_query,
174
                #{points => Points, batch => false, mode => async}
175
            ),
176
            do_async_query(InstId, Client, Points, {ReplyFun, Args});
28✔
177
        {error, ErrorPoints} = Err ->
178
            ?tp(
6✔
179
                greptimedb_connector_send_query_error,
180
                #{batch => false, mode => async, error => ErrorPoints}
181
            ),
182
            log_error_points(InstId, ErrorPoints),
6✔
183
            Err
6✔
184
    end.
185

186
on_batch_query_async(InstId, [{Channel, _} | _] = BatchData, {ReplyFun, Args}, State) ->
187
    #{
32✔
188
        channels := #{Channel := #{write_syntax := SyntaxLines}},
189
        client := Client,
190
        dbname := DbName
191
    } = State,
192
    case parse_batch_data(InstId, DbName, BatchData, SyntaxLines) of
32✔
193
        {ok, Points} ->
194
            ?tp(
26✔
195
                greptimedb_connector_send_query,
196
                #{points => Points, batch => true, mode => async}
197
            ),
198
            do_async_query(InstId, Client, Points, {ReplyFun, Args});
26✔
199
        {error, Reason} ->
200
            ?tp(
6✔
201
                greptimedb_connector_send_query_error,
202
                #{batch => true, mode => async, error => Reason}
203
            ),
204
            {error, {unrecoverable_error, Reason}}
6✔
205
    end.
206

207
on_get_status(_InstId, #{client := Client}) ->
208
    case greptimedb:is_alive(Client) of
332✔
209
        true ->
210
            ?status_connected;
316✔
211
        false ->
212
            ?status_disconnected
16✔
213
    end.
214

215
%% -------------------------------------------------------------------------------------------------
216
%% schema
217
namespace() -> connector_greptimedb.
1✔
218

219
roots() ->
220
    [
2✔
221
        {config, #{
222
            type => hoconsc:union(
223
                [
224
                    hoconsc:ref(?MODULE, greptimedb)
225
                ]
226
            )
227
        }}
228
    ].
229

230
fields("connector") ->
231
    [server_field()] ++
9,155✔
232
        credentials_fields() ++
233
        emqx_connector_schema_lib:ssl_fields();
234
%% ============ begin: schema for old bridge configs ============
235
fields(common) ->
236
    [
358✔
237
        server_field(),
238
        precision_field()
239
    ];
240
fields(greptimedb) ->
241
    fields(common) ++
358✔
242
        credentials_fields() ++
243
        emqx_connector_schema_lib:ssl_fields().
244
%% ============ end: schema for old bridge configs ============
245

246
desc(common) ->
247
    ?DESC("common");
1✔
248
desc(greptimedb) ->
249
    ?DESC("greptimedb").
2✔
250

251
precision_field() ->
252
    {precision,
952✔
253
        %% The greptimedb only supports these 4 precision
254
        mk(enum([ns, us, ms, s]), #{
255
            required => false, default => ms, desc => ?DESC("precision")
256
        })}.
257

258
server_field() ->
259
    {server, server()}.
9,513✔
260

261
server() ->
262
    Meta = #{
9,514✔
263
        required => false,
264
        default => <<"127.0.0.1:4001">>,
265
        desc => ?DESC("server"),
266
        converter => fun convert_server/2
267
    },
268
    emqx_schema:servers_sc(Meta, ?GREPTIMEDB_HOST_OPTIONS).
9,514✔
269

270
credentials_fields() ->
271
    [
9,513✔
272
        {dbname, mk(binary(), #{required => true, desc => ?DESC("dbname")})},
273
        {username, mk(binary(), #{desc => ?DESC("username")})},
274
        {password, emqx_schema_secret:mk(#{desc => ?DESC("password")})}
275
    ].
276

277
%% -------------------------------------------------------------------------------------------------
278
%% internal functions
279

280
start_client(InstId, Config) ->
281
    ClientConfig = client_config(InstId, Config),
70✔
282
    ?SLOG(info, #{
70✔
283
        msg => "starting_greptimedb_connector",
284
        connector => InstId,
285
        config => emqx_utils:redact(Config),
286
        client_config => emqx_utils:redact(ClientConfig)
287
    }),
70✔
288
    try do_start_client(InstId, ClientConfig, Config) of
70✔
289
        Res = {ok, #{client := Client}} ->
290
            ok = emqx_resource:allocate_resource(InstId, ?greptime_client, Client),
58✔
291
            Res;
58✔
292
        {error, Reason} ->
293
            {error, Reason}
8✔
294
    catch
295
        E:R:S ->
296
            ?tp(greptimedb_connector_start_exception, #{error => {E, R}}),
4✔
297
            ?SLOG(warning, #{
4✔
298
                msg => "start_greptimedb_connector_error",
299
                connector => InstId,
300
                error => E,
301
                reason => emqx_utils:redact(R),
302
                stack => emqx_utils:redact(S)
303
            }),
×
304
            {error, R}
4✔
305
    end.
306

307
do_start_client(
308
    InstId,
309
    ClientConfig,
310
    Config
311
) ->
312
    case greptimedb:start_client(ClientConfig) of
74✔
313
        {ok, Client} ->
314
            case greptimedb:is_alive(Client, true) of
62✔
315
                true ->
316
                    State = #{
58✔
317
                        client => Client,
318
                        dbname => proplists:get_value(dbname, ClientConfig, ?DEFAULT_DB),
319
                        channels => #{}
320
                    },
321
                    ?SLOG(info, #{
58✔
322
                        msg => "starting_greptimedb_connector_success",
323
                        connector => InstId,
324
                        client => redact_auth(Client),
325
                        state => redact_auth(State)
326
                    }),
58✔
327
                    {ok, State};
58✔
328
                {false, Reason} ->
329
                    ?tp(greptimedb_connector_start_failed, #{
4✔
330
                        error => greptimedb_client_not_alive, reason => Reason
331
                    }),
332
                    ?SLOG(warning, #{
4✔
333
                        msg => "failed_to_start_greptimedb_connector",
334
                        connector => InstId,
335
                        client => redact_auth(Client),
336
                        reason => Reason
337
                    }),
×
338
                    %% no leak
339
                    _ = greptimedb:stop_client(Client),
4✔
340
                    {error, greptimedb_client_not_alive}
4✔
341
            end;
342
        {error, {already_started, Client0}} ->
343
            ?tp(greptimedb_connector_start_already_started, #{}),
4✔
344
            ?SLOG(info, #{
4✔
345
                msg => "restarting_greptimedb_connector_found_already_started_client",
346
                connector => InstId,
347
                old_client => redact_auth(Client0)
348
            }),
4✔
349
            _ = greptimedb:stop_client(Client0),
4✔
350
            do_start_client(InstId, ClientConfig, Config);
4✔
351
        {error, Reason} ->
352
            ?tp(greptimedb_connector_start_failed, #{error => Reason}),
4✔
353
            ?SLOG(warning, #{
4✔
354
                msg => "failed_to_start_greptimedb_connector",
355
                connector => InstId,
356
                reason => Reason
357
            }),
×
358
            {error, Reason}
4✔
359
    end.
360

361
grpc_config() ->
362
    #{
70✔
363
        sync_start => true,
364
        connect_timeout => ?CONNECT_TIMEOUT
365
    }.
366

367
client_config(
368
    InstId,
369
    Config = #{
370
        server := Server
371
    }
372
) ->
373
    #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?GREPTIMEDB_HOST_OPTIONS),
70✔
374
    [
375
        {endpoints, [{http, str(Host), Port}]},
376
        {pool_size, erlang:system_info(schedulers)},
377
        {pool, InstId},
378
        {pool_type, random},
379
        {auto_reconnect, ?AUTO_RECONNECT_S},
380
        {gprc_options, grpc_config()}
381
    ] ++ protocol_config(Config).
70✔
382

383
protocol_config(
384
    #{
385
        dbname := DbName,
386
        ssl := SSL
387
    } = Config
388
) ->
389
    [
390
        {dbname, str(DbName)}
391
    ] ++ auth(Config) ++
70✔
392
        ssl_config(SSL).
393

394
ssl_config(#{enable := false}) ->
395
    [
70✔
396
        {https_enabled, false}
397
    ];
398
ssl_config(SSL = #{enable := true}) ->
399
    [
×
400
        {https_enabled, true},
401
        {transport, ssl},
402
        {transport_opts, emqx_tls_lib:to_client_opts(SSL)}
403
    ].
404

405
auth(#{username := Username, password := Password}) ->
406
    [
70✔
407
        %% TODO: teach `greptimedb` to accept 0-arity closures as passwords.
408
        {auth, {basic, #{username => str(Username), password => emqx_secret:unwrap(Password)}}}
409
    ];
410
auth(_) ->
411
    [].
×
412

413
redact_auth(Term) ->
414
    emqx_utils:redact(Term, fun is_auth_key/1).
4✔
415

416
is_auth_key(Key) when is_binary(Key) ->
417
    string:equal("authorization", Key, true);
2✔
418
is_auth_key(_) ->
419
    false.
61✔
420

421
%% -------------------------------------------------------------------------------------------------
422
%% Query
423
do_query(InstId, Client, Points) ->
UNCOV
424
    case greptimedb:write_batch(Client, Points) of
×
425
        {ok, #{response := {affected_rows, #{value := Rows}}}} ->
426
            ?SLOG(debug, #{
×
427
                msg => "greptimedb_write_point_success",
428
                connector => InstId,
429
                points => Points
430
            }),
×
431
            {ok, {affected_rows, Rows}};
×
432
        {error, {unauth, _, _}} ->
433
            ?tp(greptimedb_connector_do_query_failure, #{error => <<"authorization failure">>}),
×
434
            ?SLOG(error, #{
×
435
                msg => "greptimedb_authorization_failed",
436
                client => redact_auth(Client),
437
                connector => InstId
438
            }),
×
439
            {error, {unrecoverable_error, <<"authorization failure">>}};
×
440
        {error, Reason} = Err ->
UNCOV
441
            ?tp(greptimedb_connector_do_query_failure, #{error => Reason}),
×
UNCOV
442
            ?SLOG(error, #{
×
443
                msg => "greptimedb_write_point_failed",
444
                connector => InstId,
445
                reason => Reason
446
            }),
×
UNCOV
447
            case is_unrecoverable_error(Err) of
×
448
                true ->
449
                    {error, {unrecoverable_error, Reason}};
×
450
                false ->
UNCOV
451
                    {error, {recoverable_error, Reason}}
×
452
            end
453
    end.
454

455
do_async_query(InstId, Client, Points, ReplyFunAndArgs) ->
456
    ?SLOG(info, #{
54✔
457
        msg => "greptimedb_write_point_async",
458
        connector => InstId,
459
        points => Points
460
    }),
54✔
461
    WrappedReplyFunAndArgs = {fun ?MODULE:reply_callback/2, [ReplyFunAndArgs]},
54✔
462
    ok = greptimedb:async_write_batch(Client, Points, WrappedReplyFunAndArgs).
54✔
463

464
reply_callback(ReplyFunAndArgs, {error, {unauth, _, _}}) ->
465
    ?tp(greptimedb_connector_do_query_failure, #{error => <<"authorization failure">>}),
4✔
466
    Result = {error, {unrecoverable_error, <<"authorization failure">>}},
4✔
467
    emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result);
4✔
468
reply_callback(ReplyFunAndArgs, {error, Reason} = Error) ->
UNCOV
469
    case is_unrecoverable_error(Error) of
×
470
        true ->
471
            Result = {error, {unrecoverable_error, Reason}},
×
472
            emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result);
×
473
        false ->
UNCOV
474
            Result = {error, {recoverable_error, Reason}},
×
UNCOV
475
            emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result)
×
476
    end;
477
reply_callback(ReplyFunAndArgs, Result) ->
478
    emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result).
50✔
479

480
%% -------------------------------------------------------------------------------------------------
481
%% Tags & Fields Config Trans
482

483
to_config(Lines, Precision) ->
484
    to_config(Lines, [], Precision).
54✔
485

486
to_config([], Acc, _Precision) ->
487
    lists:reverse(Acc);
54✔
488
to_config([Item0 | Rest], Acc, Precision) ->
489
    Ts0 = maps:get(timestamp, Item0, ?DEFAULT_TIMESTAMP_TMPL),
54✔
490
    {Ts, FromPrecision, ToPrecision} = preproc_tmpl_timestamp(Ts0, Precision),
54✔
491
    Item = #{
54✔
492
        measurement => emqx_placeholder:preproc_tmpl(maps:get(measurement, Item0)),
493
        timestamp => Ts,
494
        precision => {FromPrecision, ToPrecision},
495
        tags => to_kv_config(maps:get(tags, Item0)),
496
        fields => to_kv_config(maps:get(fields, Item0))
497
    },
498
    to_config(Rest, [Item | Acc], Precision).
54✔
499

500
%% pre-process the timestamp template
501
%% returns a tuple of three elements:
502
%% 1. The timestamp template itself.
503
%% 2. The source timestamp precision (ms if the template ${timestamp} is used).
504
%% 3. The target timestamp precision (configured for the client).
505
preproc_tmpl_timestamp(undefined, Precision) ->
506
    %% not configured, we default it to the message timestamp
507
    preproc_tmpl_timestamp(?DEFAULT_TIMESTAMP_TMPL, Precision);
38✔
508
preproc_tmpl_timestamp(Ts, Precision) when is_integer(Ts) ->
509
    %% a const value is used which is very much unusual, but we have to add a special handling
510
    {Ts, Precision, Precision};
×
511
preproc_tmpl_timestamp(Ts, Precision) when is_list(Ts) ->
512
    preproc_tmpl_timestamp(iolist_to_binary(Ts), Precision);
54✔
513
preproc_tmpl_timestamp(<<?DEFAULT_TIMESTAMP_TMPL>> = Ts, Precision) ->
514
    {emqx_placeholder:preproc_tmpl(Ts), ms, Precision};
46✔
515
preproc_tmpl_timestamp(Ts, Precision) when is_binary(Ts) ->
516
    %% a placehold is in use. e.g. ${payload.my_timestamp}
517
    %% we can only hope it the value will be of the same precision in the configs
518
    {emqx_placeholder:preproc_tmpl(Ts), Precision, Precision}.
8✔
519

520
to_kv_config(KVfields) ->
521
    lists:foldl(
108✔
522
        fun({K, V}, Acc) -> to_maps_config(K, V, Acc) end,
328✔
523
        #{},
524
        KVfields
525
    ).
526

527
to_maps_config(K, V, Res) ->
528
    NK = emqx_placeholder:preproc_tmpl(bin(K)),
328✔
529
    NV = emqx_placeholder:preproc_tmpl(bin(V)),
328✔
530
    Res#{NK => NV}.
328✔
531

532
%% -------------------------------------------------------------------------------------------------
533
%% Tags & Fields Data Trans
534
parse_batch_data(InstId, DbName, BatchData, SyntaxLines) ->
535
    {Points, Errors} = lists:foldl(
32✔
536
        fun({_, Data}, {ListOfPoints, ErrAccIn}) ->
537
            case data_to_points(Data, DbName, SyntaxLines) of
32✔
538
                {ok, Points} ->
539
                    {[Points | ListOfPoints], ErrAccIn};
26✔
540
                {error, ErrorPoints} ->
541
                    log_error_points(InstId, ErrorPoints),
6✔
542
                    {ListOfPoints, ErrAccIn + 1}
6✔
543
            end
544
        end,
545
        {[], 0},
546
        BatchData
547
    ),
548
    case Errors of
32✔
549
        0 ->
550
            {ok, lists:flatten(Points)};
26✔
551
        _ ->
552
            ?SLOG(error, #{
6✔
553
                msg => "greptimedb_trans_point_failed",
554
                error_count => Errors,
555
                connector => InstId,
556
                reason => points_trans_failed
557
            }),
×
558
            {error, points_trans_failed}
6✔
559
    end.
560

561
-spec data_to_points(
562
    map(),
563
    binary(),
564
    [
565
        #{
566
            fields := [{binary(), binary()}],
567
            measurement := binary(),
568
            tags := [{binary(), binary()}],
569
            timestamp := emqx_placeholder:tmpl_token() | integer(),
570
            precision := {From :: ts_precision(), To :: ts_precision()}
571
        }
572
    ]
573
) -> {ok, [map()]} | {error, term()}.
574
data_to_points(Data, DbName, SyntaxLines) ->
575
    lines_to_points(Data, DbName, SyntaxLines, [], []).
66✔
576

577
%% When converting multiple rows data into Greptimedb Line Protocol, they are considered to be strongly correlated.
578
%% And once a row fails to convert, all of them are considered to have failed.
579
lines_to_points(_Data, _DbName, [], Points, ErrorPoints) ->
580
    case ErrorPoints of
66✔
581
        [] ->
582
            {ok, Points};
54✔
583
        _ ->
584
            %% ignore trans succeeded points
585
            {error, ErrorPoints}
12✔
586
    end;
587
lines_to_points(
588
    Data, DbName, [#{timestamp := Ts} = Item | Rest], ResultPointsAcc, ErrorPointsAcc
589
) when
590
    is_list(Ts)
591
->
592
    TransOptions = #{return => rawlist, var_trans => fun data_filter/1},
66✔
593
    case parse_timestamp(emqx_placeholder:proc_tmpl(Ts, Data, TransOptions)) of
66✔
594
        {ok, TsInt} ->
595
            Item1 = Item#{timestamp => TsInt},
62✔
596
            continue_lines_to_points(Data, DbName, Item1, Rest, ResultPointsAcc, ErrorPointsAcc);
62✔
597
        {error, BadTs} ->
598
            lines_to_points(Data, DbName, Rest, ResultPointsAcc, [
4✔
599
                {error, {bad_timestamp, BadTs}} | ErrorPointsAcc
600
            ])
601
    end;
602
lines_to_points(
603
    Data, DbName, [#{timestamp := Ts} = Item | Rest], ResultPointsAcc, ErrorPointsAcc
604
) when
605
    is_integer(Ts)
606
->
607
    continue_lines_to_points(Data, DbName, Item, Rest, ResultPointsAcc, ErrorPointsAcc).
×
608

609
parse_timestamp([TsInt]) when is_integer(TsInt) ->
610
    {ok, TsInt};
58✔
611
parse_timestamp([TsBin]) ->
612
    try
8✔
613
        {ok, binary_to_integer(TsBin)}
8✔
614
    catch
615
        _:_ ->
616
            {error, TsBin}
4✔
617
    end.
618

619
continue_lines_to_points(Data, DbName, Item, Rest, ResultPointsAcc, ErrorPointsAcc) ->
620
    case line_to_point(Data, DbName, Item) of
62✔
621
        {_, [#{fields := Fields}]} when map_size(Fields) =:= 0 ->
622
            %% greptimedb client doesn't like empty field maps...
623
            ErrorPointsAcc1 = [{error, no_fields} | ErrorPointsAcc],
8✔
624
            lines_to_points(Data, DbName, Rest, ResultPointsAcc, ErrorPointsAcc1);
8✔
625
        Point ->
626
            lines_to_points(Data, DbName, Rest, [Point | ResultPointsAcc], ErrorPointsAcc)
54✔
627
    end.
628

629
line_to_point(
630
    Data,
631
    DbName,
632
    #{
633
        measurement := Measurement,
634
        tags := Tags,
635
        fields := Fields,
636
        timestamp := Ts,
637
        precision := {_, ToPrecision} = Precision
638
    } = Item
639
) ->
640
    {_, EncodedTags} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Tags),
62✔
641
    {_, EncodedFields} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Fields),
62✔
642
    TableName = emqx_placeholder:proc_tmpl(Measurement, Data),
62✔
643
    Metric = #{dbname => DbName, table => TableName, timeunit => ToPrecision},
62✔
644
    {Metric, [
62✔
645
        maps:without([precision, measurement], Item#{
646
            tags => EncodedTags,
647
            fields => EncodedFields,
648
            timestamp => maybe_convert_time_unit(Ts, Precision)
649
        })
650
    ]}.
651

652
maybe_convert_time_unit(Ts, {FromPrecision, ToPrecision}) ->
653
    erlang:convert_time_unit(Ts, time_unit(FromPrecision), time_unit(ToPrecision)).
62✔
654

655
time_unit(s) -> second;
×
656
time_unit(ms) -> millisecond;
60✔
657
time_unit(us) -> microsecond;
×
658
time_unit(ns) -> nanosecond.
64✔
659

660
maps_config_to_data(K, V, {Data, Res}) ->
661
    KTransOptions = #{return => rawlist, var_trans => fun key_filter/1},
408✔
662
    VTransOptions = #{return => rawlist, var_trans => fun data_filter/1},
408✔
663
    NK0 = emqx_placeholder:proc_tmpl(K, Data, KTransOptions),
408✔
664
    NV = emqx_placeholder:proc_tmpl(V, Data, VTransOptions),
408✔
665
    case {NK0, NV} of
408✔
666
        {[undefined], _} ->
667
            {Data, Res};
48✔
668
        %% undefined value in normal format [undefined] or int/uint format [undefined, <<"i">>]
669
        {_, [undefined | _]} ->
670
            {Data, Res};
96✔
671
        _ ->
672
            NK = list_to_binary(NK0),
264✔
673
            {Data, Res#{NK => value_type(NV)}}
264✔
674
    end.
675

676
value_type([Int, <<"i">>]) when
677
    is_integer(Int)
678
->
679
    greptimedb_values:int64_value(Int);
52✔
680
value_type([UInt, <<"u">>]) when
681
    is_integer(UInt)
682
->
683
    greptimedb_values:uint64_value(UInt);
48✔
684
value_type([<<"t">>]) ->
685
    greptimedb_values:boolean_value(true);
4✔
686
value_type([<<"T">>]) ->
687
    greptimedb_values:boolean_value(true);
4✔
688
value_type([true]) ->
689
    greptimedb_values:boolean_value(true);
12✔
690
value_type([<<"TRUE">>]) ->
691
    greptimedb_values:boolean_value(true);
4✔
692
value_type([<<"True">>]) ->
693
    greptimedb_values:boolean_value(true);
4✔
694
value_type([<<"f">>]) ->
695
    greptimedb_values:boolean_value(false);
4✔
696
value_type([<<"F">>]) ->
697
    greptimedb_values:boolean_value(false);
4✔
698
value_type([false]) ->
699
    greptimedb_values:boolean_value(false);
4✔
700
value_type([<<"FALSE">>]) ->
701
    greptimedb_values:boolean_value(false);
4✔
702
value_type([<<"False">>]) ->
703
    greptimedb_values:boolean_value(false);
4✔
704
value_type([Float]) when is_float(Float) ->
705
    Float;
8✔
706
value_type(Val) ->
707
    #{values => #{string_values => Val}, datatype => 'STRING'}.
108✔
708

709
key_filter(undefined) -> undefined;
48✔
710
key_filter(Value) -> emqx_utils_conv:bin(Value).
48✔
711

712
data_filter(undefined) -> undefined;
96✔
713
data_filter(Int) when is_integer(Int) -> Int;
158✔
714
data_filter(Number) when is_number(Number) -> Number;
8✔
715
data_filter(Bool) when is_boolean(Bool) -> Bool;
16✔
716
data_filter(Data) -> bin(Data).
136✔
717

718
bin(Data) -> emqx_utils_conv:bin(Data).
792✔
719

720
%% helper funcs
721
log_error_points(InstId, Errs) ->
722
    lists:foreach(
12✔
723
        fun({error, Reason}) ->
724
            ?SLOG(error, #{
12✔
725
                msg => "greptimedb_trans_point_failed",
726
                connector => InstId,
727
                reason => Reason
728
            })
×
729
        end,
730
        Errs
731
    ).
732

733
convert_server(<<"http://", Server/binary>>, HoconOpts) ->
734
    convert_server(Server, HoconOpts);
×
735
convert_server(<<"https://", Server/binary>>, HoconOpts) ->
736
    convert_server(Server, HoconOpts);
×
737
convert_server(Server, HoconOpts) ->
738
    emqx_schema:convert_servers(Server, HoconOpts).
205✔
739

740
str(A) when is_atom(A) ->
741
    atom_to_list(A);
×
742
str(B) when is_binary(B) ->
743
    binary_to_list(B);
140✔
744
str(S) when is_list(S) ->
745
    S.
70✔
746

747
is_unrecoverable_error({error, {unrecoverable_error, _}}) ->
748
    true;
×
749
is_unrecoverable_error(_) ->
UNCOV
750
    false.
×
751

752
%%===================================================================
753
%% eunit tests
754
%%===================================================================
755

756
-ifdef(TEST).
757
-include_lib("eunit/include/eunit.hrl").
758

759
is_auth_key_test_() ->
760
    [
2✔
761
        ?_assert(is_auth_key(<<"Authorization">>)),
1✔
762
        ?_assertNot(is_auth_key(<<"Something">>)),
1✔
763
        ?_assertNot(is_auth_key(89))
1✔
764
    ].
765

766
%% for coverage
767
desc_test_() ->
768
    [
1✔
769
        ?_assertMatch(
1✔
770
            {desc, _, _},
1✔
771
            desc(common)
772
        ),
773
        ?_assertMatch(
1✔
774
            {desc, _, _},
1✔
775
            desc(greptimedb)
776
        ),
777
        ?_assertMatch(
1✔
778
            {desc, _, _},
1✔
779
            hocon_schema:field_schema(server(), desc)
780
        ),
781
        ?_assertMatch(
1✔
782
            connector_greptimedb,
1✔
783
            namespace()
784
        )
785
    ].
786
-endif.
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc