diff --git a/wis2box-management/wis2box/api/__init__.py b/wis2box-management/wis2box/api/__init__.py index bab16b84e..5db7679ad 100644 --- a/wis2box-management/wis2box/api/__init__.py +++ b/wis2box-management/wis2box/api/__init__.py @@ -232,6 +232,28 @@ def delete_collections_by_retention(days: int) -> None: backend.delete_collections_by_retention(days) +def get_plugins(record: dict) -> list: + """ + Get plugins from record + + :param record: `dict` of record + + :returns: `list` of plugins + """ + + plugins = [] + + try: + dm = record['wis2box']['data_mappings'] + for filetype in dm['plugins'].keys(): + for p in dm['plugins'][filetype]: + plugins.append(p['plugin']) + except Exception as e: + LOGGER.info(f"No plugins found for record-id={record['id']} : {e}") + + return plugins + + @click.group() def api(): """API management""" @@ -258,10 +280,15 @@ def setup(ctx, verbosity): except Exception as err: click.echo(f'Issue loading discovery-metadata: {err}') return False + # loop over records and add data-collection when bufr2geojson is used for record in records['features']: metadata_id = record['id'] + plugins = get_plugins(record) + LOGGER.info(f'Plugins used by {metadata_id}: {plugins}') + if 'wis2box.data.bufr2geojson.ObservationDataBUFR2GeoJSON' not in plugins: # noqa + continue if metadata_id not in api_collections: - click.echo(f'Adding collection: {metadata_id}') + click.echo(f'Adding data-collection for: {metadata_id}') from wis2box.data import gcm meta = gcm(record) setup_collection(meta=meta) diff --git a/wis2box-management/wis2box/api/backend/elastic.py b/wis2box-management/wis2box/api/backend/elastic.py index 81220b346..0b6ab4bf8 100644 --- a/wis2box-management/wis2box/api/backend/elastic.py +++ b/wis2box-management/wis2box/api/backend/elastic.py @@ -103,6 +103,57 @@ } } +MAPPINGS_OBS = { + 'properties': { + 'geometry': { + 'type': 'geo_shape' + }, + 'properties': { + 'properties': { + 'name': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + } + }, + 'observationTime': { + 'type': 'date', + 'fields': { + 'raw': {'type': 'keyword'} + } + }, + 'phenomenonTime': { + 'type': 'text' + }, + 'wigos_station_identifier': { + 'type': 'text', + 'fields': { + 'raw': {'type': 'keyword'} + } + }, + 'units': { + 'type': 'text' + }, + 'value': { + 'type': 'float', + 'coerce': True + }, + 'description': { + 'type': 'text' + }, + 'reportId': { + 'type': 'text', + 'fields': { + 'raw': { + 'type': 'keyword' + } + } + }, + } + } + } +} + MAPPINGS_STATIONS = { 'properties': { 'geometry': { @@ -216,8 +267,10 @@ def add_collection(self, collection_id: str) -> dict: if collection_id == 'stations': mappings = MAPPINGS_STATIONS - else: + elif collection_id in ['discovery-metadata', 'messages']: mappings = MAPPINGS + else: + mappings = MAPPINGS_OBS es_index = self.es_id(collection_id) @@ -316,8 +369,11 @@ def gendata(features): '_id': feature['id'], '_source': feature } - - helpers.bulk(self.conn, gendata(items)) + success, errors = helpers.bulk(self.conn, gendata(items), raise_on_error=False) # noqa + if errors: + for error in errors: + LOGGER.error(f"Indexing error: {error}") + raise RuntimeError(f"Upsert failed with {len(errors)} errors") def delete_collection_item(self, collection_id: str, item_id: str) -> str: """ diff --git a/wis2box-management/wis2box/data/__init__.py b/wis2box-management/wis2box/data/__init__.py index d68e82aed..1c5c8c8d1 100644 --- a/wis2box-management/wis2box/data/__init__.py +++ b/wis2box-management/wis2box/data/__init__.py @@ -124,13 +124,13 @@ def gcm(mcf: Union[dict, str]) -> dict: 'id': generated['id'], 'type': 'feature', 'topic_hierarchy': generated['properties']['wmo:topicHierarchy'].replace('origin/a/wis2/', '').replace('/', '.'), # noqa: E501 - 'title': generated['properties']['title'], - 'description': generated['properties']['description'], + 'title': f'bufr2geojson output ({generated["id"]})', + 'description': f'Output published by bufr2geojson for dataset with id={generated["id"]}', # noqa 'keywords': generated['properties']['keywords'], 'bbox': bbox, 'links': generated['links'], 'id_field': 'id', - 'time_field': 'resultTime', + 'time_field': 'observationTime', 'title_field': 'id' } @@ -145,7 +145,7 @@ def add_collection_data(metadata: str): """ meta = gcm(metadata) - + LOGGER.info(f'Adding data-collection for {meta["id"]}') setup_collection(meta=meta) return