Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FSTORE-1468] Make numpy optional #338

Merged
merged 8 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion locust_benchmark/create_feature_group.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from common.hopsworks_client import HopsworksClient

if __name__ == "__main__":

hopsworks_client = HopsworksClient()
fg = hopsworks_client.get_or_create_fg()
hopsworks_client.insert_data(fg)
Expand Down
17 changes: 17 additions & 0 deletions python/hopsworks_common/core/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@
# Avro
HAS_FAST_AVRO: bool = importlib.util.find_spec("fastavro") is not None
HAS_AVRO: bool = importlib.util.find_spec("avro") is not None
avro_not_installed_message = (
"Avro package not found. "
"If you want to use avro with Hopsworks you can install the corresponding extra via "
'`pip install "hopsworks[avro]"`. '
"You can also install avro directly in your environment with `pip install fastavro` or `pip install avro`. "
"You will need to restart your kernel if applicable."
)

# Confluent Kafka
HAS_CONFLUENT_KAFKA: bool = importlib.util.find_spec("confluent_kafka") is not None
Expand Down Expand Up @@ -55,7 +62,17 @@
)

HAS_PANDAS: bool = importlib.util.find_spec("pandas") is not None

# NumPy
HAS_NUMPY: bool = importlib.util.find_spec("numpy") is not None
numpy_not_installed_message = (
"Numpy package not found. "
"If you want to use numpy with Hopsworks you can install the corresponding extra via "
'`pip install "hopsworks[numpy]"`. '
"You can also install numpy directly in your environment with `pip install numpy`. "
"You will need to restart your kernel if applicable."
)

HAS_POLARS: bool = importlib.util.find_spec("polars") is not None
polars_not_installed_message = (
"Polars package not found. "
Expand Down
5 changes: 3 additions & 2 deletions python/hsfs/builtin_transformations.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
# limitations under the License.
#

import numpy as np
import math

import pandas as pd
from hsfs.hopsworks_udf import udf
from hsfs.transformation_statistics import TransformationStatistics
Expand Down Expand Up @@ -49,7 +50,7 @@ def label_encoder(feature: pd.Series, statistics=feature_statistics) -> pd.Serie
# Unknown categories not present in training dataset are encoded as -1.
return pd.Series(
[
value_to_index.get(data, -1) if not pd.isna(data) else np.nan
value_to_index.get(data, -1) if not pd.isna(data) else math.nan
for data in feature
]
)
Expand Down
6 changes: 5 additions & 1 deletion python/hsfs/constructor/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
from typing import Any, Dict, List, Optional, Tuple, TypeVar, Union

import humps
import numpy as np
import pandas as pd
from hopsworks_common.client.exceptions import FeatureStoreException
from hopsworks_common.core.constants import HAS_NUMPY
from hsfs import engine, storage_connector, util
from hsfs import feature_group as fg_mod
from hsfs.constructor import join
Expand All @@ -34,6 +34,10 @@
from hsfs.feature import Feature


if HAS_NUMPY:
import numpy as np


@typechecked
class Query:
ERROR_MESSAGE_FEATURE_AMBIGUOUS = (
Expand Down
6 changes: 5 additions & 1 deletion python/hsfs/core/feature_view_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
import warnings
from typing import Any, Dict, List, Optional, TypeVar, Union

import numpy as np
import pandas as pd
from hopsworks_common import client
from hopsworks_common.client.exceptions import FeatureStoreException
from hopsworks_common.core.constants import HAS_NUMPY
from hsfs import (
engine,
feature_group,
Expand All @@ -45,6 +45,10 @@
from hsfs.training_dataset_split import TrainingDatasetSplit


if HAS_NUMPY:
import numpy as np


class FeatureViewEngine:
ENTITY_TYPE = "featureview"
_TRAINING_DATA_API_PATH = "trainingdatasets"
Expand Down
7 changes: 5 additions & 2 deletions python/hsfs/core/kafka_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@
from io import BytesIO
from typing import TYPE_CHECKING, Any, Callable, Dict, Literal, Optional, Tuple, Union

import numpy as np
import pandas as pd
from hopsworks_common import client
from hopsworks_common.core.constants import HAS_NUMPY
from hsfs.core import storage_connector_api
from hsfs.core.constants import HAS_AVRO, HAS_CONFLUENT_KAFKA, HAS_FAST_AVRO
from tqdm import tqdm


if HAS_NUMPY:
import numpy as np

if HAS_CONFLUENT_KAFKA:
from confluent_kafka import Consumer, KafkaError, Producer, TopicPartition

Expand Down Expand Up @@ -202,7 +205,7 @@ def encode_row(complex_feature_writers, writer, row):
if isinstance(row, dict):
for k in row.keys():
# for avro to be able to serialize them, they need to be python data types
if isinstance(row[k], np.ndarray):
if HAS_NUMPY and isinstance(row[k], np.ndarray):
row[k] = row[k].tolist()
if isinstance(row[k], pd.Timestamp):
row[k] = row[k].to_pydatetime()
Expand Down
14 changes: 12 additions & 2 deletions python/hsfs/core/vector_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@

import avro.io
import avro.schema
import numpy as np
import pandas as pd
from hopsworks_common import client
from hopsworks_common.core.constants import (
HAS_AVRO,
HAS_FAST_AVRO,
HAS_NUMPY,
HAS_POLARS,
avro_not_installed_message,
numpy_not_installed_message,
polars_not_installed_message,
)
from hsfs import (
Expand All @@ -52,9 +55,12 @@
)


if HAS_NUMPY:
import numpy as np

if HAS_FAST_AVRO:
from fastavro import schemaless_reader
else:
elif HAS_AVRO:
from avro.io import BinaryDecoder

if HAS_POLARS:
Expand Down Expand Up @@ -807,6 +813,8 @@ def handle_feature_vector_return_type(
return feature_vectorz
elif return_type.lower() == "numpy" and not inference_helper:
_logger.debug("Returning feature vector as numpy array")
if not HAS_NUMPY:
raise ModuleNotFoundError(numpy_not_installed_message)
return np.array(feature_vectorz)
# Only inference helper can return dict
elif return_type.lower() == "dict" and inference_helper:
Expand Down Expand Up @@ -1106,6 +1114,8 @@ def build_complex_feature_decoders(self) -> Dict[str, Callable]:
for (f_name, schema) in complex_feature_schemas.items()
}
else:
if not HAS_AVRO:
raise ModuleNotFoundError(avro_not_installed_message)
_logger.debug("Fast Avro not found, using avro for deserialization.")
return {
f_name: (
Expand Down
20 changes: 15 additions & 5 deletions python/hsfs/engine/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@

import boto3
import hsfs
import numpy as np
import pandas as pd
import pyarrow as pa
from botocore.response import StreamingBody
Expand Down Expand Up @@ -83,6 +82,7 @@
from hsfs.core.constants import (
HAS_AIOMYSQL,
HAS_GREAT_EXPECTATIONS,
HAS_NUMPY,
HAS_PANDAS,
HAS_PYARROW,
HAS_SQLALCHEMY,
Expand All @@ -98,6 +98,12 @@
if HAS_GREAT_EXPECTATIONS:
import great_expectations

if HAS_NUMPY:
import numpy as np

if HAS_PYARROW:
from hsfs.core.type_systems import PYARROW_HOPSWORKS_DTYPE_MAPPING

if HAS_AIOMYSQL and HAS_SQLALCHEMY:
from hsfs.core import util_sql

Expand Down Expand Up @@ -1464,11 +1470,13 @@ def _start_offline_materialization(offline_write_options: Dict[str, Any]) -> boo
def _convert_feature_log_to_df(feature_log, cols) -> pd.DataFrame:
if feature_log is None and cols:
return pd.DataFrame(columns=cols)
if not (
isinstance(feature_log, (list, np.ndarray, pd.DataFrame, pl.DataFrame))
if not (isinstance(feature_log, (list, pd.DataFrame, pl.DataFrame))) or (
HAS_NUMPY and isinstance(feature_log, np.ndarray)
):
raise ValueError(f"Type '{type(feature_log)}' not accepted")
if isinstance(feature_log, list) or isinstance(feature_log, np.ndarray):
if isinstance(feature_log, list) or (
HAS_NUMPY and isinstance(feature_log, np.ndarray)
):
Engine._validate_logging_list(feature_log, cols)
return pd.DataFrame(feature_log, columns=cols)
else:
Expand All @@ -1479,7 +1487,9 @@ def _convert_feature_log_to_df(feature_log, cols) -> pd.DataFrame:

@staticmethod
def _validate_logging_list(feature_log, cols):
if isinstance(feature_log[0], list) or isinstance(feature_log[0], np.ndarray):
if isinstance(feature_log[0], list) or (
HAS_NUMPY and isinstance(feature_log[0], np.ndarray)
):
provided_len = len(feature_log[0])
else:
provided_len = 1
Expand Down
119 changes: 85 additions & 34 deletions python/hsfs/engine/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,19 @@
from pyspark.rdd import RDD
from pyspark.sql import DataFrame

import numpy as np
import pandas as pd
import tzlocal
from hopsworks_common.core.constants import HAS_NUMPY, HAS_PANDAS
from hsfs.constructor import query

# in case importing in %%local
from hsfs.core.vector_db_client import VectorDbClient


if HAS_NUMPY:
import numpy as np


try:
import pyspark
from pyspark import SparkFiles
Expand Down Expand Up @@ -258,39 +262,11 @@ def _return_dataframe_type(self, dataframe, dataframe_type):

def convert_to_default_dataframe(self, dataframe):
if isinstance(dataframe, list):
dataframe = np.array(dataframe)

if isinstance(dataframe, np.ndarray):
if dataframe.ndim != 2:
raise TypeError(
"Cannot convert numpy array that do not have two dimensions to a dataframe. "
"The number of dimensions are: {}".format(dataframe.ndim)
)
num_cols = dataframe.shape[1]
dataframe_dict = {}
for n_col in list(range(num_cols)):
col_name = "col_" + str(n_col)
dataframe_dict[col_name] = dataframe[:, n_col]
dataframe = pd.DataFrame(dataframe_dict)

if isinstance(dataframe, pd.DataFrame):
# convert timestamps to current timezone
local_tz = tzlocal.get_localzone()
# make shallow copy so the original df does not get changed
dataframe_copy = dataframe.copy(deep=False)
for c in dataframe_copy.columns:
if isinstance(
dataframe_copy[c].dtype, pd.core.dtypes.dtypes.DatetimeTZDtype
):
# convert to utc timestamp
dataframe_copy[c] = dataframe_copy[c].dt.tz_convert(None)
if dataframe_copy[c].dtype == np.dtype("datetime64[ns]"):
# set the timezone to the client's timezone because that is
# what spark expects.
dataframe_copy[c] = dataframe_copy[c].dt.tz_localize(
str(local_tz), ambiguous="infer", nonexistent="shift_forward"
)
dataframe = self._spark_session.createDataFrame(dataframe_copy)
dataframe = self.convert_list_to_spark_dataframe(dataframe)
elif HAS_NUMPY and isinstance(dataframe, np.ndarray):
dataframe = self.convert_numpy_to_spark_dataframe(dataframe)
elif HAS_PANDAS and isinstance(dataframe, pd.DataFrame):
dataframe = self.convert_pandas_to_spark_dataframe(dataframe)
elif isinstance(dataframe, RDD):
dataframe = dataframe.toDF()

Expand Down Expand Up @@ -341,6 +317,81 @@ def convert_to_default_dataframe(self, dataframe):
)
)

def convert_list_to_spark_dataframe(self, dataframe):
if HAS_NUMPY:
return self.convert_numpy_to_spark_dataframe(np.array(dataframe))
try:
dataframe[0][0]
except TypeError:
raise TypeError(
"Cannot convert a list that has less than two dimensions to a dataframe."
) from None
ok = False
try:
dataframe[0][0][0]
except TypeError:
ok = True
if not ok:
raise TypeError(
"Cannot convert a list that has more than two dimensions to a dataframe."
) from None
num_cols = len(dataframe[0])
if HAS_PANDAS:
dataframe_dict = {}
for n_col in range(num_cols):
c = "col_" + str(n_col)
dataframe_dict[c] = [dataframe[i][n_col] for i in range(len(dataframe))]
return self.convert_pandas_to_spark_dataframe(pd.DataFrame(dataframe_dict))
# We have neither numpy nor pandas, so there is no need to transform timestamps
return self._spark_session.createDataFrame(
dataframe, ["col_" + str(n) for n in range(num_cols)]
)

def convert_numpy_to_spark_dataframe(self, dataframe):
if dataframe.ndim != 2:
raise TypeError(
"Cannot convert numpy array that do not have two dimensions to a dataframe. "
"The number of dimensions are: {}".format(dataframe.ndim)
)
num_cols = dataframe.shape[1]
if HAS_PANDAS:
dataframe_dict = {}
for n_col in range(num_cols):
c = "col_" + str(n_col)
dataframe_dict[c] = dataframe[:, n_col]
return self.convert_pandas_to_spark_dataframe(pd.DataFrame(dataframe_dict))
# convert timestamps to current timezone
local_tz = tzlocal.get_localzone()
for n_col in range(num_cols):
if dataframe[:, n_col].dtype == np.dtype("datetime64[ns]"):
# set the timezone to the client's timezone because that is
# what spark expects.
dataframe[:, n_col] = dataframe[:, n_col].map(
lambda d: local_tz.fromutc(d.item().astimezone(local_tz))
)
return self._spark_session.createDataFrame(
dataframe.tolist(), ["col_" + str(n) for n in range(num_cols)]
)

def convert_pandas_to_spark_dataframe(self, dataframe):
# convert timestamps to current timezone
local_tz = tzlocal.get_localzone()
# make shallow copy so the original df does not get changed
dataframe_copy = dataframe.copy(deep=False)
for c in dataframe_copy.columns:
if isinstance(
dataframe_copy[c].dtype, pd.core.dtypes.dtypes.DatetimeTZDtype
):
# convert to utc timestamp
dataframe_copy[c] = dataframe_copy[c].dt.tz_convert(None)
if HAS_NUMPY and dataframe_copy[c].dtype == np.dtype("datetime64[ns]"):
# set the timezone to the client's timezone because that is
# what spark expects.
dataframe_copy[c] = dataframe_copy[c].dt.tz_localize(
str(local_tz), ambiguous="infer", nonexistent="shift_forward"
)
return self._spark_session.createDataFrame(dataframe_copy)

def save_dataframe(
self,
feature_group,
Expand Down
Loading
Loading