• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx-operator / 3693077438

pending completion
3693077438

Pull #512

github

raoxiaoli
docs: add license task doc
Pull Request #512: docs: add license task doc

3018 of 3991 relevant lines covered (75.62%)

1.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.81
/pkg/handler/handler.go
1
package handler
2

3
import (
4
        "context"
5
        "fmt"
6
        "net/http"
7
        "strings"
8

9
        json "github.com/json-iterator/go"
10

11
        emperror "emperror.dev/errors"
12
        "github.com/banzaicloud/k8s-objectmatcher/patch"
13
        appsv1beta3 "github.com/emqx/emqx-operator/apis/apps/v1beta3"
14
        apiClient "github.com/emqx/emqx-operator/pkg/apiclient"
15

16
        appsv1 "k8s.io/api/apps/v1"
17
        corev1 "k8s.io/api/core/v1"
18
        k8sErrors "k8s.io/apimachinery/pkg/api/errors"
19
        "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
20
        "k8s.io/apimachinery/pkg/runtime"
21
        "k8s.io/client-go/kubernetes"
22
        "k8s.io/client-go/rest"
23
        ctrl "sigs.k8s.io/controller-runtime"
24
        "sigs.k8s.io/controller-runtime/pkg/client"
25
)
26

27
const (
28
        ManageContainersAnnotation = "apps.emqx.io/manage-containers"
29
)
30

31
type Handler struct {
32
        client.Client
33
        kubernetes.Clientset
34
        rest.Config
35
}
36

37
func (handler *Handler) RequestAPI(obj client.Object, containerName string, method, username, password, apiPort, path string) (*http.Response, []byte, error) {
2✔
38
        podList := &corev1.PodList{}
2✔
39
        if err := handler.Client.List(
2✔
40
                context.TODO(),
2✔
41
                podList,
2✔
42
                client.InNamespace(obj.GetNamespace()),
2✔
43
                client.MatchingLabels(obj.GetLabels()),
2✔
44
        ); err != nil {
2✔
45
                return nil, nil, err
×
46
        }
×
47

48
        if len(podList.Items) == 0 {
4✔
49
                return nil, nil, emperror.Errorf("not found pods")
2✔
50
        }
2✔
51

52
        podName := findReadyEmqxPod(podList, containerName)
2✔
53
        if podName == "" {
4✔
54
                return nil, nil, emperror.Errorf("pods not ready")
2✔
55
        }
2✔
56

57
        stopChan, readyChan := make(chan struct{}, 1), make(chan struct{}, 1)
2✔
58

2✔
59
        apiClient := apiClient.APIClient{
2✔
60
                Username: username,
2✔
61
                Password: password,
2✔
62
                PortForwardOptions: apiClient.PortForwardOptions{
2✔
63
                        Namespace: obj.GetNamespace(),
2✔
64
                        PodName:   podName,
2✔
65
                        PodPorts: []string{
2✔
66
                                fmt.Sprintf(":%s", apiPort),
2✔
67
                        },
2✔
68
                        Clientset:    handler.Clientset,
2✔
69
                        Config:       &handler.Config,
2✔
70
                        ReadyChannel: readyChan,
2✔
71
                        StopChannel:  stopChan,
2✔
72
                },
2✔
73
        }
2✔
74

2✔
75
        return apiClient.Do(method, path)
2✔
76
}
77

78
func (handler *Handler) CreateOrUpdateList(instance client.Object, scheme *runtime.Scheme, resources []client.Object, postFun func(client.Object) error) error {
2✔
79
        for _, resource := range resources {
4✔
80
                if err := ctrl.SetControllerReference(instance, resource, scheme); err != nil {
2✔
81
                        return err
×
82
                }
×
83
                err := handler.CreateOrUpdate(resource, postFun)
2✔
84
                if err != nil {
2✔
85
                        return err
×
86
                }
×
87
        }
88
        return nil
2✔
89
}
90

91
func (handler *Handler) CreateOrUpdate(obj client.Object, postFun func(client.Object) error) error {
2✔
92
        u := &unstructured.Unstructured{}
2✔
93
        u.SetGroupVersionKind(obj.GetObjectKind().GroupVersionKind())
2✔
94
        err := handler.Client.Get(context.TODO(), client.ObjectKeyFromObject(obj), u)
2✔
95
        if err != nil {
4✔
96
                if k8sErrors.IsNotFound(err) {
4✔
97
                        return handler.Create(obj, postFun)
2✔
98
                }
2✔
99
                return emperror.Wrapf(err, "failed to get %s %s", obj.GetObjectKind().GroupVersionKind().Kind, obj.GetName())
×
100
        }
101

102
        obj.SetResourceVersion(u.GetResourceVersion())
2✔
103
        obj.SetCreationTimestamp(u.GetCreationTimestamp())
2✔
104
        obj.SetManagedFields(u.GetManagedFields())
2✔
105
        annotations := obj.GetAnnotations()
2✔
106
        if annotations == nil {
4✔
107
                annotations = make(map[string]string)
2✔
108
        }
2✔
109
        for key, value := range u.GetAnnotations() {
4✔
110
                if _, present := annotations[key]; !present {
4✔
111
                        annotations[key] = value
2✔
112
                }
2✔
113
        }
114
        obj.SetAnnotations(annotations)
2✔
115

2✔
116
        opts := []patch.CalculateOption{}
2✔
117
        switch resource := obj.(type) {
2✔
118
        case *appsv1.StatefulSet:
2✔
119
                opts = append(
2✔
120
                        opts,
2✔
121
                        patch.IgnoreStatusFields(),
2✔
122
                        patch.IgnoreVolumeClaimTemplateTypeMetaAndStatus(),
2✔
123
                        IgnoreOtherContainers(),
2✔
124
                )
2✔
125
        case *appsv1.Deployment:
1✔
126
                opts = append(
1✔
127
                        opts,
1✔
128
                        IgnoreOtherContainers(),
1✔
129
                )
1✔
130
        case *corev1.Service:
2✔
131
                storageResource := &corev1.Service{}
2✔
132
                err := runtime.DefaultUnstructuredConverter.FromUnstructured(u.UnstructuredContent(), storageResource)
2✔
133
                if err != nil {
2✔
134
                        return err
×
135
                }
×
136
                // Required fields when updating service in k8s 1.21
137
                if storageResource.Spec.ClusterIP != "" {
4✔
138
                        resource.Spec.ClusterIP = storageResource.Spec.ClusterIP
2✔
139
                }
2✔
140
                obj = resource
2✔
141
        }
142

143
        patchResult, err := patch.DefaultPatchMaker.Calculate(u, obj, opts...)
2✔
144
        if err != nil {
2✔
145
                return emperror.Wrapf(err, "failed to calculate patch for %s %s", obj.GetObjectKind().GroupVersionKind().Kind, obj.GetName())
×
146
        }
×
147
        if !patchResult.IsEmpty() {
4✔
148
                return handler.Update(obj, postFun)
2✔
149
        }
2✔
150
        return nil
2✔
151
}
152

153
func (handler *Handler) Create(obj client.Object, postCreated func(client.Object) error) error {
2✔
154
        switch obj.(type) {
2✔
155
        case *appsv1beta3.EmqxBroker:
×
156
        case *appsv1beta3.EmqxEnterprise:
×
157
        default:
2✔
158
                if err := patch.DefaultAnnotator.SetLastAppliedAnnotation(obj); err != nil {
2✔
159
                        return emperror.Wrapf(err, "failed to set last applied annotation for %s %s", obj.GetObjectKind().GroupVersionKind().Kind, obj.GetName())
×
160
                }
×
161
        }
162

163
        if err := handler.Client.Create(context.TODO(), obj); err != nil {
2✔
164
                return emperror.Wrapf(err, "failed to create %s %s", obj.GetObjectKind().GroupVersionKind().Kind, obj.GetName())
×
165
        }
×
166
        return postCreated(obj)
2✔
167
}
168

169
func (handler *Handler) Update(obj client.Object, postUpdated func(client.Object) error) error {
2✔
170
        if err := patch.DefaultAnnotator.SetLastAppliedAnnotation(obj); err != nil {
2✔
171
                return emperror.Wrapf(err, "failed to set last applied annotation for %s %s", obj.GetObjectKind().GroupVersionKind().Kind, obj.GetName())
×
172
        }
×
173

174
        if err := handler.Client.Update(context.TODO(), obj); err != nil {
2✔
175
                return emperror.Wrapf(err, "failed to update %s %s", obj.GetObjectKind().GroupVersionKind().Kind, obj.GetName())
×
176
        }
×
177
        return postUpdated(obj)
2✔
178
}
179

180
func IgnoreOtherContainers() patch.CalculateOption {
3✔
181
        return func(current, modified []byte) ([]byte, []byte, error) {
6✔
182
                current, err := selectManagerContainer(current)
3✔
183
                if err != nil {
3✔
184
                        return []byte{}, []byte{}, emperror.Wrap(err, "could not delete the field from current byte sequence")
×
185
                }
×
186

187
                modified, err = selectManagerContainer(modified)
3✔
188
                if err != nil {
3✔
189
                        return []byte{}, []byte{}, emperror.Wrap(err, "could not delete the field from modified byte sequence")
×
190
                }
×
191

192
                return current, modified, nil
3✔
193
        }
194
}
195

196
func selectManagerContainer(obj []byte) ([]byte, error) {
3✔
197
        var podTemplate corev1.PodTemplateSpec
3✔
198
        var objMap map[string]interface{}
3✔
199
        err := json.Unmarshal(obj, &objMap)
3✔
200
        if err != nil {
3✔
201
                return nil, emperror.Wrap(err, "could not unmarshal json")
×
202
        }
×
203

204
        kind := objMap["kind"].(string)
3✔
205
        switch kind {
3✔
206
        case "Deployment":
2✔
207
                deploy := &appsv1.Deployment{}
2✔
208
                err := json.Unmarshal(obj, deploy)
2✔
209
                if err != nil {
2✔
210
                        return nil, emperror.Wrap(err, "could not unmarshal json")
×
211
                }
×
212
                podTemplate = deploy.Spec.Template
2✔
213
        case "StatefulSet":
3✔
214
                sts := &appsv1.StatefulSet{}
3✔
215
                err := json.Unmarshal(obj, sts)
3✔
216
                if err != nil {
3✔
217
                        return nil, emperror.Wrap(err, "could not unmarshal json")
×
218
                }
×
219
                podTemplate = sts.Spec.Template
3✔
220
        default:
×
221
                return nil, emperror.Wrapf(err, "unsupported kind: %s", kind)
×
222
        }
223

224
        containerNames := podTemplate.Annotations[ManageContainersAnnotation]
3✔
225
        containers := []corev1.Container{}
3✔
226
        for _, container := range podTemplate.Spec.Containers {
6✔
227
                if strings.Contains(containerNames, container.Name) {
6✔
228
                        containers = append(containers, container)
3✔
229
                }
3✔
230
        }
231
        podTemplate.Spec.Containers = containers
3✔
232
        objMap["spec"].(map[string]interface{})["template"] = podTemplate
3✔
233
        return json.ConfigCompatibleWithStandardLibrary.Marshal(objMap)
3✔
234
}
235

236
func findReadyEmqxPod(pods *corev1.PodList, containerName string) string {
2✔
237
        for _, pod := range pods.Items {
4✔
238
                for _, status := range pod.Status.ContainerStatuses {
4✔
239
                        if status.Name == containerName && status.Ready {
4✔
240
                                return pod.Name
2✔
241
                        }
2✔
242
                }
243
        }
244
        return ""
2✔
245
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc