• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

emqx / emqx-operator / 3753782435

pending completion
3753782435

push

github

GitHub
fix: fix webhook Certificate dnsName error (#532)

3070 of 4128 relevant lines covered (74.37%)

4.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.14
/pkg/handler/handler.go
1
package handler
2

3
import (
4
        "context"
5
        "fmt"
6
        "net/http"
7
        "strings"
8

9
        json "github.com/json-iterator/go"
10

11
        emperror "emperror.dev/errors"
12
        "github.com/banzaicloud/k8s-objectmatcher/patch"
13
        appsv1beta3 "github.com/emqx/emqx-operator/apis/apps/v1beta3"
14
        apiClient "github.com/emqx/emqx-operator/pkg/apiclient"
15

16
        appsv1 "k8s.io/api/apps/v1"
17
        corev1 "k8s.io/api/core/v1"
18
        k8sErrors "k8s.io/apimachinery/pkg/api/errors"
19
        "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
20
        "k8s.io/apimachinery/pkg/runtime"
21
        "k8s.io/client-go/kubernetes"
22
        "k8s.io/client-go/rest"
23
        ctrl "sigs.k8s.io/controller-runtime"
24
        "sigs.k8s.io/controller-runtime/pkg/client"
25
)
26

27
const (
28
        ManageContainersAnnotation = "apps.emqx.io/manage-containers"
29
)
30

31
type Handler struct {
32
        client.Client
33
        kubernetes.Clientset
34
        rest.Config
35
}
36

37
func (handler *Handler) RequestAPI(obj client.Object, containerName string, method, username, password, apiPort, path string) (*http.Response, []byte, error) {
6✔
38
        podList := &corev1.PodList{}
6✔
39
        if err := handler.Client.List(
6✔
40
                context.TODO(),
6✔
41
                podList,
6✔
42
                client.InNamespace(obj.GetNamespace()),
6✔
43
                client.MatchingLabels(obj.GetLabels()),
6✔
44
        ); err != nil {
6✔
45
                return nil, nil, err
×
46
        }
×
47

48
        if len(podList.Items) == 0 {
12✔
49
                return nil, nil, emperror.Errorf("not found pods")
6✔
50
        }
6✔
51

52
        podName := findReadyEmqxPod(podList, containerName)
6✔
53
        if podName == "" {
12✔
54
                return nil, nil, emperror.Errorf("pods not ready")
6✔
55
        }
6✔
56

57
        stopChan, readyChan := make(chan struct{}, 1), make(chan struct{}, 1)
6✔
58

6✔
59
        apiClient := apiClient.APIClient{
6✔
60
                Username: username,
6✔
61
                Password: password,
6✔
62
                PortForwardOptions: apiClient.PortForwardOptions{
6✔
63
                        Namespace: obj.GetNamespace(),
6✔
64
                        PodName:   podName,
6✔
65
                        PodPorts: []string{
6✔
66
                                fmt.Sprintf(":%s", apiPort),
6✔
67
                        },
6✔
68
                        Clientset:    handler.Clientset,
6✔
69
                        Config:       &handler.Config,
6✔
70
                        ReadyChannel: readyChan,
6✔
71
                        StopChannel:  stopChan,
6✔
72
                },
6✔
73
        }
6✔
74

6✔
75
        return apiClient.Do(method, path)
6✔
76
}
77

78
func (handler *Handler) CreateOrUpdateList(instance client.Object, scheme *runtime.Scheme, resources []client.Object, postFun func(client.Object) error) error {
6✔
79
        for _, resource := range resources {
12✔
80
                if err := ctrl.SetControllerReference(instance, resource, scheme); err != nil {
6✔
81
                        return err
×
82
                }
×
83
                err := handler.CreateOrUpdate(resource, postFun)
6✔
84
                if err != nil {
8✔
85
                        return err
2✔
86
                }
2✔
87
        }
88
        return nil
6✔
89
}
90

91
func (handler *Handler) CreateOrUpdate(obj client.Object, postFun func(client.Object) error) error {
6✔
92
        u := &unstructured.Unstructured{}
6✔
93
        u.SetGroupVersionKind(obj.GetObjectKind().GroupVersionKind())
6✔
94
        err := handler.Client.Get(context.TODO(), client.ObjectKeyFromObject(obj), u)
6✔
95
        if err != nil {
12✔
96
                if k8sErrors.IsNotFound(err) {
12✔
97
                        return handler.Create(obj, postFun)
6✔
98
                }
6✔
99
                return emperror.Wrapf(err, "failed to get %s %s", obj.GetObjectKind().GroupVersionKind().Kind, obj.GetName())
×
100
        }
101

102
        obj.SetResourceVersion(u.GetResourceVersion())
6✔
103
        obj.SetCreationTimestamp(u.GetCreationTimestamp())
6✔
104
        obj.SetManagedFields(u.GetManagedFields())
6✔
105
        annotations := obj.GetAnnotations()
6✔
106
        if annotations == nil {
12✔
107
                annotations = make(map[string]string)
6✔
108
        }
6✔
109
        for key, value := range u.GetAnnotations() {
12✔
110
                if _, present := annotations[key]; !present {
12✔
111
                        annotations[key] = value
6✔
112
                }
6✔
113
        }
114
        obj.SetAnnotations(annotations)
6✔
115

6✔
116
        opts := []patch.CalculateOption{}
6✔
117
        switch resource := obj.(type) {
6✔
118
        case *appsv1.StatefulSet:
6✔
119
                opts = append(
6✔
120
                        opts,
6✔
121
                        patch.IgnoreStatusFields(),
6✔
122
                        patch.IgnoreVolumeClaimTemplateTypeMetaAndStatus(),
6✔
123
                        IgnoreOtherContainers(),
6✔
124
                )
6✔
125
        case *appsv1.Deployment:
6✔
126
                opts = append(
6✔
127
                        opts,
6✔
128
                        IgnoreOtherContainers(),
6✔
129
                )
6✔
130
        case *corev1.Service:
6✔
131
                storageResource := &corev1.Service{}
6✔
132
                err := runtime.DefaultUnstructuredConverter.FromUnstructured(u.UnstructuredContent(), storageResource)
6✔
133
                if err != nil {
6✔
134
                        return err
×
135
                }
×
136
                // Required fields when updating service in k8s 1.21
137
                if storageResource.Spec.ClusterIP != "" {
12✔
138
                        resource.Spec.ClusterIP = storageResource.Spec.ClusterIP
6✔
139
                }
6✔
140
                obj = resource
6✔
141
        }
142

143
        patchResult, err := patch.DefaultPatchMaker.Calculate(u, obj, opts...)
6✔
144
        if err != nil {
6✔
145
                return emperror.Wrapf(err, "failed to calculate patch for %s %s", obj.GetObjectKind().GroupVersionKind().Kind, obj.GetName())
×
146
        }
×
147
        if !patchResult.IsEmpty() {
12✔
148
                return handler.Update(obj, postFun)
6✔
149
        }
6✔
150
        return nil
6✔
151
}
152

153
func (handler *Handler) Create(obj client.Object, postCreated func(client.Object) error) error {
6✔
154
        switch obj.(type) {
6✔
155
        case *appsv1beta3.EmqxBroker:
×
156
        case *appsv1beta3.EmqxEnterprise:
×
157
        default:
6✔
158
                if err := patch.DefaultAnnotator.SetLastAppliedAnnotation(obj); err != nil {
6✔
159
                        return emperror.Wrapf(err, "failed to set last applied annotation for %s %s", obj.GetObjectKind().GroupVersionKind().Kind, obj.GetName())
×
160
                }
×
161
        }
162

163
        if err := handler.Client.Create(context.TODO(), obj); err != nil {
6✔
164
                return emperror.Wrapf(err, "failed to create %s %s", obj.GetObjectKind().GroupVersionKind().Kind, obj.GetName())
×
165
        }
×
166
        return postCreated(obj)
6✔
167
}
168

169
func (handler *Handler) Update(obj client.Object, postUpdated func(client.Object) error) error {
6✔
170
        if err := patch.DefaultAnnotator.SetLastAppliedAnnotation(obj); err != nil {
6✔
171
                return emperror.Wrapf(err, "failed to set last applied annotation for %s %s", obj.GetObjectKind().GroupVersionKind().Kind, obj.GetName())
×
172
        }
×
173

174
        if err := handler.Client.Update(context.TODO(), obj); err != nil {
8✔
175
                return emperror.Wrapf(err, "failed to update %s %s", obj.GetObjectKind().GroupVersionKind().Kind, obj.GetName())
2✔
176
        }
2✔
177
        return postUpdated(obj)
6✔
178
}
179

180
func IgnoreOtherContainers() patch.CalculateOption {
6✔
181
        return func(current, modified []byte) ([]byte, []byte, error) {
12✔
182
                current, err := selectManagerContainer(current)
6✔
183
                if err != nil {
6✔
184
                        return []byte{}, []byte{}, emperror.Wrap(err, "could not delete the field from current byte sequence")
×
185
                }
×
186

187
                modified, err = selectManagerContainer(modified)
6✔
188
                if err != nil {
6✔
189
                        return []byte{}, []byte{}, emperror.Wrap(err, "could not delete the field from modified byte sequence")
×
190
                }
×
191

192
                return current, modified, nil
6✔
193
        }
194
}
195

196
func selectManagerContainer(obj []byte) ([]byte, error) {
6✔
197
        var podTemplate corev1.PodTemplateSpec
6✔
198
        var objMap map[string]interface{}
6✔
199
        err := json.Unmarshal(obj, &objMap)
6✔
200
        if err != nil {
6✔
201
                return nil, emperror.Wrap(err, "could not unmarshal json")
×
202
        }
×
203

204
        kind := objMap["kind"].(string)
6✔
205
        switch kind {
6✔
206
        case "Deployment":
6✔
207
                deploy := &appsv1.Deployment{}
6✔
208
                err := json.Unmarshal(obj, deploy)
6✔
209
                if err != nil {
6✔
210
                        return nil, emperror.Wrap(err, "could not unmarshal json")
×
211
                }
×
212
                podTemplate = deploy.Spec.Template
6✔
213
        case "StatefulSet":
6✔
214
                sts := &appsv1.StatefulSet{}
6✔
215
                err := json.Unmarshal(obj, sts)
6✔
216
                if err != nil {
6✔
217
                        return nil, emperror.Wrap(err, "could not unmarshal json")
×
218
                }
×
219
                podTemplate = sts.Spec.Template
6✔
220
        default:
×
221
                return nil, emperror.Wrapf(err, "unsupported kind: %s", kind)
×
222
        }
223

224
        containerNames := podTemplate.Annotations[ManageContainersAnnotation]
6✔
225
        containers := []corev1.Container{}
6✔
226
        for _, container := range podTemplate.Spec.Containers {
12✔
227
                if strings.Contains(containerNames, container.Name) {
12✔
228
                        containers = append(containers, container)
6✔
229
                }
6✔
230
        }
231
        podTemplate.Spec.Containers = containers
6✔
232
        objMap["spec"].(map[string]interface{})["template"] = podTemplate
6✔
233
        return json.ConfigCompatibleWithStandardLibrary.Marshal(objMap)
6✔
234
}
235

236
func findReadyEmqxPod(pods *corev1.PodList, containerName string) string {
6✔
237
        for _, pod := range pods.Items {
12✔
238
                for _, status := range pod.Status.ContainerStatuses {
12✔
239
                        if status.Name == containerName && status.Ready {
12✔
240
                                return pod.Name
6✔
241
                        }
6✔
242
                }
243
        }
244
        return ""
6✔
245
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc