• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx-operator / 3692206716

pending completion
3692206716

push

github

Rory
docs: fix some minor mistakes

3048 of 4104 relevant lines covered (74.27%)

4.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.49
/controllers/apps/v1beta3/emqx_handler.go
1
/*
2
Copyright 2021.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package v1beta3
18

19
import (
20
        "context"
21
        "encoding/base64"
22
        "fmt"
23
        "net"
24
        "path/filepath"
25
        "reflect"
26
        "regexp"
27
        "sort"
28
        "strconv"
29
        "strings"
30
        "time"
31

32
        emperror "emperror.dev/errors"
33

34
        appsv1 "k8s.io/api/apps/v1"
35
        corev1 "k8s.io/api/core/v1"
36
        k8sErrors "k8s.io/apimachinery/pkg/api/errors"
37
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
38
        "k8s.io/apimachinery/pkg/labels"
39
        "k8s.io/apimachinery/pkg/runtime"
40
        "k8s.io/apimachinery/pkg/types"
41
        "k8s.io/apimachinery/pkg/util/intstr"
42
        "k8s.io/client-go/tools/record"
43

44
        ctrl "sigs.k8s.io/controller-runtime"
45
        "sigs.k8s.io/controller-runtime/pkg/client"
46
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
47

48
        appsv1beta3 "github.com/emqx/emqx-operator/apis/apps/v1beta3"
49
        "github.com/emqx/emqx-operator/pkg/handler"
50
        json "github.com/json-iterator/go"
51
        "github.com/tidwall/gjson"
52
)
53

54
var _ reconcile.Reconciler = &EmqxBrokerReconciler{}
55

56
const (
57
        EmqxContainerName      = "emqx"
58
        ReloaderContainerName  = "reloader"
59
        ReloaderContainerImage = "emqx/emqx-operator-reloader:0.0.2"
60
)
61

62
type EmqxReconciler struct {
63
        handler.Handler
64
        Scheme *runtime.Scheme
65
        record.EventRecorder
66
}
67

68
func (r *EmqxReconciler) Do(ctx context.Context, instance appsv1beta3.Emqx) (ctrl.Result, error) {
6✔
69
        var resources []client.Object
6✔
70
        var err error
6✔
71
        postFn := func(client.Object) error { return nil }
12✔
72

73
        sts := generateStatefulSetDef(instance)
6✔
74

6✔
75
        existedSts := &appsv1.StatefulSet{}
6✔
76
        if err := r.Get(ctx, client.ObjectKeyFromObject(sts), existedSts); err != nil {
12✔
77
                if !k8sErrors.IsNotFound(err) {
6✔
78
                        return ctrl.Result{}, err
×
79
                }
×
80
        }
81
        // store statefulSet is exit
82
        if existedSts.Spec.PodManagementPolicy != "" {
12✔
83
                sts.Spec.PodManagementPolicy = existedSts.Spec.PodManagementPolicy
6✔
84
        }
6✔
85
        // compatible with 1.2.2
86
        if existedSts.Spec.VolumeClaimTemplates != nil {
6✔
87
                sts.Spec.VolumeClaimTemplates = existedSts.Spec.VolumeClaimTemplates
×
88
        }
×
89
        // compatible with 1.2.2
90
        if existedSts.Annotations != nil {
12✔
91
                sts.Annotations = existedSts.Annotations
6✔
92
        }
6✔
93

94
        defaultPluginsConfig := generateDefaultPluginsConfig(instance)
6✔
95
        sts = updatePluginsConfigForSts(sts, defaultPluginsConfig)
6✔
96

6✔
97
        if status := instance.GetStatus(); !status.IsPluginInitialized() {
12✔
98
                resources = append(resources, defaultPluginsConfig)
6✔
99

6✔
100
                pluginsList := &appsv1beta3.EmqxPluginList{}
6✔
101
                err = r.Client.List(ctx, pluginsList, client.InNamespace(instance.GetNamespace()))
6✔
102
                if err != nil && !k8sErrors.IsNotFound(err) {
6✔
103
                        return ctrl.Result{}, err
×
104
                }
×
105
                var condition *appsv1beta3.Condition
6✔
106
                pluginResourceList := generateInitPluginList(instance, pluginsList)
6✔
107
                resources = append(resources, pluginResourceList...)
6✔
108

6✔
109
                err = r.CreateOrUpdateList(instance, r.Scheme, resources, postFn)
6✔
110
                if err != nil {
6✔
111
                        r.EventRecorder.Event(instance, corev1.EventTypeWarning, "FailedCreateOrUpdate", err.Error())
×
112
                        condition = appsv1beta3.NewCondition(
×
113
                                appsv1beta3.ConditionPluginInitialized,
×
114
                                corev1.ConditionFalse,
×
115
                                "PluginInitializeFailed",
×
116
                                err.Error(),
×
117
                        )
×
118
                        instance.SetCondition(*condition)
×
119
                        _ = r.Status().Update(ctx, instance)
×
120
                        return ctrl.Result{RequeueAfter: time.Duration(5) * time.Second}, err
×
121
                }
×
122
                condition = appsv1beta3.NewCondition(
6✔
123
                        appsv1beta3.ConditionPluginInitialized,
6✔
124
                        corev1.ConditionTrue,
6✔
125
                        "PluginInitializeSuccessfully",
6✔
126
                        "All default plugins initialized",
6✔
127
                )
6✔
128
                instance.SetCondition(*condition)
6✔
129
                _ = r.Status().Update(ctx, instance)
6✔
130
                return ctrl.Result{RequeueAfter: time.Duration(5) * time.Second}, nil
6✔
131
        }
132

133
        if acl := generateAcl(instance); acl != nil {
6✔
134
                resources = append(resources, acl)
×
135
                sts = updateAclForSts(sts, acl)
×
136
        }
×
137

138
        if loadedModules := generateLoadedModules(instance); loadedModules != nil {
12✔
139
                resources = append(resources, loadedModules)
6✔
140
                sts = updateLoadedModulesForSts(sts, loadedModules)
6✔
141
        }
6✔
142

143
        if emqxEnterprise, ok := instance.(*appsv1beta3.EmqxEnterprise); ok {
12✔
144
                var license *corev1.Secret
6✔
145
                if emqxEnterprise.GetLicense().SecretName != "" {
6✔
146
                        license = &corev1.Secret{}
×
147
                        if err := r.Client.Get(context.Background(), types.NamespacedName{Name: emqxEnterprise.GetLicense().SecretName, Namespace: emqxEnterprise.GetNamespace()}, license); err != nil {
×
148
                                return ctrl.Result{}, err
×
149
                        }
×
150
                } else {
6✔
151
                        license = generateLicense(emqxEnterprise)
6✔
152
                }
6✔
153

154
                if license != nil {
6✔
155
                        resources = append(resources, license)
×
156
                        sts = updateLicenseForsts(sts, license)
×
157
                }
×
158
        }
159

160
        if status := instance.GetStatus(); status.IsRunning() {
12✔
161
                serviceTemplate := instance.GetServiceTemplate()
6✔
162
                ports, _ := r.getListenerPortsByAPI(instance)
6✔
163
                serviceTemplate.MergePorts(ports)
6✔
164
                instance.SetServiceTemplate(serviceTemplate)
6✔
165
        }
6✔
166

167
        headlessSvc, svc := generateSvc(instance)
6✔
168
        sts.Spec.ServiceName = headlessSvc.Name
6✔
169
        resources = append(resources, headlessSvc, svc)
6✔
170

6✔
171
        resources = append(resources, sts)
6✔
172

6✔
173
        if err := r.CreateOrUpdateList(instance, r.Scheme, resources, postFn); err != nil {
9✔
174
                r.EventRecorder.Event(instance, corev1.EventTypeWarning, "FailedCreateOrUpdate", err.Error())
3✔
175
                condition := appsv1beta3.NewCondition(
3✔
176
                        appsv1beta3.ConditionRunning,
3✔
177
                        corev1.ConditionFalse,
3✔
178
                        "FailedCreateOrUpdate",
3✔
179
                        err.Error(),
3✔
180
                )
3✔
181
                instance.SetCondition(*condition)
3✔
182
                _ = r.Status().Update(ctx, instance)
3✔
183
                return ctrl.Result{RequeueAfter: time.Duration(5) * time.Second}, err
3✔
184
        }
3✔
185

186
        emqxNodes, err := r.getNodeStatusesByAPI(instance)
6✔
187
        if err != nil {
12✔
188
                r.EventRecorder.Event(instance, corev1.EventTypeWarning, "FailedToGetNodeStatues", err.Error())
6✔
189
                condition := appsv1beta3.NewCondition(
6✔
190
                        appsv1beta3.ConditionRunning,
6✔
191
                        corev1.ConditionFalse,
6✔
192
                        "FailedToGetNodeStatues",
6✔
193
                        err.Error(),
6✔
194
                )
6✔
195
                instance.SetCondition(*condition)
6✔
196
                _ = r.Status().Update(ctx, instance)
6✔
197
        }
6✔
198

199
        instance = updateEmqxStatus(instance, emqxNodes)
6✔
200
        if err = r.Status().Update(ctx, instance); err != nil {
6✔
201
                return ctrl.Result{}, err
×
202
        }
×
203

204
        if status := instance.GetStatus(); !status.IsRunning() {
12✔
205
                return ctrl.Result{RequeueAfter: time.Duration(5) * time.Second}, nil
6✔
206
        }
6✔
207
        return ctrl.Result{RequeueAfter: time.Duration(20) * time.Second}, nil
6✔
208
}
209

210
func (r *EmqxReconciler) getListenerPortsByAPI(instance appsv1beta3.Emqx) ([]corev1.ServicePort, error) {
6✔
211
        type emqxListener struct {
6✔
212
                Protocol string `json:"protocol"`
6✔
213
                ListenOn string `json:"listen_on"`
6✔
214
        }
6✔
215

6✔
216
        type emqxListeners struct {
6✔
217
                Node      string         `json:"node"`
6✔
218
                Listeners []emqxListener `json:"listeners"`
6✔
219
        }
6✔
220

6✔
221
        intersection := func(listeners1 []emqxListener, listeners2 []emqxListener) []emqxListener {
6✔
222
                hSection := map[string]struct{}{}
×
223
                listeners := make([]emqxListener, 0)
×
224
                for _, listener := range listeners1 {
×
225
                        hSection[listener.ListenOn] = struct{}{}
×
226
                }
×
227
                for _, listener := range listeners2 {
×
228
                        _, ok := hSection[listener.ListenOn]
×
229
                        if ok {
×
230
                                listeners = append(listeners, listener)
×
231
                                delete(hSection, listener.ListenOn)
×
232
                        }
×
233
                }
234
                return listeners
×
235
        }
236

237
        resp, body, err := r.Handler.RequestAPI(instance, EmqxContainerName, "GET", instance.GetUsername(), instance.GetPassword(), appsv1beta3.DefaultManagementPort, "api/v4/listeners")
6✔
238
        if err != nil {
6✔
239
                return nil, err
×
240
        }
×
241
        if resp.StatusCode != 200 {
6✔
242
                return nil, err
×
243
        }
×
244

245
        listenerList := []emqxListeners{}
6✔
246
        data := gjson.GetBytes(body, "data")
6✔
247
        if err := json.Unmarshal([]byte(data.Raw), &listenerList); err != nil {
6✔
248
                return nil, emperror.Wrap(err, "failed to unmarshal node statuses")
×
249
        }
×
250

251
        var listeners []emqxListener
6✔
252
        if len(listenerList) == 1 {
12✔
253
                listeners = listenerList[0].Listeners
6✔
254
        } else {
6✔
255
                for i := 0; i < len(listenerList)-1; i++ {
×
256
                        listeners = intersection(listenerList[i].Listeners, listenerList[i+1].Listeners)
×
257
                }
×
258
        }
259

260
        ports := []corev1.ServicePort{}
6✔
261
        for _, l := range listeners {
12✔
262
                var name string
6✔
263
                var protocol corev1.Protocol
6✔
264
                var strPort string
6✔
265
                var intPort int
6✔
266

6✔
267
                compile := regexp.MustCompile(".*(udp|dtls|sn).*")
6✔
268
                if compile.MatchString(l.Protocol) {
12✔
269
                        protocol = corev1.ProtocolUDP
6✔
270
                } else {
12✔
271
                        protocol = corev1.ProtocolTCP
6✔
272
                }
6✔
273

274
                if strings.Contains(l.ListenOn, ":") {
12✔
275
                        _, strPort, err = net.SplitHostPort(l.ListenOn)
6✔
276
                        if err != nil {
6✔
277
                                strPort = l.ListenOn
×
278
                        }
×
279
                } else {
6✔
280
                        strPort = l.ListenOn
6✔
281
                }
6✔
282
                intPort, _ = strconv.Atoi(strPort)
6✔
283

6✔
284
                // Get name by protocol and port from API
6✔
285
                // protocol maybe like mqtt:wss:8084
6✔
286
                // protocol maybe like mqtt:tcp
6✔
287
                // We had to do something with the "protocol" to make it conform to the kubernetes service port name specification
6✔
288
                name = regexp.MustCompile(`:[\d]+`).ReplaceAllString(l.Protocol, "")
6✔
289
                name = strings.ReplaceAll(name, ":", "-")
6✔
290
                name = fmt.Sprintf("%s-%s", name, strPort)
6✔
291

6✔
292
                ports = append(ports, corev1.ServicePort{
6✔
293
                        Name:       name,
6✔
294
                        Protocol:   protocol,
6✔
295
                        Port:       int32(intPort),
6✔
296
                        TargetPort: intstr.FromInt(intPort),
6✔
297
                })
6✔
298
        }
299
        return ports, nil
6✔
300
}
301

302
func (r *EmqxReconciler) getNodeStatusesByAPI(instance appsv1beta3.Emqx) ([]appsv1beta3.EmqxNode, error) {
6✔
303
        resp, body, err := r.Handler.RequestAPI(instance, EmqxContainerName, "GET", instance.GetUsername(), instance.GetPassword(), appsv1beta3.DefaultManagementPort, "api/v4/nodes")
6✔
304
        if err != nil {
12✔
305
                return nil, err
6✔
306
        }
6✔
307
        if resp.StatusCode != 200 {
7✔
308
                return nil, emperror.Errorf("failed to get node statuses from API: %s", resp.Status)
1✔
309
        }
1✔
310

311
        emqxNodes := []appsv1beta3.EmqxNode{}
6✔
312
        data := gjson.GetBytes(body, "data")
6✔
313
        if err := json.Unmarshal([]byte(data.Raw), &emqxNodes); err != nil {
6✔
314
                return nil, emperror.Wrap(err, "failed to unmarshal node statuses")
×
315
        }
×
316
        return emqxNodes, nil
6✔
317
}
318

319
func generateStatefulSetDef(instance appsv1beta3.Emqx) *appsv1.StatefulSet {
6✔
320
        annotations := instance.GetAnnotations()
6✔
321
        if annotations == nil {
12✔
322
                annotations = make(map[string]string)
6✔
323
        }
6✔
324
        delete(annotations, "kubectl.kubernetes.io/last-applied-configuration")
6✔
325

6✔
326
        podTemplate := corev1.PodTemplateSpec{
6✔
327
                ObjectMeta: metav1.ObjectMeta{
6✔
328
                        Labels:      instance.GetLabels(),
6✔
329
                        Annotations: annotations,
6✔
330
                },
6✔
331
                Spec: corev1.PodSpec{
6✔
332
                        Affinity:         instance.GetAffinity(),
6✔
333
                        Tolerations:      instance.GetToleRations(),
6✔
334
                        NodeName:         instance.GetNodeName(),
6✔
335
                        NodeSelector:     instance.GetNodeSelector(),
6✔
336
                        ImagePullSecrets: instance.GetImagePullSecrets(),
6✔
337
                        SecurityContext:  instance.GetSecurityContext(),
6✔
338
                        InitContainers:   instance.GetInitContainers(),
6✔
339
                        Containers: append(
6✔
340
                                []corev1.Container{
6✔
341
                                        *generateEmqxContainer(instance),
6✔
342
                                        *generateReloaderContainer(instance),
6✔
343
                                },
6✔
344
                                instance.GetExtraContainers()...,
6✔
345
                        ),
6✔
346
                        Volumes: instance.GetExtraVolumes(),
6✔
347
                },
6✔
348
        }
6✔
349

6✔
350
        podAnnotation := podTemplate.ObjectMeta.DeepCopy().Annotations
6✔
351
        podAnnotation[handler.ManageContainersAnnotation] = generateAnnotationByContainers(podTemplate.Spec.Containers)
6✔
352
        podTemplate.Annotations = podAnnotation
6✔
353

6✔
354
        sts := &appsv1.StatefulSet{
6✔
355
                TypeMeta: metav1.TypeMeta{
6✔
356
                        APIVersion: "apps/v1",
6✔
357
                        Kind:       "StatefulSet",
6✔
358
                },
6✔
359
                ObjectMeta: metav1.ObjectMeta{
6✔
360
                        Name:        instance.GetName(),
6✔
361
                        Namespace:   instance.GetNamespace(),
6✔
362
                        Labels:      instance.GetLabels(),
6✔
363
                        Annotations: annotations,
6✔
364
                },
6✔
365
                Spec: appsv1.StatefulSetSpec{
6✔
366
                        Replicas: instance.GetReplicas(),
6✔
367
                        Selector: &metav1.LabelSelector{
6✔
368
                                MatchLabels: instance.GetLabels(),
6✔
369
                        },
6✔
370
                        PodManagementPolicy: appsv1.ParallelPodManagement,
6✔
371
                        Template:            podTemplate,
6✔
372
                },
6✔
373
        }
6✔
374

6✔
375
        sts = generateDataVolume(instance, sts)
6✔
376

6✔
377
        return sts
6✔
378
}
379

380
func generateInitPluginList(instance appsv1beta3.Emqx, existPluginList *appsv1beta3.EmqxPluginList) []client.Object {
6✔
381
        matchedPluginList := []appsv1beta3.EmqxPlugin{}
6✔
382
        for _, existPlugin := range existPluginList.Items {
12✔
383
                selector, _ := labels.ValidatedSelectorFromSet(existPlugin.Spec.Selector)
6✔
384
                if selector.Empty() || !selector.Matches(labels.Set(instance.GetLabels())) {
12✔
385
                        continue
6✔
386
                }
387
                matchedPluginList = append(matchedPluginList, existPlugin)
6✔
388
        }
389

390
        isExistPlugin := func(pluginName string, pluginList []appsv1beta3.EmqxPlugin) bool {
12✔
391
                for _, plugin := range pluginList {
12✔
392
                        if plugin.Spec.PluginName == pluginName {
12✔
393
                                return true
6✔
394
                        }
6✔
395
                }
396
                return false
6✔
397
        }
398

399
        pluginList := []client.Object{}
6✔
400
        // Default plugins
6✔
401
        if !isExistPlugin("emqx_rule_engine", matchedPluginList) {
12✔
402
                emqxRuleEngine := &appsv1beta3.EmqxPlugin{
6✔
403
                        TypeMeta: metav1.TypeMeta{
6✔
404
                                APIVersion: "apps.emqx.io/v1beta3",
6✔
405
                                Kind:       "EmqxPlugin",
6✔
406
                        },
6✔
407
                        ObjectMeta: metav1.ObjectMeta{
6✔
408
                                Name:      fmt.Sprintf("%s-rule-engine", instance.GetName()),
6✔
409
                                Namespace: instance.GetNamespace(),
6✔
410
                                Labels:    instance.GetLabels(),
6✔
411
                        },
6✔
412
                        Spec: appsv1beta3.EmqxPluginSpec{
6✔
413
                                PluginName: "emqx_rule_engine",
6✔
414
                                Selector:   instance.GetLabels(),
6✔
415
                                Config:     map[string]string{},
6✔
416
                        },
6✔
417
                }
6✔
418
                pluginList = append(pluginList, emqxRuleEngine)
6✔
419
        }
6✔
420

421
        if !isExistPlugin("emqx_retainer", matchedPluginList) {
12✔
422
                emqxRetainer := &appsv1beta3.EmqxPlugin{
6✔
423
                        TypeMeta: metav1.TypeMeta{
6✔
424
                                APIVersion: "apps.emqx.io/v1beta3",
6✔
425
                                Kind:       "EmqxPlugin",
6✔
426
                        },
6✔
427
                        ObjectMeta: metav1.ObjectMeta{
6✔
428
                                Name:      fmt.Sprintf("%s-retainer", instance.GetName()),
6✔
429
                                Namespace: instance.GetNamespace(),
6✔
430
                                Labels:    instance.GetLabels(),
6✔
431
                        },
6✔
432
                        Spec: appsv1beta3.EmqxPluginSpec{
6✔
433
                                PluginName: "emqx_retainer",
6✔
434
                                Selector:   instance.GetLabels(),
6✔
435
                                Config:     map[string]string{},
6✔
436
                        },
6✔
437
                }
6✔
438
                pluginList = append(pluginList, emqxRetainer)
6✔
439
        }
6✔
440

441
        enterprise, ok := instance.(*appsv1beta3.EmqxEnterprise)
6✔
442
        if ok && !isExistPlugin("emqx_modules", matchedPluginList) {
12✔
443
                emqxModules := &appsv1beta3.EmqxPlugin{
6✔
444
                        TypeMeta: metav1.TypeMeta{
6✔
445
                                APIVersion: "apps.emqx.io/v1beta3",
6✔
446
                                Kind:       "EmqxPlugin",
6✔
447
                        },
6✔
448
                        ObjectMeta: metav1.ObjectMeta{
6✔
449
                                Name:      fmt.Sprintf("%s-modules", instance.GetName()),
6✔
450
                                Namespace: instance.GetNamespace(),
6✔
451
                                Labels:    instance.GetLabels(),
6✔
452
                        },
6✔
453
                        Spec: appsv1beta3.EmqxPluginSpec{
6✔
454
                                PluginName: "emqx_modules",
6✔
455
                                Selector:   instance.GetLabels(),
6✔
456
                                Config:     map[string]string{},
6✔
457
                        },
6✔
458
                }
6✔
459

6✔
460
                if enterprise.Spec.EmqxTemplate.Modules != nil {
12✔
461
                        emqxModules.Spec.Config = map[string]string{
6✔
462
                                "modules.loaded_file": "/mounted/modules/loaded_modules",
6✔
463
                        }
6✔
464
                }
6✔
465

466
                pluginList = append(pluginList, emqxModules)
6✔
467
        }
468

469
        return pluginList
6✔
470
}
471

472
func generateDefaultPluginsConfig(instance appsv1beta3.Emqx) *corev1.ConfigMap {
6✔
473
        names := appsv1beta3.Names{Object: instance}
6✔
474

6✔
475
        cm := &corev1.ConfigMap{
6✔
476
                TypeMeta: metav1.TypeMeta{
6✔
477
                        APIVersion: "v1",
6✔
478
                        Kind:       "ConfigMap",
6✔
479
                },
6✔
480
                ObjectMeta: metav1.ObjectMeta{
6✔
481
                        Labels:    instance.GetLabels(),
6✔
482
                        Namespace: instance.GetNamespace(),
6✔
483
                        Name:      names.PluginsConfig(),
6✔
484
                },
6✔
485
                Data: map[string]string{
6✔
486
                        "emqx_modules.conf":           "",
6✔
487
                        "emqx_management.conf":        "management.listener.http = 8081\n",
6✔
488
                        "emqx_dashboard.conf":         "dashboard.listener.http = 18083\n",
6✔
489
                        "emqx_rule_engine.conf":       "",
6✔
490
                        "emqx_retainer.conf":          "",
6✔
491
                        "emqx_telemetry.conf":         "",
6✔
492
                        "emqx_auth_http.conf":         "auth.http.auth_req.url = http://127.0.0.1:80/mqtt/auth\nauth.http.auth_req.method = post\nauth.http.auth_req.headers.content_type = application/x-www-form-urlencoded\nauth.http.auth_req.params = clientid=%c,username=%u,password=%P\nauth.http.acl_req.url = http://127.0.0.1:80/mqtt/acl\nauth.http.acl_req.method = post\nauth.http.acl_req.headers.content-type = application/x-www-form-urlencoded\nauth.http.acl_req.params = access=%A,username=%u,clientid=%c,ipaddr=%a,topic=%t,mountpoint=%m\nauth.http.timeout = 5s\nauth.http.connect_timeout = 5s\nauth.http.pool_size = 32\nauth.http.enable_pipelining = true\n",
6✔
493
                        "emqx_auth_jwt.conf":          "auth.jwt.secret = emqxsecret\nauth.jwt.from = password\nauth.jwt.verify_claims = off\n",
6✔
494
                        "emqx_auth_ldap.conf":         "auth.ldap.servers = 127.0.0.1\nauth.ldap.port = 389\nauth.ldap.pool = 8\nauth.ldap.bind_dn = cn=root,dc=emqx,dc=io\nauth.ldap.bind_password = public\nauth.ldap.timeout = 30s\nauth.ldap.device_dn = ou=device,dc=emqx,dc=io\nauth.ldap.match_objectclass = mqttUser\nauth.ldap.username.attributetype = uid\nauth.ldap.password.attributetype = userPassword\nauth.ldap.ssl = false\n",
6✔
495
                        "emqx_auth_mnesia.conf":       "",
6✔
496
                        "emqx_auth_mongo.conf":        "auth.mongo.type = single\nauth.mongo.srv_record = false\nauth.mongo.server = 127.0.0.1:27017\nauth.mongo.pool = 8\nauth.mongo.database = mqtt\nauth.mongo.topology.pool_size = 1\nauth.mongo.topology.max_overflow = 0\nauth.mongo.auth_query.password_hash = sha256\nauth.mongo.auth_query.collection = mqtt_user\nauth.mongo.auth_query.password_field = password\nauth.mongo.auth_query.selector = username=%u\nauth.mongo.super_query.collection = mqtt_user\nauth.mongo.super_query.super_field = is_superuser\nauth.mongo.super_query.selector = username=%u\nauth.mongo.acl_query.collection = mqtt_acl\nauth.mongo.acl_query.selector = username=%u\n",
6✔
497
                        "emqx_auth_mysql.conf":        "auth.mysql.server = 127.0.0.1:3306\nauth.mysql.pool = 8\nauth.mysql.database = mqtt\nauth.mysql.auth_query = select password from mqtt_user where username = '%u' limit 1\nauth.mysql.password_hash = sha256\nauth.mysql.super_query = select is_superuser from mqtt_user where username = '%u' limit 1\nauth.mysql.acl_query = select allow, ipaddr, username, clientid, access, topic from mqtt_acl where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'\n",
6✔
498
                        "emqx_auth_pgsql.conf":        "auth.pgsql.server = 127.0.0.1:5432\nauth.pgsql.pool = 8\nauth.pgsql.username = root\nauth.pgsql.database = mqtt\nauth.pgsql.encoding = utf8\nauth.pgsql.ssl = off\nauth.pgsql.auth_query = select password from mqtt_user where username = '%u' limit 1\nauth.pgsql.password_hash = sha256\nauth.pgsql.super_query = select is_superuser from mqtt_user where username = '%u' limit 1\nauth.pgsql.acl_query = select allow, ipaddr, username, clientid, access, topic from mqtt_acl where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'\n",
6✔
499
                        "emqx_auth_redis.conf":        "auth.redis.type = single\nauth.redis.server = 127.0.0.1:6379\nauth.redis.pool = 8\nauth.redis.database = 0\nauth.redis.auth_cmd = HMGET mqtt_user:%u password\nauth.redis.password_hash = plain\nauth.redis.super_cmd = HGET mqtt_user:%u is_superuser\nauth.redis.acl_cmd = HGETALL mqtt_acl:%u\n",
6✔
500
                        "emqx_backend_cassa.conf":     "backend.ecql.pool1.nodes = 127.0.0.1:9042\nbackend.ecql.pool1.size = 8\nbackend.ecql.pool1.auto_reconnect = 1\nbackend.ecql.pool1.username = cassandra\nbackend.ecql.pool1.password = cassandra\nbackend.ecql.pool1.keyspace = mqtt\nbackend.ecql.pool1.logger = info\nbackend.cassa.hook.client.connected.1    = {\"action\": {\"function\": \"on_client_connected\"}, \"pool\": \"pool1\"}\nbackend.cassa.hook.client.connected.2    = {\"action\": {\"function\": \"on_subscription_lookup\"}, \"pool\": \"pool1\"}\nbackend.cassa.hook.client.disconnected.1 = {\"action\": {\"function\": \"on_client_disconnected\"}, \"pool\": \"pool1\"}\nbackend.cassa.hook.session.subscribed.1  = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_fetch\"}, \"offline_opts\": {\"max_returned_count\": 500, \"time_range\": \"2h\"}, \"pool\": \"pool1\"}\nbackend.cassa.hook.session.subscribed.2  = {\"action\": {\"function\": \"on_retain_lookup\"}, \"pool\": \"pool1\"}\nbackend.cassa.hook.session.unsubscribed.1= {\"topic\": \"#\", \"action\": {\"cql\": [\"delete from acked where clientid = ${clientid} and topic = ${topic}\"]}, \"pool\": \"pool1\"}\nbackend.cassa.hook.message.publish.1     = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_publish\"}, \"pool\": \"pool1\"}\nbackend.cassa.hook.message.publish.2     = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_retain\"}, \"pool\": \"pool1\"}\nbackend.cassa.hook.message.publish.3     = {\"topic\": \"#\", \"action\": {\"function\": \"on_retain_delete\"}, \"pool\": \"pool1\"}\nbackend.cassa.hook.message.acked.1       = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_acked\"}, \"pool\": \"pool1\"}\n",
6✔
501
                        "emqx_backend_dynamo.conf":    "backend.dynamo.region = us-west-2\nbackend.dynamo.pool1.server = http://localhost:8000\nbackend.dynamo.pool1.pool_size = 8\nbackend.dynamo.pool1.aws_access_key_id = FAKE_AWS_ACCESS_KEY_ID\nbackend.dynamo.pool1.aws_secret_access_key = FAKE_AWS_SECRET_ACCESS_KEY\nbackend.dynamo.hook.client.connected.1    = {\"action\": {\"function\": \"on_client_connected\"}, \"pool\": \"pool1\"}\nbackend.dynamo.hook.client.connected.2    = {\"action\": {\"function\": \"on_subscribe_lookup\"}, \"pool\": \"pool1\"}\nbackend.dynamo.hook.client.disconnected.1 = {\"action\": {\"function\": \"on_client_disconnected\"}, \"pool\": \"pool1\"}\nbackend.dynamo.hook.session.subscribed.1  = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_fetch_for_queue\"}, \"pool\": \"pool1\"}\nbackend.dynamo.hook.session.subscribed.2  = {\"topic\": \"#\", \"action\": {\"function\": \"on_retain_lookup\"}, \"pool\": \"pool1\"}\nbackend.dynamo.hook.session.unsubscribed.1= {\"topic\": \"#\", \"action\": {\"function\": \"on_acked_delete\"}, \"pool\": \"pool1\"}\nbackend.dynamo.hook.message.publish.1     = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_publish\"}, \"pool\": \"pool1\"}\nbackend.dynamo.hook.message.publish.2     = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_retain\"}, \"pool\": \"pool1\"}\nbackend.dynamo.hook.message.publish.3     = {\"topic\": \"#\", \"action\": {\"function\": \"on_retain_delete\"}, \"pool\": \"pool1\"}\nbackend.dynamo.hook.message.acked.1       = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_acked_for_queue\"}, \"pool\": \"pool1\"}\n",
6✔
502
                        "emqx_backend_influxdb.conf":  "backend.influxdb.udp.pool1.server = 127.0.0.1:8089\nbackend.influxdb.udp.pool1.pool_size = 8\nbackend.influxdb.http.pool1.server = 127.0.0.1:8086\nbackend.influxdb.http.pool1.pool_size = 8\nbackend.influxdb.http.pool1.precision = ms\nbackend.influxdb.http.pool1.database = mydb\nbackend.influxdb.http.pool1.https_enabled = false\nbackend.influxdb.hook.message.publish.1 = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_publish\"}, \"pool\": \"pool1\"}\n",
6✔
503
                        "emqx_backend_mongo.conf":     "backend.mongo.pool1.type = single\nbackend.mongo.pool1.srv_record = false\nbackend.mongo.pool1.server = 127.0.0.1:27017\nbackend.mongo.pool1.c_pool_size = 8\nbackend.mongo.pool1.database = mqtt\nbackend.mongo.hook.client.connected.1    = {\"action\": {\"function\": \"on_client_connected\"}, \"pool\": \"pool1\"}\nbackend.mongo.hook.client.connected.2    = {\"action\": {\"function\": \"on_subscribe_lookup\"}, \"pool\": \"pool1\"}\nbackend.mongo.hook.client.disconnected.1 = {\"action\": {\"function\": \"on_client_disconnected\"}, \"pool\": \"pool1\"}\nbackend.mongo.hook.session.subscribed.1  = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_fetch\"}, \"pool\": \"pool1\", \"offline_opts\": {\"time_range\": \"2h\", \"max_returned_count\": 500}}\nbackend.mongo.hook.session.subscribed.2  = {\"topic\": \"#\", \"action\": {\"function\": \"on_retain_lookup\"}, \"pool\": \"pool1\"}\nbackend.mongo.hook.session.unsubscribed.1= {\"topic\": \"#\", \"action\": {\"function\": \"on_acked_delete\"}, \"pool\": \"pool1\"}\nbackend.mongo.hook.message.publish.1     = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_publish\"}, \"pool\": \"pool1\"}\nbackend.mongo.hook.message.publish.2     = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_retain\"}, \"pool\": \"pool1\"}\nbackend.mongo.hook.message.publish.3     = {\"topic\": \"#\", \"action\": {\"function\": \"on_retain_delete\"}, \"pool\": \"pool1\"}\nbackend.mongo.hook.message.acked.1       = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_acked\"}, \"pool\": \"pool1\"}\n",
6✔
504
                        "emqx_backend_mysql.conf":     "backend.mysql.pool1.server = 127.0.0.1:3306\nbackend.mysql.pool1.pool_size = 8\nbackend.mysql.pool1.user = root\nbackend.mysql.pool1.password = public\nbackend.mysql.pool1.database = mqtt\nbackend.mysql.hook.client.connected.1    = {\"action\": {\"function\": \"on_client_connected\"}, \"pool\": \"pool1\"}\nbackend.mysql.hook.client.connected.2     = {\"action\": {\"function\": \"on_subscribe_lookup\"}, \"pool\": \"pool1\"}\nbackend.mysql.hook.client.disconnected.1 = {\"action\": {\"function\": \"on_client_disconnected\"}, \"pool\": \"pool1\"}\nbackend.mysql.hook.session.subscribed.1  = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_fetch\"}, \"offline_opts\": {\"max_returned_count\": 500, \"time_range\": \"2h\"}, \"pool\": \"pool1\"}\nbackend.mysql.hook.session.subscribed.2  = {\"topic\": \"#\", \"action\": {\"function\": \"on_retain_lookup\"}, \"pool\": \"pool1\"}\nbackend.mysql.hook.session.unsubscribed.1= {\"topic\": \"#\", \"action\": {\"sql\": [\"delete from mqtt_acked where clientid = ${clientid} and topic = ${topic}\"]}, \"pool\": \"pool1\"}\nbackend.mysql.hook.message.publish.1     = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_publish\"}, \"pool\": \"pool1\"}\nbackend.mysql.hook.message.publish.2     = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_retain\"}, \"pool\": \"pool1\"}\nbackend.mysql.hook.message.publish.3     = {\"topic\": \"#\", \"action\": {\"function\": \"on_retain_delete\"}, \"pool\": \"pool1\"}\nbackend.mysql.hook.message.acked.1       = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_acked\"}, \"pool\": \"pool1\"}\n",
6✔
505
                        "emqx_backend_opentsdb.conf":  "backend.opentsdb.pool1.server = 127.0.0.1:4242\nbackend.opentsdb.pool1.pool_size = 8\nbackend.opentsdb.pool1.summary = true\nbackend.opentsdb.pool1.details = false\nbackend.opentsdb.pool1.sync = false\nbackend.opentsdb.pool1.sync_timeout = 0\nbackend.opentsdb.pool1.max_batch_size = 20\nbackend.opentsdb.hook.message.publish.1 = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_publish\"}, \"pool\": \"pool1\"}\n",
6✔
506
                        "emqx_backend_pgsql.conf":     "backend.pgsql.pool1.server = 127.0.0.1:5432\nbackend.pgsql.pool1.pool_size = 8\nbackend.pgsql.pool1.username = root\nbackend.pgsql.pool1.password = public\nbackend.pgsql.pool1.database = mqtt\nbackend.pgsql.pool1.ssl = false\nbackend.pgsql.hook.client.connected.1    = {\"action\": {\"function\": \"on_client_connected\"}, \"pool\": \"pool1\"}\nbackend.pgsql.hook.client.connected.2     = {\"action\": {\"function\": \"on_subscribe_lookup\"}, \"pool\": \"pool1\"}\nbackend.pgsql.hook.client.disconnected.1 = {\"action\": {\"function\": \"on_client_disconnected\"}, \"pool\": \"pool1\"}\nbackend.pgsql.hook.session.subscribed.1  = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_fetch\"}, \"offline_opts\": {\"max_returned_count\": 500, \"time_range\": \"2h\"}, \"pool\": \"pool1\"}\nbackend.pgsql.hook.session.subscribed.2  = {\"topic\": \"#\", \"action\": {\"function\": \"on_retain_lookup\"}, \"pool\": \"pool1\"}\nbackend.pgsql.hook.session.unsubscribed.1= {\"topic\": \"#\", \"action\": {\"sql\": \"delete from mqtt_acked where clientid = ${clientid} and topic = ${topic}\"}, \"pool\": \"pool1\"}\nbackend.pgsql.hook.message.publish.1     = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_publish\"}, \"pool\": \"pool1\"}\nbackend.pgsql.hook.message.publish.2     = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_retain\"}, \"pool\": \"pool1\"}\nbackend.pgsql.hook.message.publish.3     = {\"topic\": \"#\", \"action\": {\"function\": \"on_retain_delete\"}, \"pool\": \"pool1\"}\nbackend.pgsql.hook.message.acked.1       = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_acked\"}, \"pool\": \"pool1\"}\n",
6✔
507
                        "emqx_backend_redis.conf":     "backend.redis.pool1.type = single\nbackend.redis.pool1.server = 127.0.0.1:6379\nbackend.redis.pool1.pool_size = 8\nbackend.redis.pool1.database = 0\nbackend.redis.pool1.channel = mqtt_channel\nbackend.redis.hook.client.connected.1    = {\"action\": {\"function\": \"on_client_connected\"}, \"pool\": \"pool1\"}\nbackend.redis.hook.client.connected.2    = {\"action\": {\"function\": \"on_subscribe_lookup\"}, \"pool\": \"pool1\"}\nbackend.redis.hook.client.disconnected.1 = {\"action\": {\"function\": \"on_client_disconnected\"}, \"pool\": \"pool1\"}\nbackend.redis.hook.session.subscribed.1  = {\"topic\": \"queue/#\", \"action\": {\"function\": \"on_message_fetch_for_queue\"}, \"pool\": \"pool1\"}\nbackend.redis.hook.session.subscribed.2  = {\"topic\": \"pubsub/#\", \"action\": {\"function\": \"on_message_fetch_for_pubsub\"}, \"pool\": \"pool1\"}\nbackend.redis.hook.session.subscribed.3  = {\"action\": {\"function\": \"on_retain_lookup\"}, \"pool\": \"pool1\"}\nbackend.redis.hook.session.unsubscribed.1= {\"topic\": \"#\", \"action\": {\"commands\": [\"DEL mqtt:acked:${clientid}:${topic}\"]}, \"pool\": \"pool1\"}\nbackend.redis.hook.message.publish.1     = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_publish\"}, \"expired_time\" : 3600, \"pool\": \"pool1\"}\nbackend.redis.hook.message.publish.2     = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_retain\"}, \"expired_time\" : 3600, \"pool\": \"pool1\"}\nbackend.redis.hook.message.publish.3     = {\"topic\": \"#\", \"action\": {\"function\": \"on_retain_delete\"}, \"pool\": \"pool1\"}\nbackend.redis.hook.message.acked.1       = {\"topic\": \"queue/#\", \"action\": {\"function\": \"on_message_acked_for_queue\"}, \"pool\": \"pool1\"}\nbackend.redis.hook.message.acked.2       = {\"topic\": \"pubsub/#\", \"action\": {\"function\": \"on_message_acked_for_pubsub\"}, \"pool\": \"pool1\"}\n",
6✔
508
                        "emqx_backend_timescale.conf": "backend.timescale.pool1.server = 127.0.0.1:5432\nbackend.timescale.pool1.pool_size = 8\nbackend.timescale.pool1.username = root\nbackend.timescale.pool1.password = public\nbackend.timescale.pool1.database = mqtt\nbackend.timescale.pool1.ssl = false\nbackend.timescale.hook.message.publish.1 = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_publish\"}, \"pool\": \"pool1\"}\n",
6✔
509
                        "emqx_bridge_kafka.conf":      "bridge.kafka.servers = 127.0.0.1:9092\nbridge.kafka.query_api_versions = true\nbridge.kafka.connection_strategy = per_partition\nbridge.kafka.min_metadata_refresh_interval = 5S\nbridge.kafka.produce = sync\nbridge.kafka.produce.sync_timeout = 3s\nbridge.kafka.sock.sndbuf = 1MB\nbridge.kafka.hook.client.connected.1     = {\"topic\":\"ClientConnected\"}\nbridge.kafka.hook.client.disconnected.1  = {\"topic\":\"ClientDisconnected\"}\nbridge.kafka.hook.session.subscribed.1   = {\"filter\":\"#\", \"topic\":\"SessionSubscribed\"}\nbridge.kafka.hook.session.unsubscribed.1 = {\"filter\":\"#\", \"topic\":\"SessionUnsubscribed\"}\nbridge.kafka.hook.message.publish.1      = {\"filter\":\"#\", \"topic\":\"MessagePublish\"}\nbridge.kafka.hook.message.delivered.1    = {\"filter\":\"#\", \"topic\":\"MessageDelivered\"}\nbridge.kafka.hook.message.acked.1        = {\"filter\":\"#\", \"topic\":\"MessageAcked\"}\n",
6✔
510
                        "emqx_bridge_mqtt.conf":       "bridge.mqtt.aws.address = 127.0.0.1:1883\nbridge.mqtt.aws.proto_ver = mqttv4\nbridge.mqtt.aws.start_type = manual\nbridge.mqtt.aws.clientid = bridge_aws\nbridge.mqtt.aws.clean_start = true\nbridge.mqtt.aws.username = user\nbridge.mqtt.aws.password = passwd\nbridge.mqtt.aws.forwards = topic1/#,topic2/#\nbridge.mqtt.aws.forward_mountpoint = bridge/aws/${node}/\nbridge.mqtt.aws.ssl = off\nbridge.mqtt.aws.cacertfile = etc/certs/cacert.pem\nbridge.mqtt.aws.certfile = etc/certs/client-cert.pem\nbridge.mqtt.aws.keyfile = etc/certs/client-key.pem\nbridge.mqtt.aws.ciphers = TLS_AES_256_GCM_SHA384,TLS_AES_128_GCM_SHA256,TLS_CHACHA20_POLY1305_SHA256,TLS_AES_128_CCM_SHA256,TLS_AES_128_CCM_8_SHA256,ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA\nbridge.mqtt.aws.keepalive = 60s\nbridge.mqtt.aws.tls_versions = tlsv1.3,tlsv1.2,tlsv1.1,tlsv1\nbridge.mqtt.aws.reconnect_interval = 30s\nbridge.mqtt.aws.retry_interval = 20s\nbridge.mqtt.aws.batch_size = 32\nbridge.mqtt.aws.max_inflight_size = 32\nbridge.mqtt.aws.queue.replayq_dir = data/replayq/emqx_aws_bridge/\nbridge.mqtt.aws.queue.replayq_seg_bytes = 10MB\nbridge.mqtt.aws.queue.max_total_size = 5GB\n",
6✔
511
                        "emqx_bridge_pulsar.conf":     "bridge.pulsar.servers = 127.0.0.1:6650\nbridge.pulsar.produce = sync\nbridge.pulsar.sock.sndbuf = 1MB\nbridge.pulsar.hook.client.connected.1     = {\"topic\":\"ClientConnected\"}\nbridge.pulsar.hook.client.disconnected.1  = {\"topic\":\"ClientDisconnected\"}\nbridge.pulsar.hook.session.subscribed.1   = {\"filter\":\"#\", \"topic\":\"SessionSubscribed\"}\nbridge.pulsar.hook.session.unsubscribed.1 = {\"filter\":\"#\", \"topic\":\"SessionUnsubscribed\"}\nbridge.pulsar.hook.message.publish.1      = {\"filter\":\"#\", \"topic\":\"MessagePublish\"}\nbridge.pulsar.hook.message.delivered.1      = {\"filter\":\"#\", \"topic\":\"MessageDelivered\"}\nbridge.pulsar.hook.message.acked.1        = {\"filter\":\"#\", \"topic\":\"MessageAcked\"}\n",
6✔
512
                        "emqx_bridge_rabbit.conf":     "bridge.rabbit.1.server = 127.0.0.1:5672\nbridge.rabbit.1.pool_size = 8\nbridge.rabbit.1.username = guest\nbridge.rabbit.1.password = guest\nbridge.rabbit.1.timeout = 5s\nbridge.rabbit.1.virtual_host = /\nbridge.rabbit.1.heartbeat = 30s\nbridge.rabbit.hook.session.subscribed.1 = {\"action\": \"on_session_subscribed\", \"rabbit\": 1, \"exchange\": \"direct:emqx.subscription\"}\nbridge.rabbit.hook.session.unsubscribed.1 = {\"action\": \"on_session_unsubscribed\", \"rabbit\": 1, \"exchange\": \"direct:emqx.subscription\"}\nbridge.rabbit.hook.message.publish.1 = {\"topic\": \"$SYS/#\", \"action\": \"on_message_publish\", \"rabbit\": 1, \"exchange\": \"topic:emqx.$sys\"}\nbridge.rabbit.hook.message.publish.2 = {\"topic\": \"#\", \"action\": \"on_message_publish\", \"rabbit\": 1, \"exchange\": \"topic:emqx.pub\"}\nbridge.rabbit.hook.message.acked.1 = {\"topic\": \"#\", \"action\": \"on_message_acked\", \"rabbit\": 1, \"exchange\": \"topic:emqx.acked\"}\n",
6✔
513
                        "emqx_bridge_rocket.conf":     "bridge.rocket.servers = 127.0.0.1:9876\nbridge.rocket.refresh_topic_route_interval = 5S\nbridge.rocket.produce = sync\nbridge.rocket.sock.sndbuf = 1MB\nbridge.rocket.hook.client.connected.1     = {\"topic\":\"ClientConnected\"}\nbridge.rocket.hook.client.disconnected.1  = {\"topic\":\"ClientDisconnected\"}\nbridge.rocket.hook.session.subscribed.1   = {\"filter\":\"#\", \"topic\":\"SessionSubscribed\"}\nbridge.rocket.hook.session.unsubscribed.1 = {\"filter\":\"#\", \"topic\":\"SessionUnsubscribed\"}\nbridge.rocket.hook.message.publish.1      = {\"filter\":\"#\", \"topic\":\"MessagePublish\"}\nbridge.rocket.hook.message.delivered.1    = {\"filter\":\"#\", \"topic\":\"MessageDeliver\"}\nbridge.rocket.hook.message.acked.1        = {\"filter\":\"#\", \"topic\":\"MessageAcked\"}\n",
6✔
514
                        "emqx_coap.conf":              "coap.bind.udp.1 = 0.0.0.0:5683\ncoap.enable_stats = off\ncoap.bind.dtls.1 = 0.0.0.0:5684\ncoap.dtls.keyfile = etc/certs/key.pem\ncoap.dtls.certfile = etc/certs/cert.pem\ncoap.dtls.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA\n",
6✔
515
                        "emqx_conf.conf":              "conf.etc.dir.emqx = etc\nconf.etc.dir.emqx.zones = etc\nconf.etc.dir.emqx.listeners = etc\nconf.etc.dir.emqx.sys_mon = etc\n",
6✔
516
                        "emqx_exhook.conf":            "exhook.server.default.url = http://127.0.0.1:9000\n",
6✔
517
                        "emqx_exproto.conf":           "exproto.server.http.port = 9100\nexproto.server.https.port = 9101\nexproto.server.https.cacertfile = etc/certs/cacert.pem\nexproto.server.https.certfile = etc/certs/cert.pem\nexproto.server.https.keyfile = etc/certs/key.pem\nexproto.listener.protoname = tcp://0.0.0.0:7993\nexproto.listener.protoname.connection_handler_url = http://127.0.0.1:9001\nexproto.listener.protoname.acceptors = 8\nexproto.listener.protoname.max_connections = 1024000\nexproto.listener.protoname.max_conn_rate = 1000\nexproto.listener.protoname.active_n = 100\nexproto.listener.protoname.idle_timeout = 30s\nexproto.listener.protoname.access.1 = allow all\nexproto.listener.protoname.backlog = 1024\nexproto.listener.protoname.send_timeout = 15s\nexproto.listener.protoname.send_timeout_close = on\nexproto.listener.protoname.nodelay = true\nexproto.listener.protoname.reuseaddr = true\n",
6✔
518
                        "emqx_gbt32960.conf":          "gbt32960.frame.max_length = 8192\ngbt32960.proto.retx_interval = 8s\ngbt32960.proto.retx_max_times = 3\ngbt32960.proto.message_queue_len = 10\ngbt32960.listener.tcp = 0.0.0.0:7325\ngbt32960.listener.tcp.acceptors = 8\ngbt32960.listener.tcp.max_connections = 1024000\ngbt32960.listener.tcp.max_conn_rate = 1000\ngbt32960.listener.tcp.idle_timeout = 60s\ngbt32960.listener.tcp.active_n = 100\ngbt32960.listener.tcp.zone = external\ngbt32960.listener.tcp.access.1 = allow all\ngbt32960.listener.tcp.backlog = 1024\ngbt32960.listener.tcp.send_timeout = 15s\ngbt32960.listener.tcp.send_timeout_close = on\ngbt32960.listener.tcp.nodelay = true\ngbt32960.listener.tcp.reuseaddr = true\ngbt32960.listener.ssl = 7326\ngbt32960.listener.ssl.acceptors = 16\ngbt32960.listener.ssl.max_connections = 102400\ngbt32960.listener.ssl.max_conn_rate = 500\ngbt32960.listener.ssl.idle_timeout = 60s\ngbt32960.listener.ssl.active_n = 100\ngbt32960.listener.ssl.zone = external\ngbt32960.listener.ssl.access.1 = allow all\ngbt32960.listener.ssl.handshake_timeout = 15s\ngbt32960.listener.ssl.keyfile = etc/certs/key.pem\ngbt32960.listener.ssl.certfile = etc/certs/cert.pem\ngbt32960.listener.ssl.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA\ngbt32960.listener.ssl.reuseaddr = true\n",
6✔
519
                        "emqx_jt808.conf":             "jt808.proto.allow_anonymous = true\njt808.proto.dn_topic = jt808/%c/dn\njt808.proto.up_topic = jt808/%c/up\njt808.conn.idle_timeout = 30s\njt808.conn.enable_stats = on\njt808.frame.max_length = 8192\njt808.listener.tcp = 6207\njt808.listener.tcp.acceptors = 4\njt808.listener.tcp.max_clients = 512\n",
6✔
520
                        "emqx_lua_hook.conf":          "",
6✔
521
                        "emqx_lwm2m.conf":             "lwm2m.lifetime_min = 1s\nlwm2m.lifetime_max = 86400s\nlwm2m.mountpoint = lwm2m/%e/\nlwm2m.topics.command = dn/#\nlwm2m.topics.response = up/resp\nlwm2m.topics.notify = up/notify\nlwm2m.topics.register = up/resp\nlwm2m.topics.update = up/resp\nlwm2m.xml_dir =  etc/lwm2m_xml\nlwm2m.bind.udp.1 = 0.0.0.0:5683\nlwm2m.opts.buffer = 1024KB\nlwm2m.opts.recbuf = 1024KB\nlwm2m.opts.sndbuf = 1024KB\nlwm2m.opts.read_packets = 20\nlwm2m.bind.dtls.1 = 0.0.0.0:5684\nlwm2m.dtls.keyfile = etc/certs/key.pem\nlwm2m.dtls.certfile = etc/certs/cert.pem\nlwm2m.dtls.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA\n",
6✔
522
                        "emqx_prometheus.conf":        "prometheus.push.gateway.server = http://127.0.0.1:9091\nprometheus.interval = 15000\n",
6✔
523
                        "emqx_psk_file.conf":          "psk.file.path = etc/psk.txt\npsk.file.delimiter = :\n",
6✔
524
                        "emqx_recon.conf":             "",
6✔
525
                        "emqx_sasl.conf":              "",
6✔
526
                        "emqx_schema_registry.conf":   "",
6✔
527
                        "emqx_sn.conf":                "mqtt.sn.port = 1884\nmqtt.sn.advertise_duration = 15m\nmqtt.sn.gateway_id = 1\nmqtt.sn.enable_stats = off\nmqtt.sn.enable_qos3 = off\nmqtt.sn.idle_timeout = 30s\nmqtt.sn.predefined.topic.0 = reserved\nmqtt.sn.predefined.topic.1 = /predefined/topic/name/hello\nmqtt.sn.predefined.topic.2 = /predefined/topic/name/nice\nmqtt.sn.username = mqtt_sn_user\nmqtt.sn.password = abc\n",
6✔
528
                        "emqx_stomp.conf":             "stomp.listener = 61613\nstomp.listener.acceptors = 4\nstomp.listener.max_connections = 512\nstomp.default_user.login = guest\nstomp.default_user.passcode = guest\nstomp.allow_anonymous = true\nstomp.frame.max_headers = 10\nstomp.frame.max_header_length = 1024\nstomp.frame.max_body_length = 8192\n",
6✔
529
                        "emqx_tcp.conf":               "tcp.proto.idle_timeout = 15s\ntcp.proto.up_topic = tcp/%c/up\ntcp.proto.dn_topic = tcp/%c/dn\ntcp.proto.max_packet_size = 65535\ntcp.proto.enable_stats = on\ntcp.proto.force_gc_policy = 1000|1MB\ntcp.listener.external = 0.0.0.0:8090\ntcp.listener.external.acceptors = 8\ntcp.listener.external.max_connections = 1024000\ntcp.listener.external.max_conn_rate = 1000\ntcp.listener.external.active_n = 100\ntcp.listener.external.access.1 = allow all\ntcp.listener.external.backlog = 1024\ntcp.listener.external.send_timeout = 15s\ntcp.listener.external.send_timeout_close = on\ntcp.listener.external.nodelay = true\ntcp.listener.external.reuseaddr = true\ntcp.listener.ssl.external = 0.0.0.0:8091\ntcp.listener.ssl.external.acceptors = 8\ntcp.listener.ssl.external.max_connections = 1024000\ntcp.listener.ssl.external.max_conn_rate = 1000\ntcp.listener.ssl.external.active_n = 100\ntcp.listener.ssl.external.access.1 = allow all\ntcp.listener.ssl.external.handshake_timeout = 15s\ntcp.listener.ssl.external.keyfile = etc/certs/key.pem\ntcp.listener.ssl.external.certfile = etc/certs/cert.pem\ntcp.listener.ssl.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA\ntcp.listener.ssl.external.backlog = 1024\ntcp.listener.ssl.external.send_timeout = 15s\ntcp.listener.ssl.external.send_timeout_close = on\ntcp.listener.ssl.external.nodelay = true\ntcp.listener.ssl.external.reuseaddr = true\n",
6✔
530
                        "emqx_web_hook.conf":          "web.hook.url = http://127.0.0.1:80\nweb.hook.headers.content-type = application/json\nweb.hook.body.encoding_of_payload_field = plain\nweb.hook.pool_size = 32\nweb.hook.enable_pipelining = true\n",
6✔
531
                },
6✔
532
        }
6✔
533

6✔
534
        return cm
6✔
535
}
6✔
536

537
func generateSvc(instance appsv1beta3.Emqx) (headlessSvc, svc *corev1.Service) {
6✔
538
        names := appsv1beta3.Names{Object: instance}
6✔
539
        serviceTemplate := instance.GetServiceTemplate()
6✔
540

6✔
541
        svc = &corev1.Service{
6✔
542
                TypeMeta: metav1.TypeMeta{
6✔
543
                        APIVersion: "v1",
6✔
544
                        Kind:       "Service",
6✔
545
                },
6✔
546
                ObjectMeta: serviceTemplate.ObjectMeta,
6✔
547
                Spec:       serviceTemplate.Spec,
6✔
548
        }
6✔
549

6✔
550
        headlessSvc = &corev1.Service{
6✔
551
                TypeMeta: metav1.TypeMeta{
6✔
552
                        APIVersion: "v1",
6✔
553
                        Kind:       "Service",
6✔
554
                },
6✔
555
                ObjectMeta: metav1.ObjectMeta{
6✔
556
                        Labels:    instance.GetLabels(),
6✔
557
                        Name:      names.HeadlessSvc(),
6✔
558
                        Namespace: instance.GetNamespace(),
6✔
559
                },
6✔
560
                Spec: corev1.ServiceSpec{
6✔
561
                        Selector:                 instance.GetLabels(),
6✔
562
                        ClusterIP:                corev1.ClusterIPNone,
6✔
563
                        PublishNotReadyAddresses: true,
6✔
564
                },
6✔
565
        }
6✔
566

6✔
567
        compile := regexp.MustCompile(".*management.*")
6✔
568
        for _, port := range svc.Spec.Ports {
12✔
569
                if compile.MatchString(port.Name) {
12✔
570
                        // Headless services must not set nodePort
6✔
571
                        headlessSvc.Spec.Ports = append(headlessSvc.Spec.Ports, corev1.ServicePort{
6✔
572
                                Name:        port.Name,
6✔
573
                                Protocol:    port.Protocol,
6✔
574
                                AppProtocol: port.AppProtocol,
6✔
575
                                TargetPort:  port.TargetPort,
6✔
576
                                Port:        port.Port,
6✔
577
                        })
6✔
578
                }
6✔
579
        }
580

581
        return headlessSvc, svc
6✔
582
}
583

584
func generateAcl(instance appsv1beta3.Emqx) *corev1.ConfigMap {
6✔
585
        if len(instance.GetACL()) == 0 {
12✔
586
                return nil
6✔
587
        }
6✔
588
        names := appsv1beta3.Names{Object: instance}
6✔
589

6✔
590
        var aclString string
6✔
591
        for _, rule := range instance.GetACL() {
12✔
592
                aclString += fmt.Sprintf("%s\n", rule)
6✔
593
        }
6✔
594
        cm := &corev1.ConfigMap{
6✔
595
                TypeMeta: metav1.TypeMeta{
6✔
596
                        APIVersion: "v1",
6✔
597
                        Kind:       "ConfigMap",
6✔
598
                },
6✔
599
                ObjectMeta: metav1.ObjectMeta{
6✔
600
                        Labels:    instance.GetLabels(),
6✔
601
                        Namespace: instance.GetNamespace(),
6✔
602
                        Name:      names.ACL(),
6✔
603
                },
6✔
604
                Data: map[string]string{"acl.conf": aclString},
6✔
605
        }
6✔
606
        return cm
6✔
607
}
608

609
func generateLoadedModules(instance appsv1beta3.Emqx) *corev1.ConfigMap {
6✔
610
        names := appsv1beta3.Names{Object: instance}
6✔
611
        var loadedModulesString string
6✔
612
        switch obj := instance.(type) {
6✔
613
        case *appsv1beta3.EmqxBroker:
6✔
614
                modules := &appsv1beta3.EmqxBrokerModuleList{
6✔
615
                        Items: obj.Spec.EmqxTemplate.Modules,
6✔
616
                }
6✔
617
                loadedModulesString = modules.String()
6✔
618
                if loadedModulesString == "" {
12✔
619
                        return nil
6✔
620
                }
6✔
621
        case *appsv1beta3.EmqxEnterprise:
6✔
622
                modules := &appsv1beta3.EmqxEnterpriseModuleList{
6✔
623
                        Items: obj.Spec.EmqxTemplate.Modules,
6✔
624
                }
6✔
625
                // for enterprise, if modules is empty, don't create configmap
6✔
626
                loadedModulesString = modules.String()
6✔
627
                if loadedModulesString == "" {
12✔
628
                        return nil
6✔
629
                }
6✔
630
        }
631

632
        cm := &corev1.ConfigMap{
6✔
633
                TypeMeta: metav1.TypeMeta{
6✔
634
                        APIVersion: "v1",
6✔
635
                        Kind:       "ConfigMap",
6✔
636
                },
6✔
637
                ObjectMeta: metav1.ObjectMeta{
6✔
638
                        Labels:    instance.GetLabels(),
6✔
639
                        Namespace: instance.GetNamespace(),
6✔
640
                        Name:      names.LoadedModules(),
6✔
641
                },
6✔
642
                Data: map[string]string{"loaded_modules": loadedModulesString},
6✔
643
        }
6✔
644

6✔
645
        return cm
6✔
646
}
647

648
func generateLicense(emqxEnterprise *appsv1beta3.EmqxEnterprise) *corev1.Secret {
6✔
649
        names := appsv1beta3.Names{Object: emqxEnterprise}
6✔
650
        license := emqxEnterprise.GetLicense()
6✔
651
        if len(license.Data) == 0 && len(license.StringData) == 0 {
12✔
652
                return nil
6✔
653
        }
6✔
654

655
        secret := &corev1.Secret{
6✔
656
                TypeMeta: metav1.TypeMeta{
6✔
657
                        APIVersion: "v1",
6✔
658
                        Kind:       "Secret",
6✔
659
                },
6✔
660
                ObjectMeta: metav1.ObjectMeta{
6✔
661
                        Labels:    emqxEnterprise.GetLabels(),
6✔
662
                        Namespace: emqxEnterprise.GetNamespace(),
6✔
663
                        Name:      names.License(),
6✔
664
                },
6✔
665
                Type: corev1.SecretTypeOpaque,
6✔
666
                Data: map[string][]byte{"emqx.lic": emqxEnterprise.GetLicense().Data},
6✔
667
        }
6✔
668
        if emqxEnterprise.GetLicense().StringData != "" {
6✔
669
                secret.StringData = map[string]string{"emqx.lic": emqxEnterprise.GetLicense().StringData}
×
670
        }
×
671
        return secret
6✔
672
}
673

674
func generateDataVolume(instance appsv1beta3.Emqx, sts *appsv1.StatefulSet) *appsv1.StatefulSet {
6✔
675
        names := appsv1beta3.Names{Object: instance}
6✔
676
        dataName := names.Data()
6✔
677

6✔
678
        if reflect.ValueOf(instance.GetPersistent()).IsZero() {
12✔
679
                sts.Spec.Template.Spec.Volumes = append(
6✔
680
                        sts.Spec.Template.Spec.Volumes,
6✔
681
                        corev1.Volume{
6✔
682
                                Name: dataName,
6✔
683
                                VolumeSource: corev1.VolumeSource{
6✔
684
                                        EmptyDir: &corev1.EmptyDirVolumeSource{},
6✔
685
                                },
6✔
686
                        },
6✔
687
                )
6✔
688
        } else {
12✔
689
                sts.Spec.VolumeClaimTemplates = append(
6✔
690
                        sts.Spec.VolumeClaimTemplates,
6✔
691
                        generateVolumeClaimTemplate(instance, dataName),
6✔
692
                )
6✔
693
        }
6✔
694

695
        emqxContainerIndex := findContinerIndex(sts, EmqxContainerName)
6✔
696
        sts.Spec.Template.Spec.Containers[emqxContainerIndex].VolumeMounts = append(
6✔
697
                sts.Spec.Template.Spec.Containers[emqxContainerIndex].VolumeMounts,
6✔
698
                corev1.VolumeMount{
6✔
699
                        Name:      dataName,
6✔
700
                        MountPath: "/opt/emqx/data",
6✔
701
                },
6✔
702
        )
6✔
703

6✔
704
        ReloaderContainerIndex := findContinerIndex(sts, ReloaderContainerName)
6✔
705
        sts.Spec.Template.Spec.Containers[ReloaderContainerIndex].VolumeMounts = sts.Spec.Template.Spec.Containers[emqxContainerIndex].VolumeMounts
6✔
706
        return sts
6✔
707
}
708

709
func generateVolumeClaimTemplate(instance appsv1beta3.Emqx, Name string) corev1.PersistentVolumeClaim {
6✔
710
        template := instance.GetPersistent()
6✔
711
        pvc := corev1.PersistentVolumeClaim{
6✔
712
                ObjectMeta: metav1.ObjectMeta{
6✔
713
                        Name:      Name,
6✔
714
                        Namespace: instance.GetNamespace(),
6✔
715
                },
6✔
716
                Spec: template,
6✔
717
        }
6✔
718
        if pvc.Spec.AccessModes == nil {
12✔
719
                pvc.Spec.AccessModes = []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce}
6✔
720
        }
6✔
721
        return pvc
6✔
722
}
723

724
func generateReloaderContainer(instance appsv1beta3.Emqx) *corev1.Container {
6✔
725
        image := ReloaderContainerImage
6✔
726
        if len(instance.GetRegistry()) != 0 {
12✔
727
                image = fmt.Sprintf("%s/%s", instance.GetRegistry(), image)
6✔
728
        }
6✔
729
        return &corev1.Container{
6✔
730
                Name:            ReloaderContainerName,
6✔
731
                Image:           image,
6✔
732
                ImagePullPolicy: instance.GetImagePullPolicy(),
6✔
733
                Args: []string{
6✔
734
                        "-u", instance.GetUsername(),
6✔
735
                        "-p", instance.GetPassword(),
6✔
736
                        "-P", appsv1beta3.DefaultManagementPort,
6✔
737
                },
6✔
738
        }
6✔
739
}
740

741
func generateEmqxContainer(instance appsv1beta3.Emqx) *corev1.Container {
6✔
742
        image := instance.GetImage()
6✔
743
        if len(instance.GetRegistry()) != 0 {
12✔
744
                image = fmt.Sprintf("%s/%s", instance.GetRegistry(), image)
6✔
745
        }
6✔
746
        return &corev1.Container{
6✔
747
                Name:            EmqxContainerName,
6✔
748
                Image:           image,
6✔
749
                ImagePullPolicy: instance.GetImagePullPolicy(),
6✔
750
                Resources:       instance.GetResource(),
6✔
751
                Env: mergeEnvAndConfig(instance, []corev1.EnvVar{
6✔
752
                        {
6✔
753
                                Name:  "EMQX_MANAGEMENT__DEFAULT_APPLICATION__ID",
6✔
754
                                Value: instance.GetUsername(),
6✔
755
                        },
6✔
756
                        {
6✔
757
                                Name:  "EMQX_DASHBOARD__DEFAULT_USER__LOGIN",
6✔
758
                                Value: instance.GetUsername(),
6✔
759
                        },
6✔
760
                        {
6✔
761
                                Name:  "EMQX_MANAGEMENT__DEFAULT_APPLICATION__SECRET",
6✔
762
                                Value: instance.GetPassword(),
6✔
763
                        },
6✔
764
                        {
6✔
765
                                Name:  "EMQX_DASHBOARD__DEFAULT_USER__PASSWORD",
6✔
766
                                Value: instance.GetPassword(),
6✔
767
                        },
6✔
768
                }...),
6✔
769
                Args:           instance.GetArgs(),
6✔
770
                ReadinessProbe: instance.GetReadinessProbe(),
6✔
771
                LivenessProbe:  instance.GetLivenessProbe(),
6✔
772
                StartupProbe:   instance.GetStartupProbe(),
6✔
773
                VolumeMounts:   instance.GetExtraVolumeMounts(),
6✔
774
        }
6✔
775
}
776

777
func updateEnvAndVolumeForSts(sts *appsv1.StatefulSet, envVar corev1.EnvVar, volumeMount corev1.VolumeMount, volume corev1.Volume) *appsv1.StatefulSet {
6✔
778
        emqxContainerIndex := findContinerIndex(sts, EmqxContainerName)
6✔
779
        reloaderContainerIndex := findContinerIndex(sts, ReloaderContainerName)
6✔
780

6✔
781
        isNotExistVolume := func(volume corev1.Volume) bool {
12✔
782
                for _, v := range sts.Spec.Template.Spec.Volumes {
12✔
783
                        if v.Name == volume.Name {
6✔
784
                                return false
×
785
                        }
×
786
                }
787
                return true
6✔
788
        }
789

790
        isNotExistVolumeVolumeMount := func(volumeMount corev1.VolumeMount) bool {
12✔
791
                for _, v := range sts.Spec.Template.Spec.Containers[emqxContainerIndex].VolumeMounts {
12✔
792
                        if v.Name == volumeMount.Name {
6✔
793
                                return false
×
794
                        }
×
795
                }
796
                return true
6✔
797
        }
798

799
        isNotExistEnv := func(envVar corev1.EnvVar) bool {
12✔
800
                for _, v := range sts.Spec.Template.Spec.Containers[emqxContainerIndex].Env {
12✔
801
                        if v.Name == envVar.Name {
6✔
802
                                return false
×
803
                        }
×
804
                }
805
                return true
6✔
806
        }
807

808
        if isNotExistVolume(volume) {
12✔
809
                sts.Spec.Template.Spec.Volumes = append(
6✔
810
                        sts.Spec.Template.Spec.Volumes,
6✔
811
                        volume,
6✔
812
                )
6✔
813
        }
6✔
814

815
        if isNotExistVolumeVolumeMount(volumeMount) {
12✔
816
                sts.Spec.Template.Spec.Containers[emqxContainerIndex].VolumeMounts = append(
6✔
817
                        sts.Spec.Template.Spec.Containers[emqxContainerIndex].VolumeMounts,
6✔
818
                        volumeMount,
6✔
819
                )
6✔
820
        }
6✔
821

822
        if isNotExistEnv(envVar) {
12✔
823
                sts.Spec.Template.Spec.Containers[emqxContainerIndex].Env = append(
6✔
824
                        sts.Spec.Template.Spec.Containers[emqxContainerIndex].Env,
6✔
825
                        envVar,
6✔
826
                )
6✔
827
        }
6✔
828

829
        sts.Spec.Template.Spec.Containers[reloaderContainerIndex].VolumeMounts = sts.Spec.Template.Spec.Containers[emqxContainerIndex].VolumeMounts
6✔
830
        sts.Spec.Template.Spec.Containers[reloaderContainerIndex].Env = sts.Spec.Template.Spec.Containers[emqxContainerIndex].Env
6✔
831

6✔
832
        return sts
6✔
833
}
834

835
func mergeEnvAndConfig(instance appsv1beta3.Emqx, extraEnvs ...corev1.EnvVar) []corev1.EnvVar {
6✔
836
        lookup := func(name string, envs []corev1.EnvVar) bool {
12✔
837
                for _, env := range envs {
12✔
838
                        if env.Name == name {
12✔
839
                                return true
6✔
840
                        }
6✔
841
                }
842
                return false
6✔
843
        }
844

845
        envs := append(instance.GetEnv(), extraEnvs...)
6✔
846
        emqxConfig := instance.GetEmqxConfig()
6✔
847

6✔
848
        for k, v := range emqxConfig {
12✔
849
                key := fmt.Sprintf("EMQX_%s", strings.ToUpper(strings.ReplaceAll(k, ".", "__")))
6✔
850
                if !lookup(key, envs) {
12✔
851
                        envs = append(envs, corev1.EnvVar{Name: key, Value: v})
6✔
852
                }
6✔
853
        }
854

855
        sort.Slice(envs, func(i, j int) bool {
12✔
856
                return envs[i].Name < envs[j].Name
6✔
857
        })
6✔
858
        return envs
6✔
859
}
860

861
func findContinerIndex(sts *appsv1.StatefulSet, containerName string) int {
6✔
862
        for k, c := range sts.Spec.Template.Spec.Containers {
12✔
863
                if c.Name == containerName {
12✔
864
                        return k
6✔
865
                }
6✔
866
        }
867
        return -1
×
868
}
869

870
func generateAnnotationByContainers(containers []corev1.Container) string {
6✔
871
        containerNames := []string{}
6✔
872
        for _, c := range containers {
12✔
873
                containerNames = append(containerNames, c.Name)
6✔
874
        }
6✔
875
        return strings.Join(containerNames, ",")
6✔
876
}
877

878
func updateEmqxStatus(instance appsv1beta3.Emqx, emqxNodes []appsv1beta3.EmqxNode) appsv1beta3.Emqx {
6✔
879
        status := instance.GetStatus()
6✔
880
        status.Replicas = *instance.GetReplicas()
6✔
881
        if emqxNodes != nil {
12✔
882
                readyReplicas := int32(0)
6✔
883
                for _, node := range emqxNodes {
12✔
884
                        if node.NodeStatus == "Running" {
12✔
885
                                readyReplicas++
6✔
886
                        }
6✔
887
                }
888
                status.ReadyReplicas = readyReplicas
6✔
889
                status.EmqxNodes = emqxNodes
6✔
890
        }
891

892
        var cond *appsv1beta3.Condition
6✔
893
        if status.Replicas == status.ReadyReplicas {
12✔
894
                cond = appsv1beta3.NewCondition(
6✔
895
                        appsv1beta3.ConditionRunning,
6✔
896
                        corev1.ConditionTrue,
6✔
897
                        "ClusterReady",
6✔
898
                        "All resources are ready",
6✔
899
                )
6✔
900
        } else {
12✔
901
                cond = appsv1beta3.NewCondition(
6✔
902
                        appsv1beta3.ConditionRunning,
6✔
903
                        corev1.ConditionFalse,
6✔
904
                        "ClusterNotReady",
6✔
905
                        "Some nodes are not ready",
6✔
906
                )
6✔
907
        }
6✔
908
        status.SetCondition(*cond)
6✔
909
        instance.SetStatus(status)
6✔
910
        return instance
6✔
911
}
912

913
func updatePluginsConfigForSts(sts *appsv1.StatefulSet, PluginsConfig *corev1.ConfigMap) *appsv1.StatefulSet {
6✔
914
        return updateEnvAndVolumeForSts(sts,
6✔
915
                corev1.EnvVar{
6✔
916
                        Name:  "EMQX_PLUGINS__ETC_DIR",
6✔
917
                        Value: "/mounted/plugins/etc",
6✔
918
                },
6✔
919
                corev1.VolumeMount{
6✔
920
                        Name:      PluginsConfig.Name,
6✔
921
                        MountPath: "/mounted/plugins/etc",
6✔
922
                },
6✔
923
                corev1.Volume{
6✔
924
                        Name: PluginsConfig.Name,
6✔
925
                        VolumeSource: corev1.VolumeSource{
6✔
926
                                ConfigMap: &corev1.ConfigMapVolumeSource{
6✔
927
                                        LocalObjectReference: corev1.LocalObjectReference{
6✔
928
                                                Name: PluginsConfig.Name,
6✔
929
                                        },
6✔
930
                                },
6✔
931
                        },
6✔
932
                },
6✔
933
        )
6✔
934
}
6✔
935

936
func updateAclForSts(sts *appsv1.StatefulSet, acl *corev1.ConfigMap) *appsv1.StatefulSet {
6✔
937
        if sts.Spec.Template.Annotations == nil {
12✔
938
                sts.Spec.Template.Annotations = make(map[string]string)
6✔
939
        }
6✔
940
        sts.Spec.Template.Annotations["ACL/Base64EncodeConfig"] = base64.StdEncoding.EncodeToString([]byte(acl.Data["acl.conf"]))
6✔
941
        return updateEnvAndVolumeForSts(sts,
6✔
942
                corev1.EnvVar{
6✔
943
                        Name:  "EMQX_ACL_FILE",
6✔
944
                        Value: "/mounted/acl/acl.conf",
6✔
945
                },
6✔
946
                corev1.VolumeMount{
6✔
947
                        Name:      acl.Name,
6✔
948
                        MountPath: "/mounted/acl",
6✔
949
                },
6✔
950
                corev1.Volume{
6✔
951
                        Name: acl.Name,
6✔
952
                        VolumeSource: corev1.VolumeSource{
6✔
953
                                ConfigMap: &corev1.ConfigMapVolumeSource{
6✔
954
                                        LocalObjectReference: corev1.LocalObjectReference{
6✔
955
                                                Name: acl.Name,
6✔
956
                                        },
6✔
957
                                },
6✔
958
                        },
6✔
959
                },
6✔
960
        )
6✔
961
}
962

963
func updateLoadedModulesForSts(sts *appsv1.StatefulSet, loadedModules *corev1.ConfigMap) *appsv1.StatefulSet {
6✔
964
        if sts.Spec.Template.Annotations == nil {
12✔
965
                sts.Spec.Template.Annotations = make(map[string]string)
6✔
966
        }
6✔
967
        sts.Spec.Template.Annotations["LoadedModules/Base64EncodeConfig"] = base64.StdEncoding.EncodeToString([]byte(loadedModules.Data["loaded_modules"]))
6✔
968
        return updateEnvAndVolumeForSts(sts,
6✔
969
                corev1.EnvVar{
6✔
970
                        Name:  "EMQX_MODULES__LOADED_FILE",
6✔
971
                        Value: "/mounted/modules/loaded_modules",
6✔
972
                },
6✔
973
                corev1.VolumeMount{
6✔
974
                        Name:      loadedModules.Name,
6✔
975
                        MountPath: "/mounted/modules",
6✔
976
                },
6✔
977
                corev1.Volume{
6✔
978
                        Name: loadedModules.Name,
6✔
979
                        VolumeSource: corev1.VolumeSource{
6✔
980
                                ConfigMap: &corev1.ConfigMapVolumeSource{
6✔
981
                                        LocalObjectReference: corev1.LocalObjectReference{
6✔
982
                                                Name: loadedModules.Name,
6✔
983
                                        },
6✔
984
                                },
6✔
985
                        },
6✔
986
                },
6✔
987
        )
6✔
988
}
989

990
func updateLicenseForsts(sts *appsv1.StatefulSet, license *corev1.Secret) *appsv1.StatefulSet {
6✔
991
        fileName := "emqx.lic"
6✔
992
        for k := range license.Data {
12✔
993
                fileName = k
6✔
994
                break
6✔
995
        }
996

997
        return updateEnvAndVolumeForSts(sts,
6✔
998
                corev1.EnvVar{
6✔
999
                        Name:  "EMQX_LICENSE__FILE",
6✔
1000
                        Value: filepath.Join("/mounted/license", fileName),
6✔
1001
                },
6✔
1002
                corev1.VolumeMount{
6✔
1003
                        Name:      license.Name,
6✔
1004
                        MountPath: "/mounted/license",
6✔
1005
                        ReadOnly:  true,
6✔
1006
                },
6✔
1007
                corev1.Volume{
6✔
1008
                        Name: license.Name,
6✔
1009
                        VolumeSource: corev1.VolumeSource{
6✔
1010
                                Secret: &corev1.SecretVolumeSource{
6✔
1011
                                        SecretName: license.Name,
6✔
1012
                                },
6✔
1013
                        },
6✔
1014
                },
6✔
1015
        )
6✔
1016
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc