diff --git a/locust_benchmark/create_feature_group.py b/locust_benchmark/create_feature_group.py index b61a69a41..2ac6cf568 100644 --- a/locust_benchmark/create_feature_group.py +++ b/locust_benchmark/create_feature_group.py @@ -1,7 +1,6 @@ from common.hopsworks_client import HopsworksClient if __name__ == "__main__": - hopsworks_client = HopsworksClient() fg = hopsworks_client.get_or_create_fg() hopsworks_client.insert_data(fg) diff --git a/python/hopsworks_common/core/constants.py b/python/hopsworks_common/core/constants.py index 4ea804d4c..0e1acddc4 100644 --- a/python/hopsworks_common/core/constants.py +++ b/python/hopsworks_common/core/constants.py @@ -20,6 +20,13 @@ # Avro HAS_FAST_AVRO: bool = importlib.util.find_spec("fastavro") is not None HAS_AVRO: bool = importlib.util.find_spec("avro") is not None +avro_not_installed_message = ( + "Avro package not found. " + "If you want to use avro with Hopsworks you can install the corresponding extra via " + '`pip install "hopsworks[avro]"`. ' + "You can also install avro directly in your environment with `pip install fastavro` or `pip install avro`. " + "You will need to restart your kernel if applicable." +) # Confluent Kafka HAS_CONFLUENT_KAFKA: bool = importlib.util.find_spec("confluent_kafka") is not None @@ -55,7 +62,17 @@ ) HAS_PANDAS: bool = importlib.util.find_spec("pandas") is not None + +# NumPy HAS_NUMPY: bool = importlib.util.find_spec("numpy") is not None +numpy_not_installed_message = ( + "Numpy package not found. " + "If you want to use numpy with Hopsworks you can install the corresponding extra via " + '`pip install "hopsworks[numpy]"`. ' + "You can also install numpy directly in your environment with `pip install numpy`. " + "You will need to restart your kernel if applicable." +) + HAS_POLARS: bool = importlib.util.find_spec("polars") is not None polars_not_installed_message = ( "Polars package not found. " diff --git a/python/hsfs/builtin_transformations.py b/python/hsfs/builtin_transformations.py index fa8348553..1fc2ce670 100644 --- a/python/hsfs/builtin_transformations.py +++ b/python/hsfs/builtin_transformations.py @@ -14,7 +14,8 @@ # limitations under the License. # -import numpy as np +import math + import pandas as pd from hsfs.hopsworks_udf import udf from hsfs.transformation_statistics import TransformationStatistics @@ -49,7 +50,7 @@ def label_encoder(feature: pd.Series, statistics=feature_statistics) -> pd.Serie # Unknown categories not present in training dataset are encoded as -1. return pd.Series( [ - value_to_index.get(data, -1) if not pd.isna(data) else np.nan + value_to_index.get(data, -1) if not pd.isna(data) else math.nan for data in feature ] ) diff --git a/python/hsfs/constructor/query.py b/python/hsfs/constructor/query.py index b332ab1db..2fa4ae8d0 100644 --- a/python/hsfs/constructor/query.py +++ b/python/hsfs/constructor/query.py @@ -21,9 +21,9 @@ from typing import Any, Dict, List, Optional, Tuple, TypeVar, Union import humps -import numpy as np import pandas as pd from hopsworks_common.client.exceptions import FeatureStoreException +from hopsworks_common.core.constants import HAS_NUMPY from hsfs import engine, storage_connector, util from hsfs import feature_group as fg_mod from hsfs.constructor import join @@ -34,6 +34,10 @@ from hsfs.feature import Feature +if HAS_NUMPY: + import numpy as np + + @typechecked class Query: ERROR_MESSAGE_FEATURE_AMBIGUOUS = ( diff --git a/python/hsfs/core/feature_view_engine.py b/python/hsfs/core/feature_view_engine.py index 84b6c7bda..be284f752 100644 --- a/python/hsfs/core/feature_view_engine.py +++ b/python/hsfs/core/feature_view_engine.py @@ -19,10 +19,10 @@ import warnings from typing import Any, Dict, List, Optional, TypeVar, Union -import numpy as np import pandas as pd from hopsworks_common import client from hopsworks_common.client.exceptions import FeatureStoreException +from hopsworks_common.core.constants import HAS_NUMPY from hsfs import ( engine, feature_group, @@ -45,6 +45,10 @@ from hsfs.training_dataset_split import TrainingDatasetSplit +if HAS_NUMPY: + import numpy as np + + class FeatureViewEngine: ENTITY_TYPE = "featureview" _TRAINING_DATA_API_PATH = "trainingdatasets" diff --git a/python/hsfs/core/kafka_engine.py b/python/hsfs/core/kafka_engine.py index 66ebba47f..d21b6ec22 100644 --- a/python/hsfs/core/kafka_engine.py +++ b/python/hsfs/core/kafka_engine.py @@ -20,14 +20,17 @@ from io import BytesIO from typing import TYPE_CHECKING, Any, Callable, Dict, Literal, Optional, Tuple, Union -import numpy as np import pandas as pd from hopsworks_common import client +from hopsworks_common.core.constants import HAS_NUMPY from hsfs.core import storage_connector_api from hsfs.core.constants import HAS_AVRO, HAS_CONFLUENT_KAFKA, HAS_FAST_AVRO from tqdm import tqdm +if HAS_NUMPY: + import numpy as np + if HAS_CONFLUENT_KAFKA: from confluent_kafka import Consumer, KafkaError, Producer, TopicPartition @@ -202,7 +205,7 @@ def encode_row(complex_feature_writers, writer, row): if isinstance(row, dict): for k in row.keys(): # for avro to be able to serialize them, they need to be python data types - if isinstance(row[k], np.ndarray): + if HAS_NUMPY and isinstance(row[k], np.ndarray): row[k] = row[k].tolist() if isinstance(row[k], pd.Timestamp): row[k] = row[k].to_pydatetime() diff --git a/python/hsfs/core/vector_server.py b/python/hsfs/core/vector_server.py index ccc0f39d2..a0f2ed2ab 100755 --- a/python/hsfs/core/vector_server.py +++ b/python/hsfs/core/vector_server.py @@ -23,14 +23,15 @@ from io import BytesIO from typing import Any, Callable, Dict, List, Literal, Optional, Set, Tuple, Union -import avro.io -import avro.schema -import numpy as np import pandas as pd from hopsworks_common import client from hopsworks_common.core.constants import ( + HAS_AVRO, HAS_FAST_AVRO, + HAS_NUMPY, HAS_POLARS, + avro_not_installed_message, + numpy_not_installed_message, polars_not_installed_message, ) from hsfs import ( @@ -52,9 +53,14 @@ ) +if HAS_NUMPY: + import numpy as np + if HAS_FAST_AVRO: from fastavro import schemaless_reader -else: +if HAS_AVRO: + import avro.io + import avro.schema from avro.io import BinaryDecoder if HAS_POLARS: @@ -807,6 +813,8 @@ def handle_feature_vector_return_type( return feature_vectorz elif return_type.lower() == "numpy" and not inference_helper: _logger.debug("Returning feature vector as numpy array") + if not HAS_NUMPY: + raise ModuleNotFoundError(numpy_not_installed_message) return np.array(feature_vectorz) # Only inference helper can return dict elif return_type.lower() == "dict" and inference_helper: @@ -1064,6 +1072,9 @@ def build_complex_feature_decoders(self) -> Dict[str, Callable]: - deserialization of complex features from the online feature store - conversion of string or int timestamps to datetime objects """ + if not HAS_AVRO: + raise ModuleNotFoundError(avro_not_installed_message) + complex_feature_schemas = { f.name: avro.io.DatumReader( avro.schema.parse( diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py index 96c4f9ecc..5db04ab8e 100644 --- a/python/hsfs/engine/python.py +++ b/python/hsfs/engine/python.py @@ -50,7 +50,6 @@ import boto3 import hsfs -import numpy as np import pandas as pd import pyarrow as pa from botocore.response import StreamingBody @@ -83,6 +82,7 @@ from hsfs.core.constants import ( HAS_AIOMYSQL, HAS_GREAT_EXPECTATIONS, + HAS_NUMPY, HAS_PANDAS, HAS_PYARROW, HAS_SQLALCHEMY, @@ -98,6 +98,9 @@ if HAS_GREAT_EXPECTATIONS: import great_expectations +if HAS_NUMPY: + import numpy as np + if HAS_AIOMYSQL and HAS_SQLALCHEMY: from hsfs.core import util_sql @@ -1464,11 +1467,13 @@ def _start_offline_materialization(offline_write_options: Dict[str, Any]) -> boo def _convert_feature_log_to_df(feature_log, cols) -> pd.DataFrame: if feature_log is None and cols: return pd.DataFrame(columns=cols) - if not ( - isinstance(feature_log, (list, np.ndarray, pd.DataFrame, pl.DataFrame)) + if not (isinstance(feature_log, (list, pd.DataFrame, pl.DataFrame))) or ( + HAS_NUMPY and isinstance(feature_log, np.ndarray) ): raise ValueError(f"Type '{type(feature_log)}' not accepted") - if isinstance(feature_log, list) or isinstance(feature_log, np.ndarray): + if isinstance(feature_log, list) or ( + HAS_NUMPY and isinstance(feature_log, np.ndarray) + ): Engine._validate_logging_list(feature_log, cols) return pd.DataFrame(feature_log, columns=cols) else: @@ -1479,7 +1484,9 @@ def _convert_feature_log_to_df(feature_log, cols) -> pd.DataFrame: @staticmethod def _validate_logging_list(feature_log, cols): - if isinstance(feature_log[0], list) or isinstance(feature_log[0], np.ndarray): + if isinstance(feature_log[0], list) or ( + HAS_NUMPY and isinstance(feature_log[0], np.ndarray) + ): provided_len = len(feature_log[0]) else: provided_len = 1 diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index 53ea4dc0f..3a990fe18 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -31,15 +31,19 @@ from pyspark.rdd import RDD from pyspark.sql import DataFrame -import numpy as np import pandas as pd import tzlocal +from hopsworks_common.core.constants import HAS_NUMPY, HAS_PANDAS from hsfs.constructor import query # in case importing in %%local from hsfs.core.vector_db_client import VectorDbClient +if HAS_NUMPY: + import numpy as np + + try: import pyspark from pyspark import SparkFiles @@ -258,39 +262,11 @@ def _return_dataframe_type(self, dataframe, dataframe_type): def convert_to_default_dataframe(self, dataframe): if isinstance(dataframe, list): - dataframe = np.array(dataframe) - - if isinstance(dataframe, np.ndarray): - if dataframe.ndim != 2: - raise TypeError( - "Cannot convert numpy array that do not have two dimensions to a dataframe. " - "The number of dimensions are: {}".format(dataframe.ndim) - ) - num_cols = dataframe.shape[1] - dataframe_dict = {} - for n_col in list(range(num_cols)): - col_name = "col_" + str(n_col) - dataframe_dict[col_name] = dataframe[:, n_col] - dataframe = pd.DataFrame(dataframe_dict) - - if isinstance(dataframe, pd.DataFrame): - # convert timestamps to current timezone - local_tz = tzlocal.get_localzone() - # make shallow copy so the original df does not get changed - dataframe_copy = dataframe.copy(deep=False) - for c in dataframe_copy.columns: - if isinstance( - dataframe_copy[c].dtype, pd.core.dtypes.dtypes.DatetimeTZDtype - ): - # convert to utc timestamp - dataframe_copy[c] = dataframe_copy[c].dt.tz_convert(None) - if dataframe_copy[c].dtype == np.dtype("datetime64[ns]"): - # set the timezone to the client's timezone because that is - # what spark expects. - dataframe_copy[c] = dataframe_copy[c].dt.tz_localize( - str(local_tz), ambiguous="infer", nonexistent="shift_forward" - ) - dataframe = self._spark_session.createDataFrame(dataframe_copy) + dataframe = self.convert_list_to_spark_dataframe(dataframe) + elif HAS_NUMPY and isinstance(dataframe, np.ndarray): + dataframe = self.convert_numpy_to_spark_dataframe(dataframe) + elif HAS_PANDAS and isinstance(dataframe, pd.DataFrame): + dataframe = self.convert_pandas_to_spark_dataframe(dataframe) elif isinstance(dataframe, RDD): dataframe = dataframe.toDF() @@ -341,6 +317,92 @@ def convert_to_default_dataframe(self, dataframe): ) ) + @staticmethod + def utc_disguised_as_local(dt): + local_tz = tzlocal.get_localzone() + utc = timezone.utc + if not dt.tzinfo: + dt = dt.replace(tzinfo=utc) + return dt.astimezone(utc).replace(tzinfo=local_tz) + + def convert_list_to_spark_dataframe(self, dataframe): + if HAS_NUMPY: + return self.convert_numpy_to_spark_dataframe(np.array(dataframe)) + try: + dataframe[0][0] + except TypeError: + raise TypeError( + "Cannot convert a list that has less than two dimensions to a dataframe." + ) from None + ok = False + try: + dataframe[0][0][0] + except TypeError: + ok = True + if not ok: + raise TypeError( + "Cannot convert a list that has more than two dimensions to a dataframe." + ) from None + num_cols = len(dataframe[0]) + if HAS_PANDAS: + dataframe_dict = {} + for n_col in range(num_cols): + c = "col_" + str(n_col) + dataframe_dict[c] = [dataframe[i][n_col] for i in range(len(dataframe))] + return self.convert_pandas_to_spark_dataframe(pd.DataFrame(dataframe_dict)) + for i in range(len(dataframe)): + dataframe[i] = [ + self.utc_disguised_as_local(d) if isinstance(d, datetime) else d + for d in dataframe[i] + ] + return self._spark_session.createDataFrame( + dataframe, ["col_" + str(n) for n in range(num_cols)] + ) + + def convert_numpy_to_spark_dataframe(self, dataframe): + if dataframe.ndim != 2: + raise TypeError( + "Cannot convert numpy array that do not have two dimensions to a dataframe. " + "The number of dimensions are: {}".format(dataframe.ndim) + ) + num_cols = dataframe.shape[1] + if HAS_PANDAS: + dataframe_dict = {} + for n_col in range(num_cols): + c = "col_" + str(n_col) + dataframe_dict[c] = dataframe[:, n_col] + return self.convert_pandas_to_spark_dataframe(pd.DataFrame(dataframe_dict)) + # convert timestamps to current timezone + for n_col in range(num_cols): + if dataframe[:, n_col].dtype == np.dtype("datetime64[ns]"): + # set the timezone to the client's timezone because that is + # what spark expects. + dataframe[:, n_col] = np.array( + [self.utc_disguised_as_local(d.item()) for d in dataframe[:, n_col]] + ) + return self._spark_session.createDataFrame( + dataframe.tolist(), ["col_" + str(n) for n in range(num_cols)] + ) + + def convert_pandas_to_spark_dataframe(self, dataframe): + # convert timestamps to current timezone + local_tz = tzlocal.get_localzone() + # make shallow copy so the original df does not get changed + dataframe_copy = dataframe.copy(deep=False) + for c in dataframe_copy.columns: + if isinstance( + dataframe_copy[c].dtype, pd.core.dtypes.dtypes.DatetimeTZDtype + ): + # convert to utc timestamp + dataframe_copy[c] = dataframe_copy[c].dt.tz_convert(None) + if HAS_NUMPY and dataframe_copy[c].dtype == np.dtype("datetime64[ns]"): + # set the timezone to the client's timezone because that is + # what spark expects. + dataframe_copy[c] = dataframe_copy[c].dt.tz_localize( + str(local_tz), ambiguous="infer", nonexistent="shift_forward" + ) + return self._spark_session.createDataFrame(dataframe_copy) + def save_dataframe( self, feature_group, diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index c3407ea2f..4a1db2c57 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -22,7 +22,6 @@ import warnings from datetime import date, datetime from typing import ( - TYPE_CHECKING, Any, Dict, List, @@ -33,17 +32,12 @@ Union, ) - -if TYPE_CHECKING: - import great_expectations - import avro.schema import hsfs.expectation_suite import humps -import numpy as np import pandas as pd from hopsworks_common.client.exceptions import FeatureStoreException, RestAPIError -from hopsworks_common.core.constants import HAS_POLARS +from hopsworks_common.core.constants import HAS_NUMPY, HAS_POLARS from hsfs import ( engine, feature, @@ -104,6 +98,9 @@ if HAS_CONFLUENT_KAFKA: import confluent_kafka +if HAS_NUMPY: + import numpy as np + if HAS_POLARS: import polars as pl diff --git a/python/hsfs/feature_group_writer.py b/python/hsfs/feature_group_writer.py index 146d47bed..de63bd5a4 100644 --- a/python/hsfs/feature_group_writer.py +++ b/python/hsfs/feature_group_writer.py @@ -17,12 +17,16 @@ from typing import Any, Dict, List, Optional, Tuple, TypeVar, Union -import numpy as np import pandas as pd +from hopsworks_common.core.constants import HAS_NUMPY from hsfs.core.job import Job from hsfs.validation_report import ValidationReport +if HAS_NUMPY: + import numpy as np + + class FeatureGroupWriter: def __init__(self, feature_group): self._feature_group = feature_group diff --git a/python/hsfs/feature_store.py b/python/hsfs/feature_store.py index 2a384c961..77c48ae1a 100644 --- a/python/hsfs/feature_store.py +++ b/python/hsfs/feature_store.py @@ -21,9 +21,8 @@ from typing import Any, Dict, List, Optional, TypeVar, Union import humps -import numpy as np import pandas as pd -from hopsworks_common.core.constants import HAS_POLARS +from hopsworks_common.core.constants import HAS_NUMPY, HAS_POLARS from hsfs import ( expectation_suite, feature, @@ -52,6 +51,9 @@ from hsfs.transformation_function import TransformationFunction +if HAS_NUMPY: + import numpy as np + if HAS_POLARS: import polars as pl diff --git a/python/hsfs/feature_view.py b/python/hsfs/feature_view.py index 7527f4de7..6dbe7a585 100644 --- a/python/hsfs/feature_view.py +++ b/python/hsfs/feature_view.py @@ -33,10 +33,9 @@ ) import humps -import numpy as np import pandas as pd from hopsworks_common.client.exceptions import FeatureStoreException -from hopsworks_common.core.constants import HAS_POLARS +from hopsworks_common.core.constants import HAS_NUMPY, HAS_POLARS from hsfs import ( feature_group, storage_connector, @@ -76,6 +75,12 @@ from hsml.model import Model +if HAS_NUMPY: + import numpy as np + + +_logger = logging.getLogger(__name__) + TrainingDatasetDataFrameTypes = Union[ pd.DataFrame, TypeVar("pyspark.sql.DataFrame"), # noqa: F821 diff --git a/python/hsfs/storage_connector.py b/python/hsfs/storage_connector.py index 97e963ad0..503b46fe8 100644 --- a/python/hsfs/storage_connector.py +++ b/python/hsfs/storage_connector.py @@ -24,14 +24,16 @@ from typing import Any, Dict, List, Optional, TypeVar, Union import humps -import numpy as np import pandas as pd from hopsworks_common import client -from hopsworks_common.core.constants import HAS_POLARS +from hopsworks_common.core.constants import HAS_NUMPY, HAS_POLARS from hsfs import engine from hsfs.core import storage_connector_api +if HAS_NUMPY: + import numpy as np + if HAS_POLARS: import polars as pl diff --git a/python/hsfs/training_dataset.py b/python/hsfs/training_dataset.py index 92db6d23e..94688b692 100644 --- a/python/hsfs/training_dataset.py +++ b/python/hsfs/training_dataset.py @@ -19,10 +19,10 @@ from typing import Any, Dict, List, Optional, Set, TypeVar, Union import humps -import numpy as np import pandas as pd from hopsworks_common import client from hopsworks_common.client.exceptions import RestAPIError +from hopsworks_common.core.constants import HAS_NUMPY from hsfs import engine, training_dataset_feature, util from hsfs.constructor import filter, query from hsfs.core import ( @@ -36,6 +36,10 @@ from hsfs.training_dataset_split import TrainingDatasetSplit +if HAS_NUMPY: + import numpy as np + + class TrainingDatasetBase: # NOTE: This class is exposed to users with the only purpose of providing information about a Training Dataset # and, therefore, it should not implement any functionality and remain with as minimal as possible diff --git a/python/hsml/core/serving_api.py b/python/hsml/core/serving_api.py index c17eba65c..92d947728 100644 --- a/python/hsml/core/serving_api.py +++ b/python/hsml/core/serving_api.py @@ -291,7 +291,9 @@ def _send_inference_request_via_grpc_protocol( # the channel, which will be reused in all following calls on the same deployment object. # The gRPC channel is freed when calling deployment.stop() print("Initializing gRPC channel...") - deployment_instance._grpc_channel = self._create_grpc_channel(deployment_instance) + deployment_instance._grpc_channel = self._create_grpc_channel( + deployment_instance + ) # build an infer request request = InferRequest( infer_inputs=data, diff --git a/python/pyproject.toml b/python/pyproject.toml index 6cd64077e..d655534df 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -42,7 +42,6 @@ dependencies = [ "furl", "boto3", "pandas<2.2.0", - "numpy<2", "pyjks", "mock", "avro==1.11.3", @@ -60,6 +59,7 @@ dependencies = [ [project.optional-dependencies] python = [ + "numpy<2", "pyarrow>=10.0", "confluent-kafka<=2.3.0", "fastavro>=1.4.11,<=1.8.4", @@ -86,7 +86,10 @@ dev-pandas1 = [ "sqlalchemy<=1.4.48", ] dev = ["hopsworks[dev-no-opt,great-expectations,polars]"] -polars=["polars>=0.20.18,<=0.21.0"] +polars=[ + "polars>=0.20.18,<=0.21.0", + "pyarrow>=10.0", +] [build-system] requires = ["setuptools", "wheel"] diff --git a/utils/python/hsfs_utils.py b/utils/python/hsfs_utils.py index 5af468873..dfc0badfb 100644 --- a/utils/python/hsfs_utils.py +++ b/utils/python/hsfs_utils.py @@ -318,4 +318,3 @@ def parse_isoformat_date(da: str) -> datetime: sys.exit(1) sys.exit(0) -