• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pulibrary / pdc_describe / 205ab3b0-31e5-4157-994d-4db2048579f5

11 Dec 2023 01:45PM UTC coverage: 95.494% (-0.05%) from 95.543%
205ab3b0-31e5-4157-994d-4db2048579f5

Pull #1632

circleci

carolyncole
Adding a method to just count the files instead of fully processing them
Pull Request #1632: Adding a method to just count the files instead of fully processing them

7 of 9 new or added lines in 2 files covered. (77.78%)

22 existing lines in 1 file now uncovered.

3094 of 3240 relevant lines covered (95.49%)

204.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.59
/app/services/s3_query_service.rb
1
# frozen_string_literal: true
2

3
require "aws-sdk-s3"
1✔
4

5
# A service to query an S3 bucket for information about a given data set
6
# rubocop:disable Metrics/ClassLength
7
class S3QueryService
1✔
8
  attr_reader :model
1✔
9

10
  PRECURATION = "precuration"
1✔
11
  POSTCURATION = "postcuration"
1✔
12
  PRESERVATION = "preservation"
1✔
13

14
  def self.configuration
1✔
15
    Rails.configuration.s3
92✔
16
  end
17

18
  def self.pre_curation_config
1✔
19
    configuration.pre_curation
87✔
20
  end
21

22
  def self.post_curation_config
1✔
23
    configuration.post_curation
5✔
24
  end
25

26
  def self.preservation_config
1✔
27
    configuration.preservation
×
28
  end
29

30
  attr_reader :part_size, :last_response
1✔
31

32
  ##
33
  # @param [Work] model
34
  # @param [String] mode Valid values are "precuration", "postcuration", "preservation".
35
  #                      This value controlls the AWS S3 bucket used to access the files.
36
  # @example S3QueryService.new(Work.find(1), "precuration")
37
  def initialize(model, mode = "precuration")
1✔
38
    @model = model
68✔
39
    @doi = model.doi
68✔
40
    @mode = mode
68✔
41
    @part_size = 5_368_709_120 # 5GB is the maximum part size for AWS
68✔
42
    @last_response = nil
68✔
43
  end
44

45
  def config
1✔
46
    if @mode == PRESERVATION
80✔
UNCOV
47
      self.class.preservation_config
×
48
    elsif @mode == POSTCURATION
80✔
49
      self.class.post_curation_config
1✔
50
    elsif @mode == PRECURATION
79✔
51
      self.class.pre_curation_config
79✔
52
    else
UNCOV
53
      raise ArgumentError, "Invalid mode value: #{@mode}"
×
54
    end
55
  end
56

57
  def pre_curation?
1✔
UNCOV
58
    @mode == PRECURATION
×
59
  end
60

61
  def post_curation?
1✔
UNCOV
62
    @mode == POSTCURATION
×
63
  end
64

65
  ##
66
  # The name of the bucket this class is configured to use.
67
  # See config/s3.yml for configuration file.
68
  def bucket_name
1✔
69
    config.fetch(:bucket, nil)
49✔
70
  end
71

72
  def region
1✔
73
    config.fetch(:region, nil)
31✔
74
  end
75

76
  ##
77
  # The S3 prefix for this object, i.e., the address within the S3 bucket,
78
  # which is based on the DOI
79
  def prefix
1✔
80
    "#{@doi}/#{model.id}/"
86✔
81
  end
82

83
  ##
84
  # Construct an S3 address for this data set
85
  def s3_address
1✔
86
    "s3://#{bucket_name}/#{prefix}"
2✔
87
  end
88

89
  ##
90
  # Public signed URL to fetch this file from the S3 (valid for a limited time)
91
  def file_url(key)
1✔
92
    signer = Aws::S3::Presigner.new(client:)
1✔
93
    signer.presigned_url(:get_object, bucket: bucket_name, key:)
1✔
94
  end
95

96
  # There is probably a better way to fetch the current ActiveStorage configuration but we have
97
  # not found it.
98
  def active_storage_configuration
1✔
99
    Rails.configuration.active_storage.service_configurations[Rails.configuration.active_storage.service.to_s]
62✔
100
  end
101

102
  def access_key_id
1✔
103
    active_storage_configuration["access_key_id"]
31✔
104
  end
105

106
  def secret_access_key
1✔
107
    active_storage_configuration["secret_access_key"]
31✔
108
  end
109

110
  def credentials
1✔
111
    @credentials ||= Aws::Credentials.new(access_key_id, secret_access_key)
31✔
112
  end
113

114
  def client
1✔
115
    @client ||= Aws::S3::Client.new(region:, credentials:)
32✔
116
  end
117

118
  # required, accepts ETag, Checksum, ObjectParts, StorageClass, ObjectSize
119
  def self.object_attributes
1✔
120
    [
10✔
121
      "ETag",
122
      "Checksum",
123
      "ObjectParts",
124
      "StorageClass",
125
      "ObjectSize"
126
    ]
127
  end
128

129
  def get_s3_object_attributes(key:)
1✔
130
    response = client.get_object_attributes({
10✔
131
                                              bucket: bucket_name,
132
                                              key:,
133
                                              object_attributes: self.class.object_attributes
134
                                            })
135
    response.to_h
10✔
136
  end
137

138
  def get_s3_object(key:)
1✔
139
    response = client.get_object({
2✔
140
                                   bucket: bucket_name,
141
                                   key:
142
                                 })
143
    object = response.to_h
1✔
144
    return if object.empty?
1✔
145

146
    object
1✔
147
  rescue Aws::Errors::ServiceError => aws_service_error
148
    message = "An error was encountered when requesting the AWS S3 Object #{key}: #{aws_service_error}"
1✔
149
    Rails.logger.error(message)
1✔
150
    raise aws_service_error
1✔
151
  end
152

153
  def build_s3_object_key(filename:)
1✔
154
    "#{prefix}#{filename}"
1✔
155
  end
156

157
  def find_s3_file(filename:)
1✔
158
    s3_object_key = build_s3_object_key(filename:)
1✔
159

160
    object = get_s3_object_attributes(key: s3_object_key)
1✔
161
    return if object.nil?
1✔
162

163
    S3File.new(work: model, filename: s3_object_key, last_modified: object[:last_modified], size: object[:object_size], checksum: object[:etag])
1✔
164
  end
165

166
  # Retrieve the S3 resources uploaded to the S3 Bucket
167
  # @return [Array<S3File>]
168
  def client_s3_files(reload: false, bucket_name: self.bucket_name, prefix: self.prefix, ignore_directories: true)
1✔
169
    @client_s3_files = nil if reload # force a reload
21✔
170
    @client_s3_files ||= get_s3_objects(bucket_name:, prefix:, ignore_directories:)
21✔
171
  end
172

173
  def client_s3_empty_files(reload: false, bucket_name: self.bucket_name, prefix: self.prefix)
1✔
174
    @client_s3_empty_files = nil if reload # force a reload
6✔
175
    @client_s3_empty_files = begin
176
      files_and_directories = get_s3_objects(bucket_name:, prefix:, ignore_directories: false)
6✔
177
      files_and_directories.select { |object| !object.filename.ends_with?("/") && object.empty? }
31✔
178
    end
179
  end
180

181
  def file_count
1✔
182
    client_s3_files.count
2✔
183
  rescue Aws::Errors::ServiceError => aws_service_error
184
    message = "An error was encountered when requesting AWS S3 Objects from the bucket #{bucket_name} with the prefix #{prefix}: #{aws_service_error}"
1✔
185
    Rails.logger.error(message)
1✔
186
    raise aws_service_error
1✔
187
  end
188

189
  ##
190
  # Query the S3 bucket for what we know about the doi
191
  # For docs see:
192
  # * https://docs.aws.amazon.com/sdk-for-ruby/v3/api/Aws/S3/Client.html#list_objects_v2-instance_method
193
  # * https://docs.aws.amazon.com/sdk-for-ruby/v3/api/Aws/S3/Client.html#get_object_attributes-instance_method
194
  # @return Hash with two properties {objects: [<S3File>], ok: Bool}
195
  #   objects is an Array of S3File objects
196
  #   ok is false if there is an error connecting to S3. Otherwise true.
197
  def data_profile
1✔
198
    { objects: client_s3_files, ok: true }
6✔
199
  rescue => ex
200
    Rails.logger.error("Error querying S3. Bucket: #{bucket_name}. DOI: #{@doi}. Exception: #{ex.message}")
2✔
201

202
    { objects: [], ok: false }
2✔
203
  end
204

205
  ##
206
  # Copies the existing files from the pre-curation bucket to the post-curation bucket.
207
  # Notice that the copy process happens at AWS (i.e. the files are not downloaded and re-uploaded).
208
  # Returns an array with the files that were copied.
209
  def publish_files(current_user)
1✔
210
    source_bucket = S3QueryService.pre_curation_config[:bucket]
4✔
211
    target_bucket = S3QueryService.post_curation_config[:bucket]
4✔
212
    empty_files = client_s3_empty_files(reload: true, bucket_name: source_bucket)
4✔
213
    # Do not move the empty files, however, ensure that it is noted that the
214
    #   presence of empty files is specified in the provenance log.
215
    unless empty_files.empty?
4✔
216
      empty_files.each do |empty_file|
1✔
217
        message = "Warning: Attempted to publish empty S3 file #{empty_file.filename}."
1✔
218
        WorkActivity.add_work_activity(model.id, message, current_user.id, activity_type: WorkActivity::SYSTEM)
1✔
219
      end
220
    end
221

222
    files = client_s3_files(reload: true, bucket_name: source_bucket)
4✔
223
    snapshot = ApprovedUploadSnapshot.new(work: model)
4✔
224
    snapshot.store_files(files, current_user:)
4✔
225
    snapshot.save
4✔
226
    files.each do |file|
4✔
227
      ApprovedFileMoveJob.perform_later(work_id: model.id, source_bucket:, source_key: file.key, target_bucket:,
8✔
228
                                        target_key: file.key, size: file.size, snapshot_id: snapshot.id)
229
    end
230
    true
4✔
231
  end
232

233
  def copy_file(source_key:, target_bucket:, target_key:, size:)
1✔
234
    Rails.logger.info("Copying #{source_key} to #{target_bucket}/#{target_key}")
6✔
235
    if size > part_size
6✔
236
      copy_multi_part(source_key:, target_bucket:, target_key:, size:)
6✔
237
    else
UNCOV
238
      client.copy_object(copy_source: source_key, bucket: target_bucket, key: target_key, checksum_algorithm: "SHA256")
×
239
    end
240
  rescue Aws::Errors::ServiceError => aws_service_error
UNCOV
241
    message = "An error was encountered when requesting to copy AWS S3 Object from #{source_key} to #{target_key} in the bucket #{target_bucket}: #{aws_service_error}"
×
UNCOV
242
    Rails.logger.error(message)
×
UNCOV
243
    raise aws_service_error
×
244
  end
245

246
  def copy_multi_part(source_key:, target_bucket:, target_key:, size:)
1✔
247
    multi = client.create_multipart_upload(bucket: target_bucket, key: target_key, checksum_algorithm: "SHA256")
6✔
248
    part_num = 0
6✔
249
    start_byte = 0
6✔
250
    parts = []
6✔
251
    while start_byte < size
6✔
252
      part_num += 1
12✔
253
      end_byte = [start_byte + part_size, size].min - 1
12✔
254
      resp = client.upload_part_copy(bucket: target_bucket, copy_source: source_key, key: multi.key, part_number: part_num,
12✔
255
                                     upload_id: multi.upload_id, copy_source_range: "bytes=#{start_byte}-#{end_byte}")
256
      parts << { etag: resp.copy_part_result.etag, part_number: part_num, checksum_sha256: resp.copy_part_result.checksum_sha256 }
12✔
257
      start_byte = end_byte + 1
12✔
258
    end
259
    client.complete_multipart_upload(bucket: target_bucket, key: target_key, upload_id: multi.upload_id, multipart_upload: { parts: })
6✔
260
  rescue Aws::Errors::ServiceError => aws_service_error
UNCOV
261
    message = "An error was encountered when requesting to multipart copy AWS S3 Object from #{source_key} to #{target_key} in the bucket #{target_bucket}: #{aws_service_error}"
×
UNCOV
262
    Rails.logger.error(message)
×
UNCOV
263
    raise aws_service_error
×
264
  end
265

266
  def copy_directory(source_key:, target_bucket:, target_key:)
1✔
267
    client.copy_object(copy_source: source_key, bucket: target_bucket, key: target_key)
2✔
268
  rescue Aws::Errors::ServiceError => aws_service_error
269
    message = "An error was encountered when requesting to copy the AWS S3 directory Object from #{source_key} to #{target_key} in the bucket #{target_bucket}: #{aws_service_error}"
1✔
270
    Rails.logger.error(message)
1✔
271
    raise aws_service_error
1✔
272
  end
273

274
  def delete_s3_object(s3_file_key, bucket: bucket_name)
1✔
275
    resp = client.delete_object({ bucket:, key: s3_file_key })
6✔
276
    resp.to_h
5✔
277
  rescue Aws::Errors::ServiceError => aws_service_error
278
    message = "An error was encountered when requesting to delete the AWS S3 Object #{s3_file_key} in the bucket #{bucket_name}: #{aws_service_error}"
1✔
279
    Rails.logger.error(message)
1✔
280
    raise aws_service_error
1✔
281
  end
282

283
  def create_directory
1✔
284
    client.put_object({ bucket: bucket_name, key: prefix, content_length: 0 })
2✔
285
  rescue Aws::Errors::ServiceError => aws_service_error
286
    message = "An error was encountered when requesting to create the AWS S3 directory Object in the bucket #{bucket_name} with the key #{prefix}: #{aws_service_error}"
1✔
287
    Rails.logger.error(message)
1✔
288
    raise aws_service_error
1✔
289
  end
290

291
  def upload_file(io:, filename:, size:, md5_digest: nil)
1✔
292
    # upload file from io in a single request, may not exceed 5GB
293
    key = "#{prefix}#{filename}"
4✔
294
    if size > part_size
4✔
295
      upload_multipart_file(target_bucket: bucket_name, target_key: key, size:, io:)
1✔
296
    else
297
      md5_digest ||= md5(io:)
3✔
298
      @last_response = client.put_object(bucket: bucket_name, key:, body: io, content_md5: md5_digest)
3✔
299
    end
300
    key
2✔
301
  rescue Aws::S3::Errors::SignatureDoesNotMatch => e
302
    Honeybadger.notify("Error Uploading file #{filename} for object: #{s3_address} Signature did not match! error: #{e}")
1✔
303
    false
1✔
304
  rescue Aws::Errors::ServiceError => aws_service_error
305
    message = "An error was encountered when requesting to create the AWS S3 Object in the bucket #{bucket_name} with the key #{key}: #{aws_service_error}"
1✔
306
    Rails.logger.error(message)
1✔
307
    raise aws_service_error
1✔
308
  end
309

310
  def check_file(bucket:, key:)
1✔
311
    client.head_object({ bucket:, key: })
6✔
312
  rescue Aws::Errors::ServiceError => aws_service_error
UNCOV
313
    message = "An error was encountered when requesting to check the status of the AWS S3 Object in the bucket #{bucket} with the key #{key}: #{aws_service_error}"
×
UNCOV
314
    Rails.logger.error(message)
×
UNCOV
315
    raise aws_service_error
×
316
  end
317

318
  def md5(io:)
1✔
319
    md5 = Digest::MD5.new
5✔
320
    io.each(10_000) { |block| md5.update block }
393✔
321
    io.rewind
5✔
322
    md5.base64digest
5✔
323
  end
324

325
  def count_objects(bucket_name: self.bucket_name, prefix: self.prefix)
1✔
326
    resp = client.list_objects_v2({ bucket: bucket_name, max_keys: 1000, prefix: })
6✔
327
    total_count = resp.key_count
6✔
328
    while resp.is_truncated
6✔
NEW
UNCOV
329
      resp = client.list_objects_v2({ bucket: bucket_name, max_keys: 1000, prefix:, continuation_token: resp.next_continuation_token })
×
NEW
UNCOV
330
      total_count += resp.key_count
×
331
    end
332
    total_count
6✔
333
  end
334

335
  private
1✔
336

337
    def get_s3_objects(bucket_name:, prefix:, ignore_directories:)
1✔
338
      start = Time.zone.now
25✔
339
      resp = client.list_objects_v2({ bucket: bucket_name, max_keys: 1000, prefix: })
25✔
340
      resp_hash = resp.to_h
22✔
341
      objects = parse_objects(resp_hash, ignore_directories:)
22✔
342
      objects += parse_continuation(resp_hash, bucket_name:, prefix:, ignore_directories:)
22✔
343
      elapsed = Time.zone.now - start
22✔
344
      Rails.logger.info("Loading S3 objects. Bucket: #{bucket_name}. Prefix: #{prefix}. Elapsed: #{elapsed} seconds")
22✔
345
      objects
22✔
346
    end
347

348
    def parse_objects(resp, ignore_directories: true)
1✔
349
      objects = []
28✔
350
      resp_hash = resp.to_h
28✔
351
      response_objects = resp_hash[:contents]
28✔
352
      response_objects&.each do |object|
28✔
353
        next if object[:size] == 0 && ignore_directories
84✔
354
        s3_file = S3File.new(work: model, filename: object[:key], last_modified: object[:last_modified], size: object[:size], checksum: object[:etag])
65✔
355
        objects << s3_file
65✔
356
      end
357
      objects
28✔
358
    end
359

360
    def parse_continuation(resp_hash, bucket_name: self.bucket_name, prefix: self.prefix, ignore_directories: true)
1✔
361
      objects = []
22✔
362
      while resp_hash[:is_truncated]
22✔
363
        token = resp_hash[:next_continuation_token]
6✔
364
        resp = client.list_objects_v2({ bucket: bucket_name, max_keys: 1000, prefix:, continuation_token: token })
6✔
365
        resp_hash = resp.to_h
6✔
366
        objects += parse_objects(resp_hash, ignore_directories:)
6✔
367
      end
368
      objects
22✔
369
    rescue Aws::Errors::ServiceError => aws_service_error
UNCOV
370
      message = "An error was encountered when requesting to list the AWS S3 Objects in the bucket #{bucket_name} with the key #{prefix}: #{aws_service_error}"
×
UNCOV
371
      Rails.logger.error(message)
×
UNCOV
372
      raise aws_service_error
×
373
    end
374

375
    def upload_multipart_file(target_bucket:, target_key:, size:, io:)
1✔
376
      multi = client.create_multipart_upload(bucket: target_bucket, key: target_key)
1✔
377
      part_num = 0
1✔
378
      start_byte = 0
1✔
379
      parts = []
1✔
380
      while start_byte < size
1✔
381
        part_num += 1
2✔
382
        Tempfile.open("mutlipart-upload") do |file|
2✔
383
          IO.copy_stream(io, file, part_size)
2✔
384
          file.rewind
2✔
385
          checksum = md5(io: file)
2✔
386
          resp = client.upload_part(body: file, bucket: target_bucket, key: multi.key, part_number: part_num, upload_id: multi.upload_id, content_md5: checksum)
2✔
387
          parts << { etag: resp.etag, part_number: part_num }
2✔
388
        end
389
        start_byte += part_size
2✔
390
      end
391
      @last_response = client.complete_multipart_upload(bucket: target_bucket, key: target_key, upload_id: multi.upload_id, multipart_upload: { parts: })
1✔
392
    rescue Aws::Errors::ServiceError => aws_service_error
UNCOV
393
      message = "An error was encountered when requesting to multipart upload to AWS S3 Object to #{target_key} in the bucket #{target_bucket}: #{aws_service_error}"
×
UNCOV
394
      Rails.logger.error(message)
×
UNCOV
395
      raise aws_service_error
×
396
    end
397
end
398
# rubocop:enable Metrics/ClassLength
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc