Skip to content

Commit db00708

Browse files
committed
use ingest_metis_data_workflow for sftp_metis_uploader
1 parent 8abad95 commit db00708

File tree

7 files changed

+63
-56
lines changed

7 files changed

+63
-56
lines changed

etna/lib/etna/clients/metis/workflows/ingest_metis_data_workflow.rb

+15-4
Original file line numberDiff line numberDiff line change
@@ -9,24 +9,35 @@ class IngestMetisDataWorkflow < Struct.new(:metis_filesystem, :ingest_filesystem
99
# Since we are doing manual triage of files,
1010
# do not automatically copy directory trees.
1111
# srcs must be a list of full paths to files.
12-
def copy_files(srcs, &block)
13-
srcs.each do |src|
12+
def copy_files(files, &block)
13+
files.each do |file|
14+
if file.is_a?(Array)
15+
src, dest = file
16+
else
17+
src = dest = file
18+
end
19+
1420
if !ingest_filesystem.exist?(src)
1521
logger&.warn("#{src} does not exist on source filesystem. Skipping.")
22+
yield src, false if block_given?
1623
next
1724
end
1825

1926
logger&.info("Copying file #{src} (#{Etna::Formatting.as_size(ingest_filesystem.stat(src).size)})")
2027

2128
# For ingestion triage, just copy over the exact path + filename.
22-
copy_file(dest: src, src: src, &block)
29+
begin
30+
copy_file(dest: dest, src: src, &block)
31+
rescue Exception => e
32+
yield src, false if block_given?
33+
end
2334
end
2435
end
2536

2637
def copy_file(dest:, src:, &block)
2738
ingest_filesystem.with_readable(src, "r") do |io|
2839
metis_filesystem.do_streaming_upload(io, dest, ingest_filesystem.stat(src).size)
29-
yield src if block_given?
40+
yield src, true if block_given?
3041
end
3142
end
3243
end

etna/lib/etna/clients/metis/workflows/metis_upload_workflow.rb

+1
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,7 @@ def next_blob_bytes
197197
end
198198
end
199199
end
200+
@last_bytes
200201
end
201202
end
202203
end

etna/lib/etna/filesystem.rb

+4-2
Original file line numberDiff line numberDiff line change
@@ -438,9 +438,11 @@ def curl_cmd(path, opts=[])
438438
end
439439

440440
def sftp_file_from_path(src)
441-
file = ls(::File.dirname(src)).split("\n").map do |listing|
441+
files = ls(::File.dirname(src)).split("\n").map do |listing|
442442
SftpFile.new(listing)
443-
end.select do |file|
443+
end
444+
445+
file = files.select do |file|
444446
file.name == ::File.basename(src)
445447
end
446448

polyphemus/lib/data_eng/jobs/sftp_metis_uploader.rb

+30-35
Original file line numberDiff line numberDiff line change
@@ -68,46 +68,41 @@ def pre(context)
6868
return true
6969
end
7070

71+
def metis_filesystem
72+
Etna::Filesystem::Metis.new(
73+
project_name: project_name,
74+
bucket_name: bucket_name,
75+
metis_client: metis_client,
76+
root: metis_root_path
77+
)
78+
end
79+
80+
def ingest_filesystem
81+
Etna::Filesystem::SftpFilesystem.new(
82+
username: config["secrets"]["sftp_ingest_user"],
83+
password: config["secrets"]["sftp_ingest_password"],
84+
host: config["secrets"]["sftp_ingest_host"]
85+
)
86+
end
87+
7188
def process(context)
7289
context[:failed_files] = []
7390

74-
context[:files_to_update].each do |file|
75-
sftp_path = file[:path]
76-
modified_time = file[:modified_time]
77-
78-
begin
79-
file_stream = sftp_client.download_as_stream(sftp_path)
80-
if file_stream.nil?
81-
logger.info("Failed to download #{sftp_path}")
82-
next
83-
end
84-
rescue StandardError => e
85-
logger.info("Failed to download from sftp server, #{sftp_path}: #{e.message}")
86-
context[:failed_files] << sftp_path
87-
next
88-
end
91+
workflow = Etna::Clients::Metis::IngestMetisDataWorkflow.new(
92+
metis_filesystem: metis_filesystem,
93+
ingest_filesystem: ingest_filesystem,
94+
logger: nil,
95+
)
8996

90-
begin
91-
uploader = Etna::Clients::Metis::MetisUploadWorkflow.new(
92-
metis_client: metis_client,
93-
project_name: project_name,
94-
bucket_name: bucket_name,
95-
metis_uid: metis_uid,
96-
)
97-
98-
metis_path = File.join(metis_root_path, sftp_path.gsub(name_regex,''))
99-
uploader.do_upload(
100-
Etna::Clients::Metis::MetisUploadWorkflow::StreamingIOUpload.new(
101-
readable_io: file_stream,
102-
size_hint: file_stream.size,
103-
),
104-
metis_path
105-
)
106-
rescue StandardError => e
107-
logger.warn("Failed to upload to metis: #{metis_path}. Error: #{e.message}")
108-
context[:failed_files] << sftp_path
97+
workflow.copy_files(context[:files_to_update].map do |file|
98+
[ file[:path], file[:path].gsub(name_regex, '') ]
99+
end) do |filename, success|
100+
if success
101+
context[:successful_files] << filename
102+
else
103+
logger.warn("Failed to upload to metis: #{filename}")
104+
context[:failed_files] << filename
109105
end
110-
111106
end
112107
end
113108

polyphemus/lib/etls/metis/loader.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ def data_frame_update(update, tail, metis)
133133
)
134134

135135
data.vectors.to_a.each do |name|
136-
data.delete name unless column_map.has_key?(name)
136+
data.delete_vector name unless column_map.has_key?(name)
137137
end
138138

139139
# Blank data equaling values_to_ignore by setting as nil

polyphemus/spec/data_eng/sftp_metis_uploader_spec.rb

+12-2
Original file line numberDiff line numberDiff line change
@@ -93,10 +93,19 @@ def stub_upload_file_with_stream(file_path, stream, force_error: false)
9393
stub_polyphemus_update_run(config["project_name"], run_id, captured_requests)
9494
stub_metis_setup
9595
stub_create_folder(bucket: config["config"]["bucket_name"], project: config["project_name"])
96-
end
96+
97+
stub_request(
98+
:get, "http://sftp//some-sftp-host/SSD/20240919_LH00416_0184_B22NF2WLT3/ACMK02/"
99+
).with(
100+
headers: { 'Authorization'=>'Basic dXNlcjpwYXNzd29yZA==' }
101+
).to_return(status: 200, body: sftp_files.map { |f|
102+
"drwxrwxr-x 4 eurystheus labors 32 Aug 26 2022 #{::File.basename(f[:path])}"
103+
}.join("\n"), headers: {})
104+
end
97105

98106
it 'successfully uploads files' do
99107
stub_upload_file_with_stream(file_to_upload, fake_stream)
108+
allow_any_instance_of(Etna::Filesystem::SftpFilesystem).to receive(:with_readable).and_yield(StringIO.new("A"*32))
100109

101110
job = create_job(config, runtime_config)
102111
context = job.execute
@@ -105,7 +114,8 @@ def stub_upload_file_with_stream(file_path, stream, force_error: false)
105114
end
106115

107116
it 'fails to upload files' do
108-
stub_upload_file_with_stream(file_to_upload, fake_stream, force_error: true)
117+
allow_any_instance_of(Etna::Filesystem::SftpFilesystem).to receive(:with_readable).and_yield(StringIO.new)
118+
stub_upload_file_with_stream(file_to_upload, fake_stream)
109119

110120
allow(Polyphemus.instance.logger).to receive(:warn)
111121
expect(Polyphemus.instance.logger).to receive(:warn).with(/Failed to upload to metis/)

polyphemus/workflows/argo/ingestion/workflow.yaml

-12
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,6 @@ spec:
2323
value: "{{workflow.parameters.config_id}}"
2424
- name: version_number
2525
value: "{{workflow.parameters.version_number}}"
26-
- - name: c4-update
27-
templateRef:
28-
name: run-job-template
29-
template: run-job
30-
arguments:
31-
parameters:
32-
- name: job
33-
value: sftp_deposit_uploader
34-
- name: config_id
35-
value: "{{workflow.parameters.config_id}}"
36-
- name: version_number
37-
value: "{{workflow.parameters.version_number}}"
3826
- - name: metis-update
3927
templateRef:
4028
name: run-job-template

0 commit comments

Comments
 (0)