Skip to content

Commit

Permalink
first version reorganizing the data-collection for bufr2geojson output
Browse files Browse the repository at this point in the history
  • Loading branch information
maaikelimper committed Nov 27, 2024
1 parent 489b007 commit 412e195
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 8 deletions.
29 changes: 28 additions & 1 deletion wis2box-management/wis2box/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,28 @@ def delete_collections_by_retention(days: int) -> None:
backend.delete_collections_by_retention(days)


def get_plugins(record: dict) -> list:
"""
Get plugins from record
:param record: `dict` of record
:returns: `list` of plugins
"""

plugins = []

try:
dm = record['wis2box']['data_mappings']
for filetype in dm['plugins'].keys():
for p in dm['plugins'][filetype]:
plugins.append(p['plugin'])
except Exception as e:
LOGGER.info(f"No plugins found for record-id={record['id']} : {e}")

return plugins


@click.group()
def api():
"""API management"""
Expand All @@ -258,10 +280,15 @@ def setup(ctx, verbosity):
except Exception as err:
click.echo(f'Issue loading discovery-metadata: {err}')
return False
# loop over records and add data-collection when bufr2geojson is used
for record in records['features']:
metadata_id = record['id']
plugins = get_plugins(record)
LOGGER.info(f'Plugins used by {metadata_id}: {plugins}')
if 'wis2box.data.bufr2geojson.ObservationDataBUFR2GeoJSON' not in plugins: # noqa
continue
if metadata_id not in api_collections:
click.echo(f'Adding collection: {metadata_id}')
click.echo(f'Adding data-collection for: {metadata_id}')
from wis2box.data import gcm
meta = gcm(record)
setup_collection(meta=meta)
Expand Down
62 changes: 59 additions & 3 deletions wis2box-management/wis2box/api/backend/elastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,57 @@
}
}

MAPPINGS_OBS = {
'properties': {
'geometry': {
'type': 'geo_shape'
},
'properties': {
'properties': {
'name': {
'type': 'text',
'fields': {
'raw': {'type': 'keyword'}
}
},
'observationTime': {
'type': 'date',
'fields': {
'raw': {'type': 'keyword'}
}
},
'phenomenonTime': {
'type': 'text'
},
'wigos_station_identifier': {
'type': 'text',
'fields': {
'raw': {'type': 'keyword'}
}
},
'units': {
'type': 'text'
},
'value': {
'type': 'float',
'coerce': True
},
'description': {
'type': 'text'
},
'reportId': {
'type': 'text',
'fields': {
'raw': {
'type': 'keyword'
}
}
},
}
}
}
}

MAPPINGS_STATIONS = {
'properties': {
'geometry': {
Expand Down Expand Up @@ -216,8 +267,10 @@ def add_collection(self, collection_id: str) -> dict:

if collection_id == 'stations':
mappings = MAPPINGS_STATIONS
else:
elif collection_id in ['discovery-metadata', 'messages']:
mappings = MAPPINGS
else:
mappings = MAPPINGS_OBS

es_index = self.es_id(collection_id)

Expand Down Expand Up @@ -316,8 +369,11 @@ def gendata(features):
'_id': feature['id'],
'_source': feature
}

helpers.bulk(self.conn, gendata(items))
success, errors = helpers.bulk(self.conn, gendata(items), raise_on_error=False) # noqa
if errors:
for error in errors:
LOGGER.error(f"Indexing error: {error}")
raise RuntimeError(f"Upsert failed with {len(errors)} errors")

def delete_collection_item(self, collection_id: str, item_id: str) -> str:
"""
Expand Down
8 changes: 4 additions & 4 deletions wis2box-management/wis2box/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,13 @@ def gcm(mcf: Union[dict, str]) -> dict:
'id': generated['id'],
'type': 'feature',
'topic_hierarchy': generated['properties']['wmo:topicHierarchy'].replace('origin/a/wis2/', '').replace('/', '.'), # noqa: E501
'title': generated['properties']['title'],
'description': generated['properties']['description'],
'title': f'bufr2geojson output ({generated["id"]})',
'description': f'Output published by bufr2geojson for dataset with id={generated["id"]}', # noqa
'keywords': generated['properties']['keywords'],
'bbox': bbox,
'links': generated['links'],
'id_field': 'id',
'time_field': 'resultTime',
'time_field': 'observationTime',
'title_field': 'id'
}

Expand All @@ -145,7 +145,7 @@ def add_collection_data(metadata: str):
"""

meta = gcm(metadata)

LOGGER.info(f'Adding data-collection for {meta["id"]}')
setup_collection(meta=meta)

return
Expand Down

0 comments on commit 412e195

Please sign in to comment.