Skip to content

Commit d81e1bd

Browse files
committed
use state instead of files for sftp jobs
1 parent 8f7922d commit d81e1bd

11 files changed

+125
-186
lines changed

etna/lib/etna/remote.rb

+1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ def initialize(host:, username:, password: nil, port: 22, root:, **args)
1313
@host = host
1414
@port = port
1515
@root = root
16+
raise "RemoteSSH must have host, username and password" unless @host && @username && @password
1617
end
1718

1819
def ssh

polyphemus/lib/data_eng/clients/sftp_client.rb

+3
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ def initialize(host, username, password, port: 22)
66
@username = username
77
@password = password
88
@port = port
9+
10+
raise "SFTPClient must have host, username and password" unless @host && @username && @password
11+
912
begin
1013
Net::SFTP.start(@host, @username, password: @password, port: @port) do |sftp|
1114
sftp.dir.entries('.')
+2-8
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,3 @@
1-
def build_pipeline_state_dir(path_to_write_files, run_id)
2-
dir_path = File.join(path_to_write_files, run_id)
3-
FileUtils.mkdir_p(dir_path)
4-
dir_path
5-
end
6-
71
def remove_dscolab_prefix(path)
8-
path.gsub('DSCOLAB_', '')
9-
end
2+
path.gsub('DSCOLAB_', '')
3+
end

polyphemus/lib/data_eng/jobs/sftp_deposit_uploader.rb

+11-17
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,12 @@ def workflow_version
2020
config['version_number']
2121
end
2222

23-
def path_to_write_files
24-
config['config']['path_to_write_files']
23+
def magic_string
24+
config['config']['magic_string']
25+
end
26+
27+
def name_regex
28+
Regexp.new("#{magic_string}(-|_)")
2529
end
2630

2731
def deposit_root_path
@@ -57,9 +61,7 @@ def pre(context)
5761
raise "Run #{run_id} not found"
5862
end
5963

60-
context[:files_to_update] = CSV.foreach(run["state"]["files_to_update_path"], headers: true).map do |row|
61-
{ path: row['path'], modified_time: row['modified_time'].to_i }
62-
end
64+
context[:files_to_update] = run["state"]["files_to_update"].map(&:symbolize_keys)
6365

6466
if context[:files_to_update].empty?
6567
logger.info("No new files to upload...")
@@ -89,7 +91,7 @@ def process(context)
8991
end
9092

9193
begin
92-
deposit_path = File.join(deposit_root_path, remove_dscolab_prefix(sftp_path))
94+
deposit_path = File.join(deposit_root_path, sftp_path.gsub(name_regex,''))
9395
remote_ssh.file_upload(deposit_path, file_stream)
9496
rescue Etna::RemoteSSH::RemoteSSHError => e
9597
logger.warn("Failed to upload to deposit host #{deposit_host}: #{deposit_path}. Error: #{e.message}")
@@ -100,23 +102,15 @@ def process(context)
100102

101103
def post(context)
102104
if context[:failed_files].any?
103-
writable_dir = build_pipeline_state_dir(path_to_write_files, run_id)
104-
context[:failed_files_path] = File.join(writable_dir, DEPOSIT_FAILED_FILES_CSV)
105-
106-
logger.warn("Writing failed files to #{context[:failed_files_path]}")
107105
logger.warn("Found #{context[:failed_files].size} failed files")
108106

109-
CSV.open(context[:failed_files_path], "wb") do |csv|
110-
csv << ["path"]
111-
context[:failed_files].each do |path|
112-
csv << [path]
113-
end
107+
context[:failed_files].each do |file_path|
108+
logger.warn(file_path)
114109
end
115110

116111
polyphemus_client.update_run(project_name, run_id, {
117112
state: {
118-
deposit_num_failed_files: context[:failed_files].size,
119-
deposit_failed_files_path: context[:failed_files_path],
113+
deposit_num_failed_files: context[:failed_files].size
120114
},
121115
})
122116
else

polyphemus/lib/data_eng/jobs/sftp_file_discovery.rb

+11-31
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@ class SftpFileDiscoveryJob < Polyphemus::ETLJob
66
include WithEtnaClients
77
include WithLogger
88

9-
SFTP_FILES_TO_UPDATE_CSV = "sftp_files_to_update.csv"
10-
119
private
1210

1311
def project_name
@@ -22,16 +20,16 @@ def workflow_version
2220
config['version_number']
2321
end
2422

25-
def file_regex
26-
config['config']['file_regex']
23+
def magic_string
24+
config['config']['magic_string']
2725
end
2826

29-
def sftp_root_dir
30-
config['config']['sftp_root_dir']
27+
def file_regex
28+
Regexp.new("#{magic_string}(-|_).*")
3129
end
3230

33-
def path_to_write_files
34-
config['config']['path_to_write_files']
31+
def ingest_root_path
32+
config['config']['ingest_root_path']
3533
end
3634

3735
def restart_scan?
@@ -43,7 +41,7 @@ def initial_start_scan_time
4341
end
4442

4543
def override_interval
46-
runtime_config['config']['override_interval'] || nil
44+
runtime_config['config']['override_interval']
4745
end
4846

4947
def sftp_client
@@ -69,27 +67,20 @@ def pre(context)
6967
def process(context)
7068
logger.info("Searching for files from #{context[:start_time]} to #{context[:end_time]}")
7169
files_to_update = sftp_client.search_files(
72-
sftp_root_dir,
70+
ingest_root_path,
7371
file_regex,
7472
context[:start_time],
7573
context[:end_time ]
7674
)
77-
if files_to_update.empty?
78-
context[:num_files_to_update] = 0
79-
logger.info("No files found to update...")
80-
else
81-
context[:num_files_to_update] = files_to_update.size
82-
writable_dir = build_pipeline_state_dir(path_to_write_files, run_id)
83-
context[:files_to_update_path] = write_csv(writable_dir, files_to_update)
84-
logger.info("Found #{files_to_update.size} files to update...")
85-
end
75+
logger.info("Found #{files_to_update.size} files to update...")
76+
context[:files_to_update] = files_to_update
8677
end
8778

8879
def post(context)
8980
polyphemus_client.update_run(project_name, run_id, {
9081
state: {
9182
num_files_to_update: context[:num_files_to_update],
92-
files_to_update_path: context[:files_to_update_path],
83+
files_to_update: context[:files_to_update],
9384
start_time: context[:start_time],
9485
end_time: context[:end_time]
9586
}
@@ -117,15 +108,4 @@ def fetch_last_scan
117108
response["end_time"]
118109
end
119110
end
120-
121-
def write_csv(dir_path, files_to_update)
122-
filepath = File.join(dir_path, SFTP_FILES_TO_UPDATE_CSV)
123-
CSV.open(filepath, "wb") do |csv|
124-
csv << ["path", "modified_time"]
125-
files_to_update.each do |file|
126-
csv << [file[:path], file[:modified_time]]
127-
end
128-
end
129-
filepath
130-
end
131111
end

polyphemus/lib/data_eng/jobs/sftp_metis_uploader.rb

+20-24
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,14 @@ def workflow_version
2323
config['version_number']
2424
end
2525

26+
def magic_string
27+
config['config']['magic_string']
28+
end
29+
30+
def name_regex
31+
Regexp.new("#{magic_string}(-|_)")
32+
end
33+
2634
def bucket_name
2735
config['config']['bucket_name']
2836
end
@@ -35,10 +43,6 @@ def metis_root_path
3543
config['config']['metis_root_path']
3644
end
3745

38-
def path_to_write_files
39-
config['config']['path_to_write_files']
40-
end
41-
4246
def sftp_client
4347
@sftp_client ||= SFTPClient.new(
4448
config["secrets"]["sftp_ingest_host"],
@@ -51,19 +55,17 @@ def sftp_client
5155

5256
def pre(context)
5357
run = polyphemus_client.get_run(project_name, run_id)
54-
unless run
55-
raise "Run #{run_id} not found"
56-
end
57-
context[:files_to_update] = CSV.foreach(run["state"]["files_to_update_path"], headers: true).map do |row|
58-
{ path: row['path'], modified_time: row['modified_time'].to_i }
59-
end
58+
59+
raise "Run #{run_id} not found" unless run
60+
61+
context[:files_to_update] = run["state"]["files_to_update"].map(&:symbolize_keys)
6062

6163
if context[:files_to_update].empty?
6264
logger.info("No new files to upload...")
6365
return false
64-
else
65-
return true
6666
end
67+
68+
return true
6769
end
6870

6971
def process(context)
@@ -93,7 +95,7 @@ def process(context)
9395
metis_uid: metis_uid,
9496
)
9597

96-
metis_path = File.join(metis_root_path, remove_dscolab_prefix(sftp_path))
98+
metis_path = File.join(metis_root_path, sftp_path.gsub(name_regex,''))
9799
uploader.do_upload(
98100
Etna::Clients::Metis::MetisUploadWorkflow::StreamingIOUpload.new(
99101
readable_io: file_stream,
@@ -111,21 +113,15 @@ def process(context)
111113

112114
def post(context)
113115
if context[:failed_files].any?
114-
writable_dir = build_pipeline_state_dir(path_to_write_files, run_id)
115-
context[:failed_files_path] = File.join(writable_dir, METIS_FAILED_FILES_CSV)
116-
117-
logger.warn("Writing failed files to #{context[:failed_files_path]}")
118116
logger.warn("Found #{context[:failed_files].size} failed files")
119-
CSV.open(context[:failed_files_path], "wb") do |csv|
120-
csv << ["path"]
121-
context[:failed_files].each do |path|
122-
csv << [path]
123-
end
117+
118+
context[:failed_files].each do |file_path|
119+
logger.warn(file_path)
124120
end
121+
125122
polyphemus_client.update_run(project_name, run_id, {
126123
state: {
127-
metis_num_failed_files: context[:failed_files].size,
128-
metis_failed_files_path: context[:failed_files_path],
124+
metis_num_failed_files: context[:failed_files].size
129125
},
130126
})
131127
else

polyphemus/lib/data_eng/workflow_manifests/ingestion.rb

+8-6
Original file line numberDiff line numberDiff line change
@@ -10,20 +10,22 @@ def as_json
1010
description: "This loader downloads data from a remote host to Metis or another remote host",
1111
title: "Ingestion Loader",
1212
properties: {
13-
root_dir: { type: 'string' },
14-
file_regex: { type: 'string' },
15-
sftp_root_dir: { type: 'string' },
16-
path_to_write_files: { type: 'string' },
1713
bucket_name: { type: 'string' },
14+
magic_string: { type: 'string' },
15+
ingest_root_path: { type: 'string' },
1816
metis_root_path: { type: 'string' },
1917
deposit_root_path: { type: 'string' }
2018
}
2119
},
2220
runtime_params: {
23-
commit: 'boolean',
2421
initial_start_scan_time: 'integer', # unix timestamp
22+
override_interval: 'integer',
23+
restart_scan: 'boolean'
2524
},
26-
secrets: [:sftp_host, :sftp_user, :sftp_password, :sftp_port],
25+
secrets: [
26+
:sftp_deposit_host, :sftp_deposit_password, :sftp_deposit_user,
27+
:sftp_ingest_host, :sftp_ingest_password, :sftp_ingest_user
28+
],
2729
workflow_path: '/app/workflows/argo/ingestion/workflow.yaml'
2830
}
2931
end

polyphemus/spec/data_eng/sftp_deposit_uploader_spec.rb

+15-21
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,16 @@ def create_job(config, runtime_config)
77
config = {
88
"project_name" => "labors",
99
"secrets" => {
10-
"sftp_host" => "some-sftp-host",
11-
"sftp_user" => "user",
12-
"sftp_password" => "password",
13-
"sftp_port" => "22"
10+
"sftp_ingest_host" => "some-sftp-host",
11+
"sftp_ingest_user" => "user",
12+
"sftp_ingest_password" => "password",
13+
"sftp_deposit_host" => "other-sftp-host",
14+
"sftp_deposit_user" => "user",
15+
"sftp_deposit_password" => "password"
1416
},
1517
"config" => {
16-
"file_regex" => "LABORS(-|_).*",
17-
"sftp_root_dir" => "SSD",
18+
"magic_string" => "LABORS",
19+
"ingest_root_path" => "SSD",
1820
"path_to_write_files" => "/tmp/",
1921
"bucket_name" => "deposit",
2022
"metis_root_path" => "files/some-sftp-host",
@@ -49,24 +51,16 @@ def create_job(config, runtime_config)
4951
run_id: run_id,
5052
config_id: config["config_id"],
5153
version_number: config["version_number"],
52-
state: {files_to_update_path: "/tmp/#{run_id}/#{SftpFileDiscoveryJob::SFTP_FILES_TO_UPDATE_CSV}"},
54+
state: {
55+
files_to_update: sftp_files
56+
},
5357
orchestrator_metadata: {},
5458
output: nil,
5559
created_at: Time.now,
5660
updated_at: Time.now
5761
}
5862
}
5963

60-
def create_files_to_update_csv
61-
FileUtils.mkdir_p("/tmp/#{run_id}")
62-
CSV.open(run_record[:state][:files_to_update_path], "wb") do |csv|
63-
csv << ["path", "modified_time"]
64-
sftp_files.each do |file|
65-
csv << [file[:path], file[:modified_time]]
66-
end
67-
end
68-
end
69-
7064
before do
7165
ENV['TOKEN'] = TEST_TOKEN
7266
ENV['KUBE_ID'] = run_id
@@ -78,7 +72,6 @@ def create_files_to_update_csv
7872
let(:captured_requests) { [] }
7973

8074
before do
81-
create_files_to_update_csv
8275
stub_initial_sftp_connection
8376
stub_initial_ssh_connection
8477
stub_polyphemus_get_run(config["project_name"], run_id, run_record)
@@ -100,15 +93,16 @@ def create_files_to_update_csv
10093
end
10194

10295
it 'fails to upload files' do
96+
allow(Polyphemus.instance.logger).to receive(:warn)
97+
expect(Polyphemus.instance.logger).to receive(:warn).with(/Failed to upload to deposit host other-sftp-host/)
98+
expect(Polyphemus.instance.logger).to receive(:warn).with(/Found 1 failed files/)
99+
expect(Polyphemus.instance.logger).to receive(:warn).with(/SSD.*LABORS_S1.fastq.gz/)
103100
stub_remote_ssh_file_upload(success: false)
104101

105102
job = create_job(config, runtime_config)
106103
context = job.execute
107104

108-
failed_files_path = "/tmp/1234567890/#{SftpDepositUploaderJob::DEPOSIT_FAILED_FILES_CSV}"
109-
expect(File.exist?(failed_files_path)).to be_truthy
110105
expect(captured_requests[0][:state][:deposit_num_failed_files]).to eq(1)
111-
expect(captured_requests[0][:state][:deposit_failed_files_path]).to eq(failed_files_path)
112106
end
113107

114108
end

0 commit comments

Comments
 (0)