• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx-operator / 6256948632

21 Sep 2023 03:49AM UTC coverage: 73.791%. First build
6256948632

Pull #950

github

Rory-Z
fix: fix the unnecessary updates when replicas = 0

Signed-off-by: Rory Z <16801068+Rory-Z@users.noreply.github.com>
Pull Request #950: fix: fix the unnecessary updates when replicas = 0

2 of 12 new or added lines in 1 file covered. (16.67%)

1816 of 2461 relevant lines covered (73.79%)

1.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.05
/controllers/apps/v1beta3/emqx_handler.go
1
/*
2
Copyright 2021.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package apps
18

19
import (
20
        "context"
21
        "encoding/base64"
22
        "fmt"
23
        "net"
24
        "path/filepath"
25
        "reflect"
26
        "regexp"
27
        "sort"
28
        "strconv"
29
        "strings"
30
        "time"
31

32
        emperror "emperror.dev/errors"
33
        appsv1 "k8s.io/api/apps/v1"
34
        corev1 "k8s.io/api/core/v1"
35
        k8sErrors "k8s.io/apimachinery/pkg/api/errors"
36
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
37

38
        "k8s.io/apimachinery/pkg/labels"
39
        "k8s.io/apimachinery/pkg/runtime"
40
        "k8s.io/apimachinery/pkg/types"
41
        "k8s.io/apimachinery/pkg/util/intstr"
42
        "k8s.io/client-go/tools/record"
43
        "k8s.io/client-go/util/retry"
44

45
        ctrl "sigs.k8s.io/controller-runtime"
46
        "sigs.k8s.io/controller-runtime/pkg/client"
47
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
48

49
        appsv1beta3 "github.com/emqx/emqx-operator/apis/apps/v1beta3"
50
        "github.com/emqx/emqx-operator/pkg/handler"
51
        json "github.com/json-iterator/go"
52
        "github.com/tidwall/gjson"
53
)
54

55
var _ reconcile.Reconciler = &EmqxBrokerReconciler{}
56

57
const (
58
        ReloaderContainerName = "reloader"
59
)
60

61
type EmqxReconciler struct {
62
        handler.Handler
63
        Scheme *runtime.Scheme
64
        record.EventRecorder
65
}
66

67
func (r *EmqxReconciler) Do(ctx context.Context, instance appsv1beta3.Emqx) (ctrl.Result, error) {
1✔
68
        if instance.GetDeletionTimestamp() != nil {
1✔
NEW
69
                return ctrl.Result{}, nil
×
NEW
70
        }
×
71
        if instance.GetReplicas() == nil || *instance.GetReplicas() == 0 {
1✔
NEW
72
                if err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
×
NEW
73
                        _ = r.Get(ctx, client.ObjectKeyFromObject(instance), instance)
×
NEW
74
                        instance = updateEmqxStatus(instance, []appsv1beta3.EmqxNode{})
×
NEW
75
                        return r.Status().Update(ctx, instance)
×
NEW
76
                }); err != nil {
×
NEW
77
                        return ctrl.Result{}, emperror.Wrap(err, "failed to update status")
×
NEW
78
                }
×
NEW
79
                return ctrl.Result{}, nil
×
80
        }
81

82
        var err error
1✔
83
        var emqxNodes []appsv1beta3.EmqxNode
1✔
84
        emqxNodes, err = r.getNodeStatusesByAPI(instance)
1✔
85
        if err != nil {
2✔
86
                r.EventRecorder.Event(instance, corev1.EventTypeWarning, "FailedToGetNodeStatues", err.Error())
1✔
87
                condition := appsv1beta3.NewCondition(
1✔
88
                        appsv1beta3.ConditionRunning,
1✔
89
                        corev1.ConditionFalse,
1✔
90
                        "FailedToGetNodeStatues",
1✔
91
                        err.Error(),
1✔
92
                )
1✔
93
                instance.SetCondition(*condition)
1✔
94
                _ = r.Status().Update(ctx, instance)
1✔
95
        }
1✔
96
        if err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
2✔
97
                _ = r.Get(ctx, client.ObjectKeyFromObject(instance), instance)
1✔
98
                instance = updateEmqxStatus(instance, emqxNodes)
1✔
99
                return r.Status().Update(ctx, instance)
1✔
100
        }); err != nil {
1✔
101
                return ctrl.Result{}, emperror.Wrap(err, "failed to update status")
×
102
        }
×
103

104
        var resources []client.Object
1✔
105
        postFn := func(client.Object) error { return nil }
2✔
106

107
        sts := generateStatefulSetDef(instance)
1✔
108

1✔
109
        storeSts := &appsv1.StatefulSet{}
1✔
110
        if err := r.Get(ctx, client.ObjectKeyFromObject(sts), storeSts); err != nil {
2✔
111
                if !k8sErrors.IsNotFound(err) {
1✔
112
                        return ctrl.Result{}, err
×
113
                }
×
114
        }
115
        // store statefulSet is exit
116
        if storeSts.Spec.PodManagementPolicy != "" {
2✔
117
                sts.Spec.PodManagementPolicy = storeSts.Spec.PodManagementPolicy
1✔
118
        }
1✔
119
        // compatible with 1.2.2
120
        if storeSts.Spec.VolumeClaimTemplates != nil {
2✔
121
                sts.Spec.VolumeClaimTemplates = storeSts.Spec.VolumeClaimTemplates
1✔
122
        }
1✔
123
        // compatible with 1.2.2
124
        if storeSts.Annotations != nil {
2✔
125
                sts.Annotations = storeSts.Annotations
1✔
126
        }
1✔
127

128
        defaultPluginsConfig := generateDefaultPluginsConfig(instance)
1✔
129
        sts = updatePluginsConfigForSts(sts, defaultPluginsConfig)
1✔
130

1✔
131
        if status := instance.GetStatus(); !status.IsPluginInitialized() {
2✔
132
                resources = append(resources, defaultPluginsConfig)
1✔
133

1✔
134
                pluginsList := &appsv1beta3.EmqxPluginList{}
1✔
135
                err = r.Client.List(ctx, pluginsList, client.InNamespace(instance.GetNamespace()))
1✔
136
                if err != nil && !k8sErrors.IsNotFound(err) {
1✔
137
                        return ctrl.Result{}, err
×
138
                }
×
139
                var condition *appsv1beta3.Condition
1✔
140
                pluginResourceList := generateInitPluginList(instance, pluginsList)
1✔
141
                resources = append(resources, pluginResourceList...)
1✔
142

1✔
143
                err = r.CreateOrUpdateList(instance, r.Scheme, resources, postFn)
1✔
144
                if err != nil {
1✔
145
                        r.EventRecorder.Event(instance, corev1.EventTypeWarning, "FailedCreateOrUpdate", err.Error())
×
146
                        condition = appsv1beta3.NewCondition(
×
147
                                appsv1beta3.ConditionPluginInitialized,
×
148
                                corev1.ConditionFalse,
×
149
                                "PluginInitializeFailed",
×
150
                                err.Error(),
×
151
                        )
×
152
                        instance.SetCondition(*condition)
×
153
                        _ = r.Status().Update(ctx, instance)
×
154
                        return ctrl.Result{RequeueAfter: time.Duration(5) * time.Second}, err
×
155
                }
×
156
                condition = appsv1beta3.NewCondition(
1✔
157
                        appsv1beta3.ConditionPluginInitialized,
1✔
158
                        corev1.ConditionTrue,
1✔
159
                        "PluginInitializeSuccessfully",
1✔
160
                        "All default plugins initialized",
1✔
161
                )
1✔
162
                instance.SetCondition(*condition)
1✔
163
                _ = r.Status().Update(ctx, instance)
1✔
164
                return ctrl.Result{RequeueAfter: time.Duration(5) * time.Second}, nil
1✔
165
        }
166

167
        if acl := generateAcl(instance); acl != nil {
2✔
168
                resources = append(resources, acl)
1✔
169
                sts = updateAclForSts(sts, acl)
1✔
170
        }
1✔
171

172
        if loadedModules := generateLoadedModules(instance); loadedModules != nil {
2✔
173
                resources = append(resources, loadedModules)
1✔
174
                sts = updateLoadedModulesForSts(sts, loadedModules)
1✔
175
        }
1✔
176

177
        if emqxEnterprise, ok := instance.(*appsv1beta3.EmqxEnterprise); ok {
2✔
178
                var license *corev1.Secret
1✔
179
                if emqxEnterprise.GetLicense().SecretName != "" {
1✔
180
                        license = &corev1.Secret{}
×
181
                        if err := r.Client.Get(context.Background(), types.NamespacedName{Name: emqxEnterprise.GetLicense().SecretName, Namespace: emqxEnterprise.GetNamespace()}, license); err != nil {
×
182
                                return ctrl.Result{}, err
×
183
                        }
×
184
                } else {
1✔
185
                        license = generateLicense(emqxEnterprise)
1✔
186
                }
1✔
187

188
                if license != nil {
2✔
189
                        resources = append(resources, license)
1✔
190
                        sts = updateLicenseForsts(sts, license)
1✔
191
                }
1✔
192
        }
193

194
        if status := instance.GetStatus(); status.IsRunning() {
2✔
195
                serviceTemplate := instance.GetServiceTemplate()
1✔
196
                serviceTemplate.MergePorts(r.getListenerPortsByAPI(instance))
1✔
197
                instance.SetServiceTemplate(serviceTemplate)
1✔
198
                svc := generateSvc(instance)
1✔
199
                resources = append(resources, svc)
1✔
200
        }
1✔
201

202
        headlessSvc := generateHeadlessSvc(instance)
1✔
203
        sts.Spec.ServiceName = headlessSvc.Name
1✔
204
        resources = append(resources, headlessSvc)
1✔
205

1✔
206
        resources = append(resources, sts)
1✔
207

1✔
208
        if err := r.CreateOrUpdateList(instance, r.Scheme, resources, postFn); err != nil {
2✔
209
                r.EventRecorder.Event(instance, corev1.EventTypeWarning, "FailedCreateOrUpdate", err.Error())
1✔
210
                condition := appsv1beta3.NewCondition(
1✔
211
                        appsv1beta3.ConditionRunning,
1✔
212
                        corev1.ConditionFalse,
1✔
213
                        "FailedCreateOrUpdate",
1✔
214
                        err.Error(),
1✔
215
                )
1✔
216
                instance.SetCondition(*condition)
1✔
217
                _ = r.Status().Update(ctx, instance)
1✔
218
                return ctrl.Result{RequeueAfter: time.Duration(5) * time.Second}, err
1✔
219
        }
1✔
220

221
        emqxNodes, err = r.getNodeStatusesByAPI(instance)
1✔
222
        if err != nil {
2✔
223
                r.EventRecorder.Event(instance, corev1.EventTypeWarning, "FailedToGetNodeStatues", err.Error())
1✔
224
                condition := appsv1beta3.NewCondition(
1✔
225
                        appsv1beta3.ConditionRunning,
1✔
226
                        corev1.ConditionFalse,
1✔
227
                        "FailedToGetNodeStatues",
1✔
228
                        err.Error(),
1✔
229
                )
1✔
230
                instance.SetCondition(*condition)
1✔
231
                _ = r.Status().Update(ctx, instance)
1✔
232
        }
1✔
233

234
        if err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
2✔
235
                _ = r.Get(ctx, client.ObjectKeyFromObject(instance), instance)
1✔
236
                instance = updateEmqxStatus(instance, emqxNodes)
1✔
237
                return r.Status().Update(ctx, instance)
1✔
238
        }); err != nil {
2✔
239
                return ctrl.Result{}, emperror.Wrap(err, "failed to update status")
1✔
240
        }
1✔
241

242
        if status := instance.GetStatus(); !status.IsRunning() {
2✔
243
                return ctrl.Result{RequeueAfter: time.Duration(5) * time.Second}, nil
1✔
244
        }
1✔
245
        return ctrl.Result{RequeueAfter: time.Duration(20) * time.Second}, nil
1✔
246
}
247

248
func (r *EmqxReconciler) getListenerPortsByAPI(instance appsv1beta3.Emqx) []corev1.ServicePort {
1✔
249
        resp, body, err := r.Handler.RequestAPI(instance, "GET", instance.GetUsername(), instance.GetPassword(), appsv1beta3.DefaultManagementPort, "api/v4/listeners")
1✔
250
        if err != nil {
1✔
251
                return nil
×
252
        }
×
253
        if resp.StatusCode != 200 {
1✔
254
                return nil
×
255
        }
×
256

257
        ports := []corev1.ServicePort{}
1✔
258
        listeners := gjson.GetBytes(body, "data.0.listeners")
1✔
259
        for _, l := range listeners.Array() {
2✔
260
                var name string
1✔
261
                var protocol corev1.Protocol
1✔
262
                var strPort string
1✔
263
                var intPort int
1✔
264

1✔
265
                compile := regexp.MustCompile(".*(udp|dtls|sn).*")
1✔
266
                proto := gjson.Get(l.Raw, "protocol").String()
1✔
267
                if compile.MatchString(proto) {
2✔
268
                        protocol = corev1.ProtocolUDP
1✔
269
                } else {
2✔
270
                        protocol = corev1.ProtocolTCP
1✔
271
                }
1✔
272

273
                listenOn := gjson.Get(l.Raw, "listen_on").String()
1✔
274
                if strings.Contains(listenOn, ":") {
2✔
275
                        _, strPort, err = net.SplitHostPort(listenOn)
1✔
276
                        if err != nil {
1✔
277
                                strPort = listenOn
×
278
                        }
×
279
                } else {
1✔
280
                        strPort = listenOn
1✔
281
                }
1✔
282
                intPort, _ = strconv.Atoi(strPort)
1✔
283

1✔
284
                // Get name by protocol and port from API
1✔
285
                // protocol maybe like mqtt:wss:8084
1✔
286
                // protocol maybe like mqtt:tcp
1✔
287
                // We had to do something with the "protocol" to make it conform to the kubernetes service port name specification
1✔
288
                name = regexp.MustCompile(`:[\d]+`).ReplaceAllString(proto, "")
1✔
289
                name = strings.ReplaceAll(name, ":", "-")
1✔
290
                name = fmt.Sprintf("%s-%s", name, strPort)
1✔
291

1✔
292
                ports = append(ports, corev1.ServicePort{
1✔
293
                        Name:       name,
1✔
294
                        Protocol:   protocol,
1✔
295
                        Port:       int32(intPort),
1✔
296
                        TargetPort: intstr.FromInt(intPort),
1✔
297
                })
1✔
298
        }
299
        return ports
1✔
300
}
301

302
func (r *EmqxReconciler) getNodeStatusesByAPI(instance appsv1beta3.Emqx) ([]appsv1beta3.EmqxNode, error) {
1✔
303
        resp, body, err := r.Handler.RequestAPI(instance, "GET", instance.GetUsername(), instance.GetPassword(), appsv1beta3.DefaultManagementPort, "api/v4/nodes")
1✔
304
        if err != nil {
2✔
305
                return nil, err
1✔
306
        }
1✔
307
        if resp.StatusCode != 200 {
1✔
308
                return nil, fmt.Errorf("failed to get node statuses from API: %s", resp.Status)
×
309
        }
×
310

311
        emqxNodes := []appsv1beta3.EmqxNode{}
1✔
312
        data := gjson.GetBytes(body, "data")
1✔
313
        if err := json.Unmarshal([]byte(data.Raw), &emqxNodes); err != nil {
1✔
314
                return nil, fmt.Errorf("failed to unmarshal node statuses: %v", err)
×
315
        }
×
316
        sort.Slice(emqxNodes, func(i, j int) bool {
2✔
317
                return emqxNodes[i].Node < emqxNodes[j].Node
1✔
318
        })
1✔
319
        return emqxNodes, nil
1✔
320
}
321

322
func generateStatefulSetDef(instance appsv1beta3.Emqx) *appsv1.StatefulSet {
2✔
323
        annotations := instance.GetAnnotations()
2✔
324
        if annotations == nil {
3✔
325
                annotations = make(map[string]string)
1✔
326
        }
1✔
327
        delete(annotations, "kubectl.kubernetes.io/last-applied-configuration")
2✔
328

2✔
329
        podTemplate := corev1.PodTemplateSpec{
2✔
330
                ObjectMeta: metav1.ObjectMeta{
2✔
331
                        Labels:      instance.GetLabels(),
2✔
332
                        Annotations: annotations,
2✔
333
                },
2✔
334
                Spec: corev1.PodSpec{
2✔
335
                        Affinity:         instance.GetAffinity(),
2✔
336
                        Tolerations:      instance.GetToleRations(),
2✔
337
                        NodeName:         instance.GetNodeName(),
2✔
338
                        NodeSelector:     instance.GetNodeSelector(),
2✔
339
                        ImagePullSecrets: instance.GetImagePullSecrets(),
2✔
340
                        SecurityContext:  instance.GetSecurityContext(),
2✔
341
                        InitContainers:   instance.GetInitContainers(),
2✔
342
                        Containers: append(
2✔
343
                                []corev1.Container{
2✔
344
                                        *generateEmqxContainer(instance),
2✔
345
                                        *generateReloaderContainer(instance),
2✔
346
                                },
2✔
347
                                instance.GetExtraContainers()...,
2✔
348
                        ),
2✔
349
                        Volumes: instance.GetExtraVolumes(),
2✔
350
                },
2✔
351
        }
2✔
352

2✔
353
        podAnnotation := podTemplate.ObjectMeta.DeepCopy().Annotations
2✔
354
        podAnnotation[handler.ManageContainersAnnotation] = generateAnnotationByContainers(podTemplate.Spec.Containers)
2✔
355
        podTemplate.Annotations = podAnnotation
2✔
356

2✔
357
        sts := &appsv1.StatefulSet{
2✔
358
                TypeMeta: metav1.TypeMeta{
2✔
359
                        APIVersion: "apps/v1",
2✔
360
                        Kind:       "StatefulSet",
2✔
361
                },
2✔
362
                ObjectMeta: metav1.ObjectMeta{
2✔
363
                        Name:        instance.GetName(),
2✔
364
                        Namespace:   instance.GetNamespace(),
2✔
365
                        Labels:      instance.GetLabels(),
2✔
366
                        Annotations: annotations,
2✔
367
                },
2✔
368
                Spec: appsv1.StatefulSetSpec{
2✔
369
                        Replicas: instance.GetReplicas(),
2✔
370
                        Selector: &metav1.LabelSelector{
2✔
371
                                MatchLabels: instance.GetLabels(),
2✔
372
                        },
2✔
373
                        PodManagementPolicy: appsv1.ParallelPodManagement,
2✔
374
                        Template:            podTemplate,
2✔
375
                },
2✔
376
        }
2✔
377

2✔
378
        sts = generateDataVolume(instance, sts)
2✔
379

2✔
380
        return sts
2✔
381
}
382

383
func generateInitPluginList(instance appsv1beta3.Emqx, existPluginList *appsv1beta3.EmqxPluginList) []client.Object {
2✔
384
        matchedPluginList := []appsv1beta3.EmqxPlugin{}
2✔
385
        for _, existPlugin := range existPluginList.Items {
3✔
386
                selector, _ := labels.ValidatedSelectorFromSet(existPlugin.Spec.Selector)
1✔
387
                if selector.Empty() || !selector.Matches(labels.Set(instance.GetLabels())) {
2✔
388
                        continue
1✔
389
                }
390
                matchedPluginList = append(matchedPluginList, existPlugin)
1✔
391
        }
392

393
        isExistPlugin := func(pluginName string, pluginList []appsv1beta3.EmqxPlugin) bool {
4✔
394
                for _, plugin := range pluginList {
3✔
395
                        if plugin.Spec.PluginName == pluginName {
2✔
396
                                return true
1✔
397
                        }
1✔
398
                }
399
                return false
2✔
400
        }
401

402
        pluginList := []client.Object{}
2✔
403
        // Default plugins
2✔
404
        if !isExistPlugin("emqx_rule_engine", matchedPluginList) {
4✔
405
                emqxRuleEngine := &appsv1beta3.EmqxPlugin{
2✔
406
                        TypeMeta: metav1.TypeMeta{
2✔
407
                                APIVersion: "apps.emqx.io/v1beta3",
2✔
408
                                Kind:       "EmqxPlugin",
2✔
409
                        },
2✔
410
                        ObjectMeta: metav1.ObjectMeta{
2✔
411
                                Name:      fmt.Sprintf("%s-rule-engine", instance.GetName()),
2✔
412
                                Namespace: instance.GetNamespace(),
2✔
413
                                Labels:    instance.GetLabels(),
2✔
414
                        },
2✔
415
                        Spec: appsv1beta3.EmqxPluginSpec{
2✔
416
                                PluginName: "emqx_rule_engine",
2✔
417
                                Selector:   instance.GetLabels(),
2✔
418
                                Config:     map[string]string{},
2✔
419
                        },
2✔
420
                }
2✔
421
                pluginList = append(pluginList, emqxRuleEngine)
2✔
422
        }
2✔
423

424
        if !isExistPlugin("emqx_retainer", matchedPluginList) {
4✔
425
                emqxRetainer := &appsv1beta3.EmqxPlugin{
2✔
426
                        TypeMeta: metav1.TypeMeta{
2✔
427
                                APIVersion: "apps.emqx.io/v1beta3",
2✔
428
                                Kind:       "EmqxPlugin",
2✔
429
                        },
2✔
430
                        ObjectMeta: metav1.ObjectMeta{
2✔
431
                                Name:      fmt.Sprintf("%s-retainer", instance.GetName()),
2✔
432
                                Namespace: instance.GetNamespace(),
2✔
433
                                Labels:    instance.GetLabels(),
2✔
434
                        },
2✔
435
                        Spec: appsv1beta3.EmqxPluginSpec{
2✔
436
                                PluginName: "emqx_retainer",
2✔
437
                                Selector:   instance.GetLabels(),
2✔
438
                                Config:     map[string]string{},
2✔
439
                        },
2✔
440
                }
2✔
441
                pluginList = append(pluginList, emqxRetainer)
2✔
442
        }
2✔
443

444
        enterprise, ok := instance.(*appsv1beta3.EmqxEnterprise)
2✔
445
        if ok && !isExistPlugin("emqx_modules", matchedPluginList) {
4✔
446
                emqxModules := &appsv1beta3.EmqxPlugin{
2✔
447
                        TypeMeta: metav1.TypeMeta{
2✔
448
                                APIVersion: "apps.emqx.io/v1beta3",
2✔
449
                                Kind:       "EmqxPlugin",
2✔
450
                        },
2✔
451
                        ObjectMeta: metav1.ObjectMeta{
2✔
452
                                Name:      fmt.Sprintf("%s-modules", instance.GetName()),
2✔
453
                                Namespace: instance.GetNamespace(),
2✔
454
                                Labels:    instance.GetLabels(),
2✔
455
                        },
2✔
456
                        Spec: appsv1beta3.EmqxPluginSpec{
2✔
457
                                PluginName: "emqx_modules",
2✔
458
                                Selector:   instance.GetLabels(),
2✔
459
                                Config:     map[string]string{},
2✔
460
                        },
2✔
461
                }
2✔
462

2✔
463
                if enterprise.Spec.EmqxTemplate.Modules != nil {
4✔
464
                        emqxModules.Spec.Config = map[string]string{
2✔
465
                                "modules.loaded_file": "/mounted/modules/loaded_modules",
2✔
466
                        }
2✔
467
                }
2✔
468

469
                pluginList = append(pluginList, emqxModules)
2✔
470
        }
471

472
        return pluginList
2✔
473
}
474

475
func generateDefaultPluginsConfig(instance appsv1beta3.Emqx) *corev1.ConfigMap {
2✔
476
        names := appsv1beta3.Names{Object: instance}
2✔
477

2✔
478
        cm := &corev1.ConfigMap{
2✔
479
                TypeMeta: metav1.TypeMeta{
2✔
480
                        APIVersion: "v1",
2✔
481
                        Kind:       "ConfigMap",
2✔
482
                },
2✔
483
                ObjectMeta: metav1.ObjectMeta{
2✔
484
                        Labels:    instance.GetLabels(),
2✔
485
                        Namespace: instance.GetNamespace(),
2✔
486
                        Name:      names.PluginsConfig(),
2✔
487
                },
2✔
488
                Data: map[string]string{
2✔
489
                        "emqx_modules.conf":           "",
2✔
490
                        "emqx_management.conf":        "management.listener.http = 8081\n",
2✔
491
                        "emqx_dashboard.conf":         "dashboard.listener.http = 18083\n",
2✔
492
                        "emqx_rule_engine.conf":       "",
2✔
493
                        "emqx_retainer.conf":          "",
2✔
494
                        "emqx_telemetry.conf":         "",
2✔
495
                        "emqx_auth_http.conf":         "auth.http.auth_req.url = http://127.0.0.1:80/mqtt/auth\nauth.http.auth_req.method = post\nauth.http.auth_req.headers.content_type = application/x-www-form-urlencoded\nauth.http.auth_req.params = clientid=%c,username=%u,password=%P\nauth.http.acl_req.url = http://127.0.0.1:80/mqtt/acl\nauth.http.acl_req.method = post\nauth.http.acl_req.headers.content-type = application/x-www-form-urlencoded\nauth.http.acl_req.params = access=%A,username=%u,clientid=%c,ipaddr=%a,topic=%t,mountpoint=%m\nauth.http.timeout = 5s\nauth.http.connect_timeout = 5s\nauth.http.pool_size = 32\nauth.http.enable_pipelining = true\n",
2✔
496
                        "emqx_auth_jwt.conf":          "auth.jwt.secret = emqxsecret\nauth.jwt.from = password\nauth.jwt.verify_claims = off\n",
2✔
497
                        "emqx_auth_ldap.conf":         "auth.ldap.servers = 127.0.0.1\nauth.ldap.port = 389\nauth.ldap.pool = 8\nauth.ldap.bind_dn = cn=root,dc=emqx,dc=io\nauth.ldap.bind_password = public\nauth.ldap.timeout = 30s\nauth.ldap.device_dn = ou=device,dc=emqx,dc=io\nauth.ldap.match_objectclass = mqttUser\nauth.ldap.username.attributetype = uid\nauth.ldap.password.attributetype = userPassword\nauth.ldap.ssl = false\n",
2✔
498
                        "emqx_auth_mnesia.conf":       "",
2✔
499
                        "emqx_auth_mongo.conf":        "auth.mongo.type = single\nauth.mongo.srv_record = false\nauth.mongo.server = 127.0.0.1:27017\nauth.mongo.pool = 8\nauth.mongo.database = mqtt\nauth.mongo.topology.pool_size = 1\nauth.mongo.topology.max_overflow = 0\nauth.mongo.auth_query.password_hash = sha256\nauth.mongo.auth_query.collection = mqtt_user\nauth.mongo.auth_query.password_field = password\nauth.mongo.auth_query.selector = username=%u\nauth.mongo.super_query.collection = mqtt_user\nauth.mongo.super_query.super_field = is_superuser\nauth.mongo.super_query.selector = username=%u\nauth.mongo.acl_query.collection = mqtt_acl\nauth.mongo.acl_query.selector = username=%u\n",
2✔
500
                        "emqx_auth_mysql.conf":        "auth.mysql.server = 127.0.0.1:3306\nauth.mysql.pool = 8\nauth.mysql.database = mqtt\nauth.mysql.auth_query = select password from mqtt_user where username = '%u' limit 1\nauth.mysql.password_hash = sha256\nauth.mysql.super_query = select is_superuser from mqtt_user where username = '%u' limit 1\nauth.mysql.acl_query = select allow, ipaddr, username, clientid, access, topic from mqtt_acl where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'\n",
2✔
501
                        "emqx_auth_pgsql.conf":        "auth.pgsql.server = 127.0.0.1:5432\nauth.pgsql.pool = 8\nauth.pgsql.username = root\nauth.pgsql.database = mqtt\nauth.pgsql.encoding = utf8\nauth.pgsql.ssl = off\nauth.pgsql.auth_query = select password from mqtt_user where username = '%u' limit 1\nauth.pgsql.password_hash = sha256\nauth.pgsql.super_query = select is_superuser from mqtt_user where username = '%u' limit 1\nauth.pgsql.acl_query = select allow, ipaddr, username, clientid, access, topic from mqtt_acl where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'\n",
2✔
502
                        "emqx_auth_redis.conf":        "auth.redis.type = single\nauth.redis.server = 127.0.0.1:6379\nauth.redis.pool = 8\nauth.redis.database = 0\nauth.redis.auth_cmd = HMGET mqtt_user:%u password\nauth.redis.password_hash = plain\nauth.redis.super_cmd = HGET mqtt_user:%u is_superuser\nauth.redis.acl_cmd = HGETALL mqtt_acl:%u\n",
2✔
503
                        "emqx_backend_cassa.conf":     "backend.ecql.pool1.nodes = 127.0.0.1:9042\nbackend.ecql.pool1.size = 8\nbackend.ecql.pool1.auto_reconnect = 1\nbackend.ecql.pool1.username = cassandra\nbackend.ecql.pool1.password = cassandra\nbackend.ecql.pool1.keyspace = mqtt\nbackend.ecql.pool1.logger = info\nbackend.cassa.hook.client.connected.1    = {\"action\": {\"function\": \"on_client_connected\"}, \"pool\": \"pool1\"}\nbackend.cassa.hook.client.connected.2    = {\"action\": {\"function\": \"on_subscription_lookup\"}, \"pool\": \"pool1\"}\nbackend.cassa.hook.client.disconnected.1 = {\"action\": {\"function\": \"on_client_disconnected\"}, \"pool\": \"pool1\"}\nbackend.cassa.hook.session.subscribed.1  = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_fetch\"}, \"offline_opts\": {\"max_returned_count\": 500, \"time_range\": \"2h\"}, \"pool\": \"pool1\"}\nbackend.cassa.hook.session.subscribed.2  = {\"action\": {\"function\": \"on_retain_lookup\"}, \"pool\": \"pool1\"}\nbackend.cassa.hook.session.unsubscribed.1= {\"topic\": \"#\", \"action\": {\"cql\": [\"delete from acked where clientid = ${clientid} and topic = ${topic}\"]}, \"pool\": \"pool1\"}\nbackend.cassa.hook.message.publish.1     = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_publish\"}, \"pool\": \"pool1\"}\nbackend.cassa.hook.message.publish.2     = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_retain\"}, \"pool\": \"pool1\"}\nbackend.cassa.hook.message.publish.3     = {\"topic\": \"#\", \"action\": {\"function\": \"on_retain_delete\"}, \"pool\": \"pool1\"}\nbackend.cassa.hook.message.acked.1       = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_acked\"}, \"pool\": \"pool1\"}\n",
2✔
504
                        "emqx_backend_dynamo.conf":    "backend.dynamo.region = us-west-2\nbackend.dynamo.pool1.server = http://localhost:8000\nbackend.dynamo.pool1.pool_size = 8\nbackend.dynamo.pool1.aws_access_key_id = FAKE_AWS_ACCESS_KEY_ID\nbackend.dynamo.pool1.aws_secret_access_key = FAKE_AWS_SECRET_ACCESS_KEY\nbackend.dynamo.hook.client.connected.1    = {\"action\": {\"function\": \"on_client_connected\"}, \"pool\": \"pool1\"}\nbackend.dynamo.hook.client.connected.2    = {\"action\": {\"function\": \"on_subscribe_lookup\"}, \"pool\": \"pool1\"}\nbackend.dynamo.hook.client.disconnected.1 = {\"action\": {\"function\": \"on_client_disconnected\"}, \"pool\": \"pool1\"}\nbackend.dynamo.hook.session.subscribed.1  = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_fetch_for_queue\"}, \"pool\": \"pool1\"}\nbackend.dynamo.hook.session.subscribed.2  = {\"topic\": \"#\", \"action\": {\"function\": \"on_retain_lookup\"}, \"pool\": \"pool1\"}\nbackend.dynamo.hook.session.unsubscribed.1= {\"topic\": \"#\", \"action\": {\"function\": \"on_acked_delete\"}, \"pool\": \"pool1\"}\nbackend.dynamo.hook.message.publish.1     = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_publish\"}, \"pool\": \"pool1\"}\nbackend.dynamo.hook.message.publish.2     = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_retain\"}, \"pool\": \"pool1\"}\nbackend.dynamo.hook.message.publish.3     = {\"topic\": \"#\", \"action\": {\"function\": \"on_retain_delete\"}, \"pool\": \"pool1\"}\nbackend.dynamo.hook.message.acked.1       = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_acked_for_queue\"}, \"pool\": \"pool1\"}\n",
2✔
505
                        "emqx_backend_influxdb.conf":  "backend.influxdb.udp.pool1.server = 127.0.0.1:8089\nbackend.influxdb.udp.pool1.pool_size = 8\nbackend.influxdb.http.pool1.server = 127.0.0.1:8086\nbackend.influxdb.http.pool1.pool_size = 8\nbackend.influxdb.http.pool1.precision = ms\nbackend.influxdb.http.pool1.database = mydb\nbackend.influxdb.http.pool1.https_enabled = false\nbackend.influxdb.hook.message.publish.1 = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_publish\"}, \"pool\": \"pool1\"}\n",
2✔
506
                        "emqx_backend_mongo.conf":     "backend.mongo.pool1.type = single\nbackend.mongo.pool1.srv_record = false\nbackend.mongo.pool1.server = 127.0.0.1:27017\nbackend.mongo.pool1.c_pool_size = 8\nbackend.mongo.pool1.database = mqtt\nbackend.mongo.hook.client.connected.1    = {\"action\": {\"function\": \"on_client_connected\"}, \"pool\": \"pool1\"}\nbackend.mongo.hook.client.connected.2    = {\"action\": {\"function\": \"on_subscribe_lookup\"}, \"pool\": \"pool1\"}\nbackend.mongo.hook.client.disconnected.1 = {\"action\": {\"function\": \"on_client_disconnected\"}, \"pool\": \"pool1\"}\nbackend.mongo.hook.session.subscribed.1  = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_fetch\"}, \"pool\": \"pool1\", \"offline_opts\": {\"time_range\": \"2h\", \"max_returned_count\": 500}}\nbackend.mongo.hook.session.subscribed.2  = {\"topic\": \"#\", \"action\": {\"function\": \"on_retain_lookup\"}, \"pool\": \"pool1\"}\nbackend.mongo.hook.session.unsubscribed.1= {\"topic\": \"#\", \"action\": {\"function\": \"on_acked_delete\"}, \"pool\": \"pool1\"}\nbackend.mongo.hook.message.publish.1     = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_publish\"}, \"pool\": \"pool1\"}\nbackend.mongo.hook.message.publish.2     = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_retain\"}, \"pool\": \"pool1\"}\nbackend.mongo.hook.message.publish.3     = {\"topic\": \"#\", \"action\": {\"function\": \"on_retain_delete\"}, \"pool\": \"pool1\"}\nbackend.mongo.hook.message.acked.1       = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_acked\"}, \"pool\": \"pool1\"}\n",
2✔
507
                        "emqx_backend_mysql.conf":     "backend.mysql.pool1.server = 127.0.0.1:3306\nbackend.mysql.pool1.pool_size = 8\nbackend.mysql.pool1.user = root\nbackend.mysql.pool1.password = public\nbackend.mysql.pool1.database = mqtt\nbackend.mysql.hook.client.connected.1    = {\"action\": {\"function\": \"on_client_connected\"}, \"pool\": \"pool1\"}\nbackend.mysql.hook.client.connected.2     = {\"action\": {\"function\": \"on_subscribe_lookup\"}, \"pool\": \"pool1\"}\nbackend.mysql.hook.client.disconnected.1 = {\"action\": {\"function\": \"on_client_disconnected\"}, \"pool\": \"pool1\"}\nbackend.mysql.hook.session.subscribed.1  = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_fetch\"}, \"offline_opts\": {\"max_returned_count\": 500, \"time_range\": \"2h\"}, \"pool\": \"pool1\"}\nbackend.mysql.hook.session.subscribed.2  = {\"topic\": \"#\", \"action\": {\"function\": \"on_retain_lookup\"}, \"pool\": \"pool1\"}\nbackend.mysql.hook.session.unsubscribed.1= {\"topic\": \"#\", \"action\": {\"sql\": [\"delete from mqtt_acked where clientid = ${clientid} and topic = ${topic}\"]}, \"pool\": \"pool1\"}\nbackend.mysql.hook.message.publish.1     = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_publish\"}, \"pool\": \"pool1\"}\nbackend.mysql.hook.message.publish.2     = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_retain\"}, \"pool\": \"pool1\"}\nbackend.mysql.hook.message.publish.3     = {\"topic\": \"#\", \"action\": {\"function\": \"on_retain_delete\"}, \"pool\": \"pool1\"}\nbackend.mysql.hook.message.acked.1       = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_acked\"}, \"pool\": \"pool1\"}\n",
2✔
508
                        "emqx_backend_opentsdb.conf":  "backend.opentsdb.pool1.server = 127.0.0.1:4242\nbackend.opentsdb.pool1.pool_size = 8\nbackend.opentsdb.pool1.summary = true\nbackend.opentsdb.pool1.details = false\nbackend.opentsdb.pool1.sync = false\nbackend.opentsdb.pool1.sync_timeout = 0\nbackend.opentsdb.pool1.max_batch_size = 20\nbackend.opentsdb.hook.message.publish.1 = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_publish\"}, \"pool\": \"pool1\"}\n",
2✔
509
                        "emqx_backend_pgsql.conf":     "backend.pgsql.pool1.server = 127.0.0.1:5432\nbackend.pgsql.pool1.pool_size = 8\nbackend.pgsql.pool1.username = root\nbackend.pgsql.pool1.password = public\nbackend.pgsql.pool1.database = mqtt\nbackend.pgsql.pool1.ssl = false\nbackend.pgsql.hook.client.connected.1    = {\"action\": {\"function\": \"on_client_connected\"}, \"pool\": \"pool1\"}\nbackend.pgsql.hook.client.connected.2     = {\"action\": {\"function\": \"on_subscribe_lookup\"}, \"pool\": \"pool1\"}\nbackend.pgsql.hook.client.disconnected.1 = {\"action\": {\"function\": \"on_client_disconnected\"}, \"pool\": \"pool1\"}\nbackend.pgsql.hook.session.subscribed.1  = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_fetch\"}, \"offline_opts\": {\"max_returned_count\": 500, \"time_range\": \"2h\"}, \"pool\": \"pool1\"}\nbackend.pgsql.hook.session.subscribed.2  = {\"topic\": \"#\", \"action\": {\"function\": \"on_retain_lookup\"}, \"pool\": \"pool1\"}\nbackend.pgsql.hook.session.unsubscribed.1= {\"topic\": \"#\", \"action\": {\"sql\": \"delete from mqtt_acked where clientid = ${clientid} and topic = ${topic}\"}, \"pool\": \"pool1\"}\nbackend.pgsql.hook.message.publish.1     = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_publish\"}, \"pool\": \"pool1\"}\nbackend.pgsql.hook.message.publish.2     = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_retain\"}, \"pool\": \"pool1\"}\nbackend.pgsql.hook.message.publish.3     = {\"topic\": \"#\", \"action\": {\"function\": \"on_retain_delete\"}, \"pool\": \"pool1\"}\nbackend.pgsql.hook.message.acked.1       = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_acked\"}, \"pool\": \"pool1\"}\n",
2✔
510
                        "emqx_backend_redis.conf":     "backend.redis.pool1.type = single\nbackend.redis.pool1.server = 127.0.0.1:6379\nbackend.redis.pool1.pool_size = 8\nbackend.redis.pool1.database = 0\nbackend.redis.pool1.channel = mqtt_channel\nbackend.redis.hook.client.connected.1    = {\"action\": {\"function\": \"on_client_connected\"}, \"pool\": \"pool1\"}\nbackend.redis.hook.client.connected.2    = {\"action\": {\"function\": \"on_subscribe_lookup\"}, \"pool\": \"pool1\"}\nbackend.redis.hook.client.disconnected.1 = {\"action\": {\"function\": \"on_client_disconnected\"}, \"pool\": \"pool1\"}\nbackend.redis.hook.session.subscribed.1  = {\"topic\": \"queue/#\", \"action\": {\"function\": \"on_message_fetch_for_queue\"}, \"pool\": \"pool1\"}\nbackend.redis.hook.session.subscribed.2  = {\"topic\": \"pubsub/#\", \"action\": {\"function\": \"on_message_fetch_for_pubsub\"}, \"pool\": \"pool1\"}\nbackend.redis.hook.session.subscribed.3  = {\"action\": {\"function\": \"on_retain_lookup\"}, \"pool\": \"pool1\"}\nbackend.redis.hook.session.unsubscribed.1= {\"topic\": \"#\", \"action\": {\"commands\": [\"DEL mqtt:acked:${clientid}:${topic}\"]}, \"pool\": \"pool1\"}\nbackend.redis.hook.message.publish.1     = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_publish\"}, \"expired_time\" : 3600, \"pool\": \"pool1\"}\nbackend.redis.hook.message.publish.2     = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_retain\"}, \"expired_time\" : 3600, \"pool\": \"pool1\"}\nbackend.redis.hook.message.publish.3     = {\"topic\": \"#\", \"action\": {\"function\": \"on_retain_delete\"}, \"pool\": \"pool1\"}\nbackend.redis.hook.message.acked.1       = {\"topic\": \"queue/#\", \"action\": {\"function\": \"on_message_acked_for_queue\"}, \"pool\": \"pool1\"}\nbackend.redis.hook.message.acked.2       = {\"topic\": \"pubsub/#\", \"action\": {\"function\": \"on_message_acked_for_pubsub\"}, \"pool\": \"pool1\"}\n",
2✔
511
                        "emqx_backend_timescale.conf": "backend.timescale.pool1.server = 127.0.0.1:5432\nbackend.timescale.pool1.pool_size = 8\nbackend.timescale.pool1.username = root\nbackend.timescale.pool1.password = public\nbackend.timescale.pool1.database = mqtt\nbackend.timescale.pool1.ssl = false\nbackend.timescale.hook.message.publish.1 = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_publish\"}, \"pool\": \"pool1\"}\n",
2✔
512
                        "emqx_bridge_kafka.conf":      "bridge.kafka.servers = 127.0.0.1:9092\nbridge.kafka.query_api_versions = true\nbridge.kafka.connection_strategy = per_partition\nbridge.kafka.min_metadata_refresh_interval = 5S\nbridge.kafka.produce = sync\nbridge.kafka.produce.sync_timeout = 3s\nbridge.kafka.sock.sndbuf = 1MB\nbridge.kafka.hook.client.connected.1     = {\"topic\":\"ClientConnected\"}\nbridge.kafka.hook.client.disconnected.1  = {\"topic\":\"ClientDisconnected\"}\nbridge.kafka.hook.session.subscribed.1   = {\"filter\":\"#\", \"topic\":\"SessionSubscribed\"}\nbridge.kafka.hook.session.unsubscribed.1 = {\"filter\":\"#\", \"topic\":\"SessionUnsubscribed\"}\nbridge.kafka.hook.message.publish.1      = {\"filter\":\"#\", \"topic\":\"MessagePublish\"}\nbridge.kafka.hook.message.delivered.1    = {\"filter\":\"#\", \"topic\":\"MessageDelivered\"}\nbridge.kafka.hook.message.acked.1        = {\"filter\":\"#\", \"topic\":\"MessageAcked\"}\n",
2✔
513
                        "emqx_bridge_mqtt.conf":       "bridge.mqtt.aws.address = 127.0.0.1:1883\nbridge.mqtt.aws.proto_ver = mqttv4\nbridge.mqtt.aws.start_type = manual\nbridge.mqtt.aws.clientid = bridge_aws\nbridge.mqtt.aws.clean_start = true\nbridge.mqtt.aws.username = user\nbridge.mqtt.aws.password = passwd\nbridge.mqtt.aws.forwards = topic1/#,topic2/#\nbridge.mqtt.aws.forward_mountpoint = bridge/aws/${node}/\nbridge.mqtt.aws.ssl = off\nbridge.mqtt.aws.cacertfile = etc/certs/cacert.pem\nbridge.mqtt.aws.certfile = etc/certs/client-cert.pem\nbridge.mqtt.aws.keyfile = etc/certs/client-key.pem\nbridge.mqtt.aws.ciphers = TLS_AES_256_GCM_SHA384,TLS_AES_128_GCM_SHA256,TLS_CHACHA20_POLY1305_SHA256,TLS_AES_128_CCM_SHA256,TLS_AES_128_CCM_8_SHA256,ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA\nbridge.mqtt.aws.keepalive = 60s\nbridge.mqtt.aws.tls_versions = tlsv1.3,tlsv1.2,tlsv1.1,tlsv1\nbridge.mqtt.aws.reconnect_interval = 30s\nbridge.mqtt.aws.retry_interval = 20s\nbridge.mqtt.aws.batch_size = 32\nbridge.mqtt.aws.max_inflight_size = 32\nbridge.mqtt.aws.queue.replayq_dir = data/replayq/emqx_aws_bridge/\nbridge.mqtt.aws.queue.replayq_seg_bytes = 10MB\nbridge.mqtt.aws.queue.max_total_size = 5GB\n",
2✔
514
                        "emqx_bridge_pulsar.conf":     "bridge.pulsar.servers = 127.0.0.1:6650\nbridge.pulsar.produce = sync\nbridge.pulsar.sock.sndbuf = 1MB\nbridge.pulsar.hook.client.connected.1     = {\"topic\":\"ClientConnected\"}\nbridge.pulsar.hook.client.disconnected.1  = {\"topic\":\"ClientDisconnected\"}\nbridge.pulsar.hook.session.subscribed.1   = {\"filter\":\"#\", \"topic\":\"SessionSubscribed\"}\nbridge.pulsar.hook.session.unsubscribed.1 = {\"filter\":\"#\", \"topic\":\"SessionUnsubscribed\"}\nbridge.pulsar.hook.message.publish.1      = {\"filter\":\"#\", \"topic\":\"MessagePublish\"}\nbridge.pulsar.hook.message.delivered.1      = {\"filter\":\"#\", \"topic\":\"MessageDelivered\"}\nbridge.pulsar.hook.message.acked.1        = {\"filter\":\"#\", \"topic\":\"MessageAcked\"}\n",
2✔
515
                        "emqx_bridge_rabbit.conf":     "bridge.rabbit.1.server = 127.0.0.1:5672\nbridge.rabbit.1.pool_size = 8\nbridge.rabbit.1.username = guest\nbridge.rabbit.1.password = guest\nbridge.rabbit.1.timeout = 5s\nbridge.rabbit.1.virtual_host = /\nbridge.rabbit.1.heartbeat = 30s\nbridge.rabbit.hook.session.subscribed.1 = {\"action\": \"on_session_subscribed\", \"rabbit\": 1, \"exchange\": \"direct:emqx.subscription\"}\nbridge.rabbit.hook.session.unsubscribed.1 = {\"action\": \"on_session_unsubscribed\", \"rabbit\": 1, \"exchange\": \"direct:emqx.subscription\"}\nbridge.rabbit.hook.message.publish.1 = {\"topic\": \"$SYS/#\", \"action\": \"on_message_publish\", \"rabbit\": 1, \"exchange\": \"topic:emqx.$sys\"}\nbridge.rabbit.hook.message.publish.2 = {\"topic\": \"#\", \"action\": \"on_message_publish\", \"rabbit\": 1, \"exchange\": \"topic:emqx.pub\"}\nbridge.rabbit.hook.message.acked.1 = {\"topic\": \"#\", \"action\": \"on_message_acked\", \"rabbit\": 1, \"exchange\": \"topic:emqx.acked\"}\n",
2✔
516
                        "emqx_bridge_rocket.conf":     "bridge.rocket.servers = 127.0.0.1:9876\nbridge.rocket.refresh_topic_route_interval = 5S\nbridge.rocket.produce = sync\nbridge.rocket.sock.sndbuf = 1MB\nbridge.rocket.hook.client.connected.1     = {\"topic\":\"ClientConnected\"}\nbridge.rocket.hook.client.disconnected.1  = {\"topic\":\"ClientDisconnected\"}\nbridge.rocket.hook.session.subscribed.1   = {\"filter\":\"#\", \"topic\":\"SessionSubscribed\"}\nbridge.rocket.hook.session.unsubscribed.1 = {\"filter\":\"#\", \"topic\":\"SessionUnsubscribed\"}\nbridge.rocket.hook.message.publish.1      = {\"filter\":\"#\", \"topic\":\"MessagePublish\"}\nbridge.rocket.hook.message.delivered.1    = {\"filter\":\"#\", \"topic\":\"MessageDeliver\"}\nbridge.rocket.hook.message.acked.1        = {\"filter\":\"#\", \"topic\":\"MessageAcked\"}\n",
2✔
517
                        "emqx_coap.conf":              "coap.bind.udp.1 = 0.0.0.0:5683\ncoap.enable_stats = off\ncoap.bind.dtls.1 = 0.0.0.0:5684\ncoap.dtls.keyfile = etc/certs/key.pem\ncoap.dtls.certfile = etc/certs/cert.pem\ncoap.dtls.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA\n",
2✔
518
                        "emqx_conf.conf":              "conf.etc.dir.emqx = etc\nconf.etc.dir.emqx.zones = etc\nconf.etc.dir.emqx.listeners = etc\nconf.etc.dir.emqx.sys_mon = etc\n",
2✔
519
                        "emqx_exhook.conf":            "exhook.server.default.url = http://127.0.0.1:9000\n",
2✔
520
                        "emqx_exproto.conf":           "exproto.server.http.port = 9100\nexproto.server.https.port = 9101\nexproto.server.https.cacertfile = etc/certs/cacert.pem\nexproto.server.https.certfile = etc/certs/cert.pem\nexproto.server.https.keyfile = etc/certs/key.pem\nexproto.listener.protoname = tcp://0.0.0.0:7993\nexproto.listener.protoname.connection_handler_url = http://127.0.0.1:9001\nexproto.listener.protoname.acceptors = 8\nexproto.listener.protoname.max_connections = 1024000\nexproto.listener.protoname.max_conn_rate = 1000\nexproto.listener.protoname.active_n = 100\nexproto.listener.protoname.idle_timeout = 30s\nexproto.listener.protoname.access.1 = allow all\nexproto.listener.protoname.backlog = 1024\nexproto.listener.protoname.send_timeout = 15s\nexproto.listener.protoname.send_timeout_close = on\nexproto.listener.protoname.nodelay = true\nexproto.listener.protoname.reuseaddr = true\n",
2✔
521
                        "emqx_gbt32960.conf":          "gbt32960.frame.max_length = 8192\ngbt32960.proto.retx_interval = 8s\ngbt32960.proto.retx_max_times = 3\ngbt32960.proto.message_queue_len = 10\ngbt32960.listener.tcp = 0.0.0.0:7325\ngbt32960.listener.tcp.acceptors = 8\ngbt32960.listener.tcp.max_connections = 1024000\ngbt32960.listener.tcp.max_conn_rate = 1000\ngbt32960.listener.tcp.idle_timeout = 60s\ngbt32960.listener.tcp.active_n = 100\ngbt32960.listener.tcp.zone = external\ngbt32960.listener.tcp.access.1 = allow all\ngbt32960.listener.tcp.backlog = 1024\ngbt32960.listener.tcp.send_timeout = 15s\ngbt32960.listener.tcp.send_timeout_close = on\ngbt32960.listener.tcp.nodelay = true\ngbt32960.listener.tcp.reuseaddr = true\ngbt32960.listener.ssl = 7326\ngbt32960.listener.ssl.acceptors = 16\ngbt32960.listener.ssl.max_connections = 102400\ngbt32960.listener.ssl.max_conn_rate = 500\ngbt32960.listener.ssl.idle_timeout = 60s\ngbt32960.listener.ssl.active_n = 100\ngbt32960.listener.ssl.zone = external\ngbt32960.listener.ssl.access.1 = allow all\ngbt32960.listener.ssl.handshake_timeout = 15s\ngbt32960.listener.ssl.keyfile = etc/certs/key.pem\ngbt32960.listener.ssl.certfile = etc/certs/cert.pem\ngbt32960.listener.ssl.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA\ngbt32960.listener.ssl.reuseaddr = true\n",
2✔
522
                        "emqx_jt808.conf":             "jt808.proto.allow_anonymous = true\njt808.proto.dn_topic = jt808/%c/dn\njt808.proto.up_topic = jt808/%c/up\njt808.conn.idle_timeout = 30s\njt808.conn.enable_stats = on\njt808.frame.max_length = 8192\njt808.listener.tcp = 6207\njt808.listener.tcp.acceptors = 4\njt808.listener.tcp.max_clients = 512\n",
2✔
523
                        "emqx_lua_hook.conf":          "",
2✔
524
                        "emqx_lwm2m.conf":             "lwm2m.lifetime_min = 1s\nlwm2m.lifetime_max = 86400s\nlwm2m.mountpoint = lwm2m/%e/\nlwm2m.topics.command = dn/#\nlwm2m.topics.response = up/resp\nlwm2m.topics.notify = up/notify\nlwm2m.topics.register = up/resp\nlwm2m.topics.update = up/resp\nlwm2m.xml_dir =  etc/lwm2m_xml\nlwm2m.bind.udp.1 = 0.0.0.0:5683\nlwm2m.opts.buffer = 1024KB\nlwm2m.opts.recbuf = 1024KB\nlwm2m.opts.sndbuf = 1024KB\nlwm2m.opts.read_packets = 20\nlwm2m.bind.dtls.1 = 0.0.0.0:5684\nlwm2m.dtls.keyfile = etc/certs/key.pem\nlwm2m.dtls.certfile = etc/certs/cert.pem\nlwm2m.dtls.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA\n",
2✔
525
                        "emqx_prometheus.conf":        "prometheus.push.gateway.server = http://127.0.0.1:9091\nprometheus.interval = 15000\n",
2✔
526
                        "emqx_psk_file.conf":          "psk.file.path = etc/psk.txt\npsk.file.delimiter = :\n",
2✔
527
                        "emqx_recon.conf":             "",
2✔
528
                        "emqx_sasl.conf":              "",
2✔
529
                        "emqx_schema_registry.conf":   "",
2✔
530
                        "emqx_sn.conf":                "mqtt.sn.port = 1884\nmqtt.sn.advertise_duration = 15m\nmqtt.sn.gateway_id = 1\nmqtt.sn.enable_stats = off\nmqtt.sn.enable_qos3 = off\nmqtt.sn.idle_timeout = 30s\nmqtt.sn.predefined.topic.0 = reserved\nmqtt.sn.predefined.topic.1 = /predefined/topic/name/hello\nmqtt.sn.predefined.topic.2 = /predefined/topic/name/nice\nmqtt.sn.username = mqtt_sn_user\nmqtt.sn.password = abc\n",
2✔
531
                        "emqx_stomp.conf":             "stomp.listener = 61613\nstomp.listener.acceptors = 4\nstomp.listener.max_connections = 512\nstomp.default_user.login = guest\nstomp.default_user.passcode = guest\nstomp.allow_anonymous = true\nstomp.frame.max_headers = 10\nstomp.frame.max_header_length = 1024\nstomp.frame.max_body_length = 8192\n",
2✔
532
                        "emqx_tcp.conf":               "tcp.proto.idle_timeout = 15s\ntcp.proto.up_topic = tcp/%c/up\ntcp.proto.dn_topic = tcp/%c/dn\ntcp.proto.max_packet_size = 65535\ntcp.proto.enable_stats = on\ntcp.proto.force_gc_policy = 1000|1MB\ntcp.listener.external = 0.0.0.0:8090\ntcp.listener.external.acceptors = 8\ntcp.listener.external.max_connections = 1024000\ntcp.listener.external.max_conn_rate = 1000\ntcp.listener.external.active_n = 100\ntcp.listener.external.access.1 = allow all\ntcp.listener.external.backlog = 1024\ntcp.listener.external.send_timeout = 15s\ntcp.listener.external.send_timeout_close = on\ntcp.listener.external.nodelay = true\ntcp.listener.external.reuseaddr = true\ntcp.listener.ssl.external = 0.0.0.0:8091\ntcp.listener.ssl.external.acceptors = 8\ntcp.listener.ssl.external.max_connections = 1024000\ntcp.listener.ssl.external.max_conn_rate = 1000\ntcp.listener.ssl.external.active_n = 100\ntcp.listener.ssl.external.access.1 = allow all\ntcp.listener.ssl.external.handshake_timeout = 15s\ntcp.listener.ssl.external.keyfile = etc/certs/key.pem\ntcp.listener.ssl.external.certfile = etc/certs/cert.pem\ntcp.listener.ssl.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA\ntcp.listener.ssl.external.backlog = 1024\ntcp.listener.ssl.external.send_timeout = 15s\ntcp.listener.ssl.external.send_timeout_close = on\ntcp.listener.ssl.external.nodelay = true\ntcp.listener.ssl.external.reuseaddr = true\n",
2✔
533
                        "emqx_web_hook.conf":          "web.hook.url = http://127.0.0.1:80\nweb.hook.headers.content-type = application/json\nweb.hook.body.encoding_of_payload_field = plain\nweb.hook.pool_size = 32\nweb.hook.enable_pipelining = true\n",
2✔
534
                },
2✔
535
        }
2✔
536

2✔
537
        return cm
2✔
538
}
2✔
539

540
func generateHeadlessSvc(instance appsv1beta3.Emqx) *corev1.Service {
2✔
541
        names := appsv1beta3.Names{Object: instance}
2✔
542

2✔
543
        headlessSvc := &corev1.Service{
2✔
544
                TypeMeta: metav1.TypeMeta{
2✔
545
                        APIVersion: "v1",
2✔
546
                        Kind:       "Service",
2✔
547
                },
2✔
548
                ObjectMeta: metav1.ObjectMeta{
2✔
549
                        Labels:    instance.GetLabels(),
2✔
550
                        Name:      names.HeadlessSvc(),
2✔
551
                        Namespace: instance.GetNamespace(),
2✔
552
                },
2✔
553
                Spec: corev1.ServiceSpec{
2✔
554
                        Selector:                 instance.GetLabels(),
2✔
555
                        ClusterIP:                corev1.ClusterIPNone,
2✔
556
                        PublishNotReadyAddresses: true,
2✔
557
                },
2✔
558
        }
2✔
559

2✔
560
        compile := regexp.MustCompile(".*management.*")
2✔
561
        for _, port := range instance.GetServiceTemplate().Spec.Ports {
4✔
562
                if compile.MatchString(port.Name) {
4✔
563
                        // Headless services must not set nodePort
2✔
564
                        headlessSvc.Spec.Ports = append(headlessSvc.Spec.Ports, corev1.ServicePort{
2✔
565
                                Name:        port.Name,
2✔
566
                                Protocol:    port.Protocol,
2✔
567
                                AppProtocol: port.AppProtocol,
2✔
568
                                TargetPort:  port.TargetPort,
2✔
569
                                Port:        port.Port,
2✔
570
                        })
2✔
571
                }
2✔
572
        }
573
        return headlessSvc
2✔
574
}
575

576
func generateSvc(instance appsv1beta3.Emqx) *corev1.Service {
2✔
577
        return &corev1.Service{
2✔
578
                TypeMeta: metav1.TypeMeta{
2✔
579
                        APIVersion: "v1",
2✔
580
                        Kind:       "Service",
2✔
581
                },
2✔
582
                ObjectMeta: instance.GetServiceTemplate().ObjectMeta,
2✔
583
                Spec:       instance.GetServiceTemplate().Spec,
2✔
584
        }
2✔
585
}
2✔
586

587
func generateAcl(instance appsv1beta3.Emqx) *corev1.ConfigMap {
2✔
588
        if len(instance.GetACL()) == 0 {
4✔
589
                return nil
2✔
590
        }
2✔
591
        names := appsv1beta3.Names{Object: instance}
2✔
592

2✔
593
        var aclString string
2✔
594
        for _, rule := range instance.GetACL() {
4✔
595
                aclString += fmt.Sprintf("%s\n", rule)
2✔
596
        }
2✔
597
        cm := &corev1.ConfigMap{
2✔
598
                TypeMeta: metav1.TypeMeta{
2✔
599
                        APIVersion: "v1",
2✔
600
                        Kind:       "ConfigMap",
2✔
601
                },
2✔
602
                ObjectMeta: metav1.ObjectMeta{
2✔
603
                        Labels:    instance.GetLabels(),
2✔
604
                        Namespace: instance.GetNamespace(),
2✔
605
                        Name:      names.ACL(),
2✔
606
                },
2✔
607
                Data: map[string]string{"acl.conf": aclString},
2✔
608
        }
2✔
609
        return cm
2✔
610
}
611

612
func generateLoadedModules(instance appsv1beta3.Emqx) *corev1.ConfigMap {
2✔
613
        names := appsv1beta3.Names{Object: instance}
2✔
614
        var loadedModulesString string
2✔
615
        switch obj := instance.(type) {
2✔
616
        case *appsv1beta3.EmqxBroker:
2✔
617
                modules := &appsv1beta3.EmqxBrokerModuleList{
2✔
618
                        Items: obj.Spec.EmqxTemplate.Modules,
2✔
619
                }
2✔
620
                loadedModulesString = modules.String()
2✔
621
                if loadedModulesString == "" {
3✔
622
                        return nil
1✔
623
                }
1✔
624
        case *appsv1beta3.EmqxEnterprise:
2✔
625
                modules := &appsv1beta3.EmqxEnterpriseModuleList{
2✔
626
                        Items: obj.Spec.EmqxTemplate.Modules,
2✔
627
                }
2✔
628
                // for enterprise, if modules is empty, don't create configmap
2✔
629
                loadedModulesString = modules.String()
2✔
630
                if loadedModulesString == "" {
3✔
631
                        return nil
1✔
632
                }
1✔
633
        }
634

635
        cm := &corev1.ConfigMap{
2✔
636
                TypeMeta: metav1.TypeMeta{
2✔
637
                        APIVersion: "v1",
2✔
638
                        Kind:       "ConfigMap",
2✔
639
                },
2✔
640
                ObjectMeta: metav1.ObjectMeta{
2✔
641
                        Labels:    instance.GetLabels(),
2✔
642
                        Namespace: instance.GetNamespace(),
2✔
643
                        Name:      names.LoadedModules(),
2✔
644
                },
2✔
645
                Data: map[string]string{"loaded_modules": loadedModulesString},
2✔
646
        }
2✔
647

2✔
648
        return cm
2✔
649
}
650

651
func generateLicense(emqxEnterprise *appsv1beta3.EmqxEnterprise) *corev1.Secret {
2✔
652
        names := appsv1beta3.Names{Object: emqxEnterprise}
2✔
653
        license := emqxEnterprise.GetLicense()
2✔
654
        if len(license.Data) == 0 && len(license.StringData) == 0 {
3✔
655
                return nil
1✔
656
        }
1✔
657

658
        secret := &corev1.Secret{
2✔
659
                TypeMeta: metav1.TypeMeta{
2✔
660
                        APIVersion: "v1",
2✔
661
                        Kind:       "Secret",
2✔
662
                },
2✔
663
                ObjectMeta: metav1.ObjectMeta{
2✔
664
                        Labels:    emqxEnterprise.GetLabels(),
2✔
665
                        Namespace: emqxEnterprise.GetNamespace(),
2✔
666
                        Name:      names.License(),
2✔
667
                },
2✔
668
                Type: corev1.SecretTypeOpaque,
2✔
669
                Data: map[string][]byte{"emqx.lic": emqxEnterprise.GetLicense().Data},
2✔
670
        }
2✔
671
        if emqxEnterprise.GetLicense().StringData != "" {
2✔
672
                secret.StringData = map[string]string{"emqx.lic": emqxEnterprise.GetLicense().StringData}
×
673
        }
×
674
        return secret
2✔
675
}
676

677
func generateDataVolume(instance appsv1beta3.Emqx, sts *appsv1.StatefulSet) *appsv1.StatefulSet {
2✔
678
        names := appsv1beta3.Names{Object: instance}
2✔
679
        dataName := names.Data()
2✔
680

2✔
681
        if reflect.ValueOf(instance.GetPersistent()).IsZero() {
4✔
682
                sts.Spec.Template.Spec.Volumes = append(
2✔
683
                        sts.Spec.Template.Spec.Volumes,
2✔
684
                        corev1.Volume{
2✔
685
                                Name: dataName,
2✔
686
                                VolumeSource: corev1.VolumeSource{
2✔
687
                                        EmptyDir: &corev1.EmptyDirVolumeSource{},
2✔
688
                                },
2✔
689
                        },
2✔
690
                )
2✔
691
        } else {
4✔
692
                sts.Spec.VolumeClaimTemplates = append(
2✔
693
                        sts.Spec.VolumeClaimTemplates,
2✔
694
                        generateVolumeClaimTemplate(instance, dataName),
2✔
695
                )
2✔
696
        }
2✔
697

698
        emqxContainerIndex := findContinerIndex(sts, handler.EmqxContainerName)
2✔
699
        sts.Spec.Template.Spec.Containers[emqxContainerIndex].VolumeMounts = append(
2✔
700
                sts.Spec.Template.Spec.Containers[emqxContainerIndex].VolumeMounts,
2✔
701
                corev1.VolumeMount{
2✔
702
                        Name:      dataName,
2✔
703
                        MountPath: "/opt/emqx/data",
2✔
704
                },
2✔
705
        )
2✔
706

2✔
707
        ReloaderContainerIndex := findContinerIndex(sts, ReloaderContainerName)
2✔
708
        sts.Spec.Template.Spec.Containers[ReloaderContainerIndex].VolumeMounts = sts.Spec.Template.Spec.Containers[emqxContainerIndex].VolumeMounts
2✔
709
        return sts
2✔
710
}
711

712
func generateVolumeClaimTemplate(instance appsv1beta3.Emqx, Name string) corev1.PersistentVolumeClaim {
2✔
713
        template := instance.GetPersistent()
2✔
714
        pvc := corev1.PersistentVolumeClaim{
2✔
715
                ObjectMeta: metav1.ObjectMeta{
2✔
716
                        Name:      Name,
2✔
717
                        Namespace: instance.GetNamespace(),
2✔
718
                },
2✔
719
                Spec: template,
2✔
720
        }
2✔
721
        if pvc.Spec.AccessModes == nil {
3✔
722
                pvc.Spec.AccessModes = []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce}
1✔
723
        }
1✔
724
        return pvc
2✔
725
}
726

727
func generateReloaderContainer(instance appsv1beta3.Emqx) *corev1.Container {
2✔
728
        return &corev1.Container{
2✔
729
                Name:            ReloaderContainerName,
2✔
730
                Image:           instance.GetReloaderImage(),
2✔
731
                ImagePullPolicy: instance.GetImagePullPolicy(),
2✔
732
                Args: []string{
2✔
733
                        "-u", instance.GetUsername(),
2✔
734
                        "-p", instance.GetPassword(),
2✔
735
                        "-P", appsv1beta3.DefaultManagementPort,
2✔
736
                },
2✔
737
        }
2✔
738
}
2✔
739

740
func generateEmqxContainer(instance appsv1beta3.Emqx) *corev1.Container {
2✔
741
        return &corev1.Container{
2✔
742
                Name:            handler.EmqxContainerName,
2✔
743
                Image:           instance.GetImage(),
2✔
744
                ImagePullPolicy: instance.GetImagePullPolicy(),
2✔
745
                Resources:       instance.GetResource(),
2✔
746
                Env: mergeEnvAndConfig(instance, []corev1.EnvVar{
2✔
747
                        {
2✔
748
                                Name:  "EMQX_MANAGEMENT__DEFAULT_APPLICATION__ID",
2✔
749
                                Value: instance.GetUsername(),
2✔
750
                        },
2✔
751
                        {
2✔
752
                                Name:  "EMQX_DASHBOARD__DEFAULT_USER__LOGIN",
2✔
753
                                Value: instance.GetUsername(),
2✔
754
                        },
2✔
755
                        {
2✔
756
                                Name:  "EMQX_MANAGEMENT__DEFAULT_APPLICATION__SECRET",
2✔
757
                                Value: instance.GetPassword(),
2✔
758
                        },
2✔
759
                        {
2✔
760
                                Name:  "EMQX_DASHBOARD__DEFAULT_USER__PASSWORD",
2✔
761
                                Value: instance.GetPassword(),
2✔
762
                        },
2✔
763
                }...),
2✔
764
                Args:           instance.GetArgs(),
2✔
765
                ReadinessProbe: instance.GetReadinessProbe(),
2✔
766
                LivenessProbe:  instance.GetLivenessProbe(),
2✔
767
                StartupProbe:   instance.GetStartupProbe(),
2✔
768
                VolumeMounts:   instance.GetExtraVolumeMounts(),
2✔
769
        }
2✔
770
}
2✔
771

772
func updateEnvAndVolumeForSts(sts *appsv1.StatefulSet, envVar corev1.EnvVar, volumeMount corev1.VolumeMount, volume corev1.Volume) *appsv1.StatefulSet {
2✔
773
        emqxContainerIndex := findContinerIndex(sts, handler.EmqxContainerName)
2✔
774
        reloaderContainerIndex := findContinerIndex(sts, ReloaderContainerName)
2✔
775

2✔
776
        isNotExistVolume := func(volume corev1.Volume) bool {
4✔
777
                for _, v := range sts.Spec.Template.Spec.Volumes {
3✔
778
                        if v.Name == volume.Name {
1✔
779
                                return false
×
780
                        }
×
781
                }
782
                return true
2✔
783
        }
784

785
        isNotExistVolumeVolumeMount := func(volumeMount corev1.VolumeMount) bool {
4✔
786
                for _, v := range sts.Spec.Template.Spec.Containers[emqxContainerIndex].VolumeMounts {
3✔
787
                        if v.Name == volumeMount.Name {
1✔
788
                                return false
×
789
                        }
×
790
                }
791
                return true
2✔
792
        }
793

794
        isNotExistEnv := func(envVar corev1.EnvVar) bool {
4✔
795
                for _, v := range sts.Spec.Template.Spec.Containers[emqxContainerIndex].Env {
3✔
796
                        if v.Name == envVar.Name {
1✔
797
                                return false
×
798
                        }
×
799
                }
800
                return true
2✔
801
        }
802

803
        if isNotExistVolume(volume) {
4✔
804
                sts.Spec.Template.Spec.Volumes = append(
2✔
805
                        sts.Spec.Template.Spec.Volumes,
2✔
806
                        volume,
2✔
807
                )
2✔
808
        }
2✔
809

810
        if isNotExistVolumeVolumeMount(volumeMount) {
4✔
811
                sts.Spec.Template.Spec.Containers[emqxContainerIndex].VolumeMounts = append(
2✔
812
                        sts.Spec.Template.Spec.Containers[emqxContainerIndex].VolumeMounts,
2✔
813
                        volumeMount,
2✔
814
                )
2✔
815
        }
2✔
816

817
        if isNotExistEnv(envVar) {
4✔
818
                sts.Spec.Template.Spec.Containers[emqxContainerIndex].Env = append(
2✔
819
                        sts.Spec.Template.Spec.Containers[emqxContainerIndex].Env,
2✔
820
                        envVar,
2✔
821
                )
2✔
822
        }
2✔
823

824
        sts.Spec.Template.Spec.Containers[reloaderContainerIndex].VolumeMounts = sts.Spec.Template.Spec.Containers[emqxContainerIndex].VolumeMounts
2✔
825
        sts.Spec.Template.Spec.Containers[reloaderContainerIndex].Env = sts.Spec.Template.Spec.Containers[emqxContainerIndex].Env
2✔
826

2✔
827
        return sts
2✔
828
}
829

830
func mergeEnvAndConfig(instance appsv1beta3.Emqx, extraEnvs ...corev1.EnvVar) []corev1.EnvVar {
2✔
831
        lookup := func(name string, envs []corev1.EnvVar) bool {
4✔
832
                for _, env := range envs {
4✔
833
                        if env.Name == name {
3✔
834
                                return true
1✔
835
                        }
1✔
836
                }
837
                return false
2✔
838
        }
839

840
        envs := append(instance.GetEnv(), extraEnvs...)
2✔
841
        emqxConfig := instance.GetEmqxConfig()
2✔
842

2✔
843
        for k, v := range emqxConfig {
4✔
844
                key := fmt.Sprintf("EMQX_%s", strings.ToUpper(strings.ReplaceAll(k, ".", "__")))
2✔
845
                if !lookup(key, envs) {
4✔
846
                        envs = append(envs, corev1.EnvVar{Name: key, Value: v})
2✔
847
                }
2✔
848
        }
849

850
        sort.Slice(envs, func(i, j int) bool {
4✔
851
                return envs[i].Name < envs[j].Name
2✔
852
        })
2✔
853
        return envs
2✔
854
}
855

856
func findContinerIndex(sts *appsv1.StatefulSet, containerName string) int {
2✔
857
        for k, c := range sts.Spec.Template.Spec.Containers {
4✔
858
                if c.Name == containerName {
4✔
859
                        return k
2✔
860
                }
2✔
861
        }
862
        return -1
×
863
}
864

865
func generateAnnotationByContainers(containers []corev1.Container) string {
2✔
866
        containerNames := []string{}
2✔
867
        for _, c := range containers {
4✔
868
                containerNames = append(containerNames, c.Name)
2✔
869
        }
2✔
870
        return strings.Join(containerNames, ",")
2✔
871
}
872

873
func updateEmqxStatus(instance appsv1beta3.Emqx, emqxNodes []appsv1beta3.EmqxNode) appsv1beta3.Emqx {
2✔
874
        status := instance.GetStatus()
2✔
875
        status.Replicas = *instance.GetReplicas()
2✔
876
        if emqxNodes != nil {
4✔
877
                readyReplicas := int32(0)
2✔
878
                for _, node := range emqxNodes {
4✔
879
                        if node.NodeStatus == "Running" {
4✔
880
                                readyReplicas++
2✔
881
                        }
2✔
882
                }
883
                status.ReadyReplicas = readyReplicas
2✔
884
                status.EmqxNodes = emqxNodes
2✔
885
        }
886

887
        var cond *appsv1beta3.Condition
2✔
888
        if status.Replicas == status.ReadyReplicas {
4✔
889
                cond = appsv1beta3.NewCondition(
2✔
890
                        appsv1beta3.ConditionRunning,
2✔
891
                        corev1.ConditionTrue,
2✔
892
                        "ClusterReady",
2✔
893
                        "All resources are ready",
2✔
894
                )
2✔
895
        } else {
4✔
896
                cond = appsv1beta3.NewCondition(
2✔
897
                        appsv1beta3.ConditionRunning,
2✔
898
                        corev1.ConditionFalse,
2✔
899
                        "ClusterNotReady",
2✔
900
                        "Some nodes are not ready",
2✔
901
                )
2✔
902
        }
2✔
903
        status.SetCondition(*cond)
2✔
904
        instance.SetStatus(status)
2✔
905
        return instance
2✔
906
}
907

908
func updatePluginsConfigForSts(sts *appsv1.StatefulSet, PluginsConfig *corev1.ConfigMap) *appsv1.StatefulSet {
2✔
909
        return updateEnvAndVolumeForSts(sts,
2✔
910
                corev1.EnvVar{
2✔
911
                        Name:  "EMQX_PLUGINS__ETC_DIR",
2✔
912
                        Value: "/mounted/plugins/etc",
2✔
913
                },
2✔
914
                corev1.VolumeMount{
2✔
915
                        Name:      PluginsConfig.Name,
2✔
916
                        MountPath: "/mounted/plugins/etc",
2✔
917
                },
2✔
918
                corev1.Volume{
2✔
919
                        Name: PluginsConfig.Name,
2✔
920
                        VolumeSource: corev1.VolumeSource{
2✔
921
                                ConfigMap: &corev1.ConfigMapVolumeSource{
2✔
922
                                        LocalObjectReference: corev1.LocalObjectReference{
2✔
923
                                                Name: PluginsConfig.Name,
2✔
924
                                        },
2✔
925
                                },
2✔
926
                        },
2✔
927
                },
2✔
928
        )
2✔
929
}
2✔
930

931
func updateAclForSts(sts *appsv1.StatefulSet, acl *corev1.ConfigMap) *appsv1.StatefulSet {
2✔
932
        if sts.Spec.Template.Annotations == nil {
3✔
933
                sts.Spec.Template.Annotations = make(map[string]string)
1✔
934
        }
1✔
935
        sts.Spec.Template.Annotations["ACL/Base64EncodeConfig"] = base64.StdEncoding.EncodeToString([]byte(acl.Data["acl.conf"]))
2✔
936
        return updateEnvAndVolumeForSts(sts,
2✔
937
                corev1.EnvVar{
2✔
938
                        Name:  "EMQX_ACL_FILE",
2✔
939
                        Value: "/mounted/acl/acl.conf",
2✔
940
                },
2✔
941
                corev1.VolumeMount{
2✔
942
                        Name:      acl.Name,
2✔
943
                        MountPath: "/mounted/acl",
2✔
944
                },
2✔
945
                corev1.Volume{
2✔
946
                        Name: acl.Name,
2✔
947
                        VolumeSource: corev1.VolumeSource{
2✔
948
                                ConfigMap: &corev1.ConfigMapVolumeSource{
2✔
949
                                        LocalObjectReference: corev1.LocalObjectReference{
2✔
950
                                                Name: acl.Name,
2✔
951
                                        },
2✔
952
                                },
2✔
953
                        },
2✔
954
                },
2✔
955
        )
2✔
956
}
957

958
func updateLoadedModulesForSts(sts *appsv1.StatefulSet, loadedModules *corev1.ConfigMap) *appsv1.StatefulSet {
2✔
959
        if sts.Spec.Template.Annotations == nil {
3✔
960
                sts.Spec.Template.Annotations = make(map[string]string)
1✔
961
        }
1✔
962
        sts.Spec.Template.Annotations["LoadedModules/Base64EncodeConfig"] = base64.StdEncoding.EncodeToString([]byte(loadedModules.Data["loaded_modules"]))
2✔
963
        return updateEnvAndVolumeForSts(sts,
2✔
964
                corev1.EnvVar{
2✔
965
                        Name:  "EMQX_MODULES__LOADED_FILE",
2✔
966
                        Value: "/mounted/modules/loaded_modules",
2✔
967
                },
2✔
968
                corev1.VolumeMount{
2✔
969
                        Name:      loadedModules.Name,
2✔
970
                        MountPath: "/mounted/modules",
2✔
971
                },
2✔
972
                corev1.Volume{
2✔
973
                        Name: loadedModules.Name,
2✔
974
                        VolumeSource: corev1.VolumeSource{
2✔
975
                                ConfigMap: &corev1.ConfigMapVolumeSource{
2✔
976
                                        LocalObjectReference: corev1.LocalObjectReference{
2✔
977
                                                Name: loadedModules.Name,
2✔
978
                                        },
2✔
979
                                },
2✔
980
                        },
2✔
981
                },
2✔
982
        )
2✔
983
}
984

985
func updateLicenseForsts(sts *appsv1.StatefulSet, license *corev1.Secret) *appsv1.StatefulSet {
2✔
986
        fileName := "emqx.lic"
2✔
987
        for k := range license.Data {
4✔
988
                fileName = k
2✔
989
                break
2✔
990
        }
991

992
        return updateEnvAndVolumeForSts(sts,
2✔
993
                corev1.EnvVar{
2✔
994
                        Name:  "EMQX_LICENSE__FILE",
2✔
995
                        Value: filepath.Join("/mounted/license", fileName),
2✔
996
                },
2✔
997
                corev1.VolumeMount{
2✔
998
                        Name:      license.Name,
2✔
999
                        MountPath: "/mounted/license",
2✔
1000
                        ReadOnly:  true,
2✔
1001
                },
2✔
1002
                corev1.Volume{
2✔
1003
                        Name: license.Name,
2✔
1004
                        VolumeSource: corev1.VolumeSource{
2✔
1005
                                Secret: &corev1.SecretVolumeSource{
2✔
1006
                                        SecretName: license.Name,
2✔
1007
                                },
2✔
1008
                        },
2✔
1009
                },
2✔
1010
        )
2✔
1011
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc