• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx-operator / 3693347214

pending completion
3693347214

Pull #520

github

raoxiaoli
docs: add core and replicant doc
Pull Request #520: docs: add core and replicant doc

3016 of 3991 relevant lines covered (75.57%)

1.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.17
/controllers/apps/v2alpha1/emqx_controller.go
1
/*
2
Copyright 2021.
3

4
Licensed under the Apache License, Version 2.0 (the "License");
5
you may not use this file except in compliance with the License.
6
You may obtain a copy of the License at
7

8
    http://www.apache.org/licenses/LICENSE-2.0
9

10
Unless required by applicable law or agreed to in writing, software
11
distributed under the License is distributed on an "AS IS" BASIS,
12
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
See the License for the specific language governing permissions and
14
limitations under the License.
15
*/
16

17
package v2alpha1
18

19
import (
20
        "context"
21
        "fmt"
22
        "strings"
23
        "time"
24

25
        emperror "emperror.dev/errors"
26

27
        corev1 "k8s.io/api/core/v1"
28
        "k8s.io/apimachinery/pkg/runtime"
29
        "k8s.io/apimachinery/pkg/types"
30
        "k8s.io/client-go/tools/record"
31
        ctrl "sigs.k8s.io/controller-runtime"
32
        "sigs.k8s.io/controller-runtime/pkg/client"
33
        "sigs.k8s.io/controller-runtime/pkg/log"
34

35
        appsv2alpha1 "github.com/emqx/emqx-operator/apis/apps/v2alpha1"
36
        "github.com/emqx/emqx-operator/pkg/handler"
37
        appsv1 "k8s.io/api/apps/v1"
38
        k8sErrors "k8s.io/apimachinery/pkg/api/errors"
39
)
40

41
const EMQXContainerName string = "emqx"
42

43
// EMQXReconciler reconciles a EMQX object
44
type EMQXReconciler struct {
45
        handler.Handler
46
        Scheme *runtime.Scheme
47
        record.EventRecorder
48
}
49

50
//+kubebuilder:rbac:groups=apps.emqx.io,resources=emqxes,verbs=get;list;watch;create;update;patch;delete
51
//+kubebuilder:rbac:groups=apps.emqx.io,resources=emqxes/status,verbs=get;update;patch
52
//+kubebuilder:rbac:groups=apps.emqx.io,resources=emqxes/finalizers,verbs=update
53

54
// Reconcile is part of the main kubernetes reconciliation loop which aims to
55
// move the current state of the cluster closer to the desired state.
56
// TODO(user): Modify the Reconcile function to compare the state specified by
57
// the EMQX object against the actual cluster state, and then
58
// perform operations to make the cluster state reflect the state specified by
59
// the user.
60
//
61
// For more details, check Reconcile and its Result here:
62
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.8.3/pkg/reconcile
63
func (r *EMQXReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
1✔
64
        _ = log.FromContext(ctx)
1✔
65

1✔
66
        instance := &appsv2alpha1.EMQX{}
1✔
67
        if err := r.Get(ctx, req.NamespacedName, instance); err != nil {
2✔
68
                if k8sErrors.IsNotFound(err) {
2✔
69
                        return ctrl.Result{}, nil
1✔
70
                }
1✔
71
                return ctrl.Result{}, err
×
72
        }
73

74
        // Create Resources
75
        resources, err := r.createResources(instance)
1✔
76
        if err != nil {
1✔
77
                return ctrl.Result{}, err
×
78
        }
×
79
        if err := r.CreateOrUpdateList(instance, r.Scheme, resources, func(client.Object) error { return nil }); err != nil {
2✔
80
                return ctrl.Result{}, err
×
81
        }
×
82

83
        // Update EMQX Custom Resource's status
84
        instance, err = r.updateStatus(instance)
1✔
85
        if err != nil {
1✔
86
                return ctrl.Result{}, err
×
87
        }
×
88
        if err := r.Status().Update(ctx, instance); err != nil {
2✔
89
                return ctrl.Result{}, err
1✔
90
        }
1✔
91

92
        if !instance.Status.IsRunning() {
2✔
93
                return ctrl.Result{RequeueAfter: time.Duration(5) * time.Second}, nil
1✔
94
        }
1✔
95
        return ctrl.Result{RequeueAfter: time.Duration(20) * time.Second}, nil
1✔
96
}
97

98
// SetupWithManager sets up the controller with the Manager.
99
func (r *EMQXReconciler) SetupWithManager(mgr ctrl.Manager) error {
1✔
100
        return ctrl.NewControllerManagedBy(mgr).
1✔
101
                For(&appsv2alpha1.EMQX{}).
1✔
102
                Complete(r)
1✔
103
}
1✔
104

105
func (r *EMQXReconciler) createResources(instance *appsv2alpha1.EMQX) ([]client.Object, error) {
1✔
106
        var resources []client.Object
1✔
107
        nodeCookie := generateNodeCookieSecret(instance)
1✔
108
        bootstrapUser := generateBootstrapUserSecret(instance)
1✔
109
        bootstrapConfig := generateBootstrapConfigMap(instance)
1✔
110
        if instance.Status.IsCreating() {
2✔
111
                resources = append(resources, nodeCookie, bootstrapUser, bootstrapConfig)
1✔
112
        }
1✔
113

114
        dashboardSvc := generateDashboardService(instance)
1✔
115
        headlessSvc := generateHeadlessService(instance)
1✔
116
        sts := generateStatefulSet(instance)
1✔
117
        sts = updateStatefulSetForNodeCookie(sts, nodeCookie)
1✔
118
        sts = updateStatefulSetForBootstrapUser(sts, bootstrapUser)
1✔
119
        sts = updateStatefulSetForBootstrapConfig(sts, bootstrapConfig)
1✔
120
        resources = append(resources, dashboardSvc, headlessSvc, sts)
1✔
121

1✔
122
        if instance.Status.IsRunning() || instance.Status.IsCoreNodesReady() {
2✔
123
                deploy := generateDeployment(instance)
1✔
124
                deploy = updateDeploymentForNodeCookie(deploy, nodeCookie)
1✔
125
                deploy = updateDeploymentForBootstrapConfig(deploy, bootstrapConfig)
1✔
126
                resources = append(resources, deploy)
1✔
127

1✔
128
                listenerPorts, err := r.generateRequestAPI(instance).getAllListenersByAPI(sts)
1✔
129
                if err != nil {
2✔
130
                        r.EventRecorder.Event(instance, corev1.EventTypeWarning, "FailedToGetListenerPorts", err.Error())
1✔
131
                }
1✔
132

133
                if listenersSvc := generateListenerService(instance, listenerPorts); listenersSvc != nil {
2✔
134
                        resources = append(resources, listenersSvc)
1✔
135
                }
1✔
136
        }
137
        return resources, nil
1✔
138
}
139

140
func (r *EMQXReconciler) updateStatus(instance *appsv2alpha1.EMQX) (*appsv2alpha1.EMQX, error) {
1✔
141
        var emqxNodes []appsv2alpha1.EMQXNode
1✔
142
        var existedSts *appsv1.StatefulSet = &appsv1.StatefulSet{}
1✔
143
        var existedDeploy *appsv1.Deployment = &appsv1.Deployment{}
1✔
144
        var err error
1✔
145

1✔
146
        err = r.Get(context.TODO(), types.NamespacedName{Name: instance.Spec.CoreTemplate.Name, Namespace: instance.Namespace}, existedSts)
1✔
147
        if err != nil {
1✔
148
                if k8sErrors.IsNotFound(err) {
×
149
                        return instance, nil
×
150
                }
×
151
                return nil, emperror.Wrap(err, "failed to get existed statefulSet")
×
152
        }
153

154
        err = r.Get(context.TODO(), types.NamespacedName{Name: instance.Spec.ReplicantTemplate.Name, Namespace: instance.Namespace}, existedDeploy)
1✔
155
        if err != nil && !k8sErrors.IsNotFound(err) {
1✔
156
                return nil, emperror.Wrap(err, "failed to get existed deployment")
×
157
        }
×
158

159
        emqxNodes, err = r.generateRequestAPI(instance).getNodeStatuesByAPI(existedSts)
1✔
160
        if err != nil {
2✔
161
                r.EventRecorder.Event(instance, corev1.EventTypeWarning, "FailedToGetNodeStatuses", err.Error())
1✔
162
        }
1✔
163

164
        emqxStatusMachine := newEMQXStatusMachine(instance)
1✔
165
        emqxStatusMachine.CheckNodeCount(emqxNodes)
1✔
166
        emqxStatusMachine.NextStatus(existedSts, existedDeploy)
1✔
167
        return emqxStatusMachine.GetEMQX(), nil
1✔
168
}
169

170
func (r *EMQXReconciler) getBootstrapUser(instance *appsv2alpha1.EMQX) (username, password string, err error) {
1✔
171
        secret := &corev1.Secret{}
1✔
172
        if err = r.Get(context.TODO(), types.NamespacedName{Name: instance.NameOfBootStrapUser(), Namespace: instance.Namespace}, secret); err != nil {
2✔
173
                return "", "", err
1✔
174
        }
1✔
175

176
        data, ok := secret.Data["bootstrap_user"]
1✔
177
        if !ok {
1✔
178
                return "", "", emperror.Errorf("the secret does not contain the bootstrap_user")
×
179
        }
×
180

181
        str := string(data)
1✔
182
        index := strings.Index(str, ":")
1✔
183

1✔
184
        return str[:index], str[index+1:], nil
1✔
185
}
186

187
func (r *EMQXReconciler) generateRequestAPI(instance *appsv2alpha1.EMQX) *requestAPI {
1✔
188
        var username, password, port string
1✔
189
        username, password, err := r.getBootstrapUser(instance)
1✔
190
        if err != nil {
2✔
191
                r.EventRecorder.Event(instance, corev1.EventTypeWarning, "FailedToGetBootStrapUserSecret", err.Error())
1✔
192
        }
1✔
193

194
        dashboardPort, err := appsv2alpha1.GetDashboardServicePort(instance)
1✔
195
        if err != nil {
1✔
196
                msg := fmt.Sprintf("Failed to get dashboard service port: %s, use 18083 port", err.Error())
×
197
                r.EventRecorder.Event(instance, corev1.EventTypeWarning, "FailedToGetDashboardServicePort", msg)
×
198
                port = "18083"
×
199
        }
×
200
        if dashboardPort != nil {
2✔
201
                port = dashboardPort.TargetPort.String()
1✔
202
        }
1✔
203
        return &requestAPI{
1✔
204
                Username: username,
1✔
205
                Password: password,
1✔
206
                Port:     port,
1✔
207
                Handler:  r.Handler,
1✔
208
        }
1✔
209
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc