|
| 1 | +import logging |
| 2 | + |
| 3 | +import xml.etree.ElementTree as ET |
| 4 | + |
| 5 | +from converter_app.models import File |
| 6 | +from converter_app.readers.helper.reader import Readers |
| 7 | +from converter_app.readers.helper.base import Reader |
| 8 | + |
| 9 | +logger = logging.getLogger(__name__) |
| 10 | + |
| 11 | + |
| 12 | +class XMLReader(Reader): |
| 13 | + """ |
| 14 | + Reader for XML files. |
| 15 | + """ |
| 16 | + |
| 17 | + identifier = 'xml_reader' |
| 18 | + priority = 10 |
| 19 | + |
| 20 | + def __init__(self, file: File): |
| 21 | + super().__init__(file) |
| 22 | + self._file_extensions = ['.xml'] |
| 23 | + self._table = None |
| 24 | + self._data_tables = [] |
| 25 | + self._potential_data_tables = {} |
| 26 | + |
| 27 | + def check(self): |
| 28 | + return self.file.suffix.lower() in self._file_extensions |
| 29 | + |
| 30 | + def _get_tag_name(self, node: ET.Element): |
| 31 | + return node.tag.split('}', 1)[-1] |
| 32 | + |
| 33 | + |
| 34 | + def _filter_data_rows(self, node: ET.Element, text: str, xml_path: str) -> bool: |
| 35 | + text_array = [x for x in text.strip().split(' ') if x != ''] |
| 36 | + shape = self.get_shape(text_array) |
| 37 | + if all(x == 'f' for x in shape) and len(shape) > 1: |
| 38 | + self._data_tables.append(self._generate_data_table(shape, xml_path, text_array, node)) |
| 39 | + return True |
| 40 | + return False |
| 41 | + |
| 42 | + def _generate_data_table(self, shape: list[str], xml_path: str, text_array: list[str], node: ET.Element): |
| 43 | + return { |
| 44 | + 'shape': ''.join(shape), |
| 45 | + 'path': xml_path, |
| 46 | + 'values': [self.as_number(x) for x in text_array], |
| 47 | + 'node': node |
| 48 | + } |
| 49 | + |
| 50 | + def handle_node(self, node: ET.Element, xml_path: str, node_name: str): |
| 51 | + """ |
| 52 | + This method can be overridden to handle special nodes separately. |
| 53 | +
|
| 54 | + :param node: XML node Object |
| 55 | + :param xml_path: Path in global XML-file to this node |
| 56 | + :param node_name: Name of the Node |
| 57 | + """ |
| 58 | + pass |
| 59 | + |
| 60 | + def _add_metadata(self, key: str, val: any, node: ET.Element): |
| 61 | + m = self.float_pattern.fullmatch(val) |
| 62 | + if key in self._potential_data_tables: |
| 63 | + if m and self._potential_data_tables[key] is not None: |
| 64 | + self._potential_data_tables[key]['values'].append(self.as_number(val)) |
| 65 | + self._potential_data_tables[key]['shape'] += 'f' |
| 66 | + else: |
| 67 | + self._potential_data_tables[key] = None |
| 68 | + elif m: |
| 69 | + self._potential_data_tables[key] = self._generate_data_table(['f'], key, [val], node) |
| 70 | + self._table.add_metadata(key, val) |
| 71 | + |
| 72 | + def _read_node(self, node: ET.Element, xml_path: str = '#'): |
| 73 | + for child in node: |
| 74 | + text = child.text |
| 75 | + |
| 76 | + try: |
| 77 | + local_name = self._get_tag_name(child) |
| 78 | + new_path = f'{xml_path}.{local_name}' |
| 79 | + except ValueError: |
| 80 | + new_path = 'Unknown' |
| 81 | + local_name = '' |
| 82 | + |
| 83 | + self.handle_node(child, xml_path, local_name) |
| 84 | + |
| 85 | + if text is not None and not self._filter_data_rows(child, text, new_path): |
| 86 | + self._add_metadata(new_path, text.strip(), node) |
| 87 | + for k, v in child.attrib.items(): |
| 88 | + self._add_metadata(f'{new_path}.{k}', v, node) |
| 89 | + |
| 90 | + self._read_node(child, new_path) |
| 91 | + |
| 92 | + def prepare_tables(self): |
| 93 | + tables = [] |
| 94 | + self._table = self.append_table(tables) |
| 95 | + root = ET.XML(self.file.content) |
| 96 | + self._read_node(root) |
| 97 | + self._merge_tables(self._data_tables, tables) |
| 98 | + |
| 99 | + potential_tables = [x for k, x in self._potential_data_tables.items() if len(x['values']) > 1] |
| 100 | + potential_tables.sort(key= lambda x : len(x['values'])) |
| 101 | + self._merge_tables(potential_tables, tables) |
| 102 | + |
| 103 | + |
| 104 | + return tables |
| 105 | + |
| 106 | + def _merge_tables(self, data_tables: list, tables): |
| 107 | + current_shape = '' |
| 108 | + for table_col in data_tables: |
| 109 | + if current_shape != table_col['shape']: |
| 110 | + current_shape = table_col['shape'] |
| 111 | + self._table = self.append_table(tables) |
| 112 | + self._table['rows'] = [[] for x in range(len(table_col['values']))] |
| 113 | + |
| 114 | + tag_name = self._get_tag_name(table_col['node']) |
| 115 | + self._table.add_metadata(f"COL #{len(self._table['rows'][0])}", tag_name) |
| 116 | + self._table.add_metadata(f"COL #{len(self._table['rows'][0])} XML PATH", table_col['path']) |
| 117 | + |
| 118 | + for i, v in enumerate(table_col['values']): |
| 119 | + self._table['rows'][i].append(v) |
| 120 | + |
| 121 | + for k, v in table_col['node'].attrib.items(): |
| 122 | + self._table.add_metadata(f'{tag_name}.{k}', v) |
| 123 | + |
| 124 | + |
| 125 | +Readers.instance().register(XMLReader) |
0 commit comments