• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx-operator / 6269452303

22 Sep 2023 02:35AM UTC coverage: 73.185%. First build
6269452303

Pull #952

github

Rory-Z
fix: fix didn't delete pod when CR's replicas = 0

Signed-off-by: Rory Z <16801068+Rory-Z@users.noreply.github.com>
Pull Request #952: fix: fix didn't delete pod when CR's replicas = 0

9 of 24 new or added lines in 1 file covered. (37.5%)

1815 of 2480 relevant lines covered (73.19%)

1.26 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.85
/controllers/apps/v1beta3/emqx_handler.go
1
/*
2
Copyright 2021.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package apps
18

19
import (
20
        "context"
21
        "encoding/base64"
22
        "fmt"
23
        "net"
24
        "path/filepath"
25
        "reflect"
26
        "regexp"
27
        "sort"
28
        "strconv"
29
        "strings"
30
        "time"
31

32
        emperror "emperror.dev/errors"
33
        appsv1 "k8s.io/api/apps/v1"
34
        corev1 "k8s.io/api/core/v1"
35
        k8sErrors "k8s.io/apimachinery/pkg/api/errors"
36
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
37

38
        "k8s.io/apimachinery/pkg/labels"
39
        "k8s.io/apimachinery/pkg/runtime"
40
        "k8s.io/apimachinery/pkg/types"
41
        "k8s.io/apimachinery/pkg/util/intstr"
42
        "k8s.io/client-go/tools/record"
43
        "k8s.io/client-go/util/retry"
44

45
        ctrl "sigs.k8s.io/controller-runtime"
46
        "sigs.k8s.io/controller-runtime/pkg/client"
47
        "sigs.k8s.io/controller-runtime/pkg/reconcile"
48

49
        appsv1beta3 "github.com/emqx/emqx-operator/apis/apps/v1beta3"
50
        "github.com/emqx/emqx-operator/pkg/handler"
51
        json "github.com/json-iterator/go"
52
        "github.com/tidwall/gjson"
53
)
54

55
var _ reconcile.Reconciler = &EmqxBrokerReconciler{}
56

57
const (
58
        ReloaderContainerName = "reloader"
59
)
60

61
type EmqxReconciler struct {
62
        handler.Handler
63
        Scheme *runtime.Scheme
64
        record.EventRecorder
65
}
66

67
func (r *EmqxReconciler) Do(ctx context.Context, instance appsv1beta3.Emqx) (ctrl.Result, error) {
1✔
68
        if instance.GetDeletionTimestamp() != nil {
1✔
69
                return ctrl.Result{}, nil
×
70
        }
×
71
        if instance.GetReplicas() == nil || *instance.GetReplicas() == 0 {
1✔
NEW
72
                if err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
×
NEW
73
                        storageSts := &appsv1.StatefulSet{}
×
NEW
74
                        if err := r.Get(ctx, client.ObjectKeyFromObject(generateStatefulSetDef(instance)), storageSts); err != nil {
×
NEW
75
                                if k8sErrors.IsNotFound(err) {
×
NEW
76
                                        return nil
×
NEW
77
                                }
×
NEW
78
                                return err
×
79
                        }
NEW
80
                        storageSts.Spec.Replicas = instance.GetReplicas()
×
NEW
81
                        return r.Handler.Update(storageSts, func(o client.Object) error { return nil })
×
NEW
82
                }); err != nil {
×
NEW
83
                        return ctrl.Result{}, emperror.Wrap(err, "failed to update statefulSet")
×
NEW
84
                }
×
85

86
                if err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
×
87
                        _ = r.Get(ctx, client.ObjectKeyFromObject(instance), instance)
×
88
                        instance = updateEmqxStatus(instance, []appsv1beta3.EmqxNode{})
×
89
                        return r.Status().Update(ctx, instance)
×
90
                }); err != nil {
×
91
                        return ctrl.Result{}, emperror.Wrap(err, "failed to update status")
×
92
                }
×
93
                return ctrl.Result{}, nil
×
94
        }
95

96
        var err error
1✔
97
        var emqxNodes []appsv1beta3.EmqxNode
1✔
98
        emqxNodes, err = r.getNodeStatusesByAPI(instance)
1✔
99
        if err != nil {
2✔
100
                r.EventRecorder.Event(instance, corev1.EventTypeWarning, "FailedToGetNodeStatues", err.Error())
1✔
101
                condition := appsv1beta3.NewCondition(
1✔
102
                        appsv1beta3.ConditionRunning,
1✔
103
                        corev1.ConditionFalse,
1✔
104
                        "FailedToGetNodeStatues",
1✔
105
                        err.Error(),
1✔
106
                )
1✔
107
                instance.SetCondition(*condition)
1✔
108
                _ = r.Status().Update(ctx, instance)
1✔
109
        }
1✔
110
        if err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
2✔
111
                _ = r.Get(ctx, client.ObjectKeyFromObject(instance), instance)
1✔
112
                instance = updateEmqxStatus(instance, emqxNodes)
1✔
113
                return r.Status().Update(ctx, instance)
1✔
114
        }); err != nil {
1✔
115
                return ctrl.Result{}, emperror.Wrap(err, "failed to update status")
×
116
        }
×
117

118
        var resources []client.Object
1✔
119
        postFn := func(client.Object) error { return nil }
2✔
120

121
        sts := generateStatefulSetDef(instance)
1✔
122

1✔
123
        storeSts := &appsv1.StatefulSet{}
1✔
124
        if err := r.Get(ctx, client.ObjectKeyFromObject(sts), storeSts); err != nil {
2✔
125
                if !k8sErrors.IsNotFound(err) {
1✔
126
                        return ctrl.Result{}, err
×
127
                }
×
128
        }
129
        // store statefulSet is exit
130
        if storeSts.Spec.PodManagementPolicy != "" {
2✔
131
                sts.Spec.PodManagementPolicy = storeSts.Spec.PodManagementPolicy
1✔
132
        }
1✔
133
        // compatible with 1.2.2
134
        if storeSts.Spec.VolumeClaimTemplates != nil {
2✔
135
                sts.Spec.VolumeClaimTemplates = storeSts.Spec.VolumeClaimTemplates
1✔
136
        }
1✔
137
        // compatible with 1.2.2
138
        if storeSts.Annotations != nil {
2✔
139
                sts.Annotations = storeSts.Annotations
1✔
140
        }
1✔
141

142
        defaultPluginsConfig := generateDefaultPluginsConfig(instance)
1✔
143
        sts = updatePluginsConfigForSts(sts, defaultPluginsConfig)
1✔
144

1✔
145
        if status := instance.GetStatus(); !status.IsPluginInitialized() {
2✔
146
                resources = append(resources, defaultPluginsConfig)
1✔
147

1✔
148
                pluginsList := &appsv1beta3.EmqxPluginList{}
1✔
149
                err = r.Client.List(ctx, pluginsList, client.InNamespace(instance.GetNamespace()))
1✔
150
                if err != nil && !k8sErrors.IsNotFound(err) {
1✔
151
                        return ctrl.Result{}, err
×
152
                }
×
153
                var condition *appsv1beta3.Condition
1✔
154
                pluginResourceList := generateInitPluginList(instance, pluginsList)
1✔
155
                resources = append(resources, pluginResourceList...)
1✔
156

1✔
157
                err = r.CreateOrUpdateList(instance, r.Scheme, resources, postFn)
1✔
158
                if err != nil {
1✔
159
                        r.EventRecorder.Event(instance, corev1.EventTypeWarning, "FailedCreateOrUpdate", err.Error())
×
160
                        condition = appsv1beta3.NewCondition(
×
161
                                appsv1beta3.ConditionPluginInitialized,
×
162
                                corev1.ConditionFalse,
×
163
                                "PluginInitializeFailed",
×
164
                                err.Error(),
×
165
                        )
×
166
                        instance.SetCondition(*condition)
×
167
                        _ = r.Status().Update(ctx, instance)
×
168
                        return ctrl.Result{RequeueAfter: time.Duration(5) * time.Second}, err
×
169
                }
×
170
                condition = appsv1beta3.NewCondition(
1✔
171
                        appsv1beta3.ConditionPluginInitialized,
1✔
172
                        corev1.ConditionTrue,
1✔
173
                        "PluginInitializeSuccessfully",
1✔
174
                        "All default plugins initialized",
1✔
175
                )
1✔
176
                instance.SetCondition(*condition)
1✔
177
                _ = r.Status().Update(ctx, instance)
1✔
178
                return ctrl.Result{RequeueAfter: time.Duration(5) * time.Second}, nil
1✔
179
        }
180

181
        if acl := generateAcl(instance); acl != nil {
2✔
182
                resources = append(resources, acl)
1✔
183
                sts = updateAclForSts(sts, acl)
1✔
184
        }
1✔
185

186
        if loadedModules := generateLoadedModules(instance); loadedModules != nil {
2✔
187
                resources = append(resources, loadedModules)
1✔
188
                sts = updateLoadedModulesForSts(sts, loadedModules)
1✔
189
        }
1✔
190

191
        if emqxEnterprise, ok := instance.(*appsv1beta3.EmqxEnterprise); ok {
2✔
192
                var license *corev1.Secret
1✔
193
                if emqxEnterprise.GetLicense().SecretName != "" {
1✔
194
                        license = &corev1.Secret{}
×
195
                        if err := r.Client.Get(context.Background(), types.NamespacedName{Name: emqxEnterprise.GetLicense().SecretName, Namespace: emqxEnterprise.GetNamespace()}, license); err != nil {
×
196
                                return ctrl.Result{}, err
×
197
                        }
×
198
                } else {
1✔
199
                        license = generateLicense(emqxEnterprise)
1✔
200
                }
1✔
201

202
                if license != nil {
2✔
203
                        resources = append(resources, license)
1✔
204
                        sts = updateLicenseForsts(sts, license)
1✔
205
                }
1✔
206
        }
207

208
        if status := instance.GetStatus(); status.IsRunning() {
2✔
209
                serviceTemplate := instance.GetServiceTemplate()
1✔
210
                serviceTemplate.MergePorts(r.getListenerPortsByAPI(instance))
1✔
211
                instance.SetServiceTemplate(serviceTemplate)
1✔
212
                svc := generateSvc(instance)
1✔
213
                resources = append(resources, svc)
1✔
214
        }
1✔
215

216
        headlessSvc := generateHeadlessSvc(instance)
1✔
217
        sts.Spec.ServiceName = headlessSvc.Name
1✔
218
        resources = append(resources, headlessSvc)
1✔
219

1✔
220
        resources = append(resources, sts)
1✔
221

1✔
222
        if err := r.CreateOrUpdateList(instance, r.Scheme, resources, postFn); err != nil {
2✔
223
                r.EventRecorder.Event(instance, corev1.EventTypeWarning, "FailedCreateOrUpdate", err.Error())
1✔
224
                condition := appsv1beta3.NewCondition(
1✔
225
                        appsv1beta3.ConditionRunning,
1✔
226
                        corev1.ConditionFalse,
1✔
227
                        "FailedCreateOrUpdate",
1✔
228
                        err.Error(),
1✔
229
                )
1✔
230
                instance.SetCondition(*condition)
1✔
231
                _ = r.Status().Update(ctx, instance)
1✔
232
                return ctrl.Result{RequeueAfter: time.Duration(5) * time.Second}, err
1✔
233
        }
1✔
234

235
        emqxNodes, err = r.getNodeStatusesByAPI(instance)
1✔
236
        if err != nil {
2✔
237
                r.EventRecorder.Event(instance, corev1.EventTypeWarning, "FailedToGetNodeStatues", err.Error())
1✔
238
                condition := appsv1beta3.NewCondition(
1✔
239
                        appsv1beta3.ConditionRunning,
1✔
240
                        corev1.ConditionFalse,
1✔
241
                        "FailedToGetNodeStatues",
1✔
242
                        err.Error(),
1✔
243
                )
1✔
244
                instance.SetCondition(*condition)
1✔
245
                _ = r.Status().Update(ctx, instance)
1✔
246
        }
1✔
247

248
        if err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
2✔
249
                _ = r.Get(ctx, client.ObjectKeyFromObject(instance), instance)
1✔
250
                instance = updateEmqxStatus(instance, emqxNodes)
1✔
251
                return r.Status().Update(ctx, instance)
1✔
252
        }); err != nil {
1✔
253
                return ctrl.Result{}, emperror.Wrap(err, "failed to update status")
×
254
        }
×
255

256
        if status := instance.GetStatus(); !status.IsRunning() {
2✔
257
                return ctrl.Result{RequeueAfter: time.Duration(5) * time.Second}, nil
1✔
258
        }
1✔
259
        return ctrl.Result{RequeueAfter: time.Duration(20) * time.Second}, nil
1✔
260
}
261

262
func (r *EmqxReconciler) getListenerPortsByAPI(instance appsv1beta3.Emqx) []corev1.ServicePort {
1✔
263
        resp, body, err := r.Handler.RequestAPI(instance, "GET", instance.GetUsername(), instance.GetPassword(), appsv1beta3.DefaultManagementPort, "api/v4/listeners")
1✔
264
        if err != nil {
1✔
265
                return nil
×
266
        }
×
267
        if resp.StatusCode != 200 {
1✔
268
                return nil
×
269
        }
×
270

271
        ports := []corev1.ServicePort{}
1✔
272
        listeners := gjson.GetBytes(body, "data.0.listeners")
1✔
273
        for _, l := range listeners.Array() {
2✔
274
                var name string
1✔
275
                var protocol corev1.Protocol
1✔
276
                var strPort string
1✔
277
                var intPort int
1✔
278

1✔
279
                compile := regexp.MustCompile(".*(udp|dtls|sn).*")
1✔
280
                proto := gjson.Get(l.Raw, "protocol").String()
1✔
281
                if compile.MatchString(proto) {
2✔
282
                        protocol = corev1.ProtocolUDP
1✔
283
                } else {
2✔
284
                        protocol = corev1.ProtocolTCP
1✔
285
                }
1✔
286

287
                listenOn := gjson.Get(l.Raw, "listen_on").String()
1✔
288
                if strings.Contains(listenOn, ":") {
2✔
289
                        _, strPort, err = net.SplitHostPort(listenOn)
1✔
290
                        if err != nil {
1✔
291
                                strPort = listenOn
×
292
                        }
×
293
                } else {
1✔
294
                        strPort = listenOn
1✔
295
                }
1✔
296
                intPort, _ = strconv.Atoi(strPort)
1✔
297

1✔
298
                // Get name by protocol and port from API
1✔
299
                // protocol maybe like mqtt:wss:8084
1✔
300
                // protocol maybe like mqtt:tcp
1✔
301
                // We had to do something with the "protocol" to make it conform to the kubernetes service port name specification
1✔
302
                name = regexp.MustCompile(`:[\d]+`).ReplaceAllString(proto, "")
1✔
303
                name = strings.ReplaceAll(name, ":", "-")
1✔
304
                name = fmt.Sprintf("%s-%s", name, strPort)
1✔
305

1✔
306
                ports = append(ports, corev1.ServicePort{
1✔
307
                        Name:       name,
1✔
308
                        Protocol:   protocol,
1✔
309
                        Port:       int32(intPort),
1✔
310
                        TargetPort: intstr.FromInt(intPort),
1✔
311
                })
1✔
312
        }
313
        return ports
1✔
314
}
315

316
func (r *EmqxReconciler) getNodeStatusesByAPI(instance appsv1beta3.Emqx) ([]appsv1beta3.EmqxNode, error) {
1✔
317
        resp, body, err := r.Handler.RequestAPI(instance, "GET", instance.GetUsername(), instance.GetPassword(), appsv1beta3.DefaultManagementPort, "api/v4/nodes")
1✔
318
        if err != nil {
2✔
319
                return nil, err
1✔
320
        }
1✔
321
        if resp.StatusCode != 200 {
1✔
322
                return nil, fmt.Errorf("failed to get node statuses from API: %s", resp.Status)
×
323
        }
×
324

325
        emqxNodes := []appsv1beta3.EmqxNode{}
1✔
326
        data := gjson.GetBytes(body, "data")
1✔
327
        if err := json.Unmarshal([]byte(data.Raw), &emqxNodes); err != nil {
1✔
328
                return nil, fmt.Errorf("failed to unmarshal node statuses: %v", err)
×
329
        }
×
330
        sort.Slice(emqxNodes, func(i, j int) bool {
2✔
331
                return emqxNodes[i].Node < emqxNodes[j].Node
1✔
332
        })
1✔
333
        return emqxNodes, nil
1✔
334
}
335

336
func generateStatefulSetDef(instance appsv1beta3.Emqx) *appsv1.StatefulSet {
2✔
337
        annotations := instance.GetAnnotations()
2✔
338
        if annotations == nil {
3✔
339
                annotations = make(map[string]string)
1✔
340
        }
1✔
341
        delete(annotations, "kubectl.kubernetes.io/last-applied-configuration")
2✔
342

2✔
343
        podTemplate := corev1.PodTemplateSpec{
2✔
344
                ObjectMeta: metav1.ObjectMeta{
2✔
345
                        Labels:      instance.GetLabels(),
2✔
346
                        Annotations: annotations,
2✔
347
                },
2✔
348
                Spec: corev1.PodSpec{
2✔
349
                        Affinity:         instance.GetAffinity(),
2✔
350
                        Tolerations:      instance.GetToleRations(),
2✔
351
                        NodeName:         instance.GetNodeName(),
2✔
352
                        NodeSelector:     instance.GetNodeSelector(),
2✔
353
                        ImagePullSecrets: instance.GetImagePullSecrets(),
2✔
354
                        SecurityContext:  instance.GetSecurityContext(),
2✔
355
                        InitContainers:   instance.GetInitContainers(),
2✔
356
                        Containers: append(
2✔
357
                                []corev1.Container{
2✔
358
                                        *generateEmqxContainer(instance),
2✔
359
                                        *generateReloaderContainer(instance),
2✔
360
                                },
2✔
361
                                instance.GetExtraContainers()...,
2✔
362
                        ),
2✔
363
                        Volumes: instance.GetExtraVolumes(),
2✔
364
                },
2✔
365
        }
2✔
366

2✔
367
        podAnnotation := podTemplate.ObjectMeta.DeepCopy().Annotations
2✔
368
        podAnnotation[handler.ManageContainersAnnotation] = generateAnnotationByContainers(podTemplate.Spec.Containers)
2✔
369
        podTemplate.Annotations = podAnnotation
2✔
370

2✔
371
        sts := &appsv1.StatefulSet{
2✔
372
                TypeMeta: metav1.TypeMeta{
2✔
373
                        APIVersion: "apps/v1",
2✔
374
                        Kind:       "StatefulSet",
2✔
375
                },
2✔
376
                ObjectMeta: metav1.ObjectMeta{
2✔
377
                        Name:        instance.GetName(),
2✔
378
                        Namespace:   instance.GetNamespace(),
2✔
379
                        Labels:      instance.GetLabels(),
2✔
380
                        Annotations: annotations,
2✔
381
                },
2✔
382
                Spec: appsv1.StatefulSetSpec{
2✔
383
                        Replicas: instance.GetReplicas(),
2✔
384
                        Selector: &metav1.LabelSelector{
2✔
385
                                MatchLabels: instance.GetLabels(),
2✔
386
                        },
2✔
387
                        PodManagementPolicy: appsv1.ParallelPodManagement,
2✔
388
                        Template:            podTemplate,
2✔
389
                },
2✔
390
        }
2✔
391

2✔
392
        sts = generateDataVolume(instance, sts)
2✔
393

2✔
394
        return sts
2✔
395
}
396

397
func generateInitPluginList(instance appsv1beta3.Emqx, existPluginList *appsv1beta3.EmqxPluginList) []client.Object {
2✔
398
        matchedPluginList := []appsv1beta3.EmqxPlugin{}
2✔
399
        for _, existPlugin := range existPluginList.Items {
3✔
400
                selector, _ := labels.ValidatedSelectorFromSet(existPlugin.Spec.Selector)
1✔
401
                if selector.Empty() || !selector.Matches(labels.Set(instance.GetLabels())) {
2✔
402
                        continue
1✔
403
                }
404
                matchedPluginList = append(matchedPluginList, existPlugin)
1✔
405
        }
406

407
        isExistPlugin := func(pluginName string, pluginList []appsv1beta3.EmqxPlugin) bool {
4✔
408
                for _, plugin := range pluginList {
3✔
409
                        if plugin.Spec.PluginName == pluginName {
2✔
410
                                return true
1✔
411
                        }
1✔
412
                }
413
                return false
2✔
414
        }
415

416
        pluginList := []client.Object{}
2✔
417
        // Default plugins
2✔
418
        if !isExistPlugin("emqx_rule_engine", matchedPluginList) {
4✔
419
                emqxRuleEngine := &appsv1beta3.EmqxPlugin{
2✔
420
                        TypeMeta: metav1.TypeMeta{
2✔
421
                                APIVersion: "apps.emqx.io/v1beta3",
2✔
422
                                Kind:       "EmqxPlugin",
2✔
423
                        },
2✔
424
                        ObjectMeta: metav1.ObjectMeta{
2✔
425
                                Name:      fmt.Sprintf("%s-rule-engine", instance.GetName()),
2✔
426
                                Namespace: instance.GetNamespace(),
2✔
427
                                Labels:    instance.GetLabels(),
2✔
428
                        },
2✔
429
                        Spec: appsv1beta3.EmqxPluginSpec{
2✔
430
                                PluginName: "emqx_rule_engine",
2✔
431
                                Selector:   instance.GetLabels(),
2✔
432
                                Config:     map[string]string{},
2✔
433
                        },
2✔
434
                }
2✔
435
                pluginList = append(pluginList, emqxRuleEngine)
2✔
436
        }
2✔
437

438
        if !isExistPlugin("emqx_retainer", matchedPluginList) {
4✔
439
                emqxRetainer := &appsv1beta3.EmqxPlugin{
2✔
440
                        TypeMeta: metav1.TypeMeta{
2✔
441
                                APIVersion: "apps.emqx.io/v1beta3",
2✔
442
                                Kind:       "EmqxPlugin",
2✔
443
                        },
2✔
444
                        ObjectMeta: metav1.ObjectMeta{
2✔
445
                                Name:      fmt.Sprintf("%s-retainer", instance.GetName()),
2✔
446
                                Namespace: instance.GetNamespace(),
2✔
447
                                Labels:    instance.GetLabels(),
2✔
448
                        },
2✔
449
                        Spec: appsv1beta3.EmqxPluginSpec{
2✔
450
                                PluginName: "emqx_retainer",
2✔
451
                                Selector:   instance.GetLabels(),
2✔
452
                                Config:     map[string]string{},
2✔
453
                        },
2✔
454
                }
2✔
455
                pluginList = append(pluginList, emqxRetainer)
2✔
456
        }
2✔
457

458
        enterprise, ok := instance.(*appsv1beta3.EmqxEnterprise)
2✔
459
        if ok && !isExistPlugin("emqx_modules", matchedPluginList) {
4✔
460
                emqxModules := &appsv1beta3.EmqxPlugin{
2✔
461
                        TypeMeta: metav1.TypeMeta{
2✔
462
                                APIVersion: "apps.emqx.io/v1beta3",
2✔
463
                                Kind:       "EmqxPlugin",
2✔
464
                        },
2✔
465
                        ObjectMeta: metav1.ObjectMeta{
2✔
466
                                Name:      fmt.Sprintf("%s-modules", instance.GetName()),
2✔
467
                                Namespace: instance.GetNamespace(),
2✔
468
                                Labels:    instance.GetLabels(),
2✔
469
                        },
2✔
470
                        Spec: appsv1beta3.EmqxPluginSpec{
2✔
471
                                PluginName: "emqx_modules",
2✔
472
                                Selector:   instance.GetLabels(),
2✔
473
                                Config:     map[string]string{},
2✔
474
                        },
2✔
475
                }
2✔
476

2✔
477
                if enterprise.Spec.EmqxTemplate.Modules != nil {
4✔
478
                        emqxModules.Spec.Config = map[string]string{
2✔
479
                                "modules.loaded_file": "/mounted/modules/loaded_modules",
2✔
480
                        }
2✔
481
                }
2✔
482

483
                pluginList = append(pluginList, emqxModules)
2✔
484
        }
485

486
        return pluginList
2✔
487
}
488

489
func generateDefaultPluginsConfig(instance appsv1beta3.Emqx) *corev1.ConfigMap {
2✔
490
        names := appsv1beta3.Names{Object: instance}
2✔
491

2✔
492
        cm := &corev1.ConfigMap{
2✔
493
                TypeMeta: metav1.TypeMeta{
2✔
494
                        APIVersion: "v1",
2✔
495
                        Kind:       "ConfigMap",
2✔
496
                },
2✔
497
                ObjectMeta: metav1.ObjectMeta{
2✔
498
                        Labels:    instance.GetLabels(),
2✔
499
                        Namespace: instance.GetNamespace(),
2✔
500
                        Name:      names.PluginsConfig(),
2✔
501
                },
2✔
502
                Data: map[string]string{
2✔
503
                        "emqx_modules.conf":           "",
2✔
504
                        "emqx_management.conf":        "management.listener.http = 8081\n",
2✔
505
                        "emqx_dashboard.conf":         "dashboard.listener.http = 18083\n",
2✔
506
                        "emqx_rule_engine.conf":       "",
2✔
507
                        "emqx_retainer.conf":          "",
2✔
508
                        "emqx_telemetry.conf":         "",
2✔
509
                        "emqx_auth_http.conf":         "auth.http.auth_req.url = http://127.0.0.1:80/mqtt/auth\nauth.http.auth_req.method = post\nauth.http.auth_req.headers.content_type = application/x-www-form-urlencoded\nauth.http.auth_req.params = clientid=%c,username=%u,password=%P\nauth.http.acl_req.url = http://127.0.0.1:80/mqtt/acl\nauth.http.acl_req.method = post\nauth.http.acl_req.headers.content-type = application/x-www-form-urlencoded\nauth.http.acl_req.params = access=%A,username=%u,clientid=%c,ipaddr=%a,topic=%t,mountpoint=%m\nauth.http.timeout = 5s\nauth.http.connect_timeout = 5s\nauth.http.pool_size = 32\nauth.http.enable_pipelining = true\n",
2✔
510
                        "emqx_auth_jwt.conf":          "auth.jwt.secret = emqxsecret\nauth.jwt.from = password\nauth.jwt.verify_claims = off\n",
2✔
511
                        "emqx_auth_ldap.conf":         "auth.ldap.servers = 127.0.0.1\nauth.ldap.port = 389\nauth.ldap.pool = 8\nauth.ldap.bind_dn = cn=root,dc=emqx,dc=io\nauth.ldap.bind_password = public\nauth.ldap.timeout = 30s\nauth.ldap.device_dn = ou=device,dc=emqx,dc=io\nauth.ldap.match_objectclass = mqttUser\nauth.ldap.username.attributetype = uid\nauth.ldap.password.attributetype = userPassword\nauth.ldap.ssl = false\n",
2✔
512
                        "emqx_auth_mnesia.conf":       "",
2✔
513
                        "emqx_auth_mongo.conf":        "auth.mongo.type = single\nauth.mongo.srv_record = false\nauth.mongo.server = 127.0.0.1:27017\nauth.mongo.pool = 8\nauth.mongo.database = mqtt\nauth.mongo.topology.pool_size = 1\nauth.mongo.topology.max_overflow = 0\nauth.mongo.auth_query.password_hash = sha256\nauth.mongo.auth_query.collection = mqtt_user\nauth.mongo.auth_query.password_field = password\nauth.mongo.auth_query.selector = username=%u\nauth.mongo.super_query.collection = mqtt_user\nauth.mongo.super_query.super_field = is_superuser\nauth.mongo.super_query.selector = username=%u\nauth.mongo.acl_query.collection = mqtt_acl\nauth.mongo.acl_query.selector = username=%u\n",
2✔
514
                        "emqx_auth_mysql.conf":        "auth.mysql.server = 127.0.0.1:3306\nauth.mysql.pool = 8\nauth.mysql.database = mqtt\nauth.mysql.auth_query = select password from mqtt_user where username = '%u' limit 1\nauth.mysql.password_hash = sha256\nauth.mysql.super_query = select is_superuser from mqtt_user where username = '%u' limit 1\nauth.mysql.acl_query = select allow, ipaddr, username, clientid, access, topic from mqtt_acl where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'\n",
2✔
515
                        "emqx_auth_pgsql.conf":        "auth.pgsql.server = 127.0.0.1:5432\nauth.pgsql.pool = 8\nauth.pgsql.username = root\nauth.pgsql.database = mqtt\nauth.pgsql.encoding = utf8\nauth.pgsql.ssl = off\nauth.pgsql.auth_query = select password from mqtt_user where username = '%u' limit 1\nauth.pgsql.password_hash = sha256\nauth.pgsql.super_query = select is_superuser from mqtt_user where username = '%u' limit 1\nauth.pgsql.acl_query = select allow, ipaddr, username, clientid, access, topic from mqtt_acl where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'\n",
2✔
516
                        "emqx_auth_redis.conf":        "auth.redis.type = single\nauth.redis.server = 127.0.0.1:6379\nauth.redis.pool = 8\nauth.redis.database = 0\nauth.redis.auth_cmd = HMGET mqtt_user:%u password\nauth.redis.password_hash = plain\nauth.redis.super_cmd = HGET mqtt_user:%u is_superuser\nauth.redis.acl_cmd = HGETALL mqtt_acl:%u\n",
2✔
517
                        "emqx_backend_cassa.conf":     "backend.ecql.pool1.nodes = 127.0.0.1:9042\nbackend.ecql.pool1.size = 8\nbackend.ecql.pool1.auto_reconnect = 1\nbackend.ecql.pool1.username = cassandra\nbackend.ecql.pool1.password = cassandra\nbackend.ecql.pool1.keyspace = mqtt\nbackend.ecql.pool1.logger = info\nbackend.cassa.hook.client.connected.1    = {\"action\": {\"function\": \"on_client_connected\"}, \"pool\": \"pool1\"}\nbackend.cassa.hook.client.connected.2    = {\"action\": {\"function\": \"on_subscription_lookup\"}, \"pool\": \"pool1\"}\nbackend.cassa.hook.client.disconnected.1 = {\"action\": {\"function\": \"on_client_disconnected\"}, \"pool\": \"pool1\"}\nbackend.cassa.hook.session.subscribed.1  = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_fetch\"}, \"offline_opts\": {\"max_returned_count\": 500, \"time_range\": \"2h\"}, \"pool\": \"pool1\"}\nbackend.cassa.hook.session.subscribed.2  = {\"action\": {\"function\": \"on_retain_lookup\"}, \"pool\": \"pool1\"}\nbackend.cassa.hook.session.unsubscribed.1= {\"topic\": \"#\", \"action\": {\"cql\": [\"delete from acked where clientid = ${clientid} and topic = ${topic}\"]}, \"pool\": \"pool1\"}\nbackend.cassa.hook.message.publish.1     = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_publish\"}, \"pool\": \"pool1\"}\nbackend.cassa.hook.message.publish.2     = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_retain\"}, \"pool\": \"pool1\"}\nbackend.cassa.hook.message.publish.3     = {\"topic\": \"#\", \"action\": {\"function\": \"on_retain_delete\"}, \"pool\": \"pool1\"}\nbackend.cassa.hook.message.acked.1       = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_acked\"}, \"pool\": \"pool1\"}\n",
2✔
518
                        "emqx_backend_dynamo.conf":    "backend.dynamo.region = us-west-2\nbackend.dynamo.pool1.server = http://localhost:8000\nbackend.dynamo.pool1.pool_size = 8\nbackend.dynamo.pool1.aws_access_key_id = FAKE_AWS_ACCESS_KEY_ID\nbackend.dynamo.pool1.aws_secret_access_key = FAKE_AWS_SECRET_ACCESS_KEY\nbackend.dynamo.hook.client.connected.1    = {\"action\": {\"function\": \"on_client_connected\"}, \"pool\": \"pool1\"}\nbackend.dynamo.hook.client.connected.2    = {\"action\": {\"function\": \"on_subscribe_lookup\"}, \"pool\": \"pool1\"}\nbackend.dynamo.hook.client.disconnected.1 = {\"action\": {\"function\": \"on_client_disconnected\"}, \"pool\": \"pool1\"}\nbackend.dynamo.hook.session.subscribed.1  = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_fetch_for_queue\"}, \"pool\": \"pool1\"}\nbackend.dynamo.hook.session.subscribed.2  = {\"topic\": \"#\", \"action\": {\"function\": \"on_retain_lookup\"}, \"pool\": \"pool1\"}\nbackend.dynamo.hook.session.unsubscribed.1= {\"topic\": \"#\", \"action\": {\"function\": \"on_acked_delete\"}, \"pool\": \"pool1\"}\nbackend.dynamo.hook.message.publish.1     = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_publish\"}, \"pool\": \"pool1\"}\nbackend.dynamo.hook.message.publish.2     = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_retain\"}, \"pool\": \"pool1\"}\nbackend.dynamo.hook.message.publish.3     = {\"topic\": \"#\", \"action\": {\"function\": \"on_retain_delete\"}, \"pool\": \"pool1\"}\nbackend.dynamo.hook.message.acked.1       = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_acked_for_queue\"}, \"pool\": \"pool1\"}\n",
2✔
519
                        "emqx_backend_influxdb.conf":  "backend.influxdb.udp.pool1.server = 127.0.0.1:8089\nbackend.influxdb.udp.pool1.pool_size = 8\nbackend.influxdb.http.pool1.server = 127.0.0.1:8086\nbackend.influxdb.http.pool1.pool_size = 8\nbackend.influxdb.http.pool1.precision = ms\nbackend.influxdb.http.pool1.database = mydb\nbackend.influxdb.http.pool1.https_enabled = false\nbackend.influxdb.hook.message.publish.1 = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_publish\"}, \"pool\": \"pool1\"}\n",
2✔
520
                        "emqx_backend_mongo.conf":     "backend.mongo.pool1.type = single\nbackend.mongo.pool1.srv_record = false\nbackend.mongo.pool1.server = 127.0.0.1:27017\nbackend.mongo.pool1.c_pool_size = 8\nbackend.mongo.pool1.database = mqtt\nbackend.mongo.hook.client.connected.1    = {\"action\": {\"function\": \"on_client_connected\"}, \"pool\": \"pool1\"}\nbackend.mongo.hook.client.connected.2    = {\"action\": {\"function\": \"on_subscribe_lookup\"}, \"pool\": \"pool1\"}\nbackend.mongo.hook.client.disconnected.1 = {\"action\": {\"function\": \"on_client_disconnected\"}, \"pool\": \"pool1\"}\nbackend.mongo.hook.session.subscribed.1  = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_fetch\"}, \"pool\": \"pool1\", \"offline_opts\": {\"time_range\": \"2h\", \"max_returned_count\": 500}}\nbackend.mongo.hook.session.subscribed.2  = {\"topic\": \"#\", \"action\": {\"function\": \"on_retain_lookup\"}, \"pool\": \"pool1\"}\nbackend.mongo.hook.session.unsubscribed.1= {\"topic\": \"#\", \"action\": {\"function\": \"on_acked_delete\"}, \"pool\": \"pool1\"}\nbackend.mongo.hook.message.publish.1     = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_publish\"}, \"pool\": \"pool1\"}\nbackend.mongo.hook.message.publish.2     = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_retain\"}, \"pool\": \"pool1\"}\nbackend.mongo.hook.message.publish.3     = {\"topic\": \"#\", \"action\": {\"function\": \"on_retain_delete\"}, \"pool\": \"pool1\"}\nbackend.mongo.hook.message.acked.1       = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_acked\"}, \"pool\": \"pool1\"}\n",
2✔
521
                        "emqx_backend_mysql.conf":     "backend.mysql.pool1.server = 127.0.0.1:3306\nbackend.mysql.pool1.pool_size = 8\nbackend.mysql.pool1.user = root\nbackend.mysql.pool1.password = public\nbackend.mysql.pool1.database = mqtt\nbackend.mysql.hook.client.connected.1    = {\"action\": {\"function\": \"on_client_connected\"}, \"pool\": \"pool1\"}\nbackend.mysql.hook.client.connected.2     = {\"action\": {\"function\": \"on_subscribe_lookup\"}, \"pool\": \"pool1\"}\nbackend.mysql.hook.client.disconnected.1 = {\"action\": {\"function\": \"on_client_disconnected\"}, \"pool\": \"pool1\"}\nbackend.mysql.hook.session.subscribed.1  = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_fetch\"}, \"offline_opts\": {\"max_returned_count\": 500, \"time_range\": \"2h\"}, \"pool\": \"pool1\"}\nbackend.mysql.hook.session.subscribed.2  = {\"topic\": \"#\", \"action\": {\"function\": \"on_retain_lookup\"}, \"pool\": \"pool1\"}\nbackend.mysql.hook.session.unsubscribed.1= {\"topic\": \"#\", \"action\": {\"sql\": [\"delete from mqtt_acked where clientid = ${clientid} and topic = ${topic}\"]}, \"pool\": \"pool1\"}\nbackend.mysql.hook.message.publish.1     = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_publish\"}, \"pool\": \"pool1\"}\nbackend.mysql.hook.message.publish.2     = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_retain\"}, \"pool\": \"pool1\"}\nbackend.mysql.hook.message.publish.3     = {\"topic\": \"#\", \"action\": {\"function\": \"on_retain_delete\"}, \"pool\": \"pool1\"}\nbackend.mysql.hook.message.acked.1       = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_acked\"}, \"pool\": \"pool1\"}\n",
2✔
522
                        "emqx_backend_opentsdb.conf":  "backend.opentsdb.pool1.server = 127.0.0.1:4242\nbackend.opentsdb.pool1.pool_size = 8\nbackend.opentsdb.pool1.summary = true\nbackend.opentsdb.pool1.details = false\nbackend.opentsdb.pool1.sync = false\nbackend.opentsdb.pool1.sync_timeout = 0\nbackend.opentsdb.pool1.max_batch_size = 20\nbackend.opentsdb.hook.message.publish.1 = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_publish\"}, \"pool\": \"pool1\"}\n",
2✔
523
                        "emqx_backend_pgsql.conf":     "backend.pgsql.pool1.server = 127.0.0.1:5432\nbackend.pgsql.pool1.pool_size = 8\nbackend.pgsql.pool1.username = root\nbackend.pgsql.pool1.password = public\nbackend.pgsql.pool1.database = mqtt\nbackend.pgsql.pool1.ssl = false\nbackend.pgsql.hook.client.connected.1    = {\"action\": {\"function\": \"on_client_connected\"}, \"pool\": \"pool1\"}\nbackend.pgsql.hook.client.connected.2     = {\"action\": {\"function\": \"on_subscribe_lookup\"}, \"pool\": \"pool1\"}\nbackend.pgsql.hook.client.disconnected.1 = {\"action\": {\"function\": \"on_client_disconnected\"}, \"pool\": \"pool1\"}\nbackend.pgsql.hook.session.subscribed.1  = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_fetch\"}, \"offline_opts\": {\"max_returned_count\": 500, \"time_range\": \"2h\"}, \"pool\": \"pool1\"}\nbackend.pgsql.hook.session.subscribed.2  = {\"topic\": \"#\", \"action\": {\"function\": \"on_retain_lookup\"}, \"pool\": \"pool1\"}\nbackend.pgsql.hook.session.unsubscribed.1= {\"topic\": \"#\", \"action\": {\"sql\": \"delete from mqtt_acked where clientid = ${clientid} and topic = ${topic}\"}, \"pool\": \"pool1\"}\nbackend.pgsql.hook.message.publish.1     = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_publish\"}, \"pool\": \"pool1\"}\nbackend.pgsql.hook.message.publish.2     = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_retain\"}, \"pool\": \"pool1\"}\nbackend.pgsql.hook.message.publish.3     = {\"topic\": \"#\", \"action\": {\"function\": \"on_retain_delete\"}, \"pool\": \"pool1\"}\nbackend.pgsql.hook.message.acked.1       = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_acked\"}, \"pool\": \"pool1\"}\n",
2✔
524
                        "emqx_backend_redis.conf":     "backend.redis.pool1.type = single\nbackend.redis.pool1.server = 127.0.0.1:6379\nbackend.redis.pool1.pool_size = 8\nbackend.redis.pool1.database = 0\nbackend.redis.pool1.channel = mqtt_channel\nbackend.redis.hook.client.connected.1    = {\"action\": {\"function\": \"on_client_connected\"}, \"pool\": \"pool1\"}\nbackend.redis.hook.client.connected.2    = {\"action\": {\"function\": \"on_subscribe_lookup\"}, \"pool\": \"pool1\"}\nbackend.redis.hook.client.disconnected.1 = {\"action\": {\"function\": \"on_client_disconnected\"}, \"pool\": \"pool1\"}\nbackend.redis.hook.session.subscribed.1  = {\"topic\": \"queue/#\", \"action\": {\"function\": \"on_message_fetch_for_queue\"}, \"pool\": \"pool1\"}\nbackend.redis.hook.session.subscribed.2  = {\"topic\": \"pubsub/#\", \"action\": {\"function\": \"on_message_fetch_for_pubsub\"}, \"pool\": \"pool1\"}\nbackend.redis.hook.session.subscribed.3  = {\"action\": {\"function\": \"on_retain_lookup\"}, \"pool\": \"pool1\"}\nbackend.redis.hook.session.unsubscribed.1= {\"topic\": \"#\", \"action\": {\"commands\": [\"DEL mqtt:acked:${clientid}:${topic}\"]}, \"pool\": \"pool1\"}\nbackend.redis.hook.message.publish.1     = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_publish\"}, \"expired_time\" : 3600, \"pool\": \"pool1\"}\nbackend.redis.hook.message.publish.2     = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_retain\"}, \"expired_time\" : 3600, \"pool\": \"pool1\"}\nbackend.redis.hook.message.publish.3     = {\"topic\": \"#\", \"action\": {\"function\": \"on_retain_delete\"}, \"pool\": \"pool1\"}\nbackend.redis.hook.message.acked.1       = {\"topic\": \"queue/#\", \"action\": {\"function\": \"on_message_acked_for_queue\"}, \"pool\": \"pool1\"}\nbackend.redis.hook.message.acked.2       = {\"topic\": \"pubsub/#\", \"action\": {\"function\": \"on_message_acked_for_pubsub\"}, \"pool\": \"pool1\"}\n",
2✔
525
                        "emqx_backend_timescale.conf": "backend.timescale.pool1.server = 127.0.0.1:5432\nbackend.timescale.pool1.pool_size = 8\nbackend.timescale.pool1.username = root\nbackend.timescale.pool1.password = public\nbackend.timescale.pool1.database = mqtt\nbackend.timescale.pool1.ssl = false\nbackend.timescale.hook.message.publish.1 = {\"topic\": \"#\", \"action\": {\"function\": \"on_message_publish\"}, \"pool\": \"pool1\"}\n",
2✔
526
                        "emqx_bridge_kafka.conf":      "bridge.kafka.servers = 127.0.0.1:9092\nbridge.kafka.query_api_versions = true\nbridge.kafka.connection_strategy = per_partition\nbridge.kafka.min_metadata_refresh_interval = 5S\nbridge.kafka.produce = sync\nbridge.kafka.produce.sync_timeout = 3s\nbridge.kafka.sock.sndbuf = 1MB\nbridge.kafka.hook.client.connected.1     = {\"topic\":\"ClientConnected\"}\nbridge.kafka.hook.client.disconnected.1  = {\"topic\":\"ClientDisconnected\"}\nbridge.kafka.hook.session.subscribed.1   = {\"filter\":\"#\", \"topic\":\"SessionSubscribed\"}\nbridge.kafka.hook.session.unsubscribed.1 = {\"filter\":\"#\", \"topic\":\"SessionUnsubscribed\"}\nbridge.kafka.hook.message.publish.1      = {\"filter\":\"#\", \"topic\":\"MessagePublish\"}\nbridge.kafka.hook.message.delivered.1    = {\"filter\":\"#\", \"topic\":\"MessageDelivered\"}\nbridge.kafka.hook.message.acked.1        = {\"filter\":\"#\", \"topic\":\"MessageAcked\"}\n",
2✔
527
                        "emqx_bridge_mqtt.conf":       "bridge.mqtt.aws.address = 127.0.0.1:1883\nbridge.mqtt.aws.proto_ver = mqttv4\nbridge.mqtt.aws.start_type = manual\nbridge.mqtt.aws.clientid = bridge_aws\nbridge.mqtt.aws.clean_start = true\nbridge.mqtt.aws.username = user\nbridge.mqtt.aws.password = passwd\nbridge.mqtt.aws.forwards = topic1/#,topic2/#\nbridge.mqtt.aws.forward_mountpoint = bridge/aws/${node}/\nbridge.mqtt.aws.ssl = off\nbridge.mqtt.aws.cacertfile = etc/certs/cacert.pem\nbridge.mqtt.aws.certfile = etc/certs/client-cert.pem\nbridge.mqtt.aws.keyfile = etc/certs/client-key.pem\nbridge.mqtt.aws.ciphers = TLS_AES_256_GCM_SHA384,TLS_AES_128_GCM_SHA256,TLS_CHACHA20_POLY1305_SHA256,TLS_AES_128_CCM_SHA256,TLS_AES_128_CCM_8_SHA256,ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA\nbridge.mqtt.aws.keepalive = 60s\nbridge.mqtt.aws.tls_versions = tlsv1.3,tlsv1.2,tlsv1.1,tlsv1\nbridge.mqtt.aws.reconnect_interval = 30s\nbridge.mqtt.aws.retry_interval = 20s\nbridge.mqtt.aws.batch_size = 32\nbridge.mqtt.aws.max_inflight_size = 32\nbridge.mqtt.aws.queue.replayq_dir = data/replayq/emqx_aws_bridge/\nbridge.mqtt.aws.queue.replayq_seg_bytes = 10MB\nbridge.mqtt.aws.queue.max_total_size = 5GB\n",
2✔
528
                        "emqx_bridge_pulsar.conf":     "bridge.pulsar.servers = 127.0.0.1:6650\nbridge.pulsar.produce = sync\nbridge.pulsar.sock.sndbuf = 1MB\nbridge.pulsar.hook.client.connected.1     = {\"topic\":\"ClientConnected\"}\nbridge.pulsar.hook.client.disconnected.1  = {\"topic\":\"ClientDisconnected\"}\nbridge.pulsar.hook.session.subscribed.1   = {\"filter\":\"#\", \"topic\":\"SessionSubscribed\"}\nbridge.pulsar.hook.session.unsubscribed.1 = {\"filter\":\"#\", \"topic\":\"SessionUnsubscribed\"}\nbridge.pulsar.hook.message.publish.1      = {\"filter\":\"#\", \"topic\":\"MessagePublish\"}\nbridge.pulsar.hook.message.delivered.1      = {\"filter\":\"#\", \"topic\":\"MessageDelivered\"}\nbridge.pulsar.hook.message.acked.1        = {\"filter\":\"#\", \"topic\":\"MessageAcked\"}\n",
2✔
529
                        "emqx_bridge_rabbit.conf":     "bridge.rabbit.1.server = 127.0.0.1:5672\nbridge.rabbit.1.pool_size = 8\nbridge.rabbit.1.username = guest\nbridge.rabbit.1.password = guest\nbridge.rabbit.1.timeout = 5s\nbridge.rabbit.1.virtual_host = /\nbridge.rabbit.1.heartbeat = 30s\nbridge.rabbit.hook.session.subscribed.1 = {\"action\": \"on_session_subscribed\", \"rabbit\": 1, \"exchange\": \"direct:emqx.subscription\"}\nbridge.rabbit.hook.session.unsubscribed.1 = {\"action\": \"on_session_unsubscribed\", \"rabbit\": 1, \"exchange\": \"direct:emqx.subscription\"}\nbridge.rabbit.hook.message.publish.1 = {\"topic\": \"$SYS/#\", \"action\": \"on_message_publish\", \"rabbit\": 1, \"exchange\": \"topic:emqx.$sys\"}\nbridge.rabbit.hook.message.publish.2 = {\"topic\": \"#\", \"action\": \"on_message_publish\", \"rabbit\": 1, \"exchange\": \"topic:emqx.pub\"}\nbridge.rabbit.hook.message.acked.1 = {\"topic\": \"#\", \"action\": \"on_message_acked\", \"rabbit\": 1, \"exchange\": \"topic:emqx.acked\"}\n",
2✔
530
                        "emqx_bridge_rocket.conf":     "bridge.rocket.servers = 127.0.0.1:9876\nbridge.rocket.refresh_topic_route_interval = 5S\nbridge.rocket.produce = sync\nbridge.rocket.sock.sndbuf = 1MB\nbridge.rocket.hook.client.connected.1     = {\"topic\":\"ClientConnected\"}\nbridge.rocket.hook.client.disconnected.1  = {\"topic\":\"ClientDisconnected\"}\nbridge.rocket.hook.session.subscribed.1   = {\"filter\":\"#\", \"topic\":\"SessionSubscribed\"}\nbridge.rocket.hook.session.unsubscribed.1 = {\"filter\":\"#\", \"topic\":\"SessionUnsubscribed\"}\nbridge.rocket.hook.message.publish.1      = {\"filter\":\"#\", \"topic\":\"MessagePublish\"}\nbridge.rocket.hook.message.delivered.1    = {\"filter\":\"#\", \"topic\":\"MessageDeliver\"}\nbridge.rocket.hook.message.acked.1        = {\"filter\":\"#\", \"topic\":\"MessageAcked\"}\n",
2✔
531
                        "emqx_coap.conf":              "coap.bind.udp.1 = 0.0.0.0:5683\ncoap.enable_stats = off\ncoap.bind.dtls.1 = 0.0.0.0:5684\ncoap.dtls.keyfile = etc/certs/key.pem\ncoap.dtls.certfile = etc/certs/cert.pem\ncoap.dtls.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA\n",
2✔
532
                        "emqx_conf.conf":              "conf.etc.dir.emqx = etc\nconf.etc.dir.emqx.zones = etc\nconf.etc.dir.emqx.listeners = etc\nconf.etc.dir.emqx.sys_mon = etc\n",
2✔
533
                        "emqx_exhook.conf":            "exhook.server.default.url = http://127.0.0.1:9000\n",
2✔
534
                        "emqx_exproto.conf":           "exproto.server.http.port = 9100\nexproto.server.https.port = 9101\nexproto.server.https.cacertfile = etc/certs/cacert.pem\nexproto.server.https.certfile = etc/certs/cert.pem\nexproto.server.https.keyfile = etc/certs/key.pem\nexproto.listener.protoname = tcp://0.0.0.0:7993\nexproto.listener.protoname.connection_handler_url = http://127.0.0.1:9001\nexproto.listener.protoname.acceptors = 8\nexproto.listener.protoname.max_connections = 1024000\nexproto.listener.protoname.max_conn_rate = 1000\nexproto.listener.protoname.active_n = 100\nexproto.listener.protoname.idle_timeout = 30s\nexproto.listener.protoname.access.1 = allow all\nexproto.listener.protoname.backlog = 1024\nexproto.listener.protoname.send_timeout = 15s\nexproto.listener.protoname.send_timeout_close = on\nexproto.listener.protoname.nodelay = true\nexproto.listener.protoname.reuseaddr = true\n",
2✔
535
                        "emqx_gbt32960.conf":          "gbt32960.frame.max_length = 8192\ngbt32960.proto.retx_interval = 8s\ngbt32960.proto.retx_max_times = 3\ngbt32960.proto.message_queue_len = 10\ngbt32960.listener.tcp = 0.0.0.0:7325\ngbt32960.listener.tcp.acceptors = 8\ngbt32960.listener.tcp.max_connections = 1024000\ngbt32960.listener.tcp.max_conn_rate = 1000\ngbt32960.listener.tcp.idle_timeout = 60s\ngbt32960.listener.tcp.active_n = 100\ngbt32960.listener.tcp.zone = external\ngbt32960.listener.tcp.access.1 = allow all\ngbt32960.listener.tcp.backlog = 1024\ngbt32960.listener.tcp.send_timeout = 15s\ngbt32960.listener.tcp.send_timeout_close = on\ngbt32960.listener.tcp.nodelay = true\ngbt32960.listener.tcp.reuseaddr = true\ngbt32960.listener.ssl = 7326\ngbt32960.listener.ssl.acceptors = 16\ngbt32960.listener.ssl.max_connections = 102400\ngbt32960.listener.ssl.max_conn_rate = 500\ngbt32960.listener.ssl.idle_timeout = 60s\ngbt32960.listener.ssl.active_n = 100\ngbt32960.listener.ssl.zone = external\ngbt32960.listener.ssl.access.1 = allow all\ngbt32960.listener.ssl.handshake_timeout = 15s\ngbt32960.listener.ssl.keyfile = etc/certs/key.pem\ngbt32960.listener.ssl.certfile = etc/certs/cert.pem\ngbt32960.listener.ssl.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA\ngbt32960.listener.ssl.reuseaddr = true\n",
2✔
536
                        "emqx_jt808.conf":             "jt808.proto.allow_anonymous = true\njt808.proto.dn_topic = jt808/%c/dn\njt808.proto.up_topic = jt808/%c/up\njt808.conn.idle_timeout = 30s\njt808.conn.enable_stats = on\njt808.frame.max_length = 8192\njt808.listener.tcp = 6207\njt808.listener.tcp.acceptors = 4\njt808.listener.tcp.max_clients = 512\n",
2✔
537
                        "emqx_lua_hook.conf":          "",
2✔
538
                        "emqx_lwm2m.conf":             "lwm2m.lifetime_min = 1s\nlwm2m.lifetime_max = 86400s\nlwm2m.mountpoint = lwm2m/%e/\nlwm2m.topics.command = dn/#\nlwm2m.topics.response = up/resp\nlwm2m.topics.notify = up/notify\nlwm2m.topics.register = up/resp\nlwm2m.topics.update = up/resp\nlwm2m.xml_dir =  etc/lwm2m_xml\nlwm2m.bind.udp.1 = 0.0.0.0:5683\nlwm2m.opts.buffer = 1024KB\nlwm2m.opts.recbuf = 1024KB\nlwm2m.opts.sndbuf = 1024KB\nlwm2m.opts.read_packets = 20\nlwm2m.bind.dtls.1 = 0.0.0.0:5684\nlwm2m.dtls.keyfile = etc/certs/key.pem\nlwm2m.dtls.certfile = etc/certs/cert.pem\nlwm2m.dtls.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA\n",
2✔
539
                        "emqx_prometheus.conf":        "prometheus.push.gateway.server = http://127.0.0.1:9091\nprometheus.interval = 15000\n",
2✔
540
                        "emqx_psk_file.conf":          "psk.file.path = etc/psk.txt\npsk.file.delimiter = :\n",
2✔
541
                        "emqx_recon.conf":             "",
2✔
542
                        "emqx_sasl.conf":              "",
2✔
543
                        "emqx_schema_registry.conf":   "",
2✔
544
                        "emqx_sn.conf":                "mqtt.sn.port = 1884\nmqtt.sn.advertise_duration = 15m\nmqtt.sn.gateway_id = 1\nmqtt.sn.enable_stats = off\nmqtt.sn.enable_qos3 = off\nmqtt.sn.idle_timeout = 30s\nmqtt.sn.predefined.topic.0 = reserved\nmqtt.sn.predefined.topic.1 = /predefined/topic/name/hello\nmqtt.sn.predefined.topic.2 = /predefined/topic/name/nice\nmqtt.sn.username = mqtt_sn_user\nmqtt.sn.password = abc\n",
2✔
545
                        "emqx_stomp.conf":             "stomp.listener = 61613\nstomp.listener.acceptors = 4\nstomp.listener.max_connections = 512\nstomp.default_user.login = guest\nstomp.default_user.passcode = guest\nstomp.allow_anonymous = true\nstomp.frame.max_headers = 10\nstomp.frame.max_header_length = 1024\nstomp.frame.max_body_length = 8192\n",
2✔
546
                        "emqx_tcp.conf":               "tcp.proto.idle_timeout = 15s\ntcp.proto.up_topic = tcp/%c/up\ntcp.proto.dn_topic = tcp/%c/dn\ntcp.proto.max_packet_size = 65535\ntcp.proto.enable_stats = on\ntcp.proto.force_gc_policy = 1000|1MB\ntcp.listener.external = 0.0.0.0:8090\ntcp.listener.external.acceptors = 8\ntcp.listener.external.max_connections = 1024000\ntcp.listener.external.max_conn_rate = 1000\ntcp.listener.external.active_n = 100\ntcp.listener.external.access.1 = allow all\ntcp.listener.external.backlog = 1024\ntcp.listener.external.send_timeout = 15s\ntcp.listener.external.send_timeout_close = on\ntcp.listener.external.nodelay = true\ntcp.listener.external.reuseaddr = true\ntcp.listener.ssl.external = 0.0.0.0:8091\ntcp.listener.ssl.external.acceptors = 8\ntcp.listener.ssl.external.max_connections = 1024000\ntcp.listener.ssl.external.max_conn_rate = 1000\ntcp.listener.ssl.external.active_n = 100\ntcp.listener.ssl.external.access.1 = allow all\ntcp.listener.ssl.external.handshake_timeout = 15s\ntcp.listener.ssl.external.keyfile = etc/certs/key.pem\ntcp.listener.ssl.external.certfile = etc/certs/cert.pem\ntcp.listener.ssl.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA\ntcp.listener.ssl.external.backlog = 1024\ntcp.listener.ssl.external.send_timeout = 15s\ntcp.listener.ssl.external.send_timeout_close = on\ntcp.listener.ssl.external.nodelay = true\ntcp.listener.ssl.external.reuseaddr = true\n",
2✔
547
                        "emqx_web_hook.conf":          "web.hook.url = http://127.0.0.1:80\nweb.hook.headers.content-type = application/json\nweb.hook.body.encoding_of_payload_field = plain\nweb.hook.pool_size = 32\nweb.hook.enable_pipelining = true\n",
2✔
548
                },
2✔
549
        }
2✔
550

2✔
551
        return cm
2✔
552
}
2✔
553

554
func generateHeadlessSvc(instance appsv1beta3.Emqx) *corev1.Service {
2✔
555
        names := appsv1beta3.Names{Object: instance}
2✔
556

2✔
557
        headlessSvc := &corev1.Service{
2✔
558
                TypeMeta: metav1.TypeMeta{
2✔
559
                        APIVersion: "v1",
2✔
560
                        Kind:       "Service",
2✔
561
                },
2✔
562
                ObjectMeta: metav1.ObjectMeta{
2✔
563
                        Labels:    instance.GetLabels(),
2✔
564
                        Name:      names.HeadlessSvc(),
2✔
565
                        Namespace: instance.GetNamespace(),
2✔
566
                },
2✔
567
                Spec: corev1.ServiceSpec{
2✔
568
                        Selector:                 instance.GetLabels(),
2✔
569
                        ClusterIP:                corev1.ClusterIPNone,
2✔
570
                        PublishNotReadyAddresses: true,
2✔
571
                },
2✔
572
        }
2✔
573

2✔
574
        compile := regexp.MustCompile(".*management.*")
2✔
575
        for _, port := range instance.GetServiceTemplate().Spec.Ports {
4✔
576
                if compile.MatchString(port.Name) {
4✔
577
                        // Headless services must not set nodePort
2✔
578
                        headlessSvc.Spec.Ports = append(headlessSvc.Spec.Ports, corev1.ServicePort{
2✔
579
                                Name:        port.Name,
2✔
580
                                Protocol:    port.Protocol,
2✔
581
                                AppProtocol: port.AppProtocol,
2✔
582
                                TargetPort:  port.TargetPort,
2✔
583
                                Port:        port.Port,
2✔
584
                        })
2✔
585
                }
2✔
586
        }
587
        return headlessSvc
2✔
588
}
589

590
func generateSvc(instance appsv1beta3.Emqx) *corev1.Service {
2✔
591
        return &corev1.Service{
2✔
592
                TypeMeta: metav1.TypeMeta{
2✔
593
                        APIVersion: "v1",
2✔
594
                        Kind:       "Service",
2✔
595
                },
2✔
596
                ObjectMeta: instance.GetServiceTemplate().ObjectMeta,
2✔
597
                Spec:       instance.GetServiceTemplate().Spec,
2✔
598
        }
2✔
599
}
2✔
600

601
func generateAcl(instance appsv1beta3.Emqx) *corev1.ConfigMap {
2✔
602
        if len(instance.GetACL()) == 0 {
4✔
603
                return nil
2✔
604
        }
2✔
605
        names := appsv1beta3.Names{Object: instance}
2✔
606

2✔
607
        var aclString string
2✔
608
        for _, rule := range instance.GetACL() {
4✔
609
                aclString += fmt.Sprintf("%s\n", rule)
2✔
610
        }
2✔
611
        cm := &corev1.ConfigMap{
2✔
612
                TypeMeta: metav1.TypeMeta{
2✔
613
                        APIVersion: "v1",
2✔
614
                        Kind:       "ConfigMap",
2✔
615
                },
2✔
616
                ObjectMeta: metav1.ObjectMeta{
2✔
617
                        Labels:    instance.GetLabels(),
2✔
618
                        Namespace: instance.GetNamespace(),
2✔
619
                        Name:      names.ACL(),
2✔
620
                },
2✔
621
                Data: map[string]string{"acl.conf": aclString},
2✔
622
        }
2✔
623
        return cm
2✔
624
}
625

626
func generateLoadedModules(instance appsv1beta3.Emqx) *corev1.ConfigMap {
2✔
627
        names := appsv1beta3.Names{Object: instance}
2✔
628
        var loadedModulesString string
2✔
629
        switch obj := instance.(type) {
2✔
630
        case *appsv1beta3.EmqxBroker:
2✔
631
                modules := &appsv1beta3.EmqxBrokerModuleList{
2✔
632
                        Items: obj.Spec.EmqxTemplate.Modules,
2✔
633
                }
2✔
634
                loadedModulesString = modules.String()
2✔
635
                if loadedModulesString == "" {
3✔
636
                        return nil
1✔
637
                }
1✔
638
        case *appsv1beta3.EmqxEnterprise:
2✔
639
                modules := &appsv1beta3.EmqxEnterpriseModuleList{
2✔
640
                        Items: obj.Spec.EmqxTemplate.Modules,
2✔
641
                }
2✔
642
                // for enterprise, if modules is empty, don't create configmap
2✔
643
                loadedModulesString = modules.String()
2✔
644
                if loadedModulesString == "" {
3✔
645
                        return nil
1✔
646
                }
1✔
647
        }
648

649
        cm := &corev1.ConfigMap{
2✔
650
                TypeMeta: metav1.TypeMeta{
2✔
651
                        APIVersion: "v1",
2✔
652
                        Kind:       "ConfigMap",
2✔
653
                },
2✔
654
                ObjectMeta: metav1.ObjectMeta{
2✔
655
                        Labels:    instance.GetLabels(),
2✔
656
                        Namespace: instance.GetNamespace(),
2✔
657
                        Name:      names.LoadedModules(),
2✔
658
                },
2✔
659
                Data: map[string]string{"loaded_modules": loadedModulesString},
2✔
660
        }
2✔
661

2✔
662
        return cm
2✔
663
}
664

665
func generateLicense(emqxEnterprise *appsv1beta3.EmqxEnterprise) *corev1.Secret {
2✔
666
        names := appsv1beta3.Names{Object: emqxEnterprise}
2✔
667
        license := emqxEnterprise.GetLicense()
2✔
668
        if len(license.Data) == 0 && len(license.StringData) == 0 {
3✔
669
                return nil
1✔
670
        }
1✔
671

672
        secret := &corev1.Secret{
2✔
673
                TypeMeta: metav1.TypeMeta{
2✔
674
                        APIVersion: "v1",
2✔
675
                        Kind:       "Secret",
2✔
676
                },
2✔
677
                ObjectMeta: metav1.ObjectMeta{
2✔
678
                        Labels:    emqxEnterprise.GetLabels(),
2✔
679
                        Namespace: emqxEnterprise.GetNamespace(),
2✔
680
                        Name:      names.License(),
2✔
681
                },
2✔
682
                Type: corev1.SecretTypeOpaque,
2✔
683
                Data: map[string][]byte{"emqx.lic": emqxEnterprise.GetLicense().Data},
2✔
684
        }
2✔
685
        if emqxEnterprise.GetLicense().StringData != "" {
2✔
686
                secret.StringData = map[string]string{"emqx.lic": emqxEnterprise.GetLicense().StringData}
×
687
        }
×
688
        return secret
2✔
689
}
690

691
func generateDataVolume(instance appsv1beta3.Emqx, sts *appsv1.StatefulSet) *appsv1.StatefulSet {
2✔
692
        names := appsv1beta3.Names{Object: instance}
2✔
693
        dataName := names.Data()
2✔
694

2✔
695
        if reflect.ValueOf(instance.GetPersistent()).IsZero() {
4✔
696
                sts.Spec.Template.Spec.Volumes = append(
2✔
697
                        sts.Spec.Template.Spec.Volumes,
2✔
698
                        corev1.Volume{
2✔
699
                                Name: dataName,
2✔
700
                                VolumeSource: corev1.VolumeSource{
2✔
701
                                        EmptyDir: &corev1.EmptyDirVolumeSource{},
2✔
702
                                },
2✔
703
                        },
2✔
704
                )
2✔
705
        } else {
4✔
706
                sts.Spec.VolumeClaimTemplates = append(
2✔
707
                        sts.Spec.VolumeClaimTemplates,
2✔
708
                        generateVolumeClaimTemplate(instance, dataName),
2✔
709
                )
2✔
710
        }
2✔
711

712
        emqxContainerIndex := findContinerIndex(sts, handler.EmqxContainerName)
2✔
713
        sts.Spec.Template.Spec.Containers[emqxContainerIndex].VolumeMounts = append(
2✔
714
                sts.Spec.Template.Spec.Containers[emqxContainerIndex].VolumeMounts,
2✔
715
                corev1.VolumeMount{
2✔
716
                        Name:      dataName,
2✔
717
                        MountPath: "/opt/emqx/data",
2✔
718
                },
2✔
719
        )
2✔
720

2✔
721
        ReloaderContainerIndex := findContinerIndex(sts, ReloaderContainerName)
2✔
722
        sts.Spec.Template.Spec.Containers[ReloaderContainerIndex].VolumeMounts = sts.Spec.Template.Spec.Containers[emqxContainerIndex].VolumeMounts
2✔
723
        return sts
2✔
724
}
725

726
func generateVolumeClaimTemplate(instance appsv1beta3.Emqx, Name string) corev1.PersistentVolumeClaim {
2✔
727
        template := instance.GetPersistent()
2✔
728
        pvc := corev1.PersistentVolumeClaim{
2✔
729
                ObjectMeta: metav1.ObjectMeta{
2✔
730
                        Name:      Name,
2✔
731
                        Namespace: instance.GetNamespace(),
2✔
732
                },
2✔
733
                Spec: template,
2✔
734
        }
2✔
735
        if pvc.Spec.AccessModes == nil {
3✔
736
                pvc.Spec.AccessModes = []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce}
1✔
737
        }
1✔
738
        return pvc
2✔
739
}
740

741
func generateReloaderContainer(instance appsv1beta3.Emqx) *corev1.Container {
2✔
742
        return &corev1.Container{
2✔
743
                Name:            ReloaderContainerName,
2✔
744
                Image:           instance.GetReloaderImage(),
2✔
745
                ImagePullPolicy: instance.GetImagePullPolicy(),
2✔
746
                Args: []string{
2✔
747
                        "-u", instance.GetUsername(),
2✔
748
                        "-p", instance.GetPassword(),
2✔
749
                        "-P", appsv1beta3.DefaultManagementPort,
2✔
750
                },
2✔
751
        }
2✔
752
}
2✔
753

754
func generateEmqxContainer(instance appsv1beta3.Emqx) *corev1.Container {
2✔
755
        return &corev1.Container{
2✔
756
                Name:            handler.EmqxContainerName,
2✔
757
                Image:           instance.GetImage(),
2✔
758
                ImagePullPolicy: instance.GetImagePullPolicy(),
2✔
759
                Resources:       instance.GetResource(),
2✔
760
                Env: mergeEnvAndConfig(instance, []corev1.EnvVar{
2✔
761
                        {
2✔
762
                                Name:  "EMQX_MANAGEMENT__DEFAULT_APPLICATION__ID",
2✔
763
                                Value: instance.GetUsername(),
2✔
764
                        },
2✔
765
                        {
2✔
766
                                Name:  "EMQX_DASHBOARD__DEFAULT_USER__LOGIN",
2✔
767
                                Value: instance.GetUsername(),
2✔
768
                        },
2✔
769
                        {
2✔
770
                                Name:  "EMQX_MANAGEMENT__DEFAULT_APPLICATION__SECRET",
2✔
771
                                Value: instance.GetPassword(),
2✔
772
                        },
2✔
773
                        {
2✔
774
                                Name:  "EMQX_DASHBOARD__DEFAULT_USER__PASSWORD",
2✔
775
                                Value: instance.GetPassword(),
2✔
776
                        },
2✔
777
                }...),
2✔
778
                Args:           instance.GetArgs(),
2✔
779
                ReadinessProbe: instance.GetReadinessProbe(),
2✔
780
                LivenessProbe:  instance.GetLivenessProbe(),
2✔
781
                StartupProbe:   instance.GetStartupProbe(),
2✔
782
                VolumeMounts:   instance.GetExtraVolumeMounts(),
2✔
783
        }
2✔
784
}
2✔
785

786
func updateEnvAndVolumeForSts(sts *appsv1.StatefulSet, envVar corev1.EnvVar, volumeMount corev1.VolumeMount, volume corev1.Volume) *appsv1.StatefulSet {
2✔
787
        emqxContainerIndex := findContinerIndex(sts, handler.EmqxContainerName)
2✔
788
        reloaderContainerIndex := findContinerIndex(sts, ReloaderContainerName)
2✔
789

2✔
790
        isNotExistVolume := func(volume corev1.Volume) bool {
4✔
791
                for _, v := range sts.Spec.Template.Spec.Volumes {
3✔
792
                        if v.Name == volume.Name {
1✔
793
                                return false
×
794
                        }
×
795
                }
796
                return true
2✔
797
        }
798

799
        isNotExistVolumeVolumeMount := func(volumeMount corev1.VolumeMount) bool {
4✔
800
                for _, v := range sts.Spec.Template.Spec.Containers[emqxContainerIndex].VolumeMounts {
3✔
801
                        if v.Name == volumeMount.Name {
1✔
802
                                return false
×
803
                        }
×
804
                }
805
                return true
2✔
806
        }
807

808
        isNotExistEnv := func(envVar corev1.EnvVar) bool {
4✔
809
                for _, v := range sts.Spec.Template.Spec.Containers[emqxContainerIndex].Env {
3✔
810
                        if v.Name == envVar.Name {
1✔
811
                                return false
×
812
                        }
×
813
                }
814
                return true
2✔
815
        }
816

817
        if isNotExistVolume(volume) {
4✔
818
                sts.Spec.Template.Spec.Volumes = append(
2✔
819
                        sts.Spec.Template.Spec.Volumes,
2✔
820
                        volume,
2✔
821
                )
2✔
822
        }
2✔
823

824
        if isNotExistVolumeVolumeMount(volumeMount) {
4✔
825
                sts.Spec.Template.Spec.Containers[emqxContainerIndex].VolumeMounts = append(
2✔
826
                        sts.Spec.Template.Spec.Containers[emqxContainerIndex].VolumeMounts,
2✔
827
                        volumeMount,
2✔
828
                )
2✔
829
        }
2✔
830

831
        if isNotExistEnv(envVar) {
4✔
832
                sts.Spec.Template.Spec.Containers[emqxContainerIndex].Env = append(
2✔
833
                        sts.Spec.Template.Spec.Containers[emqxContainerIndex].Env,
2✔
834
                        envVar,
2✔
835
                )
2✔
836
        }
2✔
837

838
        sts.Spec.Template.Spec.Containers[reloaderContainerIndex].VolumeMounts = sts.Spec.Template.Spec.Containers[emqxContainerIndex].VolumeMounts
2✔
839
        sts.Spec.Template.Spec.Containers[reloaderContainerIndex].Env = sts.Spec.Template.Spec.Containers[emqxContainerIndex].Env
2✔
840

2✔
841
        return sts
2✔
842
}
843

844
func mergeEnvAndConfig(instance appsv1beta3.Emqx, extraEnvs ...corev1.EnvVar) []corev1.EnvVar {
2✔
845
        lookup := func(name string, envs []corev1.EnvVar) bool {
4✔
846
                for _, env := range envs {
4✔
847
                        if env.Name == name {
3✔
848
                                return true
1✔
849
                        }
1✔
850
                }
851
                return false
2✔
852
        }
853

854
        envs := append(instance.GetEnv(), extraEnvs...)
2✔
855
        emqxConfig := instance.GetEmqxConfig()
2✔
856

2✔
857
        for k, v := range emqxConfig {
4✔
858
                key := fmt.Sprintf("EMQX_%s", strings.ToUpper(strings.ReplaceAll(k, ".", "__")))
2✔
859
                if !lookup(key, envs) {
4✔
860
                        envs = append(envs, corev1.EnvVar{Name: key, Value: v})
2✔
861
                }
2✔
862
        }
863

864
        sort.Slice(envs, func(i, j int) bool {
4✔
865
                return envs[i].Name < envs[j].Name
2✔
866
        })
2✔
867
        return envs
2✔
868
}
869

870
func findContinerIndex(sts *appsv1.StatefulSet, containerName string) int {
2✔
871
        for k, c := range sts.Spec.Template.Spec.Containers {
4✔
872
                if c.Name == containerName {
4✔
873
                        return k
2✔
874
                }
2✔
875
        }
876
        return -1
×
877
}
878

879
func generateAnnotationByContainers(containers []corev1.Container) string {
2✔
880
        containerNames := []string{}
2✔
881
        for _, c := range containers {
4✔
882
                containerNames = append(containerNames, c.Name)
2✔
883
        }
2✔
884
        return strings.Join(containerNames, ",")
2✔
885
}
886

887
func updateEmqxStatus(instance appsv1beta3.Emqx, emqxNodes []appsv1beta3.EmqxNode) appsv1beta3.Emqx {
2✔
888
        status := instance.GetStatus()
2✔
889
        status.Replicas = *instance.GetReplicas()
2✔
890
        if emqxNodes != nil {
4✔
891
                readyReplicas := int32(0)
2✔
892
                for _, node := range emqxNodes {
4✔
893
                        if node.NodeStatus == "Running" {
4✔
894
                                readyReplicas++
2✔
895
                        }
2✔
896
                }
897
                status.ReadyReplicas = readyReplicas
2✔
898
                status.EmqxNodes = emqxNodes
2✔
899
        }
900

901
        var cond *appsv1beta3.Condition
2✔
902
        if status.Replicas == 0 {
2✔
903
                cond = appsv1beta3.NewCondition(
×
904
                        appsv1beta3.ConditionRunning,
×
NEW
905
                        corev1.ConditionFalse,
×
NEW
906
                        "ClusterNotReady",
×
NEW
907
                        "Replicas is 0",
×
908
                )
×
909
        } else if status.Replicas != status.ReadyReplicas {
4✔
910
                cond = appsv1beta3.NewCondition(
2✔
911
                        appsv1beta3.ConditionRunning,
2✔
912
                        corev1.ConditionFalse,
2✔
913
                        "ClusterNotReady",
2✔
914
                        "Some nodes are not ready",
2✔
915
                )
2✔
916
        } else {
4✔
917
                cond = appsv1beta3.NewCondition(
2✔
918
                        appsv1beta3.ConditionRunning,
2✔
919
                        corev1.ConditionTrue,
2✔
920
                        "ClusterReady",
2✔
921
                        "All resources are ready",
2✔
922
                )
2✔
923
        }
2✔
924
        status.SetCondition(*cond)
2✔
925
        instance.SetStatus(status)
2✔
926
        return instance
2✔
927
}
928

929
func updatePluginsConfigForSts(sts *appsv1.StatefulSet, PluginsConfig *corev1.ConfigMap) *appsv1.StatefulSet {
2✔
930
        return updateEnvAndVolumeForSts(sts,
2✔
931
                corev1.EnvVar{
2✔
932
                        Name:  "EMQX_PLUGINS__ETC_DIR",
2✔
933
                        Value: "/mounted/plugins/etc",
2✔
934
                },
2✔
935
                corev1.VolumeMount{
2✔
936
                        Name:      PluginsConfig.Name,
2✔
937
                        MountPath: "/mounted/plugins/etc",
2✔
938
                },
2✔
939
                corev1.Volume{
2✔
940
                        Name: PluginsConfig.Name,
2✔
941
                        VolumeSource: corev1.VolumeSource{
2✔
942
                                ConfigMap: &corev1.ConfigMapVolumeSource{
2✔
943
                                        LocalObjectReference: corev1.LocalObjectReference{
2✔
944
                                                Name: PluginsConfig.Name,
2✔
945
                                        },
2✔
946
                                },
2✔
947
                        },
2✔
948
                },
2✔
949
        )
2✔
950
}
2✔
951

952
func updateAclForSts(sts *appsv1.StatefulSet, acl *corev1.ConfigMap) *appsv1.StatefulSet {
2✔
953
        if sts.Spec.Template.Annotations == nil {
3✔
954
                sts.Spec.Template.Annotations = make(map[string]string)
1✔
955
        }
1✔
956
        sts.Spec.Template.Annotations["ACL/Base64EncodeConfig"] = base64.StdEncoding.EncodeToString([]byte(acl.Data["acl.conf"]))
2✔
957
        return updateEnvAndVolumeForSts(sts,
2✔
958
                corev1.EnvVar{
2✔
959
                        Name:  "EMQX_ACL_FILE",
2✔
960
                        Value: "/mounted/acl/acl.conf",
2✔
961
                },
2✔
962
                corev1.VolumeMount{
2✔
963
                        Name:      acl.Name,
2✔
964
                        MountPath: "/mounted/acl",
2✔
965
                },
2✔
966
                corev1.Volume{
2✔
967
                        Name: acl.Name,
2✔
968
                        VolumeSource: corev1.VolumeSource{
2✔
969
                                ConfigMap: &corev1.ConfigMapVolumeSource{
2✔
970
                                        LocalObjectReference: corev1.LocalObjectReference{
2✔
971
                                                Name: acl.Name,
2✔
972
                                        },
2✔
973
                                },
2✔
974
                        },
2✔
975
                },
2✔
976
        )
2✔
977
}
978

979
func updateLoadedModulesForSts(sts *appsv1.StatefulSet, loadedModules *corev1.ConfigMap) *appsv1.StatefulSet {
2✔
980
        if sts.Spec.Template.Annotations == nil {
3✔
981
                sts.Spec.Template.Annotations = make(map[string]string)
1✔
982
        }
1✔
983
        sts.Spec.Template.Annotations["LoadedModules/Base64EncodeConfig"] = base64.StdEncoding.EncodeToString([]byte(loadedModules.Data["loaded_modules"]))
2✔
984
        return updateEnvAndVolumeForSts(sts,
2✔
985
                corev1.EnvVar{
2✔
986
                        Name:  "EMQX_MODULES__LOADED_FILE",
2✔
987
                        Value: "/mounted/modules/loaded_modules",
2✔
988
                },
2✔
989
                corev1.VolumeMount{
2✔
990
                        Name:      loadedModules.Name,
2✔
991
                        MountPath: "/mounted/modules",
2✔
992
                },
2✔
993
                corev1.Volume{
2✔
994
                        Name: loadedModules.Name,
2✔
995
                        VolumeSource: corev1.VolumeSource{
2✔
996
                                ConfigMap: &corev1.ConfigMapVolumeSource{
2✔
997
                                        LocalObjectReference: corev1.LocalObjectReference{
2✔
998
                                                Name: loadedModules.Name,
2✔
999
                                        },
2✔
1000
                                },
2✔
1001
                        },
2✔
1002
                },
2✔
1003
        )
2✔
1004
}
1005

1006
func updateLicenseForsts(sts *appsv1.StatefulSet, license *corev1.Secret) *appsv1.StatefulSet {
2✔
1007
        fileName := "emqx.lic"
2✔
1008
        for k := range license.Data {
4✔
1009
                fileName = k
2✔
1010
                break
2✔
1011
        }
1012

1013
        return updateEnvAndVolumeForSts(sts,
2✔
1014
                corev1.EnvVar{
2✔
1015
                        Name:  "EMQX_LICENSE__FILE",
2✔
1016
                        Value: filepath.Join("/mounted/license", fileName),
2✔
1017
                },
2✔
1018
                corev1.VolumeMount{
2✔
1019
                        Name:      license.Name,
2✔
1020
                        MountPath: "/mounted/license",
2✔
1021
                        ReadOnly:  true,
2✔
1022
                },
2✔
1023
                corev1.Volume{
2✔
1024
                        Name: license.Name,
2✔
1025
                        VolumeSource: corev1.VolumeSource{
2✔
1026
                                Secret: &corev1.SecretVolumeSource{
2✔
1027
                                        SecretName: license.Name,
2✔
1028
                                },
2✔
1029
                        },
2✔
1030
                },
2✔
1031
        )
2✔
1032
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc