diff --git a/.github/workflows/cicd.yml b/.github/workflows/cicd.yml index 3bf7a999..9a50e28f 100644 --- a/.github/workflows/cicd.yml +++ b/.github/workflows/cicd.yml @@ -16,7 +16,7 @@ jobs: services: elasticsearch_8_svc: - image: docker.elastic.co/elasticsearch/elasticsearch:8.1.3 + image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0 env: cluster.name: stac-cluster node.name: es01 @@ -58,7 +58,7 @@ jobs: # Setup Python (faster than using Python container) - name: Setup Python - uses: actions/setup-python@v4 + uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} - name: Lint code diff --git a/CHANGELOG.md b/CHANGELOG.md index 52d2701b..94bec4de 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,18 +9,39 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Added -- Collection-level Assets to the CollectionSerializer [#148](https://github.com/stac-utils/stac-fastapi-elasticsearch/issues/148) +### Changed + +- Elasticsearch drivers from 7.17.9 to 8.11.0 [#169](https://github.com/stac-utils/stac-fastapi-elasticsearch/pull/169) + +### Fixed + +- Exclude unset fields in search response [#166](https://github.com/stac-utils/stac-fastapi-elasticsearch/issues/166) +- Upgrade stac-fastapi to v2.4.9 [#172](https://github.com/stac-utils/stac-fastapi-elasticsearch/pull/172) + +## [v1.0.0] ### Added +- Collection-level Assets to the CollectionSerializer [#148](https://github.com/stac-utils/stac-fastapi-elasticsearch/issues/148) +- Pagination for /collections - GET all collections - route [#164](https://github.com/stac-utils/stac-fastapi-elasticsearch/pull/164) - Examples folder with example docker setup for running sfes from pip [#147](https://github.com/stac-utils/stac-fastapi-elasticsearch/pull/147) +- GET /search filter extension queries [#163](https://github.com/stac-utils/stac-fastapi-elasticsearch/pull/163) +- Added support for GET /search intersection queries [#158](https://github.com/stac-utils/stac-fastapi-elasticsearch/issues/158) ### Changed + +- Update elasticsearch version from 8.1.3 to 8.10.4 in cicd, gh actions [#164](https://github.com/stac-utils/stac-fastapi-elasticsearch/pull/164) +- Updated core stac-fastapi libraries to 2.4.8 from 2.4.3 [#151](https://github.com/stac-utils/stac-fastapi-elasticsearch/pull/151) +- Use aliases on Elasticsearch indices, add number suffix in index name. [#152](https://github.com/stac-utils/stac-fastapi-elasticsearch/pull/152) + ### Fixed - Corrected the closing of client connections in ES index management functions [#132](https://github.com/stac-utils/stac-fastapi-elasticsearch/issues/132) - Corrected the automatic converstion of float values to int when building Filter Clauses [#135](https://github.com/stac-utils/stac-fastapi-elasticsearch/issues/135) +- Do not index `proj:geometry` field as geo_shape [#154](https://github.com/stac-utils/stac-fastapi-elasticsearch/issues/154) - Remove unsupported characters from Elasticsearch index names [#153](https://github.com/stac-utils/stac-fastapi-elasticsearch/issues/153) +- Fixed GET /search sortby requests [#25](https://github.com/stac-utils/stac-fastapi-elasticsearch/issues/25) + ## [v0.3.0] @@ -34,11 +55,11 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. - Fields Extension [#129](https://github.com/stac-utils/stac-fastapi-elasticsearch/pull/129) - Support for Python 3.11 [#131](https://github.com/stac-utils/stac-fastapi-elasticsearch/pull/131) - ### Changed - Updated core stac-fastapi libraries to 2.4.3 from 2.3.0 [#127](https://github.com/stac-utils/stac-fastapi-elasticsearch/pull/127) + ## [v0.2.0] ### Added @@ -61,6 +82,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. - When using bulk ingest, items will continue to be ingested if any of them fail. Previously, the call would fail immediately if any items failed. + ## [v0.1.0] ### Changed @@ -72,7 +94,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. - Use genexp in execute_search and get_all_collections to return results. - Added db_to_stac serializer to item_collection method in core.py. -[Unreleased]: + +[Unreleased]: +[v1.0.0]: [v0.3.0]: [v0.2.0]: [v0.1.0]: diff --git a/README.md b/README.md index a38ce14a..5b4b7685 100644 --- a/README.md +++ b/README.md @@ -61,7 +61,17 @@ curl -X "POST" "http://localhost:8080/collections" \ ``` Note: this "Collections Transaction" behavior is not part of the STAC API, but may be soon. + + +## Collection pagination + +The collections route handles optional `limit` and `token` parameters. The `links` field that is +returned from the `/collections` route contains a `next` link with the token that can be used to +get the next page of results. +```shell +curl -X "GET" "http://localhost:8080/collections?limit=1&token=example_token" +``` ## Testing diff --git a/VERSION b/VERSION index 9325c3cc..afaf360d 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.3.0 \ No newline at end of file +1.0.0 \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index d17e3af2..db3352fb 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -31,7 +31,7 @@ services: elasticsearch: container_name: es-container - image: docker.elastic.co/elasticsearch/elasticsearch:${ELASTICSEARCH_VERSION:-8.1.3} + image: docker.elastic.co/elasticsearch/elasticsearch:${ELASTICSEARCH_VERSION:-8.11.0} environment: ES_JAVA_OPTS: -Xms512m -Xmx1g volumes: diff --git a/stac_fastapi/elasticsearch/setup.py b/stac_fastapi/elasticsearch/setup.py index 7b0edddb..3106c512 100644 --- a/stac_fastapi/elasticsearch/setup.py +++ b/stac_fastapi/elasticsearch/setup.py @@ -8,15 +8,16 @@ install_requires = [ "fastapi", "attrs", - "pydantic[dotenv]", + "pydantic[dotenv]<2", "stac_pydantic==2.0.*", - "stac-fastapi.types==2.4.3", - "stac-fastapi.api==2.4.3", - "stac-fastapi.extensions==2.4.3", - "elasticsearch[async]==7.17.9", - "elasticsearch-dsl==7.4.1", + "stac-fastapi.types==2.4.9", + "stac-fastapi.api==2.4.9", + "stac-fastapi.extensions==2.4.9", + "elasticsearch[async]==8.11.0", + "elasticsearch-dsl==8.11.0", "pystac[validation]", "uvicorn", + "orjson", "overrides", "starlette", "geojson-pydantic", diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/app.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/app.py index 84eece90..e3c4bc64 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/app.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/app.py @@ -1,8 +1,4 @@ """FastAPI application.""" -from typing import List - -import attr - from stac_fastapi.api.app import StacApi from stac_fastapi.api.models import create_get_request_model, create_post_request_model from stac_fastapi.elasticsearch.config import ElasticsearchSettings @@ -28,52 +24,15 @@ settings = ElasticsearchSettings() session = Session.create_from_settings(settings) - -@attr.s -class FixedSortExtension(SortExtension): - """SortExtension class fixed with correct paths, removing extra forward-slash.""" - - conformance_classes: List[str] = attr.ib( - factory=lambda: ["https://api.stacspec.org/v1.0.0-beta.4/item-search#sort"] - ) - - -@attr.s -class FixedFilterExtension(FilterExtension): - """FilterExtension class fixed with correct paths, removing extra forward-slash.""" - - conformance_classes: List[str] = attr.ib( - default=[ - "https://api.stacspec.org/v1.0.0-rc.1/item-search#filter", - "http://www.opengis.net/spec/ogcapi-features-3/1.0/conf/filter", - "http://www.opengis.net/spec/ogcapi-features-3/1.0/conf/features-filter", - "http://www.opengis.net/spec/cql2/1.0/conf/cql2-text", - "http://www.opengis.net/spec/cql2/1.0/conf/cql2-json", - "http://www.opengis.net/spec/cql2/1.0/conf/basic-cql2", - "http://www.opengis.net/spec/cql2/1.0/conf/basic-spatial-operators", - ] - ) - client = attr.ib(factory=EsAsyncBaseFiltersClient) - - -@attr.s -class FixedQueryExtension(QueryExtension): - """Fixed Query Extension string.""" - - conformance_classes: List[str] = attr.ib( - factory=lambda: ["https://api.stacspec.org/v1.0.0-beta.4/item-search#query"] - ) - - extensions = [ TransactionExtension(client=TransactionsClient(session=session), settings=settings), BulkTransactionExtension(client=BulkTransactionsClient(session=session)), FieldsExtension(), - FixedQueryExtension(), - FixedSortExtension(), + QueryExtension(), + SortExtension(), TokenPaginationExtension(), ContextExtension(), - FixedFilterExtension(), + FilterExtension(client=EsAsyncBaseFiltersClient()), ] post_request_model = create_post_request_model(extensions) diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py index e1630d67..8634d3b9 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py @@ -1,5 +1,6 @@ """API configuration.""" import os +import ssl from typing import Any, Dict, Set from elasticsearch import AsyncElasticsearch, Elasticsearch # type: ignore @@ -7,24 +8,36 @@ def _es_config() -> Dict[str, Any]: + # Determine the scheme (http or https) + use_ssl = os.getenv("ES_USE_SSL", "true").lower() == "true" + scheme = "https" if use_ssl else "http" + + # Configure the hosts parameter with the correct scheme + hosts = [f"{scheme}://{os.getenv('ES_HOST')}:{os.getenv('ES_PORT')}"] + + # Initialize the configuration dictionary config = { - "hosts": [{"host": os.getenv("ES_HOST"), "port": os.getenv("ES_PORT")}], + "hosts": hosts, "headers": {"accept": "application/vnd.elasticsearch+json; compatible-with=7"}, - "use_ssl": True, - "verify_certs": True, } - if (u := os.getenv("ES_USER")) and (p := os.getenv("ES_PASS")): - config["http_auth"] = (u, p) + # Explicitly exclude SSL settings when not using SSL + if not use_ssl: + return config - if (v := os.getenv("ES_USE_SSL")) and v == "false": - config["use_ssl"] = False + # Include SSL settings if using https + config["ssl_version"] = ssl.TLSVersion.TLSv1_3 # type: ignore + config["verify_certs"] = os.getenv("ES_VERIFY_CERTS", "true").lower() != "false" # type: ignore - if (v := os.getenv("ES_VERIFY_CERTS")) and v == "false": - config["verify_certs"] = False + # Include CA Certificates if verifying certs + if config["verify_certs"]: + config["ca_certs"] = os.getenv( + "CURL_CA_BUNDLE", "/etc/ssl/certs/ca-certificates.crt" + ) - if v := os.getenv("CURL_CA_BUNDLE"): - config["ca_certs"] = v + # Handle authentication + if (u := os.getenv("ES_USER")) and (p := os.getenv("ES_PASS")): + config["http_auth"] = (u, p) return config diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py index 1342b653..64ad6cf1 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py @@ -1,19 +1,22 @@ """Item crud client.""" -import json import logging +import re +from base64 import urlsafe_b64encode from datetime import datetime as datetime_type from datetime import timezone from typing import Any, Dict, List, Optional, Set, Type, Union -from urllib.parse import urljoin +from urllib.parse import unquote_plus, urljoin import attr +import orjson import stac_pydantic -from fastapi import HTTPException +from fastapi import HTTPException, Request from overrides import overrides from pydantic import ValidationError +from pygeofilter.backends.cql2_json import to_cql2 +from pygeofilter.parsers.cql2_text import parse as parse_cql2_text from stac_pydantic.links import Relations from stac_pydantic.shared import MimeTypes -from starlette.requests import Request from stac_fastapi.elasticsearch import serializers from stac_fastapi.elasticsearch.config import ElasticsearchSettings @@ -23,6 +26,7 @@ from stac_fastapi.elasticsearch.session import Session from stac_fastapi.extensions.third_party.bulk_transactions import ( BaseBulkTransactionsClient, + BulkTransactionMethod, Items, ) from stac_fastapi.types import stac as stac_types @@ -80,30 +84,58 @@ async def all_collections(self, **kwargs) -> Collections: Raises: Exception: If any error occurs while reading the collections from the database. """ + request: Request = kwargs["request"] base_url = str(kwargs["request"].base_url) + limit = ( + int(request.query_params["limit"]) + if "limit" in request.query_params + else 10 + ) + token = ( + request.query_params["token"] if "token" in request.query_params else None + ) + + hits = await self.database.get_all_collections(limit=limit, token=token) + + next_search_after = None + next_link = None + if len(hits) == limit: + last_hit = hits[-1] + next_search_after = last_hit["sort"] + next_token = urlsafe_b64encode( + ",".join(map(str, next_search_after)).encode() + ).decode() + paging_links = PagingLinks(next=next_token, request=request) + next_link = paging_links.link_next() + + links = [ + { + "rel": Relations.root.value, + "type": MimeTypes.json, + "href": base_url, + }, + { + "rel": Relations.parent.value, + "type": MimeTypes.json, + "href": base_url, + }, + { + "rel": Relations.self.value, + "type": MimeTypes.json, + "href": urljoin(base_url, "collections"), + }, + ] + + if next_link: + links.append(next_link) + return Collections( collections=[ - self.collection_serializer.db_to_stac(c, base_url=base_url) - for c in await self.database.get_all_collections() - ], - links=[ - { - "rel": Relations.root.value, - "type": MimeTypes.json, - "href": base_url, - }, - { - "rel": Relations.parent.value, - "type": MimeTypes.json, - "href": base_url, - }, - { - "rel": Relations.self.value, - "type": MimeTypes.json, - "href": urljoin(base_url, "collections"), - }, + self.collection_serializer.db_to_stac(c["_source"], base_url=base_url) + for c in hits ], + links=links, ) @overrides @@ -274,9 +306,9 @@ def _return_date(interval_str): return {"lte": end_date, "gte": start_date} - @overrides async def get_search( self, + request: Request, collections: Optional[List[str]] = None, ids: Optional[List[str]] = None, bbox: Optional[List[NumType]] = None, @@ -286,8 +318,9 @@ async def get_search( token: Optional[str] = None, fields: Optional[List[str]] = None, sortby: Optional[str] = None, - # filter: Optional[str] = None, # todo: requires fastapi > 2.3 unreleased - # filter_lang: Optional[str] = None, # todo: requires fastapi > 2.3 unreleased + intersects: Optional[str] = None, + filter: Optional[str] = None, + filter_lang: Optional[str] = None, **kwargs, ) -> ItemCollection: """Get search results from the database. @@ -302,6 +335,7 @@ async def get_search( token (Optional[str]): Access token to use when searching the catalog. fields (Optional[List[str]]): Fields to include or exclude from the results. sortby (Optional[str]): Sorting options for the results. + intersects (Optional[str]): GeoJSON geometry to search in. kwargs: Additional parameters to be passed to the API. Returns: @@ -316,30 +350,42 @@ async def get_search( "bbox": bbox, "limit": limit, "token": token, - "query": json.loads(query) if query else query, + "query": orjson.loads(query) if query else query, } + # this is borrowed from stac-fastapi-pgstac + # Kludgy fix because using factory does not allow alias for filter-lan + query_params = str(request.query_params) + if filter_lang is None: + match = re.search(r"filter-lang=([a-z0-9-]+)", query_params, re.IGNORECASE) + if match: + filter_lang = match.group(1) + if datetime: base_args["datetime"] = datetime + if intersects: + base_args["intersects"] = orjson.loads(unquote_plus(intersects)) + if sortby: - # https://github.com/radiantearth/stac-spec/tree/master/api-spec/extensions/sort#http-get-or-post-form sort_param = [] for sort in sortby: sort_param.append( { "field": sort[1:], - "direction": "asc" if sort[0] == "+" else "desc", + "direction": "desc" if sort[0] == "-" else "asc", } ) + print(sort_param) base_args["sortby"] = sort_param - # todo: requires fastapi > 2.3 unreleased - # if filter: - # if filter_lang == "cql2-text": - # base_args["filter-lang"] = "cql2-json" - # base_args["filter"] = orjson.loads(to_cql2(parse_cql2_text(filter))) - # print(f'>>> {base_args["filter"]}') + if filter: + if filter_lang == "cql2-text": + base_args["filter-lang"] = "cql2-json" + base_args["filter"] = orjson.loads(to_cql2(parse_cql2_text(filter))) + else: + base_args["filter-lang"] = "cql2-json" + base_args["filter"] = orjson.loads(unquote_plus(filter)) if fields: includes = set() @@ -358,13 +404,12 @@ async def get_search( search_request = self.post_request_model(**base_args) except ValidationError: raise HTTPException(status_code=400, detail="Invalid parameters provided") - resp = await self.post_search(search_request, request=kwargs["request"]) + resp = await self.post_search(search_request=search_request, request=request) return resp - @overrides async def post_search( - self, search_request: BaseSearchPostRequest, **kwargs + self, search_request: BaseSearchPostRequest, request: Request ) -> ItemCollection: """ Perform a POST search on the catalog. @@ -379,7 +424,6 @@ async def post_search( Raises: HTTPException: If there is an error with the cql2_json filter. """ - request: Request = kwargs["request"] base_url = str(request.base_url) search = self.database.make_search() @@ -466,7 +510,9 @@ async def post_search( filter_kwargs = search_request.fields.filter_fields items = [ - json.loads(stac_pydantic.Item(**feat).json(**filter_kwargs)) + orjson.loads( + stac_pydantic.Item(**feat).json(**filter_kwargs, exclude_unset=True) + ) for feat in items ] @@ -523,7 +569,7 @@ async def create_item( if item["type"] == "FeatureCollection": bulk_client = BulkTransactionsClient() processed_items = [ - bulk_client.preprocess_item(item, base_url) for item in item["features"] # type: ignore + bulk_client.preprocess_item(item, base_url, BulkTransactionMethod.INSERT) for item in item["features"] # type: ignore ] await self.database.bulk_async( @@ -682,17 +728,23 @@ def __attrs_post_init__(self): settings = ElasticsearchSettings() self.client = settings.create_client - def preprocess_item(self, item: stac_types.Item, base_url) -> stac_types.Item: + def preprocess_item( + self, item: stac_types.Item, base_url, method: BulkTransactionMethod + ) -> stac_types.Item: """Preprocess an item to match the data model. Args: item: The item to preprocess. base_url: The base URL of the request. + method: The bulk transaction method. Returns: The preprocessed item. """ - return self.database.sync_prep_create_item(item=item, base_url=base_url) + exist_ok = method == BulkTransactionMethod.UPSERT + return self.database.sync_prep_create_item( + item=item, base_url=base_url, exist_ok=exist_ok + ) @overrides def bulk_item_insert( @@ -715,7 +767,8 @@ def bulk_item_insert( base_url = "" processed_items = [ - self.preprocess_item(item, base_url) for item in items.items.values() + self.preprocess_item(item, base_url, items.method) + for item in items.items.values() ] # not a great way to get the collection_id-- should be part of the method signature diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py index d1190243..35430e7d 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py @@ -39,7 +39,7 @@ ":", } -DEFAULT_INDICES = f"*,-*kibana*,-{COLLECTIONS_INDEX}" +ITEM_INDICES = f"{ITEMS_INDEX_PREFIX}*,-*kibana*,-{COLLECTIONS_INDEX}*" DEFAULT_SORT = { "properties.datetime": {"order": "desc"}, @@ -87,7 +87,7 @@ { "proj_geometry": { "match": "proj:geometry", - "mapping": {"type": "geo_shape"}, + "mapping": {"type": "object", "enabled": False}, } }, { @@ -164,23 +164,25 @@ def indices(collection_ids: Optional[List[str]]) -> str: A string of comma-separated index names. If `collection_ids` is None, returns the default indices. """ if collection_ids is None: - return DEFAULT_INDICES + return ITEM_INDICES else: return ",".join([index_by_collection_id(c) for c in collection_ids]) async def create_collection_index() -> None: - """Create the index for Collections in Elasticsearch. + """ + Create the index for a Collection. + + Returns: + None - This function creates the Elasticsearch index for the `Collections` with the predefined mapping. - If the index already exists, the function ignores the error and continues execution. """ client = AsyncElasticsearchSettings().create_client - await client.indices.create( - index=COLLECTIONS_INDEX, + await client.options(ignore_status=400).indices.create( + index=f"{COLLECTIONS_INDEX}-000001", + aliases={COLLECTIONS_INDEX: {}}, mappings=ES_COLLECTIONS_MAPPINGS, - ignore=400, # ignore 400 already exists code ) await client.close() @@ -197,12 +199,13 @@ async def create_item_index(collection_id: str): """ client = AsyncElasticsearchSettings().create_client + index_name = index_by_collection_id(collection_id) - await client.indices.create( - index=index_by_collection_id(collection_id), + await client.options(ignore_status=400).indices.create( + index=f"{index_by_collection_id(collection_id)}-000001", + aliases={index_name: {}}, mappings=ES_ITEMS_MAPPINGS, settings=ES_ITEMS_SETTINGS, - ignore=400, # ignore 400 already exists code ) await client.close() @@ -215,7 +218,14 @@ async def delete_item_index(collection_id: str): """ client = AsyncElasticsearchSettings().create_client - await client.indices.delete(index=index_by_collection_id(collection_id)) + name = index_by_collection_id(collection_id) + resolved = await client.indices.resolve_index(name=name) + if "aliases" in resolved and resolved["aliases"]: + [alias] = resolved["aliases"] + await client.indices.delete_alias(index=alias["indices"], name=alias["name"]) + await client.indices.delete(index=alias["indices"]) + else: + await client.indices.delete(index=name) await client.close() @@ -295,21 +305,34 @@ class DatabaseLogic: """CORE LOGIC""" - async def get_all_collections(self) -> Iterable[Dict[str, Any]]: + async def get_all_collections( + self, token: Optional[str], limit: int + ) -> Iterable[Dict[str, Any]]: """Retrieve a list of all collections from the database. + Args: + token (Optional[str]): The token used to return the next set of results. + limit (int): Number of results to return + Returns: collections (Iterable[Dict[str, Any]]): A list of dictionaries containing the source data for each collection. Notes: The collections are retrieved from the Elasticsearch database using the `client.search` method, - with the `COLLECTIONS_INDEX` as the target index and `size=1000` to retrieve up to 1000 records. + with the `COLLECTIONS_INDEX` as the target index and `size=limit` to retrieve records. The result is a generator of dictionaries containing the source data for each collection. """ - # https://github.com/stac-utils/stac-fastapi-elasticsearch/issues/65 - # collections should be paginated, but at least return more than the default 10 for now - collections = await self.client.search(index=COLLECTIONS_INDEX, size=1000) - return (c["_source"] for c in collections["hits"]["hits"]) + search_after = None + if token: + search_after = urlsafe_b64decode(token.encode()).decode().split(",") + collections = await self.client.search( + index=COLLECTIONS_INDEX, + search_after=search_after, + size=limit, + sort={"id": {"order": "asc"}}, + ) + hits = collections["hits"]["hits"] + return hits async def get_one_item(self, collection_id: str, item_id: str) -> Dict: """Retrieve a single item from the database. @@ -568,13 +591,16 @@ async def check_collection_exists(self, collection_id: str): if not await self.client.exists(index=COLLECTIONS_INDEX, id=collection_id): raise NotFoundError(f"Collection {collection_id} does not exist") - async def prep_create_item(self, item: Item, base_url: str) -> Item: + async def prep_create_item( + self, item: Item, base_url: str, exist_ok: bool = False + ) -> Item: """ Preps an item for insertion into the database. Args: item (Item): The item to be prepped for insertion. base_url (str): The base URL used to create the item's self URL. + exist_ok (bool): Indicates whether the item can exist already. Returns: Item: The prepped item. @@ -585,7 +611,7 @@ async def prep_create_item(self, item: Item, base_url: str) -> Item: """ await self.check_collection_exists(collection_id=item["collection"]) - if await self.client.exists( + if not exist_ok and await self.client.exists( index=index_by_collection_id(item["collection"]), id=mk_item_id(item["id"], item["collection"]), ): @@ -595,17 +621,20 @@ async def prep_create_item(self, item: Item, base_url: str) -> Item: return self.item_serializer.stac_to_db(item, base_url) - def sync_prep_create_item(self, item: Item, base_url: str) -> Item: + def sync_prep_create_item( + self, item: Item, base_url: str, exist_ok: bool = False + ) -> Item: """ Prepare an item for insertion into the database. This method performs pre-insertion preparation on the given `item`, such as checking if the collection the item belongs to exists, - and verifying that an item with the same ID does not already exist in the database. + and optionally verifying that an item with the same ID does not already exist in the database. Args: item (Item): The item to be inserted into the database. base_url (str): The base URL used for constructing URLs for the item. + exist_ok (bool): Indicates whether the item can exist already. Returns: Item: The item after preparation is done. @@ -619,7 +648,7 @@ def sync_prep_create_item(self, item: Item, base_url: str) -> Item: if not self.sync_client.exists(index=COLLECTIONS_INDEX, id=collection_id): raise NotFoundError(f"Collection {collection_id} does not exist") - if self.sync_client.exists( + if not exist_ok and self.sync_client.exists( index=index_by_collection_id(collection_id), id=mk_item_id(item_id, collection_id), ): @@ -820,14 +849,11 @@ async def bulk_async( `mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to True, the index is refreshed after the bulk insert. The function does not return any value. """ - await asyncio.get_event_loop().run_in_executor( - None, - lambda: helpers.bulk( - self.sync_client, - mk_actions(collection_id, processed_items), - refresh=refresh, - raise_on_error=False, - ), + await helpers.async_bulk( + self.client, + mk_actions(collection_id, processed_items), + refresh=refresh, + raise_on_error=False, ) def bulk_sync( @@ -858,7 +884,7 @@ def bulk_sync( async def delete_items(self) -> None: """Danger. this is only for tests.""" await self.client.delete_by_query( - index=DEFAULT_INDICES, + index=ITEM_INDICES, body={"query": {"match_all": {}}}, wait_for_completion=True, ) diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/version.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/version.py index 68999e11..1eeef171 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/version.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/version.py @@ -1,2 +1,2 @@ """library version.""" -__version__ = "0.3.0" +__version__ = "1.0.0" diff --git a/stac_fastapi/elasticsearch/tests/api/test_api.py b/stac_fastapi/elasticsearch/tests/api/test_api.py index 7dbf3996..74f0bb55 100644 --- a/stac_fastapi/elasticsearch/tests/api/test_api.py +++ b/stac_fastapi/elasticsearch/tests/api/test_api.py @@ -2,6 +2,8 @@ import uuid from datetime import datetime, timedelta +import pytest + from ..conftest import create_collection, create_item ROUTES = { @@ -31,17 +33,20 @@ } +@pytest.mark.asyncio async def test_post_search_content_type(app_client, ctx): params = {"limit": 1} resp = await app_client.post("/search", json=params) assert resp.headers["content-type"] == "application/geo+json" +@pytest.mark.asyncio async def test_get_search_content_type(app_client, ctx): resp = await app_client.get("/search") assert resp.headers["content-type"] == "application/geo+json" +@pytest.mark.asyncio async def test_api_headers(app_client): resp = await app_client.get("/api") assert ( @@ -50,11 +55,13 @@ async def test_api_headers(app_client): assert resp.status_code == 200 +@pytest.mark.asyncio async def test_router(app): api_routes = set([f"{list(route.methods)[0]} {route.path}" for route in app.routes]) assert len(api_routes - ROUTES) == 0 +@pytest.mark.asyncio async def test_app_transaction_extension(app_client, ctx): item = copy.deepcopy(ctx.item) item["id"] = str(uuid.uuid4()) @@ -64,6 +71,7 @@ async def test_app_transaction_extension(app_client, ctx): await app_client.delete(f"/collections/{item['collection']}/items/{item['id']}") +@pytest.mark.asyncio async def test_app_search_response(app_client, ctx): resp = await app_client.get("/search", params={"ids": ["test-item"]}) assert resp.status_code == 200 @@ -75,6 +83,7 @@ async def test_app_search_response(app_client, ctx): assert resp_json.get("stac_extensions") is None +@pytest.mark.asyncio async def test_app_context_extension(app_client, ctx, txn_client): test_item = ctx.item test_item["id"] = "test-item-2" @@ -108,6 +117,7 @@ async def test_app_context_extension(app_client, ctx, txn_client): assert matched == 1 +@pytest.mark.asyncio async def test_app_fields_extension(app_client, ctx, txn_client): resp = await app_client.get("/search", params={"collections": ["test-collection"]}) assert resp.status_code == 200 @@ -115,6 +125,7 @@ async def test_app_fields_extension(app_client, ctx, txn_client): assert list(resp_json["features"][0]["properties"]) == ["datetime"] +@pytest.mark.asyncio async def test_app_fields_extension_query(app_client, ctx, txn_client): resp = await app_client.post( "/search", @@ -128,6 +139,7 @@ async def test_app_fields_extension_query(app_client, ctx, txn_client): assert list(resp_json["features"][0]["properties"]) == ["datetime", "proj:epsg"] +@pytest.mark.asyncio async def test_app_fields_extension_no_properties_get(app_client, ctx, txn_client): resp = await app_client.get( "/search", params={"collections": ["test-collection"], "fields": "-properties"} @@ -137,6 +149,7 @@ async def test_app_fields_extension_no_properties_get(app_client, ctx, txn_clien assert "properties" not in resp_json["features"][0] +@pytest.mark.asyncio async def test_app_fields_extension_no_properties_post(app_client, ctx, txn_client): resp = await app_client.post( "/search", @@ -150,6 +163,24 @@ async def test_app_fields_extension_no_properties_post(app_client, ctx, txn_clie assert "properties" not in resp_json["features"][0] +@pytest.mark.asyncio +async def test_app_fields_extension_no_null_fields(app_client, ctx, txn_client): + resp = await app_client.get("/search", params={"collections": ["test-collection"]}) + assert resp.status_code == 200 + resp_json = resp.json() + # check if no null fields: https://github.com/stac-utils/stac-fastapi-elasticsearch/issues/166 + for feature in resp_json["features"]: + # assert "bbox" not in feature["geometry"] + for link in feature["links"]: + assert all(a not in link or link[a] is not None for a in ("title", "asset")) + for asset in feature["assets"]: + assert all( + a not in asset or asset[a] is not None + for a in ("start_datetime", "created") + ) + + +@pytest.mark.asyncio async def test_app_fields_extension_return_all_properties(app_client, ctx, txn_client): item = ctx.item resp = await app_client.get( @@ -166,6 +197,7 @@ async def test_app_fields_extension_return_all_properties(app_client, ctx, txn_c assert feature["properties"][expected_prop] == expected_value +@pytest.mark.asyncio async def test_app_query_extension_gt(app_client, ctx): params = {"query": {"proj:epsg": {"gt": ctx.item["properties"]["proj:epsg"]}}} resp = await app_client.post("/search", json=params) @@ -174,6 +206,7 @@ async def test_app_query_extension_gt(app_client, ctx): assert len(resp_json["features"]) == 0 +@pytest.mark.asyncio async def test_app_query_extension_gte(app_client, ctx): params = {"query": {"proj:epsg": {"gte": ctx.item["properties"]["proj:epsg"]}}} resp = await app_client.post("/search", json=params) @@ -182,21 +215,97 @@ async def test_app_query_extension_gte(app_client, ctx): assert len(resp.json()["features"]) == 1 +@pytest.mark.asyncio async def test_app_query_extension_limit_lt0(app_client): assert (await app_client.post("/search", json={"limit": -1})).status_code == 400 +@pytest.mark.asyncio async def test_app_query_extension_limit_gt10000(app_client): - assert (await app_client.post("/search", json={"limit": 10001})).status_code == 400 + resp = await app_client.post("/search", json={"limit": 10001}) + assert resp.status_code == 200 + assert resp.json()["context"]["limit"] == 10000 +@pytest.mark.asyncio async def test_app_query_extension_limit_10000(app_client): params = {"limit": 10000} resp = await app_client.post("/search", json=params) assert resp.status_code == 200 -async def test_app_sort_extension(app_client, txn_client, ctx): +@pytest.mark.asyncio +async def test_app_sort_extension_get_asc(app_client, txn_client, ctx): + first_item = ctx.item + item_date = datetime.strptime( + first_item["properties"]["datetime"], "%Y-%m-%dT%H:%M:%SZ" + ) + + second_item = dict(first_item) + second_item["id"] = "another-item" + another_item_date = item_date - timedelta(days=1) + second_item["properties"]["datetime"] = another_item_date.strftime( + "%Y-%m-%dT%H:%M:%SZ" + ) + await create_item(txn_client, second_item) + + resp = await app_client.get("/search?sortby=+properties.datetime") + assert resp.status_code == 200 + resp_json = resp.json() + assert resp_json["features"][1]["id"] == first_item["id"] + assert resp_json["features"][0]["id"] == second_item["id"] + + +@pytest.mark.asyncio +async def test_app_sort_extension_get_desc(app_client, txn_client, ctx): + first_item = ctx.item + item_date = datetime.strptime( + first_item["properties"]["datetime"], "%Y-%m-%dT%H:%M:%SZ" + ) + + second_item = dict(first_item) + second_item["id"] = "another-item" + another_item_date = item_date - timedelta(days=1) + second_item["properties"]["datetime"] = another_item_date.strftime( + "%Y-%m-%dT%H:%M:%SZ" + ) + await create_item(txn_client, second_item) + + resp = await app_client.get("/search?sortby=-properties.datetime") + assert resp.status_code == 200 + resp_json = resp.json() + assert resp_json["features"][0]["id"] == first_item["id"] + assert resp_json["features"][1]["id"] == second_item["id"] + + +@pytest.mark.asyncio +async def test_app_sort_extension_post_asc(app_client, txn_client, ctx): + first_item = ctx.item + item_date = datetime.strptime( + first_item["properties"]["datetime"], "%Y-%m-%dT%H:%M:%SZ" + ) + + second_item = dict(first_item) + second_item["id"] = "another-item" + another_item_date = item_date - timedelta(days=1) + second_item["properties"]["datetime"] = another_item_date.strftime( + "%Y-%m-%dT%H:%M:%SZ" + ) + await create_item(txn_client, second_item) + + params = { + "collections": [first_item["collection"]], + "sortby": [{"field": "properties.datetime", "direction": "asc"}], + } + resp = await app_client.post("/search", json=params) + assert resp.status_code == 200 + resp_json = resp.json() + assert resp_json["features"][1]["id"] == first_item["id"] + assert resp_json["features"][0]["id"] == second_item["id"] + + +@pytest.mark.asyncio +async def test_app_sort_extension_post_desc(app_client, txn_client, ctx): first_item = ctx.item item_date = datetime.strptime( first_item["properties"]["datetime"], "%Y-%m-%dT%H:%M:%SZ" @@ -221,6 +330,7 @@ async def test_app_sort_extension(app_client, txn_client, ctx): assert resp_json["features"][1]["id"] == second_item["id"] +@pytest.mark.asyncio async def test_search_invalid_date(app_client, ctx): params = { "datetime": "2020-XX-01/2020-10-30", @@ -231,7 +341,30 @@ async def test_search_invalid_date(app_client, ctx): assert resp.status_code == 400 -async def test_search_point_intersects(app_client, ctx): +@pytest.mark.asyncio +async def test_search_point_intersects_get(app_client, ctx): + resp = await app_client.get( + '/search?intersects={"type":"Point","coordinates":[150.04,-33.14]}' + ) + + assert resp.status_code == 200 + resp_json = resp.json() + assert len(resp_json["features"]) == 1 + + +@pytest.mark.asyncio +async def test_search_polygon_intersects_get(app_client, ctx): + resp = await app_client.get( + '/search?intersects={"type":"Polygon","coordinates":[[[149.04, -34.14],[149.04, -32.14],[151.04, -32.14],[151.04, -34.14],[149.04, -34.14]]]}' + ) + + assert resp.status_code == 200 + resp_json = resp.json() + assert len(resp_json["features"]) == 1 + + +@pytest.mark.asyncio +async def test_search_point_intersects_post(app_client, ctx): point = [150.04, -33.14] intersects = {"type": "Point", "coordinates": point} @@ -246,6 +379,7 @@ async def test_search_point_intersects(app_client, ctx): assert len(resp_json["features"]) == 1 +@pytest.mark.asyncio async def test_search_point_does_not_intersect(app_client, ctx): point = [15.04, -3.14] intersects = {"type": "Point", "coordinates": point} @@ -261,6 +395,7 @@ async def test_search_point_does_not_intersect(app_client, ctx): assert len(resp_json["features"]) == 0 +@pytest.mark.asyncio async def test_datetime_non_interval(app_client, ctx): dt_formats = [ "2020-02-12T12:30:22+00:00", @@ -282,6 +417,7 @@ async def test_datetime_non_interval(app_client, ctx): assert resp_json["features"][0]["properties"]["datetime"][0:19] == dt[0:19] +@pytest.mark.asyncio async def test_bbox_3d(app_client, ctx): australia_bbox = [106.343365, -47.199523, 0.1, 168.218365, -19.437288, 0.1] params = { @@ -294,6 +430,7 @@ async def test_bbox_3d(app_client, ctx): assert len(resp_json["features"]) == 1 +@pytest.mark.asyncio async def test_search_line_string_intersects(app_client, ctx): line = [[150.04, -33.14], [150.22, -33.89]] intersects = {"type": "LineString", "coordinates": line} diff --git a/stac_fastapi/elasticsearch/tests/clients/test_elasticsearch.py b/stac_fastapi/elasticsearch/tests/clients/test_elasticsearch.py index e46d4d1f..3da8f86d 100644 --- a/stac_fastapi/elasticsearch/tests/clients/test_elasticsearch.py +++ b/stac_fastapi/elasticsearch/tests/clients/test_elasticsearch.py @@ -11,6 +11,7 @@ from ..conftest import MockRequest, create_item +@pytest.mark.asyncio async def test_create_collection(app_client, ctx, core_client, txn_client): in_coll = deepcopy(ctx.collection) in_coll["id"] = str(uuid.uuid4()) @@ -20,6 +21,7 @@ async def test_create_collection(app_client, ctx, core_client, txn_client): await txn_client.delete_collection(in_coll["id"]) +@pytest.mark.asyncio async def test_create_collection_already_exists(app_client, ctx, txn_client): data = deepcopy(ctx.collection) @@ -32,6 +34,7 @@ async def test_create_collection_already_exists(app_client, ctx, txn_client): await txn_client.delete_collection(data["id"]) +@pytest.mark.asyncio async def test_update_collection( core_client, txn_client, @@ -49,6 +52,7 @@ async def test_update_collection( await txn_client.delete_collection(data["id"]) +@pytest.mark.asyncio async def test_delete_collection( core_client, txn_client, @@ -63,6 +67,7 @@ async def test_delete_collection( await core_client.get_collection(data["id"], request=MockRequest) +@pytest.mark.asyncio async def test_get_collection( core_client, txn_client, @@ -76,6 +81,7 @@ async def test_get_collection( await txn_client.delete_collection(data["id"]) +@pytest.mark.asyncio async def test_get_item(app_client, ctx, core_client): got_item = await core_client.get_item( item_id=ctx.item["id"], @@ -86,6 +92,7 @@ async def test_get_item(app_client, ctx, core_client): assert got_item["collection"] == ctx.item["collection"] +@pytest.mark.asyncio async def test_get_collection_items(app_client, ctx, core_client, txn_client): coll = ctx.collection num_of_items_to_create = 5 @@ -106,6 +113,7 @@ async def test_get_collection_items(app_client, ctx, core_client, txn_client): assert item["collection"] == coll["id"] +@pytest.mark.asyncio async def test_create_item(ctx, core_client, txn_client): resp = await core_client.get_item( ctx.item["id"], ctx.item["collection"], request=MockRequest @@ -115,6 +123,7 @@ async def test_create_item(ctx, core_client, txn_client): ) == Item(**resp).dict(exclude={"links": ..., "properties": {"created", "updated"}}) +@pytest.mark.asyncio async def test_create_item_already_exists(ctx, txn_client): with pytest.raises(ConflictError): await txn_client.create_item( @@ -125,6 +134,7 @@ async def test_create_item_already_exists(ctx, txn_client): ) +@pytest.mark.asyncio async def test_update_item(ctx, core_client, txn_client): ctx.item["properties"]["foo"] = "bar" collection_id = ctx.item["collection"] @@ -139,6 +149,7 @@ async def test_update_item(ctx, core_client, txn_client): assert updated_item["properties"]["foo"] == "bar" +@pytest.mark.asyncio async def test_update_geometry(ctx, core_client, txn_client): new_coordinates = [ [ @@ -163,6 +174,7 @@ async def test_update_geometry(ctx, core_client, txn_client): assert updated_item["geometry"]["coordinates"] == new_coordinates +@pytest.mark.asyncio async def test_delete_item(ctx, core_client, txn_client): await txn_client.delete_item(ctx.item["id"], ctx.item["collection"]) @@ -172,6 +184,7 @@ async def test_delete_item(ctx, core_client, txn_client): ) +@pytest.mark.asyncio async def test_bulk_item_insert(ctx, core_client, txn_client, bulk_txn_client): items = {} for _ in range(10): @@ -193,6 +206,7 @@ async def test_bulk_item_insert(ctx, core_client, txn_client, bulk_txn_client): # ) +@pytest.mark.asyncio async def test_feature_collection_insert( core_client, txn_client, @@ -212,6 +226,7 @@ async def test_feature_collection_insert( assert len(fc["features"]) >= 10 +@pytest.mark.asyncio async def test_landing_page_no_collection_title(ctx, core_client, txn_client, app): ctx.collection["id"] = "new_id" del ctx.collection["title"] diff --git a/stac_fastapi/elasticsearch/tests/conftest.py b/stac_fastapi/elasticsearch/tests/conftest.py index b755425c..fa093af2 100644 --- a/stac_fastapi/elasticsearch/tests/conftest.py +++ b/stac_fastapi/elasticsearch/tests/conftest.py @@ -10,11 +10,6 @@ from stac_fastapi.api.app import StacApi from stac_fastapi.api.models import create_get_request_model, create_post_request_model -from stac_fastapi.elasticsearch.app import ( - FixedFilterExtension, - FixedQueryExtension, - FixedSortExtension, -) from stac_fastapi.elasticsearch.config import AsyncElasticsearchSettings from stac_fastapi.elasticsearch.core import ( BulkTransactionsClient, @@ -22,9 +17,12 @@ TransactionsClient, ) from stac_fastapi.elasticsearch.database_logic import create_collection_index +from stac_fastapi.elasticsearch.extensions import QueryExtension from stac_fastapi.extensions.core import ( # FieldsExtension, ContextExtension, FieldsExtension, + FilterExtension, + SortExtension, TokenPaginationExtension, TransactionExtension, ) @@ -43,11 +41,16 @@ class MockRequest: base_url = "http://test-server" def __init__( - self, method: str = "GET", url: str = "XXXX", app: Optional[Any] = None + self, + method: str = "GET", + url: str = "XXXX", + app: Optional[Any] = None, + query_params: Dict[str, Any] = {"limit": "10"}, ): self.method = method self.url = url self.app = app + self.query_params = query_params or {} class TestSettings(AsyncElasticsearchSettings): @@ -61,7 +64,10 @@ class Config: @pytest.fixture(scope="session") def event_loop(): - return asyncio.get_event_loop() + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + yield loop + loop.close() def _load_file(filename: str) -> Dict: @@ -160,11 +166,11 @@ async def app(): client=TransactionsClient(session=None), settings=settings ), ContextExtension(), - FixedSortExtension(), + SortExtension(), FieldsExtension(), - FixedQueryExtension(), + QueryExtension(), TokenPaginationExtension(), - FixedFilterExtension(), + FilterExtension(), ] post_request_model = create_post_request_model(extensions) diff --git a/stac_fastapi/elasticsearch/tests/extensions/test_filter.py b/stac_fastapi/elasticsearch/tests/extensions/test_filter.py index 43aadf18..d9db48cd 100644 --- a/stac_fastapi/elasticsearch/tests/extensions/test_filter.py +++ b/stac_fastapi/elasticsearch/tests/extensions/test_filter.py @@ -3,10 +3,13 @@ from os import listdir from os.path import isfile, join +import pytest + THIS_DIR = os.path.dirname(os.path.abspath(__file__)) -async def test_search_filters(app_client, ctx): +@pytest.mark.asyncio +async def test_search_filters_post(app_client, ctx): filters = [] pwd = f"{THIS_DIR}/cql2" @@ -19,7 +22,18 @@ async def test_search_filters(app_client, ctx): assert resp.status_code == 200 -async def test_search_filter_extension_eq(app_client, ctx): +@pytest.mark.asyncio +async def test_search_filter_extension_eq_get(app_client, ctx): + resp = await app_client.get( + '/search?filter-lang=cql2-json&filter={"op":"=","args":[{"property":"id"},"test-item"]}' + ) + assert resp.status_code == 200 + resp_json = resp.json() + assert len(resp_json["features"]) == 1 + + +@pytest.mark.asyncio +async def test_search_filter_extension_eq_post(app_client, ctx): params = {"filter": {"op": "=", "args": [{"property": "id"}, ctx.item["id"]]}} resp = await app_client.post("/search", json=params) assert resp.status_code == 200 @@ -27,7 +41,26 @@ async def test_search_filter_extension_eq(app_client, ctx): assert len(resp_json["features"]) == 1 -async def test_search_filter_extension_gte(app_client, ctx): +@pytest.mark.asyncio +async def test_search_filter_extension_gte_get(app_client, ctx): + # there's one item that can match, so one of these queries should match it and the other shouldn't + resp = await app_client.get( + '/search?filter-lang=cql2-json&filter={"op":"<=","args":[{"property": "properties.proj:epsg"},32756]}' + ) + + assert resp.status_code == 200 + assert len(resp.json()["features"]) == 1 + + resp = await app_client.get( + '/search?filter-lang=cql2-json&filter={"op":">","args":[{"property": "properties.proj:epsg"},32756]}' + ) + + assert resp.status_code == 200 + assert len(resp.json()["features"]) == 0 + + +@pytest.mark.asyncio +async def test_search_filter_extension_gte_post(app_client, ctx): # there's one item that can match, so one of these queries should match it and the other shouldn't params = { "filter": { @@ -58,7 +91,53 @@ async def test_search_filter_extension_gte(app_client, ctx): assert len(resp.json()["features"]) == 0 -async def test_search_filter_ext_and(app_client, ctx): +@pytest.mark.asyncio +async def test_search_filter_ext_and_get(app_client, ctx): + resp = await app_client.get( + '/search?filter-lang=cql2-json&filter={"op":"and","args":[{"op":"<=","args":[{"property":"properties.proj:epsg"},32756]},{"op":"=","args":[{"property":"id"},"test-item"]}]}' + ) + + assert resp.status_code == 200 + assert len(resp.json()["features"]) == 1 + + +@pytest.mark.asyncio +async def test_search_filter_ext_and_get_cql2text_id(app_client, ctx): + collection = ctx.item["collection"] + id = ctx.item["id"] + filter = f"id='{id}' AND collection='{collection}'" + resp = await app_client.get(f"/search?filter-lang=cql2-text&filter={filter}") + + assert resp.status_code == 200 + assert len(resp.json()["features"]) == 1 + + +@pytest.mark.asyncio +async def test_search_filter_ext_and_get_cql2text_cloud_cover(app_client, ctx): + collection = ctx.item["collection"] + cloud_cover = ctx.item["properties"]["eo:cloud_cover"] + filter = f"cloud_cover={cloud_cover} AND collection='{collection}'" + resp = await app_client.get(f"/search?filter-lang=cql2-text&filter={filter}") + + assert resp.status_code == 200 + assert len(resp.json()["features"]) == 1 + + +@pytest.mark.asyncio +async def test_search_filter_ext_and_get_cql2text_cloud_cover_no_results( + app_client, ctx +): + collection = ctx.item["collection"] + cloud_cover = ctx.item["properties"]["eo:cloud_cover"] + 1 + filter = f"cloud_cover={cloud_cover} AND collection='{collection}'" + resp = await app_client.get(f"/search?filter-lang=cql2-text&filter={filter}") + + assert resp.status_code == 200 + assert len(resp.json()["features"]) == 0 + + +@pytest.mark.asyncio +async def test_search_filter_ext_and_post(app_client, ctx): params = { "filter": { "op": "and", @@ -80,7 +159,32 @@ async def test_search_filter_ext_and(app_client, ctx): assert len(resp.json()["features"]) == 1 -async def test_search_filter_extension_floats(app_client, ctx): +@pytest.mark.asyncio +async def test_search_filter_extension_floats_get(app_client, ctx): + resp = await app_client.get( + """/search?filter={"op":"and","args":[{"op":"=","args":[{"property":"id"},"test-item"]},{"op":">","args":[{"property":"properties.view:sun_elevation"},"-37.30891534"]},{"op":"<","args":[{"property":"properties.view:sun_elevation"},"-37.30691534"]}]}""" + ) + + assert resp.status_code == 200 + assert len(resp.json()["features"]) == 1 + + resp = await app_client.get( + """/search?filter={"op":"and","args":[{"op":"=","args":[{"property":"id"},"test-item-7"]},{"op":">","args":[{"property":"properties.view:sun_elevation"},"-37.30891534"]},{"op":"<","args":[{"property":"properties.view:sun_elevation"},"-37.30691534"]}]}""" + ) + + assert resp.status_code == 200 + assert len(resp.json()["features"]) == 0 + + resp = await app_client.get( + """/search?filter={"op":"and","args":[{"op":"=","args":[{"property":"id"},"test-item"]},{"op":">","args":[{"property":"properties.view:sun_elevation"},"-37.30591534"]},{"op":"<","args":[{"property":"properties.view:sun_elevation"},"-37.30491534"]}]}""" + ) + + assert resp.status_code == 200 + assert len(resp.json()["features"]) == 0 + + +@pytest.mark.asyncio +async def test_search_filter_extension_floats_post(app_client, ctx): sun_elevation = ctx.item["properties"]["view:sun_elevation"] params = { diff --git a/stac_fastapi/elasticsearch/tests/resources/test_collection.py b/stac_fastapi/elasticsearch/tests/resources/test_collection.py index f37b36b0..9061ac1e 100644 --- a/stac_fastapi/elasticsearch/tests/resources/test_collection.py +++ b/stac_fastapi/elasticsearch/tests/resources/test_collection.py @@ -1,6 +1,12 @@ +import uuid + import pystac +import pytest + +from ..conftest import create_collection, delete_collections_and_items, refresh_indices +@pytest.mark.asyncio async def test_create_and_delete_collection(app_client, load_test_data): """Test creation and deletion of a collection""" test_collection = load_test_data("test_collection.json") @@ -13,6 +19,7 @@ async def test_create_and_delete_collection(app_client, load_test_data): assert resp.status_code == 204 +@pytest.mark.asyncio async def test_create_collection_conflict(app_client, ctx): """Test creation of a collection which already exists""" # This collection ID is created in the fixture, so this should be a conflict @@ -20,12 +27,14 @@ async def test_create_collection_conflict(app_client, ctx): assert resp.status_code == 409 +@pytest.mark.asyncio async def test_delete_missing_collection(app_client): """Test deletion of a collection which does not exist""" resp = await app_client.delete("/collections/missing-collection") assert resp.status_code == 404 +@pytest.mark.asyncio async def test_update_collection_already_exists(ctx, app_client): """Test updating a collection which already exists""" ctx.collection["keywords"].append("test") @@ -38,6 +47,7 @@ async def test_update_collection_already_exists(ctx, app_client): assert "test" in resp_json["keywords"] +@pytest.mark.asyncio async def test_update_new_collection(app_client, load_test_data): """Test updating a collection which does not exist (same as creation)""" test_collection = load_test_data("test_collection.json") @@ -47,12 +57,14 @@ async def test_update_new_collection(app_client, load_test_data): assert resp.status_code == 404 +@pytest.mark.asyncio async def test_collection_not_found(app_client): """Test read a collection which does not exist""" resp = await app_client.get("/collections/does-not-exist") assert resp.status_code == 404 +@pytest.mark.asyncio async def test_returns_valid_collection(ctx, app_client): """Test validates fetched collection with jsonschema""" resp = await app_client.put("/collections", json=ctx.collection) @@ -70,3 +82,42 @@ async def test_returns_valid_collection(ctx, app_client): resp_json, root=mock_root, preserve_dict=False ) collection.validate() + + +@pytest.mark.asyncio +async def test_pagination_collection(app_client, ctx, txn_client): + """Test collection pagination links""" + + # Clear existing collections if necessary + await delete_collections_and_items(txn_client) + + # Ingest 6 collections + ids = set() + for _ in range(6): + ctx.collection["id"] = str(uuid.uuid4()) + await create_collection(txn_client, collection=ctx.collection) + ids.add(ctx.collection["id"]) + + await refresh_indices(txn_client) + + # Paginate through all 6 collections with a limit of 1 + collection_ids = set() + page = await app_client.get("/collections", params={"limit": 1}) + while True: + page_data = page.json() + assert ( + len(page_data["collections"]) <= 1 + ) # Each page should have 1 or 0 collections + collection_ids.update(coll["id"] for coll in page_data["collections"]) + + next_link = next( + (link for link in page_data["links"] if link["rel"] == "next"), None + ) + if not next_link: + break # No more pages + + href = next_link["href"][len("http://test-server") :] + page = await app_client.get(href) + + # Confirm we have paginated through all collections + assert collection_ids == ids diff --git a/stac_fastapi/elasticsearch/tests/resources/test_conformance.py b/stac_fastapi/elasticsearch/tests/resources/test_conformance.py index ab70a00b..d93d8b81 100644 --- a/stac_fastapi/elasticsearch/tests/resources/test_conformance.py +++ b/stac_fastapi/elasticsearch/tests/resources/test_conformance.py @@ -20,6 +20,7 @@ def get_link(landing_page, rel_type): ) +@pytest.mark.asyncio async def test_landing_page_health(response): """Test landing page""" assert response.status_code == 200 @@ -39,6 +40,7 @@ async def test_landing_page_health(response): ] +@pytest.mark.asyncio @pytest.mark.parametrize("rel_type,expected_media_type,expected_path", link_tests) async def test_landing_page_links( response_json, app_client, rel_type, expected_media_type, expected_path @@ -59,6 +61,7 @@ async def test_landing_page_links( # code here seems meaningless since it would be the same as if the endpoint did not exist. Once # https://github.com/stac-utils/stac-fastapi/pull/227 has been merged we can add this to the # parameterized tests above. +@pytest.mark.asyncio async def test_search_link(response_json): search_link = get_link(response_json, "search") diff --git a/stac_fastapi/elasticsearch/tests/resources/test_item.py b/stac_fastapi/elasticsearch/tests/resources/test_item.py index 76f38f79..5b382873 100644 --- a/stac_fastapi/elasticsearch/tests/resources/test_item.py +++ b/stac_fastapi/elasticsearch/tests/resources/test_item.py @@ -23,6 +23,7 @@ def rfc3339_str_to_datetime(s: str) -> datetime: return ciso8601.parse_rfc3339(s) +@pytest.mark.asyncio async def test_create_and_delete_item(app_client, ctx, txn_client): """Test creation and deletion of a single item (transactions extension)""" @@ -46,6 +47,7 @@ async def test_create_and_delete_item(app_client, ctx, txn_client): assert resp.status_code == 404 +@pytest.mark.asyncio async def test_create_item_conflict(app_client, ctx): """Test creation of an item which already exists (transactions extension)""" @@ -57,6 +59,7 @@ async def test_create_item_conflict(app_client, ctx): assert resp.status_code == 409 +@pytest.mark.asyncio async def test_delete_missing_item(app_client, load_test_data): """Test deletion of an item which does not exist (transactions extension)""" test_item = load_test_data("test_item.json") @@ -66,6 +69,7 @@ async def test_delete_missing_item(app_client, load_test_data): assert resp.status_code == 404 +@pytest.mark.asyncio async def test_create_item_missing_collection(app_client, ctx): """Test creation of an item without a parent collection (transactions extension)""" ctx.item["collection"] = "stac_is_cool" @@ -75,6 +79,7 @@ async def test_create_item_missing_collection(app_client, ctx): assert resp.status_code == 404 +@pytest.mark.asyncio async def test_create_uppercase_collection_with_item(app_client, ctx, txn_client): """Test creation of a collection and item with uppercase collection ID (transactions extension)""" collection_id = "UPPERCASE" @@ -87,6 +92,7 @@ async def test_create_uppercase_collection_with_item(app_client, ctx, txn_client assert resp.status_code == 200 +@pytest.mark.asyncio async def test_update_item_already_exists(app_client, ctx): """Test updating an item which already exists (transactions extension)""" @@ -106,6 +112,7 @@ async def test_update_item_already_exists(app_client, ctx): ) +@pytest.mark.asyncio async def test_update_new_item(app_client, ctx): """Test updating an item which does not exist (transactions extension)""" test_item = ctx.item @@ -118,6 +125,7 @@ async def test_update_new_item(app_client, ctx): assert resp.status_code == 404 +@pytest.mark.asyncio async def test_update_item_missing_collection(app_client, ctx): """Test updating an item without a parent collection (transactions extension)""" # Try to update collection of the item @@ -128,6 +136,7 @@ async def test_update_item_missing_collection(app_client, ctx): assert resp.status_code == 404 +@pytest.mark.asyncio async def test_update_item_geometry(app_client, ctx): ctx.item["id"] = "update_test_item_1" @@ -162,6 +171,7 @@ async def test_update_item_geometry(app_client, ctx): assert resp.json()["geometry"]["coordinates"] == new_coordinates +@pytest.mark.asyncio async def test_get_item(app_client, ctx): """Test read an item by id (core)""" get_item = await app_client.get( @@ -170,6 +180,7 @@ async def test_get_item(app_client, ctx): assert get_item.status_code == 200 +@pytest.mark.asyncio async def test_returns_valid_item(app_client, ctx): """Test validates fetched item with jsonschema""" test_item = ctx.item @@ -186,6 +197,7 @@ async def test_returns_valid_item(app_client, ctx): item.validate() +@pytest.mark.asyncio async def test_get_item_collection(app_client, ctx, txn_client): """Test read an item collection (core)""" item_count = randint(1, 4) @@ -202,6 +214,7 @@ async def test_get_item_collection(app_client, ctx, txn_client): assert matched == item_count + 1 +@pytest.mark.asyncio async def test_item_collection_filter_bbox(app_client, ctx): item = ctx.item collection = item["collection"] @@ -223,6 +236,7 @@ async def test_item_collection_filter_bbox(app_client, ctx): assert len(resp_json["features"]) == 0 +@pytest.mark.asyncio async def test_item_collection_filter_datetime(app_client, ctx): item = ctx.item collection = item["collection"] @@ -244,6 +258,7 @@ async def test_item_collection_filter_datetime(app_client, ctx): assert len(resp_json["features"]) == 0 +@pytest.mark.asyncio @pytest.mark.skip(reason="Pagination extension not implemented") async def test_pagination(app_client, load_test_data): """Test item collection pagination (paging extension)""" @@ -272,6 +287,7 @@ async def test_pagination(app_client, load_test_data): assert second_page["context"]["returned"] == 3 +@pytest.mark.asyncio async def test_item_timestamps(app_client, ctx): """Test created and updated timestamps (common metadata)""" # start_time = now_to_rfc3339_str() @@ -300,6 +316,7 @@ async def test_item_timestamps(app_client, ctx): ) +@pytest.mark.asyncio async def test_item_search_by_id_post(app_client, ctx, txn_client): """Test POST search by item id (core)""" ids = ["test1", "test2", "test3"] @@ -315,6 +332,7 @@ async def test_item_search_by_id_post(app_client, ctx, txn_client): assert set([feat["id"] for feat in resp_json["features"]]) == set(ids) +@pytest.mark.asyncio async def test_item_search_spatial_query_post(app_client, ctx): """Test POST search with spatial query (core)""" test_item = ctx.item @@ -329,6 +347,7 @@ async def test_item_search_spatial_query_post(app_client, ctx): assert resp_json["features"][0]["id"] == test_item["id"] +@pytest.mark.asyncio async def test_item_search_temporal_query_post(app_client, ctx): """Test POST search with single-tailed spatio-temporal query (core)""" @@ -347,6 +366,7 @@ async def test_item_search_temporal_query_post(app_client, ctx): assert resp_json["features"][0]["id"] == test_item["id"] +@pytest.mark.asyncio async def test_item_search_temporal_window_post(app_client, ctx): """Test POST search with two-tailed spatio-temporal query (core)""" test_item = ctx.item @@ -365,6 +385,7 @@ async def test_item_search_temporal_window_post(app_client, ctx): assert resp_json["features"][0]["id"] == test_item["id"] +@pytest.mark.asyncio @pytest.mark.skip(reason="KeyError: 'features") async def test_item_search_temporal_open_window(app_client, ctx): """Test POST search with open spatio-temporal query (core)""" @@ -379,39 +400,7 @@ async def test_item_search_temporal_open_window(app_client, ctx): assert resp_json["features"][0]["id"] == test_item["id"] -@pytest.mark.skip(reason="sortby date not implemented") -async def test_item_search_sort_post(app_client, load_test_data): - """Test POST search with sorting (sort extension)""" - first_item = load_test_data("test_item.json") - item_date = rfc3339_str_to_datetime(first_item["properties"]["datetime"]) - resp = await app_client.post( - f"/collections/{first_item['collection']}/items", json=first_item - ) - assert resp.status_code == 200 - - second_item = load_test_data("test_item.json") - second_item["id"] = "another-item" - another_item_date = item_date - timedelta(days=1) - second_item["properties"]["datetime"] = datetime_to_str(another_item_date) - resp = await app_client.post( - f"/collections/{second_item['collection']}/items", json=second_item - ) - assert resp.status_code == 200 - - params = { - "collections": [first_item["collection"]], - "sortby": [{"field": "datetime", "direction": "desc"}], - } - resp = await app_client.post("/search", json=params) - assert resp.status_code == 200 - resp_json = resp.json() - assert resp_json["features"][0]["id"] == first_item["id"] - assert resp_json["features"][1]["id"] == second_item["id"] - await app_client.delete( - f"/collections/{first_item['collection']}/items/{first_item['id']}" - ) - - +@pytest.mark.asyncio async def test_item_search_by_id_get(app_client, ctx, txn_client): """Test GET search by item id (core)""" ids = ["test1", "test2", "test3"] @@ -427,6 +416,7 @@ async def test_item_search_by_id_get(app_client, ctx, txn_client): assert set([feat["id"] for feat in resp_json["features"]]) == set(ids) +@pytest.mark.asyncio async def test_item_search_bbox_get(app_client, ctx): """Test GET search with spatial query (core)""" params = { @@ -439,6 +429,7 @@ async def test_item_search_bbox_get(app_client, ctx): assert resp_json["features"][0]["id"] == ctx.item["id"] +@pytest.mark.asyncio async def test_item_search_get_without_collections(app_client, ctx): """Test GET search without specifying collections""" @@ -449,6 +440,7 @@ async def test_item_search_get_without_collections(app_client, ctx): assert resp.status_code == 200 +@pytest.mark.asyncio async def test_item_search_get_with_non_existent_collections(app_client, ctx): """Test GET search with non-existent collections""" @@ -457,6 +449,7 @@ async def test_item_search_get_with_non_existent_collections(app_client, ctx): assert resp.status_code == 200 +@pytest.mark.asyncio async def test_item_search_temporal_window_get(app_client, ctx): """Test GET search with spatio-temporal query (core)""" test_item = ctx.item @@ -474,27 +467,7 @@ async def test_item_search_temporal_window_get(app_client, ctx): assert resp_json["features"][0]["id"] == test_item["id"] -@pytest.mark.skip(reason="sorting not fully implemented") -async def test_item_search_sort_get(app_client, ctx, txn_client): - """Test GET search with sorting (sort extension)""" - first_item = ctx.item - item_date = rfc3339_str_to_datetime(first_item["properties"]["datetime"]) - await create_item(txn_client, ctx.item) - - second_item = ctx.item.copy() - second_item["id"] = "another-item" - another_item_date = item_date - timedelta(days=1) - second_item.update({"properties": {"datetime": datetime_to_str(another_item_date)}}) - await create_item(txn_client, second_item) - - params = {"collections": [first_item["collection"]], "sortby": "-datetime"} - resp = await app_client.get("/search", params=params) - assert resp.status_code == 200 - resp_json = resp.json() - assert resp_json["features"][0]["id"] == first_item["id"] - assert resp_json["features"][1]["id"] == second_item["id"] - - +@pytest.mark.asyncio async def test_item_search_post_without_collection(app_client, ctx): """Test POST search without specifying a collection""" test_item = ctx.item @@ -505,6 +478,7 @@ async def test_item_search_post_without_collection(app_client, ctx): assert resp.status_code == 200 +@pytest.mark.asyncio async def test_item_search_properties_es(app_client, ctx): """Test POST search with JSONB query (query extension)""" @@ -517,6 +491,7 @@ async def test_item_search_properties_es(app_client, ctx): assert len(resp_json["features"]) == 0 +@pytest.mark.asyncio async def test_item_search_properties_field(app_client): """Test POST search indexed field with query (query extension)""" @@ -528,6 +503,7 @@ async def test_item_search_properties_field(app_client): assert len(resp_json["features"]) == 0 +@pytest.mark.asyncio async def test_item_search_get_query_extension(app_client, ctx): """Test GET search with JSONB query (query extension)""" @@ -554,12 +530,14 @@ async def test_item_search_get_query_extension(app_client, ctx): ) +@pytest.mark.asyncio async def test_get_missing_item_collection(app_client): """Test reading a collection which does not exist""" resp = await app_client.get("/collections/invalid-collection/items") assert resp.status_code == 404 +@pytest.mark.asyncio async def test_pagination_item_collection(app_client, ctx, txn_client): """Test item collection pagination links (paging extension)""" ids = [ctx.item["id"]] @@ -596,6 +574,7 @@ async def test_pagination_item_collection(app_client, ctx, txn_client): assert not set(item_ids) - set(ids) +@pytest.mark.asyncio async def test_pagination_post(app_client, ctx, txn_client): """Test POST pagination (paging extension)""" ids = [ctx.item["id"]] @@ -631,6 +610,7 @@ async def test_pagination_post(app_client, ctx, txn_client): assert not set(item_ids) - set(ids) +@pytest.mark.asyncio async def test_pagination_token_idempotent(app_client, ctx, txn_client): """Test that pagination tokens are idempotent (paging extension)""" ids = [ctx.item["id"]] @@ -661,6 +641,7 @@ async def test_pagination_token_idempotent(app_client, ctx, txn_client): ] +@pytest.mark.asyncio async def test_field_extension_get_includes(app_client, ctx): """Test GET search with included fields (fields extension)""" test_item = ctx.item @@ -673,6 +654,7 @@ async def test_field_extension_get_includes(app_client, ctx): assert not set(feat_properties) - {"proj:epsg", "gsd", "datetime"} +@pytest.mark.asyncio async def test_field_extension_get_excludes(app_client, ctx): """Test GET search with included fields (fields extension)""" test_item = ctx.item @@ -686,6 +668,7 @@ async def test_field_extension_get_excludes(app_client, ctx): assert "gsd" not in resp_json["features"][0]["properties"].keys() +@pytest.mark.asyncio async def test_field_extension_post(app_client, ctx): """Test POST search with included and excluded fields (fields extension)""" test_item = ctx.item @@ -707,6 +690,7 @@ async def test_field_extension_post(app_client, ctx): } +@pytest.mark.asyncio async def test_field_extension_exclude_and_include(app_client, ctx): """Test POST search including/excluding same field (fields extension)""" test_item = ctx.item @@ -723,6 +707,7 @@ async def test_field_extension_exclude_and_include(app_client, ctx): assert "eo:cloud_cover" not in resp_json["features"][0]["properties"] +@pytest.mark.asyncio async def test_field_extension_exclude_default_includes(app_client, ctx): """Test POST search excluding a forbidden field (fields extension)""" test_item = ctx.item @@ -733,6 +718,7 @@ async def test_field_extension_exclude_default_includes(app_client, ctx): assert "gsd" not in resp_json["features"][0] +@pytest.mark.asyncio async def test_search_intersects_and_bbox(app_client): """Test POST search intersects and bbox are mutually exclusive (core)""" bbox = [-118, 34, -117, 35] @@ -742,6 +728,7 @@ async def test_search_intersects_and_bbox(app_client): assert resp.status_code == 400 +@pytest.mark.asyncio async def test_get_missing_item(app_client, load_test_data): """Test read item which does not exist (transactions extension)""" test_coll = load_test_data("test_collection.json") @@ -749,6 +736,7 @@ async def test_get_missing_item(app_client, load_test_data): assert resp.status_code == 404 +@pytest.mark.asyncio @pytest.mark.skip(reason="invalid queries not implemented") async def test_search_invalid_query_field(app_client): body = {"query": {"gsd": {"lt": 100}, "invalid-field": {"eq": 50}}} @@ -756,6 +744,7 @@ async def test_search_invalid_query_field(app_client): assert resp.status_code == 400 +@pytest.mark.asyncio async def test_search_bbox_errors(app_client): body = {"query": {"bbox": [0]}} resp = await app_client.post("/search", json=body) @@ -770,6 +759,7 @@ async def test_search_bbox_errors(app_client): assert resp.status_code == 400 +@pytest.mark.asyncio async def test_conformance_classes_configurable(): """Test conformance class configurability""" landing = LandingPageMixin() @@ -787,6 +777,7 @@ async def test_conformance_classes_configurable(): assert client.conformance_classes()[0] == "this is a test" +@pytest.mark.asyncio async def test_search_datetime_validation_errors(app_client): bad_datetimes = [ "37-01-01T12:00:27.87Z", diff --git a/stac_fastapi/elasticsearch/tests/resources/test_mgmt.py b/stac_fastapi/elasticsearch/tests/resources/test_mgmt.py index 9d2bc3dc..2b7d9728 100644 --- a/stac_fastapi/elasticsearch/tests/resources/test_mgmt.py +++ b/stac_fastapi/elasticsearch/tests/resources/test_mgmt.py @@ -1,3 +1,7 @@ +import pytest + + +@pytest.mark.asyncio async def test_ping_no_param(app_client): """ Test ping endpoint with a mocked client.