@@ -27,11 +27,17 @@ def list_class_rowtypes() :
27
27
print (f"{ name } : { member .value } " )
28
28
29
29
@staticmethod
30
- def get_contents_from_file_names (files : list ) -> (dict [MetaElementTypes , str ], dict [MetaElementTypes , str ]):
30
+ def get_contents_from_file_names (files : list , csv_encoding : CSVEncoding ,
31
+ content_keys : dict [MetaElementTypes , list ] = None , zf : ZipFile = None ) \
32
+ -> (ContentData , list [ContentData ]):
31
33
"""Find the core content and extension contents from a list of file paths.
32
34
Core content will always be event if present, otherwise, occurrence content
33
35
34
36
:param files: list of files
37
+ :param csv_encoding: delimiter for txt file. Default is comma delimiter txt files if not supplied
38
+ :param content_keys: optional dictionary of MetaElementTypes and key list
39
+ for eg. {MetaElementTypes.OCCURRENCE, ["occurrenceID"]}
40
+ :param zf: Zipfile pointer if using
35
41
:return dict of core content type and file name and dict containing ext content type and file name
36
42
"""
37
43
def derive_type (file_list : list ) -> dict [str , MetaElementTypes ]:
@@ -51,9 +57,23 @@ def derive_type(file_list: list) -> dict[str, MetaElementTypes]:
51
57
core_filename = next (iter (core_file ))
52
58
core_type = core_file [core_filename ]
53
59
ext_files = {k : v for k , v in contents .items () if v != core_type }
54
- return core_file , ext_files
55
60
56
- return None
61
+ core_data = [core_filename ] if not zf else io .TextIOWrapper (zf .open (core_filename ), encoding = "utf-8" )
62
+ core_content = ContentData (data = core_data ,
63
+ type = core_type , csv_encoding = csv_encoding ,
64
+ keys = get_keys (class_type = core_type ,
65
+ override_content_keys = content_keys ))
66
+ ext_content = []
67
+ for ext_file , ext_type in ext_files .items ():
68
+ ext_data = [ext_file ] if not zf else io .TextIOWrapper (zf .open (ext_file ), encoding = "utf-8" )
69
+ ext_content .append (ContentData (data = ext_data ,
70
+ type = ext_type , csv_encoding = csv_encoding ,
71
+ keys = get_keys (class_type = ext_type ,
72
+ override_content_keys = content_keys )))
73
+ return core_content , ext_content
74
+ else :
75
+ raise ValueError ("The core content cannot be determined. Please check filenames against the class type. "
76
+ "Use list_class_rowtypes to print the class types. " )
57
77
58
78
"""Perform various DwCA operations"""
59
79
@staticmethod
@@ -71,23 +91,9 @@ def create_dwca_from_file_list(files: list, output_dwca: Union[str, BytesIO],
71
91
:param content_keys: optional dictionary of MetaElementTypes and key list
72
92
for eg. {MetaElementTypes.OCCURRENCE, ["occurrenceID"]}
73
93
"""
74
- core_content , ext_content_list = DwcaHandler .get_contents_from_file_names (files )
75
- if core_content :
76
- core_filename = next (iter (core_content ))
77
- core_type = core_content [core_filename ]
78
-
79
- core_content = ContentData (data = [core_filename ], type = core_type , csv_encoding = csv_encoding ,
80
- keys = get_keys (class_type = core_type , override_content_keys = content_keys ))
81
- ext_content = []
82
- for ext_file , ext_type in ext_content_list .items ():
83
- ext_content .append (ContentData (data = [ext_file ],
84
- type = ext_type , csv_encoding = csv_encoding ,
85
- keys = get_keys (class_type = ext_type ,
86
- override_content_keys = content_keys )))
87
- DwcaHandler .create_dwca (core_csv = core_content , ext_csv_list = ext_content , output_dwca = output_dwca ,
88
- eml_content = eml_content )
89
- else :
90
- raise ValueError ("The core content cannot be determined. Please check filename in zip file" )
94
+ core_content , ext_content_list = DwcaHandler .get_contents_from_file_names (files = files , csv_encoding = csv_encoding )
95
+ DwcaHandler .create_dwca (core_csv = core_content , ext_csv_list = ext_content_list , output_dwca = output_dwca ,
96
+ eml_content = eml_content )
91
97
92
98
@staticmethod
93
99
def create_dwca_from_zip_content (zip_file : str , output_dwca : Union [str , BytesIO ],
@@ -107,24 +113,10 @@ def create_dwca_from_zip_content(zip_file: str, output_dwca: Union[str, BytesIO]
107
113
"""
108
114
with ZipFile (zip_file , 'r' ) as zf :
109
115
files = zf .namelist ()
110
- core_content , ext_content_list = DwcaHandler .get_contents_from_file_names (files )
111
- if core_content :
112
- core_filename = next (iter (core_content ))
113
- core_type = core_content [core_filename ]
114
- core_content = ContentData (data = io .TextIOWrapper (zf .open (core_filename ), encoding = "utf-8" ),
115
- type = core_type , csv_encoding = csv_encoding ,
116
- keys = get_keys (class_type = core_type ,
117
- override_content_keys = content_keys ))
118
- ext_content = []
119
- for ext_file , ext_type in ext_content_list .items ():
120
- ext_content .append (ContentData (data = io .TextIOWrapper (zf .open (ext_file ), encoding = "utf-8" ),
121
- type = ext_type , csv_encoding = csv_encoding ,
122
- keys = get_keys (class_type = ext_type ,
123
- override_content_keys = content_keys )))
124
- DwcaHandler .create_dwca (core_csv = core_content , ext_csv_list = ext_content , output_dwca = output_dwca ,
125
- eml_content = eml_content )
126
- else :
127
- raise ValueError ("The core content cannot be determined. Please check filename in zip file" )
116
+ core_content , ext_content_list = DwcaHandler .get_contents_from_file_names (files = files , csv_encoding = csv_encoding , zf = zf )
117
+ DwcaHandler .create_dwca (core_csv = core_content , ext_csv_list = ext_content_list , output_dwca = output_dwca ,
118
+ eml_content = eml_content )
119
+ zf .close ()
128
120
129
121
@staticmethod
130
122
def create_dwca (core_csv : ContentData ,
0 commit comments