Skip to content

Commit

Permalink
Change Pairtree to directory storage
Browse files Browse the repository at this point in the history
  • Loading branch information
shsdev committed Nov 24, 2024
1 parent df1f0ca commit a1c4444
Showing 1 changed file with 45 additions and 141 deletions.
186 changes: 45 additions & 141 deletions eatb/pairtree_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@
import pathlib
import re
import shutil
import tarfile
import fnmatch

from itertools import groupby
from itertools import groupby
from eatb import VersionDirFormat
from eatb.checksum import check_transfer
Expand All @@ -13,7 +11,6 @@
from eatb.utils.fileutils import to_safe_filename, list_files_in_dir, copy_file_with_base_directory
from pairtree import PairtreeStorageFactory, ObjectNotFoundException
from eatb import logger
from eatb.checksum import files_identical
from eatb.packaging import ChunkedTarEntryReader
from eatb.utils.reporters import default_reporter

Expand Down Expand Up @@ -75,13 +72,6 @@ def store(self, identifier, source_file_path, progress_reporter=default_reporter
progress_reporter(100)
return next_version


def get_bag_name(self, identifier: str, version: str, bagnr: int):
safe_identifier_name = to_safe_filename(identifier)
bag_dir_name = "b%05d" % int(bagnr)
bag_name = "%s_%s_%s" % (safe_identifier_name, version, bag_dir_name)
return bag_name

def identifier_object_exists(self, identifier):
"""
Verify if an object of the given identifier exists in the repository
Expand All @@ -91,7 +81,7 @@ def identifier_object_exists(self, identifier):
@rtype: boolean
@return: True if the object exists, false otherwise
"""
logger.debug("Looking for object at path: %s/data" % (self.repo_storage_client._id_to_dirpath(identifier)))
logger.debug(f"Looking for object at path: {self.repo_storage_client._id_to_dirpath(identifier)}/data")
return self.repo_storage_client.exists(identifier, os.path.join("data"))

def identifier_version_object_exists(self, identifier, version_num):
Expand Down Expand Up @@ -142,64 +132,78 @@ def curr_version_num(self, identifier):
"""
if not self.identifier_object_exists(identifier):
return -1
version_num = 0
version_num = 1
while self.identifier_version_object_exists(identifier, version_num):
version_num += 1
version_num -= 1
return version_num

def get_object_path(self, identifier):
"""
Get absolute file path of the stored object. If the version number is omitted, the path of the highest version
number is returned.
Get the absolute path of the stored object directory. Verifies that the directory is not empty.
:param identifier: identifier
:param version_num: version number
:return: absolute file path of the stored object
:return: absolute path to the stored object directory
"""
curr_version = self.curr_version(identifier)
dirpath = self.repo_storage_client._id_to_dirpath(identifier)
target_data_directory = os.path.join(dirpath, "data")
target_data_version_directory = os.path.join(target_data_directory, curr_version)
path_to_object = os.path.join(target_data_version_directory, "%s.tar" % to_safe_filename(identifier))
if os.path.exists(path_to_object):
logger.debug("Package file found at: %s" % path_to_object)
return path_to_object
raise ObjectNotFoundException("Package file not found")

def get_chunked_tar_entry_reader(self, identifier: str) -> ChunkedTarEntryReader:
tar_file_path = os.path.join(self.get_object_path(identifier), "%s.tar" % to_safe_filename(identifier))
tar_file = tarfile.open(tar_file_path, 'r')
return ChunkedTarEntryReader(tar_file)
if not os.path.exists(target_data_version_directory):
raise ObjectNotFoundException(f"Version directory not found: {target_data_version_directory}")

# Check if the directory is empty
if not os.listdir(target_data_version_directory): # os.listdir returns an empty list if the directory is empty
raise ObjectNotFoundException(f"Directory is empty: {target_data_version_directory}")

logger.debug("Valid data directory found at: %s", target_data_version_directory)
return target_data_version_directory

# noinspection PyProtectedMember
def latest_version_ip_list(self) -> list:
"""
Get a list of latest version packages from repository storage.
:return: list of latest version packages
Get a list of the latest version directories from repository storage.
:return: list of latest version directories
"""
files = rec_find_files(self.repository_storage_dir)
sortkeyfn = lambda s: s[1]
# Find all directories recursively in the repository storage
directories = rec_find_directories(self.repository_storage_dir)
sortkeyfn = lambda s: s[1] # Sort by version number
tuples = []
for repofile in files:
if repofile.endswith(".tar"):
f, _ = os.path.split(repofile)
version = re.search(r'v[0-9]{5,5}', f).group(0)
repoitem = (repofile, int(re.search(r'\d+', version).group(0)))
tuples.append(repoitem)

for directory in directories:
if "/data/v" in directory:
# Extract version from the path
version_match = re.search(r'v[0-9]{5,5}', directory)
if version_match:
version = version_match.group(0)
repoitem = (directory, int(re.search(r'\d+', version).group(0)))
tuples.append(repoitem)

# Sort by version number (descending order)
tuples.sort(key=sortkeyfn, reverse=True)

# Group by version
items_grouped_by_version = []
for key, valuesiter in groupby(tuples, key=sortkeyfn):
items_grouped_by_version.append(dict(version=key, items=list(v[0] for v in valuesiter)))

lastversionfiles = []
for version_items in items_grouped_by_version:
for item in version_items['items']:
p, f = os.path.split(item)
p2 = os.path.join(self.repository_storage_dir, p[:p.find("/data/v")])
obj_id = self.repo_storage_client._get_id_from_dirpath(p2)
# Get the root directory for the object
p, _ = os.path.split(item)
root_dir = os.path.join(self.repository_storage_dir, p[:p.find("/data/v")])
obj_id = self.repo_storage_client._get_id_from_dirpath(root_dir)

# Avoid duplicates based on object ID
if obj_id not in [x['id'] for x in lastversionfiles]:
lastversionfiles.append({"id": obj_id, "version": version_items['version'], "path": item})
return lastversionfiles
lastversionfiles.append({
"id": obj_id,
"version": version_items['version'],
"path": item
})

return lastversionfiles

# noinspection PyProtectedMember
def get_dir_path_from_id(self, identifier):
Expand All @@ -210,70 +214,6 @@ def get_dir_path_from_id(self, identifier):
"""
return self.repo_storage_client._id_to_dirpath(identifier)

def get_tar_file_path(self, identifier):
curr_version = self.curr_version(identifier)
dirpath = self.repo_storage_client._id_to_dirpath(identifier)
target_data_directory = os.path.join(dirpath, "data")
target_data_version_directory = os.path.join(target_data_directory, curr_version)
bag_name = self.get_bag_name(identifier, curr_version, 1)
target_data_version_asset_directory = os.path.join(target_data_version_directory, "content")
tar_file_path = os.path.join(target_data_version_asset_directory, "%s.tar" % bag_name)
if os.path.exists(tar_file_path):
logger.debug("Package file found at: %s" % tar_file_path)
return tar_file_path
raise ObjectNotFoundException("Package file not found")

def get_object_item_stream(self, identifier, entry, tar_file=None):
"""
Get stream of a representation tar file entry
:param identifier: package identifier
:param entry: entry of the tar file
:tar_file: path to tar file (optional)
:return: chunks iterator of the tar file
"""
tar_file_path = self.get_object_path(identifier)
if os.path.exists(tar_file_path):
logger.debug("tar file found at: %s" % entry)
t = tar_file if tar_file else tarfile.open(tar_file_path, 'r')
logger.debug("Accessing access entry %s" % entry)
try:
inst = ChunkedTarEntryReader(t)
return inst.chunks(entry)
except KeyError:
logger.error('ERROR: Did not find %s in tar archive' % entry)
raise ObjectNotFoundException("Entry not found in repository object")

def trigger_new_version(self, uuid, identifier, config_path_work, storage_directory):
"""
Trigger new version depending on changed files in working directory compared to the data set in storage.
:param storage_directory:
:param config_path_work:
:param uuid: UUID of working directory
:param identifier: Data asset identifier
:return: True, if new version is triggered, False otherwise
"""
working_dir = os.path.join(config_path_work, uuid)
if self.identifier_object_exists(identifier):
version = self.curr_version(identifier)
data_asset_last_version_path = os.path.join(
make_storage_data_directory_path(identifier, storage_directory),
version, to_safe_filename(identifier))
working_distributions_dir = os.path.join(working_dir, self.representations_directory)
if not os.path.exists(working_distributions_dir):
logger.debug("New version is not triggered because working catalogue directory does not exist.")
return False
stored_distributions_dir = os.path.join(data_asset_last_version_path, self.representations_directory)
distribution_files = list_files_in_dir(working_distributions_dir)
for dataset_dir in distribution_files:
dataset_package_file = os.path.join(working_distributions_dir, "%s.tar" % dataset_dir)
dataset_package_stored_file = os.path.join(stored_distributions_dir, "%s.tar" % dataset_dir)
files_ident = files_identical(dataset_package_file, dataset_package_stored_file)
if not files_ident:
logger.debug("New version triggered because hash of dataset packages is not identical")
return True
logger.debug("New version not triggered.")
return False


def update_state(state_xml_file, identifier, version):
ip_state = IpState.from_path(state_xml_file) if os.path.exists(state_xml_file) \
Expand All @@ -283,24 +223,6 @@ def update_state(state_xml_file, identifier, version):
ip_state.write_doc(state_xml_file)


def get_package_from_directory_storage(task_context_path, package_uuid, package_extension,
tl, config_path_storage):
pts = PairtreeStorage(config_path_storage)
parent_object_path = pts.get_object_path(package_uuid)

package_in_dip_work_dir = os.path.join(task_context_path, ("%s.%s" % (package_uuid, package_extension)))
package_source_size = fsize(parent_object_path)
tl.addinfo("Source: %s (%d)" % (parent_object_path, package_source_size))
tl.addinfo("Target: %s" % package_in_dip_work_dir)
total_bytes_read = 0
with open(package_in_dip_work_dir, 'wb') as target_file:
for chunk in FileBinaryDataChunks(parent_object_path, 65536).chunks(total_bytes_read):
target_file.write(chunk)
total_bytes_read += package_source_size
target_file.close()
check_transfer(parent_object_path, package_in_dip_work_dir)


def make_storage_directory_path(identifier, version, config_path_storage):
"""Used for remote (no access to storage backend)"""
pts = PairtreeStorage(config_path_storage)
Expand All @@ -311,21 +233,3 @@ def make_storage_data_directory_path(identifier, config_path_storage):
"""Used for remote (no access to storage backend)"""
pts = PairtreeStorage(config_path_storage)
return os.path.join(pts.get_dir_path_from_id(identifier), "data")


def get_package_from_storage(storage, task_context_path, package_uuid, package_extension, tl):

pts = PairtreeStorage(storage)
parent_object_path = pts.get_object_path(package_uuid)

package_in_dip_work_dir = os.path.join(task_context_path, ("%s.%s" % (package_uuid, package_extension)))
package_source_size = fsize(parent_object_path)
tl.addinfo("Source: %s (%d)" % (parent_object_path, package_source_size))
tl.addinfo("Target: %s" % package_in_dip_work_dir)
total_bytes_read = 0
with open(package_in_dip_work_dir, 'wb') as target_file:
for chunk in FileBinaryDataChunks(parent_object_path, 65536).chunks(total_bytes_read):
target_file.write(chunk)
total_bytes_read += package_source_size
target_file.close()
check_transfer(parent_object_path, package_in_dip_work_dir)

0 comments on commit a1c4444

Please sign in to comment.