• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / reporting / 825182036

pending completion
825182036

Pull #126

gitlab-ci

Krzysztof Jaskiewicz
fix: do not index status of the latest device deployment
Pull Request #126: fix: do not index status of the latest device deployment

2801 of 3299 relevant lines covered (84.9%)

15.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.82
/app/indexer/run.go
1
// Copyright 2023 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
package indexer
16

17
import (
18
        "context"
19
        "errors"
20
        "fmt"
21
        "os"
22
        "os/signal"
23
        "strconv"
24
        "time"
25

26
        "golang.org/x/sys/unix"
27

28
        "github.com/mendersoftware/go-lib-micro/config"
29
        "github.com/mendersoftware/go-lib-micro/log"
30

31
        "github.com/mendersoftware/reporting/client/deployments"
32
        "github.com/mendersoftware/reporting/client/deviceauth"
33
        "github.com/mendersoftware/reporting/client/inventory"
34
        "github.com/mendersoftware/reporting/client/nats"
35
        rconfig "github.com/mendersoftware/reporting/config"
36
        "github.com/mendersoftware/reporting/model"
37
        "github.com/mendersoftware/reporting/store"
38
)
39

40
const (
41
        jobsChanSize    = 1000
42
        shutdownTimeout = time.Second * 30
43
)
44

45
// InitAndRun initializes the indexer and runs it
46
func InitAndRun(conf config.Reader, store store.Store, ds store.DataStore, nats nats.Client) error {
1✔
47
        ctx, cancel := context.WithCancel(context.Background())
1✔
48
        defer cancel()
1✔
49
        l := log.FromContext(ctx)
1✔
50

1✔
51
        invClient := inventory.NewClient(
1✔
52
                conf.GetString(rconfig.SettingInventoryAddr),
1✔
53
        )
1✔
54

1✔
55
        devClient := deviceauth.NewClient(
1✔
56
                conf.GetString(rconfig.SettingDeviceAuthAddr),
1✔
57
        )
1✔
58

1✔
59
        deplClient := deployments.NewClient(
1✔
60
                conf.GetString(rconfig.SettingDeploymentsAddr),
1✔
61
        )
1✔
62

1✔
63
        indexer := NewIndexer(store, ds, nats, devClient, invClient, deplClient)
1✔
64
        jobs := make(chan model.Job, jobsChanSize)
1✔
65

1✔
66
        err := indexer.GetJobs(ctx, jobs)
1✔
67
        if err != nil {
1✔
68
                return err
×
69
        }
×
70

71
        intChan := make(chan os.Signal, 1)
1✔
72
        signal.Notify(intChan, unix.SIGINT, unix.SIGTERM)
1✔
73

1✔
74
        batchSize := conf.GetInt(rconfig.SettingReindexBatchSize)
1✔
75
        if batchSize <= 0 {
1✔
76
                return fmt.Errorf(
×
77
                        "%s: must be a positive integer",
×
78
                        rconfig.SettingReindexBatchSize,
×
79
                )
×
80
        }
×
81
        workerConcurrency := conf.GetInt(rconfig.SettingWorkerConcurrency)
1✔
82
        if workerConcurrency <= 0 {
1✔
83
                return fmt.Errorf(
×
84
                        "%s: must be a positive integer",
×
85
                        rconfig.SettingWorkerConcurrency,
×
86
                )
×
87
        }
×
88
        dispatch := make(chan []model.Job)
1✔
89
        jobPool := make(chan []model.Job, workerConcurrency)
1✔
90
        for i := 0; i < workerConcurrency; i++ {
11✔
91
                jobPool <- make([]model.Job, 0, batchSize)
10✔
92
                go workerRoutine(ctx, strconv.Itoa(i+1), indexer, dispatch, jobPool)
10✔
93
        }
10✔
94

95
        maxTimeMs := conf.GetInt(rconfig.SettingReindexMaxTimeMsec)
1✔
96
        tickerTimeout := time.Duration(maxTimeMs) * time.Millisecond
1✔
97
        ticker := time.NewTimer(tickerTimeout)
1✔
98
        jobsList := <-jobPool
1✔
99
        done := ctx.Done()
1✔
100
        for err == nil {
50✔
101
                select {
49✔
102
                case sig := <-intChan:
×
103
                        var workersDone int
×
104
                        l.Warnf("Received signal %s: waiting for workers to finish", sig)
×
105
                        if len(jobsList) > 0 {
×
106
                                _, err = dispatchJobs(ctx, jobsList, dispatch, jobPool)
×
107
                                if err != nil {
×
108
                                        break
×
109
                                }
110
                                workersDone++
×
111
                        }
112
                        close(dispatch)
×
113
                        timeout := time.After(shutdownTimeout)
×
114
                        for workersDone < workerConcurrency {
×
115
                                select {
×
116
                                case <-timeout:
×
117
                                        return errors.New("timeout waiting for workers to finish")
×
118
                                case <-jobPool:
×
119
                                        workersDone++
×
120
                                }
121
                        }
122
                        l.Info("workers finished processing jobs: terminating")
×
123
                        return nil
×
124
                case <-ticker.C:
29✔
125
                        ticker.Reset(tickerTimeout)
29✔
126
                        if len(jobsList) > 0 {
34✔
127
                                jobsList, err = dispatchJobs(ctx, jobsList, dispatch, jobPool)
5✔
128
                        }
5✔
129

130
                case job, open := <-jobs:
19✔
131
                        if !open {
19✔
132
                                return errors.New("Jetstream closed")
×
133
                        }
×
134
                        jobsList = append(jobsList, job)
19✔
135
                        if len(jobsList) >= cap(jobsList) {
19✔
136
                                ticker.Reset(tickerTimeout)
×
137
                                jobsList, err = dispatchJobs(ctx, jobsList, dispatch, jobPool)
×
138
                        }
×
139

140
                case <-done:
×
141
                        err = ctx.Err()
×
142
                }
143
        }
144
        return err
×
145
}
146

147
func dispatchJobs(ctx context.Context,
148
        jobs []model.Job,
149
        dispatch chan<- []model.Job,
150
        jobPool <-chan []model.Job,
151
) (next []model.Job, err error) {
5✔
152
        done := ctx.Done()
5✔
153
        select {
5✔
154
        case <-done:
×
155
                return nil, ctx.Err()
×
156
        case dispatch <- jobs:
5✔
157
        }
158
        select {
5✔
159
        case <-done:
×
160
                return nil, ctx.Err()
×
161
        case next = <-jobPool:
5✔
162
        }
163
        return next[:0], nil
5✔
164
}
165

166
func workerRoutine(
167
        ctx context.Context,
168
        workerName string,
169
        indexer Indexer,
170
        jobQ <-chan []model.Job,
171
        jobPool chan<- []model.Job) {
10✔
172
        l := log.FromContext(ctx)
10✔
173
        l.Data["worker"] = workerName
10✔
174
        l.Infof("Worker %s waiting for jobs", workerName)
10✔
175
        ctx = log.WithContext(ctx, l)
10✔
176
        for jobs := range jobQ {
15✔
177
                l.Infof("processing %d jobs", len(jobs))
5✔
178
                indexer.ProcessJobs(ctx, jobs)
5✔
179
                jobPool <- jobs
5✔
180
        }
5✔
181
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc