From 4d1c49985a17b473fea5356f82be119e7b410a14 Mon Sep 17 00:00:00 2001 From: Aleksey Veresov Date: Mon, 23 Sep 2024 09:48:14 +0200 Subject: [PATCH 1/7] Remove dependency on numpy except from convert_to_default_dataframe --- python/hopsworks_common/core/constants.py | 18 +++++++++++ python/hsfs/builtin_transformations.py | 5 +-- python/hsfs/constructor/query.py | 6 +++- python/hsfs/core/feature_view_engine.py | 10 ++++-- python/hsfs/core/kafka_engine.py | 7 +++-- python/hsfs/core/vector_server.py | 25 ++++++++++----- python/hsfs/engine/python.py | 17 +++++++--- python/hsfs/engine/spark.py | 38 ++++++++++++++++++++--- python/hsfs/feature_group.py | 10 +++--- python/hsfs/feature_group_writer.py | 6 +++- python/hsfs/feature_store.py | 6 +++- python/hsfs/feature_view.py | 6 +++- python/hsfs/storage_connector.py | 6 +++- python/hsfs/training_dataset.py | 6 +++- 14 files changed, 131 insertions(+), 35 deletions(-) diff --git a/python/hopsworks_common/core/constants.py b/python/hopsworks_common/core/constants.py index 56f98d01e..d3e905d87 100644 --- a/python/hopsworks_common/core/constants.py +++ b/python/hopsworks_common/core/constants.py @@ -20,6 +20,13 @@ # Avro HAS_FAST_AVRO: bool = importlib.util.find_spec("fastavro") is not None HAS_AVRO: bool = importlib.util.find_spec("avro") is not None +avro_not_installed_message = ( + "Avro package not found. " + "If you want to use avro with Hopsworks you can install the corresponding extra via " + '`pip install "hopsworks[avro]"`. ' + "You can also install avro directly in your environment with `pip install fastavro` or `pip install avro`. " + "You will need to restart your kernel if applicable." +) # Confluent Kafka HAS_CONFLUENT_KAFKA: bool = importlib.util.find_spec("confluent_kafka") is not None @@ -30,6 +37,7 @@ "You can also install confluent-kafka directly in your environment e.g `pip install confluent-kafka`. " "You will need to restart your kernel if applicable." ) + # Data Validation / Great Expectations HAS_GREAT_EXPECTATIONS: bool = ( importlib.util.find_spec("great_expectations") is not None @@ -45,7 +53,17 @@ HAS_ARROW: bool = importlib.util.find_spec("pyarrow") is not None HAS_PANDAS: bool = importlib.util.find_spec("pandas") is not None + +# NumPy HAS_NUMPY: bool = importlib.util.find_spec("numpy") is not None +numpy_not_installed_message = ( + "Numpy package not found. " + "If you want to use numpy with Hopsworks you can install the corresponding extra via " + '`pip install "hopsworks[numpy]"`. ' + "You can also install numpy directly in your environment with `pip install numpy`. " + "You will need to restart your kernel if applicable." +) + HAS_POLARS: bool = importlib.util.find_spec("polars") is not None # SQL packages diff --git a/python/hsfs/builtin_transformations.py b/python/hsfs/builtin_transformations.py index fa8348553..1fc2ce670 100644 --- a/python/hsfs/builtin_transformations.py +++ b/python/hsfs/builtin_transformations.py @@ -14,7 +14,8 @@ # limitations under the License. # -import numpy as np +import math + import pandas as pd from hsfs.hopsworks_udf import udf from hsfs.transformation_statistics import TransformationStatistics @@ -49,7 +50,7 @@ def label_encoder(feature: pd.Series, statistics=feature_statistics) -> pd.Serie # Unknown categories not present in training dataset are encoded as -1. return pd.Series( [ - value_to_index.get(data, -1) if not pd.isna(data) else np.nan + value_to_index.get(data, -1) if not pd.isna(data) else math.nan for data in feature ] ) diff --git a/python/hsfs/constructor/query.py b/python/hsfs/constructor/query.py index 9a6c74272..1351f1437 100644 --- a/python/hsfs/constructor/query.py +++ b/python/hsfs/constructor/query.py @@ -21,9 +21,9 @@ from typing import Any, Dict, List, Optional, Tuple, TypeVar, Union import humps -import numpy as np import pandas as pd from hopsworks_common.client.exceptions import FeatureStoreException +from hopsworks_common.core.constants import HAS_NUMPY from hsfs import engine, storage_connector, util from hsfs import feature_group as fg_mod from hsfs.constructor import join @@ -34,6 +34,10 @@ from hsfs.feature import Feature +if HAS_NUMPY: + import numpy as np + + @typechecked class Query: ERROR_MESSAGE_FEATURE_AMBIGUOUS = ( diff --git a/python/hsfs/core/feature_view_engine.py b/python/hsfs/core/feature_view_engine.py index afdc9c75f..7f5b79158 100644 --- a/python/hsfs/core/feature_view_engine.py +++ b/python/hsfs/core/feature_view_engine.py @@ -19,10 +19,10 @@ import warnings from typing import Any, Dict, List, Optional, TypeVar, Union -import numpy as np import pandas as pd from hopsworks_common import client from hopsworks_common.client.exceptions import FeatureStoreException +from hopsworks_common.core.constants import HAS_NUMPY from hsfs import ( engine, feature_group, @@ -46,6 +46,10 @@ from hsfs.training_dataset_split import TrainingDatasetSplit +if HAS_NUMPY: + import numpy as np + + class FeatureViewEngine: ENTITY_TYPE = "featureview" _TRAINING_DATA_API_PATH = "trainingdatasets" @@ -1227,7 +1231,9 @@ def _get_feature_logging_data( model_col_name=FeatureViewEngine._HSML_MODEL, predictions=predictions, training_dataset_version=training_dataset_version, - hsml_model=self.get_hsml_model_value(hsml_model) if hsml_model else None, + hsml_model=self.get_hsml_model_value(hsml_model) + if hsml_model + else None, ) else: return engine.get_instance().get_feature_logging_df( diff --git a/python/hsfs/core/kafka_engine.py b/python/hsfs/core/kafka_engine.py index 66ebba47f..d21b6ec22 100644 --- a/python/hsfs/core/kafka_engine.py +++ b/python/hsfs/core/kafka_engine.py @@ -20,14 +20,17 @@ from io import BytesIO from typing import TYPE_CHECKING, Any, Callable, Dict, Literal, Optional, Tuple, Union -import numpy as np import pandas as pd from hopsworks_common import client +from hopsworks_common.core.constants import HAS_NUMPY from hsfs.core import storage_connector_api from hsfs.core.constants import HAS_AVRO, HAS_CONFLUENT_KAFKA, HAS_FAST_AVRO from tqdm import tqdm +if HAS_NUMPY: + import numpy as np + if HAS_CONFLUENT_KAFKA: from confluent_kafka import Consumer, KafkaError, Producer, TopicPartition @@ -202,7 +205,7 @@ def encode_row(complex_feature_writers, writer, row): if isinstance(row, dict): for k in row.keys(): # for avro to be able to serialize them, they need to be python data types - if isinstance(row[k], np.ndarray): + if HAS_NUMPY and isinstance(row[k], np.ndarray): row[k] = row[k].tolist() if isinstance(row[k], pd.Timestamp): row[k] = row[k].to_pydatetime() diff --git a/python/hsfs/core/vector_server.py b/python/hsfs/core/vector_server.py index 3b28bd86a..aceee2828 100755 --- a/python/hsfs/core/vector_server.py +++ b/python/hsfs/core/vector_server.py @@ -25,10 +25,16 @@ import avro.io import avro.schema -import numpy as np import pandas as pd import polars as pl from hopsworks_common import client +from hopsworks_common.core.constants import ( + HAS_AVRO, + HAS_FAST_AVRO, + HAS_NUMPY, + avro_not_installed_message, + numpy_not_installed_message, +) from hsfs import ( feature_view, training_dataset, @@ -48,14 +54,15 @@ ) -HAS_FASTAVRO = False -try: - from fastavro import schemaless_reader +if HAS_NUMPY: + import numpy as np - HAS_FASTAVRO = True -except ImportError: +if HAS_FAST_AVRO: + from fastavro import schemaless_reader +elif HAS_AVRO: from avro.io import BinaryDecoder + _logger = logging.getLogger(__name__) @@ -803,6 +810,8 @@ def handle_feature_vector_return_type( return feature_vectorz elif return_type.lower() == "numpy" and not inference_helper: _logger.debug("Returning feature vector as numpy array") + if not HAS_NUMPY: + raise ModuleNotFoundError(numpy_not_installed_message) return np.array(feature_vectorz) # Only inference helper can return dict elif return_type.lower() == "dict" and inference_helper: @@ -1076,7 +1085,7 @@ def build_complex_feature_decoders(self) -> Dict[str, Callable]: _logger.debug( f"Building complex feature decoders corresponding to {complex_feature_schemas}." ) - if HAS_FASTAVRO: + if HAS_FAST_AVRO: _logger.debug("Using fastavro for deserialization.") return { f_name: ( @@ -1100,6 +1109,8 @@ def build_complex_feature_decoders(self) -> Dict[str, Callable]: for (f_name, schema) in complex_feature_schemas.items() } else: + if not HAS_AVRO: + raise ModuleNotFoundError(avro_not_installed_message) _logger.debug("Fast Avro not found, using avro for deserialization.") return { f_name: ( diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py index d05e7c2a7..ec53902b9 100644 --- a/python/hsfs/engine/python.py +++ b/python/hsfs/engine/python.py @@ -50,7 +50,6 @@ import boto3 import hsfs -import numpy as np import pandas as pd import polars as pl import pyarrow as pa @@ -84,6 +83,7 @@ HAS_AIOMYSQL, HAS_ARROW, HAS_GREAT_EXPECTATIONS, + HAS_NUMPY, HAS_PANDAS, HAS_SQLALCHEMY, ) @@ -98,6 +98,9 @@ if HAS_GREAT_EXPECTATIONS: import great_expectations +if HAS_NUMPY: + import numpy as np + if HAS_ARROW: from hsfs.core.type_systems import PYARROW_HOPSWORKS_DTYPE_MAPPING if HAS_AIOMYSQL and HAS_SQLALCHEMY: @@ -1416,11 +1419,13 @@ def _start_offline_materialization(offline_write_options: Dict[str, Any]) -> boo def _convert_feature_log_to_df(feature_log, cols) -> pd.DataFrame: if feature_log is None and cols: return pd.DataFrame(columns=cols) - if not ( - isinstance(feature_log, (list, np.ndarray, pd.DataFrame, pl.DataFrame)) + if not (isinstance(feature_log, (list, pd.DataFrame, pl.DataFrame))) or ( + HAS_NUMPY and isinstance(feature_log, np.ndarray) ): raise ValueError(f"Type '{type(feature_log)}' not accepted") - if isinstance(feature_log, list) or isinstance(feature_log, np.ndarray): + if isinstance(feature_log, list) or ( + HAS_NUMPY and isinstance(feature_log, np.ndarray) + ): Engine._validate_logging_list(feature_log, cols) return pd.DataFrame(feature_log, columns=cols) else: @@ -1431,7 +1436,9 @@ def _convert_feature_log_to_df(feature_log, cols) -> pd.DataFrame: @staticmethod def _validate_logging_list(feature_log, cols): - if isinstance(feature_log[0], list) or isinstance(feature_log[0], np.ndarray): + if isinstance(feature_log[0], list) or ( + HAS_NUMPY and isinstance(feature_log[0], np.ndarray) + ): provided_len = len(feature_log[0]) else: provided_len = 1 diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index a1fe19a62..81797fa47 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -31,15 +31,19 @@ from pyspark.rdd import RDD from pyspark.sql import DataFrame -import numpy as np import pandas as pd import tzlocal +from hopsworks_common.core.constants import HAS_NUMPY from hsfs.constructor import query # in case importing in %%local from hsfs.core.vector_db_client import VectorDbClient +if HAS_NUMPY: + import numpy as np + + try: import pyspark from pyspark import SparkFiles @@ -258,9 +262,33 @@ def _return_dataframe_type(self, dataframe, dataframe_type): def convert_to_default_dataframe(self, dataframe): if isinstance(dataframe, list): - dataframe = np.array(dataframe) - - if isinstance(dataframe, np.ndarray): + #################### TODO TODO TODO TODO TODO #################### + if HAS_NUMPY: + dataframe = np.array(dataframe) + else: + try: + dataframe[0][0] + except TypeError: + raise TypeError( + "Cannot convert a list that has less than two dimensions to a dataframe." + ) from None + ok = False + try: + dataframe[0][0][0] + except TypeError: + ok = True + if not ok: + raise TypeError( + "Cannot convert a list that has more than two dimensions to a dataframe." + ) from None + num_cols = len(dataframe[0]) + dataframe_dict = {} + for n_col in list(range(num_cols)): + col_name = "col_" + str(n_col) + dataframe_dict[col_name] = dataframe[:, n_col] + dataframe = pd.DataFrame(dataframe_dict) + + if HAS_NUMPY and isinstance(dataframe, np.ndarray): if dataframe.ndim != 2: raise TypeError( "Cannot convert numpy array that do not have two dimensions to a dataframe. " @@ -284,7 +312,7 @@ def convert_to_default_dataframe(self, dataframe): ): # convert to utc timestamp dataframe_copy[c] = dataframe_copy[c].dt.tz_convert(None) - if dataframe_copy[c].dtype == np.dtype("datetime64[ns]"): + if HAS_NUMPY and dataframe_copy[c].dtype == np.dtype("datetime64[ns]"): # set the timezone to the client's timezone because that is # what spark expects. dataframe_copy[c] = dataframe_copy[c].dt.tz_localize( diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index 95604d9ff..40c6b3bc5 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -22,7 +22,6 @@ import warnings from datetime import date, datetime from typing import ( - TYPE_CHECKING, Any, Dict, List, @@ -33,17 +32,13 @@ Union, ) - -if TYPE_CHECKING: - import great_expectations - import avro.schema import hsfs.expectation_suite import humps -import numpy as np import pandas as pd import polars as pl from hopsworks_common.client.exceptions import FeatureStoreException, RestAPIError +from hopsworks_common.core.constants import HAS_NUMPY from hsfs import ( engine, feature, @@ -104,6 +99,9 @@ if HAS_CONFLUENT_KAFKA: import confluent_kafka +if HAS_NUMPY: + import numpy as np + _logger = logging.getLogger(__name__) diff --git a/python/hsfs/feature_group_writer.py b/python/hsfs/feature_group_writer.py index 146d47bed..de63bd5a4 100644 --- a/python/hsfs/feature_group_writer.py +++ b/python/hsfs/feature_group_writer.py @@ -17,12 +17,16 @@ from typing import Any, Dict, List, Optional, Tuple, TypeVar, Union -import numpy as np import pandas as pd +from hopsworks_common.core.constants import HAS_NUMPY from hsfs.core.job import Job from hsfs.validation_report import ValidationReport +if HAS_NUMPY: + import numpy as np + + class FeatureGroupWriter: def __init__(self, feature_group): self._feature_group = feature_group diff --git a/python/hsfs/feature_store.py b/python/hsfs/feature_store.py index 6cf52e87f..a5e00179d 100644 --- a/python/hsfs/feature_store.py +++ b/python/hsfs/feature_store.py @@ -21,9 +21,9 @@ from typing import Any, Dict, List, Optional, TypeVar, Union import humps -import numpy as np import pandas as pd import polars as pl +from hopsworks_common.core.constants import HAS_NUMPY from hsfs import ( expectation_suite, feature, @@ -53,6 +53,10 @@ from hsfs.transformation_function import TransformationFunction +if HAS_NUMPY: + import numpy as np + + @typechecked class FeatureStore: DEFAULT_VERSION = 1 diff --git a/python/hsfs/feature_view.py b/python/hsfs/feature_view.py index 3ecbf2068..ec56e9c0c 100644 --- a/python/hsfs/feature_view.py +++ b/python/hsfs/feature_view.py @@ -33,10 +33,10 @@ ) import humps -import numpy as np import pandas as pd import polars as pl from hopsworks_common.client.exceptions import FeatureStoreException +from hopsworks_common.core.constants import HAS_NUMPY from hsfs import ( feature_group, storage_connector, @@ -76,6 +76,10 @@ from hsml.model import Model +if HAS_NUMPY: + import numpy as np + + _logger = logging.getLogger(__name__) TrainingDatasetDataFrameTypes = Union[ diff --git a/python/hsfs/storage_connector.py b/python/hsfs/storage_connector.py index 9f959b1da..ce18090fb 100644 --- a/python/hsfs/storage_connector.py +++ b/python/hsfs/storage_connector.py @@ -24,14 +24,18 @@ from typing import Any, Dict, List, Optional, TypeVar, Union import humps -import numpy as np import pandas as pd import polars as pl from hopsworks_common import client +from hopsworks_common.core.constants import HAS_NUMPY from hsfs import engine from hsfs.core import storage_connector_api +if HAS_NUMPY: + import numpy as np + + _logger = logging.getLogger(__name__) diff --git a/python/hsfs/training_dataset.py b/python/hsfs/training_dataset.py index 92db6d23e..94688b692 100644 --- a/python/hsfs/training_dataset.py +++ b/python/hsfs/training_dataset.py @@ -19,10 +19,10 @@ from typing import Any, Dict, List, Optional, Set, TypeVar, Union import humps -import numpy as np import pandas as pd from hopsworks_common import client from hopsworks_common.client.exceptions import RestAPIError +from hopsworks_common.core.constants import HAS_NUMPY from hsfs import engine, training_dataset_feature, util from hsfs.constructor import filter, query from hsfs.core import ( @@ -36,6 +36,10 @@ from hsfs.training_dataset_split import TrainingDatasetSplit +if HAS_NUMPY: + import numpy as np + + class TrainingDatasetBase: # NOTE: This class is exposed to users with the only purpose of providing information about a Training Dataset # and, therefore, it should not implement any functionality and remain with as minimal as possible From f1f7cfd3dc4166c42077c76423ee36f6928bd145 Mon Sep 17 00:00:00 2001 From: Aleksey Veresov Date: Thu, 26 Sep 2024 15:21:41 +0200 Subject: [PATCH 2/7] Ruff --- locust_benchmark/create_feature_group.py | 1 - python/hsml/core/serving_api.py | 4 +++- utils/python/hsfs_utils.py | 1 - 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/locust_benchmark/create_feature_group.py b/locust_benchmark/create_feature_group.py index b61a69a41..2ac6cf568 100644 --- a/locust_benchmark/create_feature_group.py +++ b/locust_benchmark/create_feature_group.py @@ -1,7 +1,6 @@ from common.hopsworks_client import HopsworksClient if __name__ == "__main__": - hopsworks_client = HopsworksClient() fg = hopsworks_client.get_or_create_fg() hopsworks_client.insert_data(fg) diff --git a/python/hsml/core/serving_api.py b/python/hsml/core/serving_api.py index c17eba65c..92d947728 100644 --- a/python/hsml/core/serving_api.py +++ b/python/hsml/core/serving_api.py @@ -291,7 +291,9 @@ def _send_inference_request_via_grpc_protocol( # the channel, which will be reused in all following calls on the same deployment object. # The gRPC channel is freed when calling deployment.stop() print("Initializing gRPC channel...") - deployment_instance._grpc_channel = self._create_grpc_channel(deployment_instance) + deployment_instance._grpc_channel = self._create_grpc_channel( + deployment_instance + ) # build an infer request request = InferRequest( infer_inputs=data, diff --git a/utils/python/hsfs_utils.py b/utils/python/hsfs_utils.py index 5af468873..dfc0badfb 100644 --- a/utils/python/hsfs_utils.py +++ b/utils/python/hsfs_utils.py @@ -318,4 +318,3 @@ def parse_isoformat_date(da: str) -> datetime: sys.exit(1) sys.exit(0) - From f122515a875dc6f937c00c4d3a5f9f91055af7d6 Mon Sep 17 00:00:00 2001 From: Aleksey Veresov Date: Thu, 26 Sep 2024 15:27:55 +0200 Subject: [PATCH 3/7] Update pyproject.toml extras --- python/pyproject.toml | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/python/pyproject.toml b/python/pyproject.toml index 6cd64077e..128e243c2 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -41,8 +41,6 @@ dependencies = [ "requests", "furl", "boto3", - "pandas<2.2.0", - "numpy<2", "pyjks", "mock", "avro==1.11.3", @@ -60,6 +58,8 @@ dependencies = [ [project.optional-dependencies] python = [ + "numpy<2", + "pandas<2.2.0", "pyarrow>=10.0", "confluent-kafka<=2.3.0", "fastavro>=1.4.11,<=1.8.4", @@ -86,7 +86,10 @@ dev-pandas1 = [ "sqlalchemy<=1.4.48", ] dev = ["hopsworks[dev-no-opt,great-expectations,polars]"] -polars=["polars>=0.20.18,<=0.21.0"] +polars=[ + "polars>=0.20.18,<=0.21.0", + "pyarrow>=10.0", +] [build-system] requires = ["setuptools", "wheel"] From 2fb302efeb36cab4d87778e0d74ca4947feeb760 Mon Sep 17 00:00:00 2001 From: Aleksey Veresov Date: Thu, 26 Sep 2024 15:30:04 +0200 Subject: [PATCH 4/7] Fix --- python/pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyproject.toml b/python/pyproject.toml index 128e243c2..d655534df 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -41,6 +41,7 @@ dependencies = [ "requests", "furl", "boto3", + "pandas<2.2.0", "pyjks", "mock", "avro==1.11.3", @@ -59,7 +60,6 @@ dependencies = [ [project.optional-dependencies] python = [ "numpy<2", - "pandas<2.2.0", "pyarrow>=10.0", "confluent-kafka<=2.3.0", "fastavro>=1.4.11,<=1.8.4", From f06ee3f52ddebd69f17307adaa396c92a524ff5f Mon Sep 17 00:00:00 2001 From: Aleksey Veresov Date: Thu, 26 Sep 2024 16:25:07 +0200 Subject: [PATCH 5/7] Fix --- python/hsfs/engine/python.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py index 42f208110..4e19ed3e6 100644 --- a/python/hsfs/engine/python.py +++ b/python/hsfs/engine/python.py @@ -101,7 +101,7 @@ if HAS_NUMPY: import numpy as np -if HAS_ARROW: +if HAS_PYARROW: from hsfs.core.type_systems import PYARROW_HOPSWORKS_DTYPE_MAPPING if HAS_AIOMYSQL and HAS_SQLALCHEMY: From bfbe7aabed58931be068c54fcbc23a72d3294396 Mon Sep 17 00:00:00 2001 From: Aleksey Veresov Date: Tue, 1 Oct 2024 16:53:58 +0200 Subject: [PATCH 6/7] Attempt making numpy optional in convert_to_default_dataframe --- python/hsfs/engine/spark.py | 139 +++++++++++++++++++++--------------- 1 file changed, 81 insertions(+), 58 deletions(-) diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index 7e4c8b13b..e7409f34c 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -33,7 +33,7 @@ import pandas as pd import tzlocal -from hopsworks_common.core.constants import HAS_NUMPY +from hopsworks_common.core.constants import HAS_NUMPY, HAS_PANDAS from hsfs.constructor import query # in case importing in %%local @@ -262,63 +262,11 @@ def _return_dataframe_type(self, dataframe, dataframe_type): def convert_to_default_dataframe(self, dataframe): if isinstance(dataframe, list): - #################### TODO TODO TODO TODO TODO #################### - if HAS_NUMPY: - dataframe = np.array(dataframe) - else: - try: - dataframe[0][0] - except TypeError: - raise TypeError( - "Cannot convert a list that has less than two dimensions to a dataframe." - ) from None - ok = False - try: - dataframe[0][0][0] - except TypeError: - ok = True - if not ok: - raise TypeError( - "Cannot convert a list that has more than two dimensions to a dataframe." - ) from None - num_cols = len(dataframe[0]) - dataframe_dict = {} - for n_col in list(range(num_cols)): - col_name = "col_" + str(n_col) - dataframe_dict[col_name] = dataframe[:, n_col] - dataframe = pd.DataFrame(dataframe_dict) - - if HAS_NUMPY and isinstance(dataframe, np.ndarray): - if dataframe.ndim != 2: - raise TypeError( - "Cannot convert numpy array that do not have two dimensions to a dataframe. " - "The number of dimensions are: {}".format(dataframe.ndim) - ) - num_cols = dataframe.shape[1] - dataframe_dict = {} - for n_col in list(range(num_cols)): - col_name = "col_" + str(n_col) - dataframe_dict[col_name] = dataframe[:, n_col] - dataframe = pd.DataFrame(dataframe_dict) - - if isinstance(dataframe, pd.DataFrame): - # convert timestamps to current timezone - local_tz = tzlocal.get_localzone() - # make shallow copy so the original df does not get changed - dataframe_copy = dataframe.copy(deep=False) - for c in dataframe_copy.columns: - if isinstance( - dataframe_copy[c].dtype, pd.core.dtypes.dtypes.DatetimeTZDtype - ): - # convert to utc timestamp - dataframe_copy[c] = dataframe_copy[c].dt.tz_convert(None) - if HAS_NUMPY and dataframe_copy[c].dtype == np.dtype("datetime64[ns]"): - # set the timezone to the client's timezone because that is - # what spark expects. - dataframe_copy[c] = dataframe_copy[c].dt.tz_localize( - str(local_tz), ambiguous="infer", nonexistent="shift_forward" - ) - dataframe = self._spark_session.createDataFrame(dataframe_copy) + dataframe = self.convert_list_to_spark_dataframe(dataframe) + elif HAS_NUMPY and isinstance(dataframe, np.ndarray): + dataframe = self.convert_numpy_to_spark_dataframe(dataframe) + elif HAS_PANDAS and isinstance(dataframe, pd.DataFrame): + dataframe = self.convert_pandas_to_spark_dataframe(dataframe) elif isinstance(dataframe, RDD): dataframe = dataframe.toDF() @@ -369,6 +317,81 @@ def convert_to_default_dataframe(self, dataframe): ) ) + def convert_list_to_spark_dataframe(self, dataframe): + if HAS_NUMPY: + return self.convert_numpy_to_spark_dataframe(np.array(dataframe)) + try: + dataframe[0][0] + except TypeError: + raise TypeError( + "Cannot convert a list that has less than two dimensions to a dataframe." + ) from None + ok = False + try: + dataframe[0][0][0] + except TypeError: + ok = True + if not ok: + raise TypeError( + "Cannot convert a list that has more than two dimensions to a dataframe." + ) from None + num_cols = len(dataframe[0]) + if HAS_PANDAS: + dataframe_dict = {} + for n_col in range(num_cols): + c = "col_" + str(n_col) + dataframe_dict[c] = [dataframe[i][n_col] for i in range(len(dataframe))] + return self.convert_pandas_to_spark_dataframe(pd.DataFrame(dataframe_dict)) + # We have neither numpy nor pandas, so there is no need to transform timestamps + return self._spark_session.createDataFrame( + dataframe, ["col_" + str(n) for n in range(num_cols)] + ) + + def convert_numpy_to_spark_dataframe(self, dataframe): + if dataframe.ndim != 2: + raise TypeError( + "Cannot convert numpy array that do not have two dimensions to a dataframe. " + "The number of dimensions are: {}".format(dataframe.ndim) + ) + num_cols = dataframe.shape[1] + if HAS_PANDAS: + dataframe_dict = {} + for n_col in range(num_cols): + c = "col_" + str(n_col) + dataframe_dict[c] = dataframe[:, n_col] + return self.convert_pandas_to_spark_dataframe(pd.DataFrame(dataframe_dict)) + # convert timestamps to current timezone + local_tz = tzlocal.get_localzone() + for n_col in range(num_cols): + if dataframe[:, n_col].dtype == np.dtype("datetime64[ns]"): + # set the timezone to the client's timezone because that is + # what spark expects. + dataframe[:, n_col] = dataframe[:, n_col].map( + lambda d: local_tz.fromutc(d.item().astimezone(local_tz)) + ) + return self._spark_session.createDataFrame( + dataframe.tolist(), ["col_" + str(n) for n in range(num_cols)] + ) + + def convert_pandas_to_spark_dataframe(self, dataframe): + # convert timestamps to current timezone + local_tz = tzlocal.get_localzone() + # make shallow copy so the original df does not get changed + dataframe_copy = dataframe.copy(deep=False) + for c in dataframe_copy.columns: + if isinstance( + dataframe_copy[c].dtype, pd.core.dtypes.dtypes.DatetimeTZDtype + ): + # convert to utc timestamp + dataframe_copy[c] = dataframe_copy[c].dt.tz_convert(None) + if HAS_NUMPY and dataframe_copy[c].dtype == np.dtype("datetime64[ns]"): + # set the timezone to the client's timezone because that is + # what spark expects. + dataframe_copy[c] = dataframe_copy[c].dt.tz_localize( + str(local_tz), ambiguous="infer", nonexistent="shift_forward" + ) + return self._spark_session.createDataFrame(dataframe_copy) + def save_dataframe( self, feature_group, From 7280c4462fc2eb92532a2644a9a323ef3366e644 Mon Sep 17 00:00:00 2001 From: Aleksey Veresov Date: Thu, 3 Oct 2024 10:55:38 +0200 Subject: [PATCH 7/7] Address Manu's review --- python/hsfs/core/vector_server.py | 11 ++++++----- python/hsfs/engine/python.py | 3 --- python/hsfs/engine/spark.py | 19 +++++++++++++++---- 3 files changed, 21 insertions(+), 12 deletions(-) diff --git a/python/hsfs/core/vector_server.py b/python/hsfs/core/vector_server.py index fc261a88e..a0f2ed2ab 100755 --- a/python/hsfs/core/vector_server.py +++ b/python/hsfs/core/vector_server.py @@ -23,8 +23,6 @@ from io import BytesIO from typing import Any, Callable, Dict, List, Literal, Optional, Set, Tuple, Union -import avro.io -import avro.schema import pandas as pd from hopsworks_common import client from hopsworks_common.core.constants import ( @@ -60,7 +58,9 @@ if HAS_FAST_AVRO: from fastavro import schemaless_reader -elif HAS_AVRO: +if HAS_AVRO: + import avro.io + import avro.schema from avro.io import BinaryDecoder if HAS_POLARS: @@ -1072,6 +1072,9 @@ def build_complex_feature_decoders(self) -> Dict[str, Callable]: - deserialization of complex features from the online feature store - conversion of string or int timestamps to datetime objects """ + if not HAS_AVRO: + raise ModuleNotFoundError(avro_not_installed_message) + complex_feature_schemas = { f.name: avro.io.DatumReader( avro.schema.parse( @@ -1114,8 +1117,6 @@ def build_complex_feature_decoders(self) -> Dict[str, Callable]: for (f_name, schema) in complex_feature_schemas.items() } else: - if not HAS_AVRO: - raise ModuleNotFoundError(avro_not_installed_message) _logger.debug("Fast Avro not found, using avro for deserialization.") return { f_name: ( diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py index 4e19ed3e6..5db04ab8e 100644 --- a/python/hsfs/engine/python.py +++ b/python/hsfs/engine/python.py @@ -101,9 +101,6 @@ if HAS_NUMPY: import numpy as np -if HAS_PYARROW: - from hsfs.core.type_systems import PYARROW_HOPSWORKS_DTYPE_MAPPING - if HAS_AIOMYSQL and HAS_SQLALCHEMY: from hsfs.core import util_sql diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index e7409f34c..3a990fe18 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -317,6 +317,14 @@ def convert_to_default_dataframe(self, dataframe): ) ) + @staticmethod + def utc_disguised_as_local(dt): + local_tz = tzlocal.get_localzone() + utc = timezone.utc + if not dt.tzinfo: + dt = dt.replace(tzinfo=utc) + return dt.astimezone(utc).replace(tzinfo=local_tz) + def convert_list_to_spark_dataframe(self, dataframe): if HAS_NUMPY: return self.convert_numpy_to_spark_dataframe(np.array(dataframe)) @@ -342,7 +350,11 @@ def convert_list_to_spark_dataframe(self, dataframe): c = "col_" + str(n_col) dataframe_dict[c] = [dataframe[i][n_col] for i in range(len(dataframe))] return self.convert_pandas_to_spark_dataframe(pd.DataFrame(dataframe_dict)) - # We have neither numpy nor pandas, so there is no need to transform timestamps + for i in range(len(dataframe)): + dataframe[i] = [ + self.utc_disguised_as_local(d) if isinstance(d, datetime) else d + for d in dataframe[i] + ] return self._spark_session.createDataFrame( dataframe, ["col_" + str(n) for n in range(num_cols)] ) @@ -361,13 +373,12 @@ def convert_numpy_to_spark_dataframe(self, dataframe): dataframe_dict[c] = dataframe[:, n_col] return self.convert_pandas_to_spark_dataframe(pd.DataFrame(dataframe_dict)) # convert timestamps to current timezone - local_tz = tzlocal.get_localzone() for n_col in range(num_cols): if dataframe[:, n_col].dtype == np.dtype("datetime64[ns]"): # set the timezone to the client's timezone because that is # what spark expects. - dataframe[:, n_col] = dataframe[:, n_col].map( - lambda d: local_tz.fromutc(d.item().astimezone(local_tz)) + dataframe[:, n_col] = np.array( + [self.utc_disguised_as_local(d.item()) for d in dataframe[:, n_col]] ) return self._spark_session.createDataFrame( dataframe.tolist(), ["col_" + str(n) for n in range(num_cols)]