• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx / 8613439193

09 Apr 2024 09:25AM UTC coverage: 62.491% (-0.1%) from 62.636%
8613439193

push

github

web-flow
Merge pull request #12854 from id/0409-update-codeowners

chore: update codeowners

34606 of 55378 relevant lines covered (62.49%)

6551.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

49.24
/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl
1
%%--------------------------------------------------------------------
2
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
3
%%
4
%% Licensed under the Apache License, Version 2.0 (the "License");
5
%% you may not use this file except in compliance with the License.
6
%% You may obtain a copy of the License at
7
%%
8
%%     http://www.apache.org/licenses/LICENSE-2.0
9
%%
10
%% Unless required by applicable law or agreed to in writing, software
11
%% distributed under the License is distributed on an "AS IS" BASIS,
12
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
%% See the License for the specific language governing permissions and
14
%% limitations under the License.
15
%%--------------------------------------------------------------------
16

17
%% @doc Metadata storage for the builtin sharded database.
18
%%
19
%% Currently metadata is stored in mria; that's not ideal, but
20
%% eventually we'll replace it, so it's important not to leak
21
%% implementation details from this module.
22
-module(emqx_ds_replication_layer_meta).
23

24
-compile(inline).
25

26
-behaviour(gen_server).
27

28
%% API:
29
-export([
30
    shards/1,
31
    my_shards/1,
32
    shard_info/2,
33
    allocate_shards/1,
34
    replica_set/2,
35
    sites/0,
36
    node/1,
37
    this_site/0,
38
    print_status/0
39
]).
40

41
%% DB API:
42
-export([
43
    open_db/2,
44
    db_config/1,
45
    update_db_config/2,
46
    drop_db/1
47
]).
48

49
%% Site / shard allocation:
50
-export([
51
    join_db_site/2,
52
    leave_db_site/2,
53
    assign_db_sites/2,
54
    replica_set_transitions/2,
55
    update_replica_set/3,
56
    db_sites/1,
57
    target_set/2
58
]).
59

60
%% Subscriptions to changes:
61
-export([
62
    subscribe/2,
63
    unsubscribe/1
64
]).
65

66
%% gen_server
67
-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
68

69
%% internal exports:
70
-export([
71
    open_db_trans/2,
72
    allocate_shards_trans/1,
73
    assign_db_sites_trans/2,
74
    modify_db_sites_trans/2,
75
    update_replica_set_trans/3,
76
    update_db_config_trans/2,
77
    drop_db_trans/1,
78
    claim_site/2,
79
    n_shards/1
80
]).
81

82
-export_type([
83
    site/0,
84
    transition/0,
85
    subscription_event/0,
86
    update_cluster_result/0
87
]).
88

89
-include_lib("stdlib/include/qlc.hrl").
90
-include_lib("stdlib/include/ms_transform.hrl").
91

92
%%================================================================================
93
%% Type declarations
94
%%================================================================================
95

96
-define(SERVER, ?MODULE).
97

98
-define(SHARD, emqx_ds_builtin_metadata_shard).
99
%% DS database metadata:
100
-define(META_TAB, emqx_ds_builtin_metadata_tab).
101
%% Mapping from Site to the actual Erlang node:
102
-define(NODE_TAB, emqx_ds_builtin_node_tab).
103
%% Shard metadata:
104
-define(SHARD_TAB, emqx_ds_builtin_shard_tab).
105

106
-record(?META_TAB, {
107
    db :: emqx_ds:db(),
108
    db_props :: emqx_ds_replication_layer:builtin_db_opts()
109
}).
110

111
-record(?NODE_TAB, {
112
    site :: site(),
113
    node :: node(),
114
    misc = #{} :: map()
115
}).
116

117
-record(?SHARD_TAB, {
118
    shard :: {emqx_ds:db(), emqx_ds_replication_layer:shard_id()},
119
    %% Sites that currently contain the data:
120
    replica_set :: [site()],
121
    %% Sites that should contain the data when the cluster is in the
122
    %% stable state (no nodes are being added or removed from it):
123
    target_set :: [site()] | undefined,
124
    misc = #{} :: map()
125
}).
126

127
%% Persistent ID of the node (independent from the IP/FQDN):
128
-type site() :: binary().
129

130
%% Membership transition of shard's replica set:
131
-type transition() :: {add | del, site()}.
132

133
-type update_cluster_result() ::
134
    ok
135
    | {error, {nonexistent_db, emqx_ds:db()}}
136
    | {error, {nonexistent_sites, [site()]}}
137
    | {error, {too_few_sites, [site()]}}
138
    | {error, _}.
139

140
%% Subject of the subscription:
141
-type subject() :: emqx_ds:db().
142

143
%% Event for the subscription:
144
-type subscription_event() ::
145
    {changed, {shard, emqx_ds:db(), emqx_ds_replication_layer:shard_id()}}.
146

147
%% Peristent term key:
148
-define(emqx_ds_builtin_site, emqx_ds_builtin_site).
149

150
%% Make Dialyzer happy
151
-define(NODE_PAT(),
152
    %% Equivalent of `#?NODE_TAB{_ = '_'}`:
153
    erlang:make_tuple(record_info(size, ?NODE_TAB), '_')
154
).
155

156
-define(SHARD_PAT(SHARD),
157
    %% Equivalent of `#?SHARD_TAB{shard = SHARD, _ = '_'}`
158
    erlang:make_tuple(record_info(size, ?SHARD_TAB), '_', [{#?SHARD_TAB.shard, SHARD}])
159
).
160

161
%%================================================================================
162
%% API funcions
163
%%================================================================================
164

165
-spec print_status() -> ok.
166
print_status() ->
167
    io:format("THIS SITE:~n~s~n", [this_site()]),
×
168
    io:format("~nSITES:~n", []),
×
169
    Nodes = [node() | nodes()],
×
170
    lists:foreach(
×
171
        fun(#?NODE_TAB{site = Site, node = Node}) ->
172
            Status =
×
173
                case lists:member(Node, Nodes) of
174
                    true -> up;
×
175
                    false -> down
×
176
                end,
177
            io:format("~s    ~p    ~p~n", [Site, Node, Status])
×
178
        end,
179
        eval_qlc(mnesia:table(?NODE_TAB))
180
    ),
181
    io:format(
×
182
        "~nSHARDS:~nId                             Replicas~n", []
183
    ),
184
    lists:foreach(
×
185
        fun(#?SHARD_TAB{shard = {DB, Shard}, replica_set = RS}) ->
186
            ShardStr = string:pad(io_lib:format("~p/~s", [DB, Shard]), 30),
×
187
            ReplicasStr = string:pad(io_lib:format("~p", [RS]), 40),
×
188
            io:format("~s ~s~n", [ShardStr, ReplicasStr])
×
189
        end,
190
        eval_qlc(mnesia:table(?SHARD_TAB))
191
    ).
192

193
-spec this_site() -> site().
194
this_site() ->
195
    persistent_term:get(?emqx_ds_builtin_site).
785✔
196

197
-spec n_shards(emqx_ds:db()) -> pos_integer().
198
n_shards(DB) ->
199
    [#?META_TAB{db_props = #{n_shards := NShards}}] = mnesia:dirty_read(?META_TAB, DB),
1✔
200
    NShards.
1✔
201

202
-spec start_link() -> {ok, pid()}.
203
start_link() ->
204
    gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
23✔
205

206
-spec shards(emqx_ds:db()) -> [emqx_ds_replication_layer:shard_id()].
207
shards(DB) ->
208
    Recs = mnesia:dirty_match_object(?SHARD_TAB, ?SHARD_PAT({DB, '_'})),
1,292✔
209
    [Shard || #?SHARD_TAB{shard = {_, Shard}} <- Recs].
1,290✔
210

211
-spec shard_info(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
212
    #{replica_set := #{site() => #{status => up | joining}}}
213
    | undefined.
214
shard_info(DB, Shard) ->
215
    case mnesia:dirty_read(?SHARD_TAB, {DB, Shard}) of
×
216
        [] ->
217
            undefined;
×
218
        [#?SHARD_TAB{replica_set = Replicas}] ->
219
            ReplicaSet = maps:from_list([
×
220
                begin
221
                    %% TODO:
222
                    ReplInfo = #{status => up},
×
223
                    {I, ReplInfo}
×
224
                end
225
             || I <- Replicas
×
226
            ]),
227
            #{replica_set => ReplicaSet}
×
228
    end.
229

230
-spec my_shards(emqx_ds:db()) -> [emqx_ds_replication_layer:shard_id()].
231
my_shards(DB) ->
232
    Site = this_site(),
23✔
233
    Recs = mnesia:dirty_match_object(?SHARD_TAB, ?SHARD_PAT({DB, '_'})),
23✔
234
    [Shard || #?SHARD_TAB{shard = {_, Shard}, replica_set = RS} <- Recs, lists:member(Site, RS)].
23✔
235

236
allocate_shards(DB) ->
237
    case mria:transaction(?SHARD, fun ?MODULE:allocate_shards_trans/1, [DB]) of
23✔
238
        {atomic, Shards} ->
239
            {ok, Shards};
18✔
240
        {aborted, {shards_already_allocated, Shards}} ->
241
            {ok, Shards};
5✔
242
        {aborted, {insufficient_sites_online, Needed, Sites}} ->
243
            {error, #{reason => insufficient_sites_online, needed => Needed, sites => Sites}}
×
244
    end.
245

246
-spec sites() -> [site()].
247
sites() ->
248
    eval_qlc(qlc:q([Site || #?NODE_TAB{site = Site} <- mnesia:table(?NODE_TAB)])).
×
249

250
-spec node(site()) -> node() | undefined.
251
node(Site) ->
252
    case mnesia:dirty_read(?NODE_TAB, Site) of
966✔
253
        [#?NODE_TAB{node = Node}] ->
254
            Node;
966✔
255
        [] ->
256
            undefined
×
257
    end.
258

259
%%===============================================================================
260
%% DB API
261
%%===============================================================================
262

263
-spec db_config(emqx_ds:db()) -> emqx_ds_replication_layer:builtin_db_opts().
264
db_config(DB) ->
265
    case mnesia:dirty_read(?META_TAB, DB) of
356✔
266
        [#?META_TAB{db_props = Opts}] ->
267
            Opts;
356✔
268
        [] ->
269
            #{}
×
270
    end.
271

272
-spec open_db(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) ->
273
    emqx_ds_replication_layer:builtin_db_opts().
274
open_db(DB, DefaultOpts) ->
275
    transaction(fun ?MODULE:open_db_trans/2, [DB, DefaultOpts]).
23✔
276

277
-spec update_db_config(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) ->
278
    emqx_ds_replication_layer:builtin_db_opts() | {error, nonexistent_db}.
279
update_db_config(DB, DefaultOpts) ->
280
    transaction(fun ?MODULE:update_db_config_trans/2, [DB, DefaultOpts]).
×
281

282
-spec drop_db(emqx_ds:db()) -> ok.
283
drop_db(DB) ->
284
    transaction(fun ?MODULE:drop_db_trans/1, [DB]).
×
285

286
%%===============================================================================
287
%% Site / shard allocation API
288
%%===============================================================================
289

290
%% @doc Join a site to the set of sites the DB is replicated across.
291
-spec join_db_site(emqx_ds:db(), site()) -> update_cluster_result().
292
join_db_site(DB, Site) ->
293
    transaction(fun ?MODULE:modify_db_sites_trans/2, [DB, [{add, Site}]]).
×
294

295
%% @doc Make a site leave the set of sites the DB is replicated across.
296
-spec leave_db_site(emqx_ds:db(), site()) -> update_cluster_result().
297
leave_db_site(DB, Site) ->
298
    transaction(fun ?MODULE:modify_db_sites_trans/2, [DB, [{del, Site}]]).
×
299

300
%% @doc Assign a set of sites to the DB for replication.
301
-spec assign_db_sites(emqx_ds:db(), [site()]) -> update_cluster_result().
302
assign_db_sites(DB, Sites) ->
303
    transaction(fun ?MODULE:assign_db_sites_trans/2, [DB, Sites]).
×
304

305
%% @doc List the sites the DB is replicated across.
306
-spec db_sites(emqx_ds:db()) -> [site()].
307
db_sites(DB) ->
308
    Recs = mnesia:dirty_match_object(?SHARD_TAB, ?SHARD_PAT({DB, '_'})),
×
309
    list_db_sites(Recs).
×
310

311
%% @doc List the sequence of transitions that should be conducted in order to
312
%% bring the set of replicas for a DB shard in line with the target set.
313
-spec replica_set_transitions(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
314
    [transition()] | undefined.
315
replica_set_transitions(DB, Shard) ->
316
    case mnesia:dirty_read(?SHARD_TAB, {DB, Shard}) of
355✔
317
        [#?SHARD_TAB{target_set = TargetSet, replica_set = ReplicaSet}] ->
318
            compute_transitions(TargetSet, ReplicaSet);
355✔
319
        [] ->
320
            undefined
×
321
    end.
322

323
%% @doc Update the set of replication sites for a shard.
324
%% To be called after a `transition()` has been conducted successfully.
325
-spec update_replica_set(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), transition()) -> ok.
326
update_replica_set(DB, Shard, Trans) ->
327
    case mria:transaction(?SHARD, fun ?MODULE:update_replica_set_trans/3, [DB, Shard, Trans]) of
×
328
        {atomic, ok} ->
329
            ok;
×
330
        {aborted, Reason} ->
331
            {error, Reason}
×
332
    end.
333

334
%% @doc Get the current set of replication sites for a shard.
335
-spec replica_set(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
336
    [site()] | undefined.
337
replica_set(DB, Shard) ->
338
    case mnesia:dirty_read(?SHARD_TAB, {DB, Shard}) of
710✔
339
        [#?SHARD_TAB{replica_set = ReplicaSet}] ->
340
            ReplicaSet;
710✔
341
        [] ->
342
            undefined
×
343
    end.
344

345
%% @doc Get the target set of replication sites for a DB shard.
346
%% Target set is updated every time the set of replication sites for the DB changes.
347
%% See `join_db_site/2`, `leave_db_site/2`, `assign_db_sites/2`.
348
-spec target_set(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
349
    [site()] | undefined.
350
target_set(DB, Shard) ->
351
    case mnesia:dirty_read(?SHARD_TAB, {DB, Shard}) of
×
352
        [#?SHARD_TAB{target_set = TargetSet}] ->
353
            TargetSet;
×
354
        [] ->
355
            undefined
×
356
    end.
357

358
%%================================================================================
359

360
subscribe(Pid, Subject) ->
361
    gen_server:call(?SERVER, {subscribe, Pid, Subject}, infinity).
23✔
362

363
unsubscribe(Pid) ->
364
    gen_server:call(?SERVER, {unsubscribe, Pid}, infinity).
16✔
365

366
%%================================================================================
367
%% behavior callbacks
368
%%================================================================================
369

370
-record(s, {
371
    subs = #{} :: #{pid() => {subject(), _Monitor :: reference()}}
372
}).
373

374
init([]) ->
375
    process_flag(trap_exit, true),
23✔
376
    logger:set_process_metadata(#{domain => [ds, meta]}),
23✔
377
    ensure_tables(),
23✔
378
    ensure_site(),
23✔
379
    S = #s{},
23✔
380
    {ok, _Node} = mnesia:subscribe({table, ?SHARD_TAB, simple}),
23✔
381
    {ok, S}.
23✔
382

383
handle_call({subscribe, Pid, Subject}, _From, S) ->
384
    {reply, ok, handle_subscribe(Pid, Subject, S)};
23✔
385
handle_call({unsubscribe, Pid}, _From, S) ->
386
    {reply, ok, handle_unsubscribe(Pid, S)};
16✔
387
handle_call(_Call, _From, S) ->
388
    {reply, {error, unknown_call}, S}.
×
389

390
handle_cast(_Cast, S) ->
391
    {noreply, S}.
×
392

393
handle_info({mnesia_table_event, {write, #?SHARD_TAB{shard = {DB, Shard}}, _}}, S) ->
394
    ok = notify_subscribers(DB, {shard, DB, Shard}, S),
323✔
395
    {noreply, S};
323✔
396
handle_info({'DOWN', _MRef, process, Pid, _Reason}, S) ->
397
    {noreply, handle_unsubscribe(Pid, S)};
×
398
handle_info(_Info, S) ->
399
    {noreply, S}.
×
400

401
terminate(_Reason, #s{}) ->
402
    persistent_term:erase(?emqx_ds_builtin_site),
16✔
403
    ok.
16✔
404

405
%%================================================================================
406
%% Internal exports
407
%%================================================================================
408

409
-spec open_db_trans(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) ->
410
    emqx_ds_replication_layer:builtin_db_opts().
411
open_db_trans(DB, CreateOpts) ->
412
    case mnesia:wread({?META_TAB, DB}) of
23✔
413
        [] ->
414
            mnesia:write(#?META_TAB{db = DB, db_props = CreateOpts}),
18✔
415
            CreateOpts;
18✔
416
        [#?META_TAB{db_props = Opts}] ->
417
            Opts
5✔
418
    end.
419

420
-spec allocate_shards_trans(emqx_ds:db()) -> [emqx_ds_replication_layer:shard_id()].
421
allocate_shards_trans(DB) ->
422
    Opts = #{n_shards := NShards, n_sites := NSites} = db_config_trans(DB),
23✔
423
    Nodes = mnesia:match_object(?NODE_TAB, ?NODE_PAT(), read),
23✔
424
    case length(Nodes) of
23✔
425
        N when N >= NSites ->
426
            ok;
23✔
427
        _ ->
428
            mnesia:abort({insufficient_sites_online, NSites, Nodes})
×
429
    end,
430
    case mnesia:match_object(?SHARD_TAB, ?SHARD_PAT({DB, '_'}), write) of
23✔
431
        [] ->
432
            ok;
18✔
433
        Records ->
434
            ShardsAllocated = [Shard || #?SHARD_TAB{shard = {_DB, Shard}} <- Records],
5✔
435
            mnesia:abort({shards_already_allocated, ShardsAllocated})
5✔
436
    end,
437
    Shards = gen_shards(NShards),
18✔
438
    Sites = [S || #?NODE_TAB{site = S} <- Nodes],
18✔
439
    Allocation = compute_allocation(Shards, Sites, Opts),
18✔
440
    lists:map(
18✔
441
        fun({Shard, ReplicaSet}) ->
442
            Record = #?SHARD_TAB{
275✔
443
                shard = {DB, Shard},
444
                replica_set = ReplicaSet
445
            },
446
            ok = mnesia:write(Record),
275✔
447
            Shard
275✔
448
        end,
449
        Allocation
450
    ).
451

452
-spec assign_db_sites_trans(emqx_ds:db(), [site()]) -> ok.
453
assign_db_sites_trans(DB, Sites) ->
454
    Opts = db_config_trans(DB),
×
455
    case [S || S <- Sites, mnesia:read(?NODE_TAB, S, read) == []] of
×
456
        [] when length(Sites) == 0 ->
457
            mnesia:abort({too_few_sites, Sites});
×
458
        [] ->
459
            ok;
×
460
        NonexistentSites ->
461
            mnesia:abort({nonexistent_sites, NonexistentSites})
×
462
    end,
463
    %% TODO
464
    %% Optimize reallocation. The goals are:
465
    %% 1. Minimize the number of membership transitions.
466
    %% 2. Ensure that sites are responsible for roughly the same number of shards.
467
    Shards = mnesia:match_object(?SHARD_TAB, ?SHARD_PAT({DB, '_'}), write),
×
468
    Reallocation = compute_allocation(Shards, Sites, Opts),
×
469
    lists:foreach(
×
470
        fun({Record, ReplicaSet}) ->
471
            ok = mnesia:write(Record#?SHARD_TAB{target_set = ReplicaSet})
×
472
        end,
473
        Reallocation
474
    ).
475

476
-spec modify_db_sites_trans(emqx_ds:db(), [transition()]) -> ok.
477
modify_db_sites_trans(DB, Modifications) ->
478
    Shards = mnesia:match_object(?SHARD_TAB, ?SHARD_PAT({DB, '_'}), write),
×
479
    Sites0 = list_db_target_sites(Shards),
×
480
    Sites = lists:foldl(fun apply_transition/2, Sites0, Modifications),
×
481
    case Sites of
×
482
        Sites0 ->
483
            ok;
×
484
        _Changed ->
485
            assign_db_sites_trans(DB, Sites)
×
486
    end.
487

488
update_replica_set_trans(DB, Shard, Trans) ->
489
    case mnesia:read(?SHARD_TAB, {DB, Shard}, write) of
×
490
        [Record = #?SHARD_TAB{replica_set = ReplicaSet0, target_set = TargetSet0}] ->
491
            %% NOTE
492
            %% It's possible to complete a transition that's no longer planned. We
493
            %% should anticipate that we may stray _away_ from the target set.
494
            TargetSet1 = emqx_maybe:define(TargetSet0, ReplicaSet0),
×
495
            ReplicaSet = apply_transition(Trans, ReplicaSet0),
×
496
            case lists:usort(TargetSet1) of
×
497
                ReplicaSet ->
498
                    TargetSet = undefined;
×
499
                TS ->
500
                    TargetSet = TS
×
501
            end,
502
            mnesia:write(Record#?SHARD_TAB{replica_set = ReplicaSet, target_set = TargetSet});
×
503
        [] ->
504
            mnesia:abort({nonexistent_shard, {DB, Shard}})
×
505
    end.
506

507
-spec update_db_config_trans(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) ->
508
    emqx_ds_replication_layer:builtin_db_opts().
509
update_db_config_trans(DB, UpdateOpts) ->
510
    Opts = db_config_trans(DB, write),
×
511
    %% Since this is an update and not a reopen,
512
    %% we should keep the shard number and replication factor
513
    %% and not create a new shard server
514
    ChangeableOpts = maps:without([n_shards, n_sites, replication_factor], UpdateOpts),
×
515
    EffectiveOpts = maps:merge(Opts, ChangeableOpts),
×
516
    ok = mnesia:write(#?META_TAB{
×
517
        db = DB,
518
        db_props = EffectiveOpts
519
    }),
520
    EffectiveOpts.
×
521

522
-spec db_config_trans(emqx_ds:db()) -> emqx_ds_replication_layer:builtin_db_opts().
523
db_config_trans(DB) ->
524
    db_config_trans(DB, read).
23✔
525

526
db_config_trans(DB, LockType) ->
527
    case mnesia:read(?META_TAB, DB, LockType) of
23✔
528
        [#?META_TAB{db_props = Config}] ->
529
            Config;
23✔
530
        [] ->
531
            mnesia:abort({nonexistent_db, DB})
×
532
    end.
533

534
-spec drop_db_trans(emqx_ds:db()) -> ok.
535
drop_db_trans(DB) ->
536
    mnesia:delete({?META_TAB, DB}),
×
537
    [mnesia:delete({?SHARD_TAB, Shard}) || Shard <- shards(DB)],
×
538
    ok.
×
539

540
-spec claim_site(site(), node()) -> ok.
541
claim_site(Site, Node) ->
542
    mnesia:write(#?NODE_TAB{site = Site, node = Node}).
23✔
543

544
%%================================================================================
545
%% Internal functions
546
%%================================================================================
547

548
ensure_tables() ->
549
    ok = mria:create_table(?META_TAB, [
23✔
550
        {rlog_shard, ?SHARD},
551
        {type, ordered_set},
552
        {storage, disc_copies},
553
        {record_name, ?META_TAB},
554
        {attributes, record_info(fields, ?META_TAB)}
555
    ]),
556
    ok = mria:create_table(?NODE_TAB, [
23✔
557
        {rlog_shard, ?SHARD},
558
        {type, ordered_set},
559
        {storage, disc_copies},
560
        {record_name, ?NODE_TAB},
561
        {attributes, record_info(fields, ?NODE_TAB)}
562
    ]),
563
    ok = mria:create_table(?SHARD_TAB, [
23✔
564
        {rlog_shard, ?SHARD},
565
        {type, ordered_set},
566
        {storage, disc_copies},
567
        {record_name, ?SHARD_TAB},
568
        {attributes, record_info(fields, ?SHARD_TAB)}
569
    ]),
570
    ok = mria:wait_for_tables([?META_TAB, ?NODE_TAB, ?SHARD_TAB]).
23✔
571

572
ensure_site() ->
573
    Filename = filename:join(emqx_ds:base_dir(), "emqx_ds_builtin_site.eterm"),
23✔
574
    case file:consult(Filename) of
23✔
575
        {ok, [Site]} ->
576
            ok;
2✔
577
        _ ->
578
            Site = binary:encode_hex(crypto:strong_rand_bytes(8)),
21✔
579
            logger:notice("Creating a new site with ID=~s", [Site]),
21✔
580
            ok = filelib:ensure_dir(Filename),
21✔
581
            {ok, FD} = file:open(Filename, [write]),
21✔
582
            io:format(FD, "~p.", [Site]),
21✔
583
            file:close(FD)
21✔
584
    end,
585
    {atomic, ok} = mria:transaction(?SHARD, fun ?MODULE:claim_site/2, [Site, node()]),
23✔
586
    persistent_term:put(?emqx_ds_builtin_site, Site),
23✔
587
    ok.
23✔
588

589
%% @doc Returns sorted list of sites shards are replicated across.
590
-spec list_db_sites([_Shard]) -> [site()].
591
list_db_sites(Shards) ->
592
    flatmap_sorted_set(fun get_shard_sites/1, Shards).
×
593

594
-spec list_db_target_sites([_Shard]) -> [site()].
595
list_db_target_sites(Shards) ->
596
    flatmap_sorted_set(fun get_shard_target_sites/1, Shards).
×
597

598
-spec get_shard_sites(_Shard) -> [site()].
599
get_shard_sites(#?SHARD_TAB{replica_set = ReplicaSet}) ->
600
    ReplicaSet.
×
601

602
-spec get_shard_target_sites(_Shard) -> [site()].
603
get_shard_target_sites(#?SHARD_TAB{target_set = Sites}) when is_list(Sites) ->
604
    Sites;
×
605
get_shard_target_sites(#?SHARD_TAB{target_set = undefined} = Shard) ->
606
    get_shard_sites(Shard).
×
607

608
-spec compute_allocation([Shard], [Site], emqx_ds_replication_layer:builtin_db_opts()) ->
609
    [{Shard, [Site, ...]}].
610
compute_allocation(Shards, Sites, Opts) ->
611
    NSites = length(Sites),
18✔
612
    ReplicationFactor = maps:get(replication_factor, Opts),
18✔
613
    NReplicas = min(NSites, ReplicationFactor),
18✔
614
    ShardsSorted = lists:sort(Shards),
18✔
615
    SitesSorted = lists:sort(Sites),
18✔
616
    {Allocation, _} = lists:mapfoldl(
18✔
617
        fun(Shard, SSites) ->
618
            {ReplicaSet, _} = emqx_utils_stream:consume(NReplicas, SSites),
275✔
619
            {_, SRest} = emqx_utils_stream:consume(1, SSites),
275✔
620
            {{Shard, ReplicaSet}, SRest}
275✔
621
        end,
622
        emqx_utils_stream:repeat(emqx_utils_stream:list(SitesSorted)),
623
        ShardsSorted
624
    ),
625
    Allocation.
18✔
626

627
compute_transitions(undefined, _ReplicaSet) ->
628
    [];
355✔
629
compute_transitions(TargetSet, ReplicaSet) ->
630
    Additions = TargetSet -- ReplicaSet,
×
631
    Deletions = ReplicaSet -- TargetSet,
×
632
    intersperse([{add, S} || S <- Additions], [{del, S} || S <- Deletions]).
×
633

634
%% @doc Apply a transition to a list of sites, preserving sort order.
635
-spec apply_transition(transition(), [site()]) -> [site()].
636
apply_transition({add, S}, Sites) ->
637
    lists:usort([S | Sites]);
×
638
apply_transition({del, S}, Sites) ->
639
    lists:delete(S, Sites).
×
640

641
gen_shards(NShards) ->
642
    [integer_to_binary(I) || I <- lists:seq(0, NShards - 1)].
18✔
643

644
eval_qlc(Q) ->
645
    case mnesia:is_transaction() of
×
646
        true ->
647
            qlc:eval(Q);
×
648
        false ->
649
            {atomic, Result} = mria:ro_transaction(?SHARD, fun() -> qlc:eval(Q) end),
×
650
            Result
×
651
    end.
652

653
transaction(Fun, Args) ->
654
    case mria:transaction(?SHARD, Fun, Args) of
23✔
655
        {atomic, Result} ->
656
            Result;
23✔
657
        {aborted, Reason} ->
658
            {error, Reason}
×
659
    end.
660

661
%%====================================================================
662

663
handle_subscribe(Pid, Subject, S = #s{subs = Subs0}) ->
664
    case maps:is_key(Pid, Subs0) of
23✔
665
        false ->
666
            MRef = erlang:monitor(process, Pid),
23✔
667
            Subs = Subs0#{Pid => {Subject, MRef}},
23✔
668
            S#s{subs = Subs};
23✔
669
        true ->
670
            S
×
671
    end.
672

673
handle_unsubscribe(Pid, S = #s{subs = Subs0}) ->
674
    case maps:take(Pid, Subs0) of
16✔
675
        {{_Subject, MRef}, Subs} ->
676
            _ = erlang:demonitor(MRef, [flush]),
16✔
677
            S#s{subs = Subs};
16✔
678
        error ->
679
            S
×
680
    end.
681

682
notify_subscribers(EventSubject, Event, #s{subs = Subs}) ->
683
    maps:foreach(
323✔
684
        fun(Pid, {Subject, _MRef}) ->
685
            Subject == EventSubject andalso
×
686
                erlang:send(Pid, {changed, Event})
×
687
        end,
688
        Subs
689
    ).
690

691
%%====================================================================
692

693
%% @doc Intersperse elements of two lists.
694
%% Example: intersperse([1, 2], [3, 4, 5]) -> [1, 3, 2, 4, 5].
695
-spec intersperse([X], [Y]) -> [X | Y].
696
intersperse(L1, []) ->
697
    L1;
×
698
intersperse([], L2) ->
699
    L2;
×
700
intersperse([H1 | T1], L2) ->
701
    [H1 | intersperse(L2, T1)].
×
702

703
%% @doc Map list into a list of sets and return union, as a sorted list.
704
-spec flatmap_sorted_set(fun((X) -> [Y]), [X]) -> [Y].
705
flatmap_sorted_set(Fun, L) ->
706
    ordsets:to_list(
×
707
        lists:foldl(
708
            fun(X, Acc) -> ordsets:union(ordsets:from_list(Fun(X)), Acc) end,
×
709
            ordsets:new(),
710
            L
711
        )
712
    ).
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc