• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx / 8628139215

10 Apr 2024 08:18AM UTC coverage: 62.44% (-0.05%) from 62.489%
8628139215

push

github

web-flow
Merge pull request #12851 from zmstone/0327-feat-add-emqx_variform

emqx_variform for string substitution and transform

206 of 238 new or added lines in 3 files covered. (86.55%)

28 existing lines in 16 files now uncovered.

34895 of 55886 relevant lines covered (62.44%)

6585.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.09
/apps/emqx_ft/src/emqx_ft_storage_fs.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
%% Filesystem storage backend
18
%%
19
%% NOTE
20
%% If you plan to change storage layout please consult `emqx_ft_storage_fs_gc`
21
%% to see how much it would break or impair GC.
22

23
-module(emqx_ft_storage_fs).
24

25
-behaviour(emqx_ft_storage).
26

27
-include_lib("emqx/include/logger.hrl").
28
-include_lib("snabbkaffe/include/trace.hrl").
29

30
-export([child_spec/1]).
31

32
% Segments-related API
33
-export([store_filemeta/3]).
34
-export([store_segment/3]).
35
-export([read_filemeta/2]).
36
-export([list/3]).
37
-export([pread/5]).
38
-export([lookup_local_assembler/1]).
39
-export([assemble/4]).
40

41
-export([transfers/1]).
42

43
% GC API
44
% TODO: This is quickly becomes hairy.
45
-export([get_root/1]).
46
-export([get_subdir/2]).
47
-export([get_subdir/3]).
48

49
-export([files/2]).
50

51
-export([start/1]).
52
-export([stop/1]).
53
-export([update_config/2]).
54

55
-export_type([storage/0]).
56
-export_type([filefrag/1]).
57
-export_type([filefrag/0]).
58
-export_type([transferinfo/0]).
59
-export_type([segmentinfo/0]).
60

61
-export_type([file_error/0]).
62

63
-type transfer() :: emqx_ft:transfer().
64
-type offset() :: emqx_ft:offset().
65
-type filemeta() :: emqx_ft:filemeta().
66
-type segment() :: emqx_ft:segment().
67

68
-type segmentinfo() :: #{
69
    offset := offset(),
70
    size := _Bytes :: non_neg_integer()
71
}.
72

73
-type transferinfo() :: #{
74
    filemeta => filemeta()
75
}.
76

77
% TODO naming
78
-type filefrag(T) :: #{
79
    path := file:name(),
80
    timestamp := emqx_utils_calendar:epoch_second(),
81
    size := _Bytes :: non_neg_integer(),
82
    fragment := T
83
}.
84

85
-type filefrag() :: filefrag(
86
    {filemeta, filemeta()}
87
    | {segment, segmentinfo()}
88
).
89

90
-define(FRAGDIR, frags).
91
-define(TEMPDIR, tmp).
92
-define(MANIFEST, "MANIFEST.json").
93
-define(SEGMENT, "SEG").
94

95
-type segments() :: #{
96
    root := file:name(),
97
    gc := #{
98
        interval := non_neg_integer(),
99
        maximum_segments_ttl := non_neg_integer(),
100
        minimum_segments_ttl := non_neg_integer()
101
    }
102
}.
103

104
-type storage() :: #{
105
    type := 'local',
106
    enable := true,
107
    segments := segments(),
108
    exporter := emqx_ft_storage_exporter:exporter_conf()
109
}.
110

111
-type file_error() ::
112
    file:posix()
113
    %% Filename is incompatible with the backing filesystem.
114
    | badarg
115
    %% System limit (e.g. number of ports) reached.
116
    | system_limit.
117

118
%% Related resources childspecs
119
-spec child_spec(storage()) ->
120
    [supervisor:child_spec()].
121
child_spec(Storage) ->
122
    [
68✔
123
        #{
124
            id => emqx_ft_storage_fs_gc,
125
            start => {emqx_ft_storage_fs_gc, start_link, [Storage]},
126
            restart => permanent
127
        }
128
    ].
129

130
%% Store manifest in the backing filesystem.
131
%% Atomic operation.
132
-spec store_filemeta(storage(), transfer(), filemeta()) ->
133
    % Quota? Some lower level errors?
134
    ok | {error, conflict} | {error, file_error()}.
135
store_filemeta(Storage, Transfer, Meta) ->
136
    Filepath = mk_filepath(Storage, Transfer, get_subdirs_for(fragment), ?MANIFEST),
1,135✔
137
    case read_file(Filepath, fun decode_filemeta/1) of
1,135✔
138
        {ok, Meta} ->
139
            _ = touch_file(Filepath),
×
140
            ok;
×
141
        {ok, Conflict} ->
142
            ?SLOG(warning, #{
2✔
143
                msg => "filemeta_conflict", transfer => Transfer, new => Meta, old => Conflict
144
            }),
×
145
            % TODO
146
            % We won't see conflicts in case of concurrent `store_filemeta`
147
            % requests. It's rather odd scenario so it's fine not to worry
148
            % about it too much now.
149
            {error, filemeta_conflict};
2✔
150
        {error, Reason} when Reason =:= notfound; Reason =:= corrupted; Reason =:= enoent ->
151
            write_file_atomic(Storage, Transfer, Filepath, encode_filemeta(Meta));
1,133✔
152
        {error, _} = Error ->
153
            Error
×
154
    end.
155

156
%% Store a segment in the backing filesystem.
157
%% Atomic operation.
158
-spec store_segment(storage(), transfer(), segment()) ->
159
    % Where is the checksum gets verified? Upper level probably.
160
    % Quota? Some lower level errors?
161
    ok | {error, file_error()}.
162
store_segment(Storage, Transfer, Segment = {_Offset, Content}) ->
163
    Filename = mk_segment_filename(Segment),
1,341✔
164
    Filepath = mk_filepath(Storage, Transfer, get_subdirs_for(fragment), Filename),
1,341✔
165
    write_file_atomic(Storage, Transfer, Filepath, Content).
1,341✔
166

167
-spec read_filemeta(storage(), transfer()) ->
168
    {ok, filemeta()} | {error, corrupted} | {error, file_error()}.
169
read_filemeta(Storage, Transfer) ->
170
    Filepath = mk_filepath(Storage, Transfer, get_subdirs_for(fragment), ?MANIFEST),
38✔
171
    read_file(Filepath, fun decode_filemeta/1).
38✔
172

173
-spec list(storage(), transfer(), _What :: fragment) ->
174
    % Some lower level errors? {error, notfound}?
175
    % Result will contain zero or only one filemeta.
176
    {ok, [filefrag({filemeta, filemeta()} | {segment, segmentinfo()})]}
177
    | {error, file_error()}.
178
list(Storage, Transfer, What = fragment) ->
179
    Dirname = mk_filedir(Storage, Transfer, get_subdirs_for(What)),
1,145✔
180
    case file:list_dir(Dirname) of
1,145✔
181
        {ok, Filenames} ->
182
            % TODO
183
            % In case of `What = result` there might be more than one file (though
184
            % extremely bad luck is needed for that, e.g. concurrent assemblers with
185
            % different filemetas from different nodes). This might be unexpected for a
186
            % client given the current protocol, yet might be helpful in the future.
187
            {ok, filtermap_files(fun mk_filefrag/2, Dirname, Filenames)};
1,132✔
188
        {error, enoent} ->
189
            {ok, []};
13✔
190
        {error, _} = Error ->
191
            Error
×
192
    end.
193

194
-spec pread(storage(), transfer(), filefrag(), offset(), _Size :: non_neg_integer()) ->
195
    {ok, _Content :: iodata()} | {error, eof} | {error, file_error()}.
196
pread(_Storage, _Transfer, Frag, Offset, Size) ->
197
    Filepath = maps:get(path, Frag),
1,199✔
198
    case file:open(Filepath, [read, raw, binary]) of
1,199✔
199
        {ok, IoDevice} ->
200
            % NOTE
201
            % Reading empty file is always `eof`.
202
            Read = file:pread(IoDevice, Offset, Size),
1,199✔
203
            ok = file:close(IoDevice),
1,199✔
204
            case Read of
1,199✔
205
                {ok, Content} ->
206
                    {ok, Content};
1,199✔
207
                eof ->
208
                    {error, eof};
×
209
                {error, Reason} ->
210
                    {error, Reason}
×
211
            end;
212
        {error, Reason} ->
213
            {error, Reason}
×
214
    end.
215

216
-spec assemble(storage(), transfer(), emqx_ft:bytes(), emqx_ft:finopts()) ->
217
    {async, _Assembler :: pid()} | ok | {error, _TODO}.
218
assemble(Storage, Transfer, Size, Opts) ->
219
    LookupSources = [
1,149✔
220
        fun() -> lookup_local_assembler(Transfer) end,
1,149✔
221
        fun() -> lookup_remote_assembler(Transfer) end,
1,149✔
222
        fun() -> check_if_already_exported(Storage, Transfer) end,
1,149✔
223
        fun() -> ensure_local_assembler(Storage, Transfer, Size, Opts) end
1,129✔
224
    ],
225
    lookup_assembler(LookupSources).
1,149✔
226

227
%%
228

229
files(Storage, Query) ->
230
    emqx_ft_storage_exporter:list(Storage, Query).
1,277✔
231

232
%%
233

234
update_config(StorageOld, StorageNew) ->
235
    % NOTE: this will reset GC timer, frequent changes would postpone GC indefinitely
236
    ok = emqx_ft_storage_fs_gc:reset(StorageNew),
21✔
237
    emqx_ft_storage_exporter:update_config(StorageOld, StorageNew).
21✔
238

239
start(Storage) ->
240
    ok = lists:foreach(
38✔
241
        fun(ChildSpec) ->
242
            {ok, _Child} = supervisor:start_child(emqx_ft_sup, ChildSpec)
38✔
243
        end,
244
        child_spec(Storage)
245
    ),
246
    ok = emqx_ft_storage_exporter:update_config(undefined, Storage),
38✔
247
    ok.
38✔
248

249
stop(Storage) ->
250
    ok = emqx_ft_storage_exporter:update_config(Storage, undefined),
30✔
251
    ok = lists:foreach(
30✔
252
        fun(#{id := ChildId}) ->
253
            _ = supervisor:terminate_child(emqx_ft_sup, ChildId),
30✔
254
            ok = supervisor:delete_child(emqx_ft_sup, ChildId)
30✔
255
        end,
256
        child_spec(Storage)
257
    ),
258
    ok.
30✔
259

260
%%
261

262
lookup_assembler([LastSource]) ->
263
    LastSource();
1,129✔
264
lookup_assembler([Source | Sources]) ->
265
    case Source() of
3,447✔
266
        {error, not_found} -> lookup_assembler(Sources);
3,427✔
267
        Result -> Result
20✔
268
    end.
269

270
check_if_already_exported(Storage, Transfer) ->
271
    case files(Storage, #{transfer => Transfer}) of
1,149✔
272
        {ok, #{items := [_ | _]}} ->
273
            % NOTE: we don't know coverage here, let's just clean up locally.
274
            _ = emqx_ft_storage_fs_gc:collect(Storage, Transfer, [node()]),
20✔
275
            ok;
20✔
276
        _ ->
277
            {error, not_found}
1,129✔
278
    end.
279

280
lookup_local_assembler(Transfer) ->
281
    case emqx_ft_assembler:where(Transfer) of
1,149✔
UNCOV
282
        Pid when is_pid(Pid) -> {async, Pid};
×
283
        _ -> {error, not_found}
1,149✔
284
    end.
285

286
lookup_remote_assembler(Transfer) ->
287
    Nodes = emqx:running_nodes() -- [node()],
1,149✔
288
    Assemblers = lists:flatmap(
1,149✔
289
        fun
290
            ({ok, {async, Pid}}) -> [Pid];
×
291
            (_) -> []
106✔
292
        end,
293
        emqx_ft_storage_fs_proto_v1:list_assemblers(Nodes, Transfer)
294
    ),
295
    case Assemblers of
1,149✔
296
        [Pid | _] -> {async, Pid};
×
297
        _ -> {error, not_found}
1,149✔
298
    end.
299

300
ensure_local_assembler(Storage, Transfer, Size, Opts) ->
301
    {ok, Pid} = emqx_ft_assembler_sup:ensure_child(Storage, Transfer, Size, Opts),
1,129✔
302
    {async, Pid}.
1,129✔
303

304
-spec transfers(storage()) ->
305
    {ok, #{transfer() => transferinfo()}}.
306
transfers(Storage) ->
307
    % TODO `Continuation`
308
    % There might be millions of transfers on the node, we need a protocol and
309
    % storage schema to iterate through them effectively.
310
    ClientIds = try_list_dir(get_root(Storage)),
18✔
311
    {ok,
18✔
312
        lists:foldl(
313
            fun(ClientId, Acc) -> transfers(Storage, ClientId, Acc) end,
38✔
314
            #{},
315
            ClientIds
316
        )}.
317

318
transfers(Storage, ClientId, AccIn) ->
319
    Dirname = filename:join(get_root(Storage), ClientId),
38✔
320
    case file:list_dir(Dirname) of
38✔
321
        {ok, FileIds} ->
322
            lists:foldl(
38✔
323
                fun(FileId, Acc) ->
324
                    Transfer = dirnames_to_transfer(ClientId, FileId),
38✔
325
                    read_transferinfo(Storage, Transfer, Acc)
38✔
326
                end,
327
                AccIn,
328
                FileIds
329
            );
330
        {error, _Reason} ->
331
            ?tp(warning, "list_dir_failed", #{
×
332
                storage => Storage,
333
                directory => Dirname
334
            }),
335
            AccIn
×
336
    end.
337

338
read_transferinfo(Storage, Transfer, Acc) ->
339
    case read_filemeta(Storage, Transfer) of
38✔
340
        {ok, Filemeta} ->
341
            Acc#{Transfer => #{filemeta => Filemeta}};
23✔
342
        {error, enoent} ->
343
            Acc#{Transfer => #{}};
15✔
344
        {error, Reason} ->
345
            ?tp(warning, "read_transferinfo_failed", #{
×
346
                storage => Storage,
347
                transfer => Transfer,
348
                reason => Reason
349
            }),
350
            Acc
×
351
    end.
352

353
-spec get_root(storage()) ->
354
    file:name().
355
get_root(Storage) ->
356
    case emqx_utils_maps:deep_find([segments, root], Storage) of
8,493✔
357
        {ok, Root} ->
358
            Root;
8,204✔
359
        {not_found, _, _} ->
360
            filename:join([emqx:data_dir(), file_transfer, segments])
289✔
361
    end.
362

363
-spec get_subdir(storage(), transfer()) ->
364
    file:name().
365
get_subdir(Storage, Transfer) ->
366
    mk_filedir(Storage, Transfer, []).
21✔
367

368
-spec get_subdir(storage(), transfer(), fragment | temporary) ->
369
    file:name().
370
get_subdir(Storage, Transfer, What) ->
371
    mk_filedir(Storage, Transfer, get_subdirs_for(What)).
4,750✔
372

373
get_subdirs_for(fragment) ->
374
    [?FRAGDIR];
4,798✔
375
get_subdirs_for(temporary) ->
376
    [?TEMPDIR].
3,611✔
377

378
-define(PRELUDE(Vsn, Meta), [<<"filemeta">>, Vsn, Meta]).
379

380
encode_filemeta(Meta) ->
381
    emqx_utils_json:encode(?PRELUDE(_Vsn = 1, emqx_ft:encode_filemeta(Meta))).
1,133✔
382

383
decode_filemeta(Binary) when is_binary(Binary) ->
384
    ?PRELUDE(_Vsn = 1, Map) = emqx_utils_json:decode(Binary, [return_maps]),
1,153✔
385
    case emqx_ft:decode_filemeta(Map) of
1,153✔
386
        {ok, Meta} ->
387
            Meta;
1,153✔
388
        {error, Reason} ->
389
            error(Reason)
×
390
    end.
391

392
mk_segment_filename({Offset, Content}) ->
393
    lists:concat([?SEGMENT, ".", Offset, ".", byte_size(Content)]).
1,341✔
394

395
break_segment_filename(Filename) ->
396
    Regex = "^" ?SEGMENT "[.]([0-9]+)[.]([0-9]+)$",
1,217✔
397
    Result = re:run(Filename, Regex, [{capture, all_but_first, list}]),
1,217✔
398
    case Result of
1,217✔
399
        {match, [Offset, Size]} ->
400
            {ok, #{offset => list_to_integer(Offset), size => list_to_integer(Size)}};
1,217✔
401
        nomatch ->
402
            {error, invalid}
×
403
    end.
404

405
mk_filedir(Storage, {ClientId, FileId}, SubDirs) ->
406
    filename:join([
8,430✔
407
        get_root(Storage),
408
        emqx_ft_fs_util:escape_filename(ClientId),
409
        emqx_ft_fs_util:escape_filename(FileId)
410
        | SubDirs
411
    ]).
412

413
dirnames_to_transfer(ClientId, FileId) ->
414
    {emqx_ft_fs_util:unescape_filename(ClientId), emqx_ft_fs_util:unescape_filename(FileId)}.
38✔
415

416
mk_filepath(Storage, Transfer, SubDirs, Filename) ->
417
    filename:join(mk_filedir(Storage, Transfer, SubDirs), Filename).
2,514✔
418

419
try_list_dir(Dirname) ->
420
    case file:list_dir(Dirname) of
18✔
421
        {ok, List} -> List;
14✔
422
        {error, _} -> []
4✔
423
    end.
424

425
-include_lib("kernel/include/file.hrl").
426

427
read_file(Filepath, DecodeFun) ->
428
    emqx_ft_fs_util:read_decode_file(Filepath, DecodeFun).
2,301✔
429

430
write_file_atomic(Storage, Transfer, Filepath, Content) when is_binary(Content) ->
431
    TempFilepath = mk_temp_filepath(Storage, Transfer, filename:basename(Filepath)),
2,474✔
432
    Result = emqx_utils:pipeline(
2,474✔
433
        [
434
            fun filelib:ensure_dir/1,
435
            fun write_contents/2,
436
            fun(_) -> mv_temp_file(TempFilepath, Filepath) end
2,474✔
437
        ],
438
        TempFilepath,
439
        Content
440
    ),
441
    case Result of
2,474✔
442
        {ok, _, _} ->
443
            _ = file:delete(TempFilepath),
2,474✔
444
            ok;
2,474✔
445
        {error, Reason, _} ->
446
            {error, Reason}
×
447
    end.
448

449
mk_temp_filepath(Storage, Transfer, Filename) ->
450
    TempFilename = emqx_ft_fs_util:mk_temp_filename(Filename),
2,474✔
451
    filename:join(get_subdir(Storage, Transfer, temporary), TempFilename).
2,474✔
452

453
write_contents(Filepath, Content) ->
454
    file:write_file(Filepath, Content).
2,474✔
455

456
mv_temp_file(TempFilepath, Filepath) ->
457
    _ = filelib:ensure_dir(Filepath),
2,474✔
458
    file:rename(TempFilepath, Filepath).
2,474✔
459

460
touch_file(Filepath) ->
461
    Now = erlang:localtime(),
×
462
    file:change_time(Filepath, _Mtime = Now, _Atime = Now).
×
463

464
filtermap_files(Fun, Dirname, Filenames) ->
465
    lists:filtermap(fun(Filename) -> Fun(Dirname, Filename) end, Filenames).
1,132✔
466

467
mk_filefrag(Dirname, Filename = ?MANIFEST) ->
468
    mk_filefrag(Dirname, Filename, filemeta, fun read_frag_filemeta/2);
1,128✔
469
mk_filefrag(Dirname, Filename = ?SEGMENT ++ _) ->
470
    mk_filefrag(Dirname, Filename, segment, fun read_frag_segmentinfo/2);
1,217✔
471
mk_filefrag(_Dirname, _Filename) ->
472
    ?tp(warning, "rogue_file_found", #{
×
473
        directory => _Dirname,
474
        filename => _Filename
475
    }),
476
    false.
×
477

478
mk_filefrag(Dirname, Filename, Tag, Fun) ->
479
    Filepath = filename:join(Dirname, Filename),
2,345✔
480
    % TODO error handling?
481
    {ok, Fileinfo} = file:read_file_info(Filepath),
2,345✔
482
    case Fun(Filename, Filepath) of
2,345✔
483
        {ok, Frag} ->
484
            {true, #{
2,345✔
485
                path => Filepath,
486
                timestamp => Fileinfo#file_info.mtime,
487
                size => Fileinfo#file_info.size,
488
                fragment => {Tag, Frag}
489
            }};
490
        {error, _Reason} ->
491
            ?tp(warning, "mk_filefrag_failed", #{
×
492
                directory => Dirname,
493
                filename => Filename,
494
                type => Tag,
495
                reason => _Reason
496
            }),
497
            false
×
498
    end.
499

500
read_frag_filemeta(_Filename, Filepath) ->
501
    read_file(Filepath, fun decode_filemeta/1).
1,128✔
502

503
read_frag_segmentinfo(Filename, _Filepath) ->
504
    break_segment_filename(Filename).
1,217✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc