• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mendersoftware / reporting / 812972904

pending completion
812972904

push

gitlab-ci

GitHub
Merge pull request #118 from mendersoftware/master

20 of 26 new or added lines in 1 file covered. (76.92%)

2 existing lines in 1 file now uncovered.

2792 of 3259 relevant lines covered (85.67%)

17.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.84
/app/indexer/run.go
1
// Copyright 2023 Northern.tech AS
2
//
3
//    Licensed under the Apache License, Version 2.0 (the "License");
4
//    you may not use this file except in compliance with the License.
5
//    You may obtain a copy of the License at
6
//
7
//        http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//    Unless required by applicable law or agreed to in writing, software
10
//    distributed under the License is distributed on an "AS IS" BASIS,
11
//    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//    See the License for the specific language governing permissions and
13
//    limitations under the License.
14

15
package indexer
16

17
import (
18
        "context"
19
        "errors"
20
        "fmt"
21
        "os"
22
        "os/signal"
23
        "strconv"
24
        "time"
25

26
        "golang.org/x/sys/unix"
27

28
        "github.com/mendersoftware/go-lib-micro/config"
29
        "github.com/mendersoftware/go-lib-micro/log"
30

31
        "github.com/mendersoftware/reporting/client/deployments"
32
        "github.com/mendersoftware/reporting/client/deviceauth"
33
        "github.com/mendersoftware/reporting/client/inventory"
34
        "github.com/mendersoftware/reporting/client/nats"
35
        rconfig "github.com/mendersoftware/reporting/config"
36
        "github.com/mendersoftware/reporting/model"
37
        "github.com/mendersoftware/reporting/store"
38
)
39

40
const (
41
        jobsChanSize = 1000
42
)
43

44
// InitAndRun initializes the indexer and runs it
45
func InitAndRun(conf config.Reader, store store.Store, ds store.DataStore, nats nats.Client) error {
1✔
46
        ctx, cancel := context.WithCancel(context.Background())
1✔
47
        defer cancel()
1✔
48

1✔
49
        invClient := inventory.NewClient(
1✔
50
                conf.GetString(rconfig.SettingInventoryAddr),
1✔
51
        )
1✔
52

1✔
53
        devClient := deviceauth.NewClient(
1✔
54
                conf.GetString(rconfig.SettingDeviceAuthAddr),
1✔
55
        )
1✔
56

1✔
57
        deplClient := deployments.NewClient(
1✔
58
                conf.GetString(rconfig.SettingDeploymentsAddr),
1✔
59
        )
1✔
60

1✔
61
        indexer := NewIndexer(store, ds, nats, devClient, invClient, deplClient)
1✔
62
        jobs := make(chan model.Job, jobsChanSize)
1✔
63

1✔
64
        err := indexer.GetJobs(ctx, jobs)
1✔
65
        if err != nil {
1✔
66
                return err
×
67
        }
×
68

69
        quit := make(chan os.Signal, 1)
1✔
70
        signal.Notify(quit, unix.SIGINT, unix.SIGTERM)
1✔
71
        go func() {
2✔
72
                select {
1✔
73
                case <-quit:
1✔
74
                        cancel()
1✔
NEW
75
                case <-ctx.Done():
×
76
                }
77
        }()
78

79
        batchSize := conf.GetInt(rconfig.SettingReindexBatchSize)
1✔
80
        if batchSize <= 0 {
1✔
81
                return fmt.Errorf(
×
82
                        "%s: must be a positive integer",
×
83
                        rconfig.SettingReindexBatchSize,
×
84
                )
×
85
        }
×
86
        workerConcurrency := conf.GetInt(rconfig.SettingWorkerConcurrency)
1✔
87
        if workerConcurrency <= 0 {
1✔
88
                return fmt.Errorf(
×
89
                        "%s: must be a positive integer",
×
90
                        rconfig.SettingWorkerConcurrency,
×
91
                )
×
92
        }
×
93
        dispatch := make(chan []model.Job)
1✔
94
        jobPool := make(chan []model.Job, workerConcurrency)
1✔
95
        for i := 0; i < workerConcurrency; i++ {
11✔
96
                jobPool <- make([]model.Job, batchSize)
10✔
97
                go workerRoutine(ctx, strconv.Itoa(i+1), indexer, dispatch, jobPool)
10✔
98
        }
10✔
99

100
        maxTimeMs := conf.GetInt(rconfig.SettingReindexMaxTimeMsec)
1✔
101
        tickerTimeout := time.Duration(maxTimeMs) * time.Millisecond
1✔
102
        ticker := time.NewTimer(tickerTimeout)
1✔
103
        jobsList := <-jobPool
1✔
104
        done := ctx.Done()
1✔
105
        for err == nil {
50✔
106
                select {
49✔
107
                case <-ticker.C:
29✔
108
                        ticker.Reset(tickerTimeout)
29✔
109
                        if len(jobsList) > 0 {
35✔
110
                                jobsList, err = dispatchJobs(ctx, jobsList, dispatch, jobPool)
6✔
111
                        }
6✔
112

113
                case job, open := <-jobs:
19✔
114
                        if !open {
19✔
115
                                return errors.New("Jetstream closed")
×
116
                        }
×
117
                        jobsList = append(jobsList, job)
19✔
118
                        if len(jobsList) >= cap(jobsList) {
19✔
119
                                ticker.Reset(tickerTimeout)
×
NEW
120
                                jobsList, err = dispatchJobs(ctx, jobsList, dispatch, jobPool)
×
121
                        }
×
122

123
                case <-done:
1✔
124
                        err = ctx.Err()
1✔
125
                }
126
        }
127
        return err
1✔
128
}
129

130
func dispatchJobs(ctx context.Context,
131
        jobs []model.Job,
132
        dispatch chan<- []model.Job,
133
        jobPool <-chan []model.Job,
134
) (next []model.Job, err error) {
6✔
135
        done := ctx.Done()
6✔
136
        select {
6✔
NEW
137
        case <-done:
×
NEW
138
                return nil, ctx.Err()
×
139
        case dispatch <- jobs:
6✔
140
        }
141
        select {
6✔
NEW
142
        case <-done:
×
NEW
143
                return nil, ctx.Err()
×
144
        case next = <-jobPool:
6✔
145
        }
146
        return next[:0], nil
6✔
147
}
148

149
func workerRoutine(
150
        ctx context.Context,
151
        workerName string,
152
        indexer Indexer,
153
        jobQ <-chan []model.Job,
154
        jobPool chan<- []model.Job) {
10✔
155
        l := log.FromContext(ctx)
10✔
156
        l.Data["worker"] = workerName
10✔
157
        l.Infof("Worker %s waiting for jobs", workerName)
10✔
158
        ctx = log.WithContext(ctx, l)
10✔
159
        for jobs := range jobQ {
16✔
160
                l.Infof("processing %d jobs", len(jobs))
6✔
161
                indexer.ProcessJobs(ctx, jobs)
6✔
162
                jobPool <- jobs
6✔
163
        }
6✔
164
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc