• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pulibrary / bibdata / 1b5975ab-8c9b-4b35-a7b8-d91eeb914610

13 Jan 2025 08:17PM UTC coverage: 92.052% (-0.02%) from 92.076%
1b5975ab-8c9b-4b35-a7b8-d91eeb914610

Pull #2591

circleci

christinach
bundle webhook Gemfile.lock with bundler version 2.6.2
Keep ruby cimg in version 3.3.6 for the webhook spec
Pull Request #2591: Upgrade to ruby 3.4.1

3428 of 3724 relevant lines covered (92.05%)

371.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

23.58
/marc_to_solr/lib/pul_solr_json_writer.rb
1
require 'yell'
1✔
2

3
require 'traject/util'
1✔
4
require 'traject/qualified_const_get'
1✔
5
require 'traject/thread_pool'
1✔
6

7
require 'json'
1✔
8
require 'httpclient'
1✔
9

10
require 'uri'
1✔
11
# for Mutex/Queue
12
require 'concurrent' # for atomic_fixnum
1✔
13

14
# ==========================
15
# This class is a copy of Traject's SolrJsonWriter
16
# (https://github.com/traject/traject/blob/master/lib/traject/solr_json_writer.rb)
17
# but it has been updated to skip records with UTF-8 values that Ruby cannot
18
# handle in JSON.parse() - see https://github.com/pulibrary/bibdata/issues/1677
19
# ==========================
20

21
# Write to Solr using the JSON interface; only works for Solr >= 3.2
22
#
23
# This should work under both MRI and JRuby, with JRuby getting much
24
# better performance due to the threading model.
25
#
26
# Relevant settings
27
#
28
# * solr.url (optional if solr.update_url is set) The URL to the solr core to index into
29
#
30
# * solr.update_url: The actual update url. If unset, we'll first see if
31
#   "#{solr.url}/update/json" exists, and if not use "#{solr.url}/update"
32
#
33
# * solr_writer.batch_size: How big a batch to send to solr. Default is 100.
34
#   My tests indicate that this setting doesn't change overall index speed by a ton.
35
#
36
# * solr_writer.thread_pool: How many threads to use for the writer. Default is 1.
37
#   Likely useful even under MRI since thread will be waiting on Solr for some time.
38
#
39
# * solr_writer.max_skipped: How many records skipped due to errors before we
40
#   bail out with a fatal error? Set to -1 for unlimited skips. Default 0,
41
#   raise and abort on a single record that could not be added to Solr.
42
#
43
# * solr_writer.commit_on_close: Set to true (or "true") if you want to commit at the
44
#   end of the indexing run. (Old "solrj_writer.commit_on_close" supported for backwards
45
#   compat only.)
46
#
47
# * solr_writer.commit_timeout: If commit_on_close, how long to wait for Solr before
48
#   giving up as a timeout. Default 10 minutes. Solr can be slow.
49
#
50
# * solr_json_writer.http_client Mainly intended for testing, set your own HTTPClient
51
#   or mock object to be used for HTTP.
52

53
class Traject::PulSolrJsonWriter
1✔
54
  include Traject::QualifiedConstGet
1✔
55

56
  # Disable RuboCop so that we can mimic the original in Traject as much as possible.
57
  # rubocop:disable all
58

59
  DEFAULT_MAX_SKIPPED = 0
1✔
60
  DEFAULT_BATCH_SIZE  = 100
1✔
61

62
  # The passed-in settings
63
  attr_reader :settings, :thread_pool_size
1✔
64

65
  # A queue to hold documents before sending to solr
66
  attr_reader :batched_queue
1✔
67

68
  def initialize(argSettings)
1✔
69
    @settings = Traject::Indexer::Settings.new(argSettings)
×
70

71
    # Set max errors
72
    @max_skipped = (@settings['solr_writer.max_skipped'] || DEFAULT_MAX_SKIPPED).to_i
×
73
    if @max_skipped < 0
×
74
      @max_skipped = nil
×
75
    end
76

77
    @http_client = @settings["solr_json_writer.http_client"] || HTTPClient.new
×
78

79
    @batch_size = (settings["solr_writer.batch_size"] || DEFAULT_BATCH_SIZE).to_i
×
80
    @batch_size = 1 if @batch_size < 1
×
81

82
    # Store error count in an AtomicInteger, so multi threads can increment
83
    # it safely, if we're threaded.
84
    @skipped_record_incrementer = Concurrent::AtomicFixnum.new(0)
×
85

86

87
    # How many threads to use for the writer?
88
    # if our thread pool settings are 0, it'll just create a null threadpool that
89
    # executes in calling context.
90
    @thread_pool_size = (@settings["solr_writer.thread_pool"] || 1).to_i
×
91

92
    @batched_queue         = Queue.new
×
93
    @thread_pool = Traject::ThreadPool.new(@thread_pool_size)
×
94

95
    # old setting solrj_writer supported for backwards compat, as we make
96
    # this the new default writer.
97
    @commit_on_close = (settings["solr_writer.commit_on_close"] || settings["solrj_writer.commit_on_close"]).to_s == "true"
×
98

99
    # Figure out where to send updates
100
    @solr_update_url = self.determine_solr_update_url
×
101

102
    logger.info("   #{self.class.name} writing to '#{@solr_update_url}' in batches of #{@batch_size} with #{@thread_pool_size} bg threads")
×
103
  end
104

105

106
  # Add a single context to the queue, ready to be sent to solr
107
  def put(context)
1✔
108
    @thread_pool.raise_collected_exception!
×
109

110
    @batched_queue << context
×
111
    if @batched_queue.size >= @batch_size
×
112
      batch = Traject::Util.drain_queue(@batched_queue)
×
113
      @thread_pool.maybe_in_thread_pool(batch) {|batch_arg| send_batch(batch_arg) }
×
114
    end
115
  end
116

117
  # Send the given batch of contexts. If something goes wrong, send
118
  # them one at a time.
119
  # @param [Array<Traject::Indexer::Context>] an array of contexts
120
  def send_batch(batch)
1✔
121
    return if batch.empty?
×
122
    json_package = JSON.generate(batch.map { |c| c.output_hash })
×
123
    begin
124
      resp = @http_client.post @solr_update_url, json_package, "Content-type" => "application/json"
×
125
    rescue StandardError => exception
126
    end
127

128
    if exception || resp.status != 200
×
129
      error_message = exception ?
×
130
        Traject::Util.exception_to_log_message(exception) :
×
131
        "Solr response: #{resp.status}: #{resp.body}"
×
132

133
      logger.error "Error in Solr batch add. Will retry documents individually at performance penalty: #{error_message}"
×
134

135
      batch.each do |c|
×
136
        send_single(c)
×
137
      end
138
    end
139
  # This rescue clause is not in the original Traject::SolrJsonWriter
140
  # Special handling for batch of records with UTF-8 characters that JSON.parse() cannot handle.
141
  rescue JSON::GeneratorError => ex
142
    logger.error "JSON error in Solr batch add. Will retry documents individually at performance penalty: #{ex}"
×
143
    batch.each do |c|
×
144
      send_single(c)
×
145
    end
146
  end
147

148

149
  # Send a single context to Solr, logging an error if need be
150
  # @param [Traject::Indexer::Context] c The context whose document you want to send
151
  def send_single(c)
1✔
152
    json_package = JSON.generate([c.output_hash])
×
153
    begin
154
      resp = @http_client.post @solr_update_url, json_package, "Content-type" => "application/json"
×
155
      # Catch Timeouts and network errors as skipped records, but otherwise
156
      # allow unexpected errors to propagate up.
157
    rescue HTTPClient::TimeoutError, SocketError, Errno::ECONNREFUSED => exception
158
    end
159

160
    if exception || resp.status != 200
×
161
      if exception
×
162
        msg = Traject::Util.exception_to_log_message(exception)
×
163
      else
164
        msg = "Solr error response: #{resp.status}: #{resp.body}"
×
165
      end
166
      logger.error "Could not add record #{c.source_record_id} at source file position #{c.position}: #{msg}"
×
167
      logger.debug(c.source_record.to_s)
×
168

169
      @skipped_record_incrementer.increment
×
170
      if @max_skipped and skipped_record_count > @max_skipped
×
171
        raise RuntimeError.new("#{self.class.name}: Exceeded maximum number of skipped records (#{@max_skipped}): aborting")
×
172
      end
173

174
    end
175
  # This rescue clause is not in the original Traject::SolrJsonWriter
176
  # Special handling for single record with UTF-8 characters that JSON.parse() cannot handle.
177
  rescue JSON::GeneratorError => ex
178
    logger.error "Could not add record #{c.source_record_id}: #{ex}"
×
179
  end
180

181

182
  # Get the logger from the settings, or default to an effectively null logger
183
  def logger
1✔
184
    settings["logger"] ||= Yell.new(STDERR, :level => "gt.fatal") # null logger
×
185
  end
186

187
  # On close, we need to (a) raise any exceptions we might have, (b) send off
188
  # the last (possibly empty) batch, and (c) commit if instructed to do so
189
  # via the solr_writer.commit_on_close setting.
190
  def close
1✔
191
    @thread_pool.raise_collected_exception!
×
192

193
    # Finish off whatever's left. Do it in the thread pool for
194
    # consistency, and to ensure expected order of operations, so
195
    # it goes to the end of the queue behind any other work.
196
    batch = Traject::Util.drain_queue(@batched_queue)
×
197
    if batch.length > 0
×
198
      @thread_pool.maybe_in_thread_pool { send_batch(batch) }
×
199
    end
200

201
    # Wait for shutdown, and time it.
202
    logger.debug "#{self.class.name}: Shutting down thread pool, waiting if needed..."
×
203
    elapsed = @thread_pool.shutdown_and_wait
×
204
    if elapsed > 60
×
205
      logger.warn "Waited #{elapsed} seconds for all threads, you may want to increase solr_writer.thread_pool (currently #{@settings["solr_writer.thread_pool"]})"
×
206
    end
207
    logger.debug "#{self.class.name}: Thread pool shutdown complete"
×
208
    logger.warn "#{self.class.name}: #{skipped_record_count} skipped records" if skipped_record_count > 0
×
209

210
    # check again now that we've waited, there could still be some
211
    # that didn't show up before.
212
    @thread_pool.raise_collected_exception!
×
213

214
    # Commit if we're supposed to
215
    if @commit_on_close
×
216
      commit
×
217
    end
218
  end
219

220

221
  # Send a commit
222
  def commit
1✔
223
    logger.info "#{self.class.name} sending commit to solr at url #{@solr_update_url}..."
×
224

225
    original_timeout = @http_client.receive_timeout
×
226

227
    @http_client.receive_timeout = (settings["commit_timeout"] || (10 * 60)).to_i
×
228

229
    resp = @http_client.get(@solr_update_url, {"commit" => 'true'})
×
230
    unless resp.status == 200
×
231
      raise RuntimeError.new("Could not commit to Solr: #{resp.status} #{resp.body}")
×
232
    end
233

234
    @http_client.receive_timeout = original_timeout
×
235
  end
236

237

238
  # Return count of encountered skipped records. Most accurate to call
239
  # it after #close, in which case it should include full count, even
240
  # under async thread_pool.
241
  def skipped_record_count
1✔
242
    @skipped_record_incrementer.value
×
243
  end
244

245

246
  # Relatively complex logic to determine if we have a valid URL and what it is
247
  def determine_solr_update_url
1✔
248
    if settings['solr.update_url']
×
249
      check_solr_update_url(settings['solr.update_url'])
×
250
    else
251
      derive_solr_update_url_from_solr_url(settings['solr.url'])
×
252
    end
253
  end
254

255

256
  # If we've got a solr.update_url, make sure it's ok
257
  def check_solr_update_url(url)
1✔
258
    unless url =~ /^#{URI::regexp}$/
×
259
      raise ArgumentError.new("#{self.class.name} setting `solr.update_url` doesn't look like a URL: `#{url}`")
×
260
    end
261
    url
×
262
  end
263

264
  def derive_solr_update_url_from_solr_url(url)
1✔
265
    # Nil? Then we bail
266
    if url.nil?
×
267
      raise ArgumentError.new("#{self.class.name}: Neither solr.update_url nor solr.url set; need at least one")
×
268
    end
269

270
    # Not a URL? Bail
271
    unless url =~ /^#{URI::regexp}$/
×
272
      raise ArgumentError.new("#{self.class.name} setting `solr.url` doesn't look like a URL: `#{url}`")
×
273
    end
274

275
    # First, try the /update/json handler
276
    candidate = [url.chomp('/'), 'update', 'json'].join('/')
×
277
    resp      = @http_client.get(candidate)
×
278
    if resp.status == 404
×
279
      candidate = [url.chomp('/'), 'update'].join('/')
×
280
    end
281
    candidate
×
282
  end
283

284
  # rubocop:enable all
285
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc