Skip to content

Commit 7280c44

Browse files
committed
Address Manu's review
1 parent bfbe7aa commit 7280c44

File tree

3 files changed

+21
-12
lines changed

3 files changed

+21
-12
lines changed

python/hsfs/core/vector_server.py

+6-5
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@
2323
from io import BytesIO
2424
from typing import Any, Callable, Dict, List, Literal, Optional, Set, Tuple, Union
2525

26-
import avro.io
27-
import avro.schema
2826
import pandas as pd
2927
from hopsworks_common import client
3028
from hopsworks_common.core.constants import (
@@ -60,7 +58,9 @@
6058

6159
if HAS_FAST_AVRO:
6260
from fastavro import schemaless_reader
63-
elif HAS_AVRO:
61+
if HAS_AVRO:
62+
import avro.io
63+
import avro.schema
6464
from avro.io import BinaryDecoder
6565

6666
if HAS_POLARS:
@@ -1072,6 +1072,9 @@ def build_complex_feature_decoders(self) -> Dict[str, Callable]:
10721072
- deserialization of complex features from the online feature store
10731073
- conversion of string or int timestamps to datetime objects
10741074
"""
1075+
if not HAS_AVRO:
1076+
raise ModuleNotFoundError(avro_not_installed_message)
1077+
10751078
complex_feature_schemas = {
10761079
f.name: avro.io.DatumReader(
10771080
avro.schema.parse(
@@ -1114,8 +1117,6 @@ def build_complex_feature_decoders(self) -> Dict[str, Callable]:
11141117
for (f_name, schema) in complex_feature_schemas.items()
11151118
}
11161119
else:
1117-
if not HAS_AVRO:
1118-
raise ModuleNotFoundError(avro_not_installed_message)
11191120
_logger.debug("Fast Avro not found, using avro for deserialization.")
11201121
return {
11211122
f_name: (

python/hsfs/engine/python.py

-3
Original file line numberDiff line numberDiff line change
@@ -101,9 +101,6 @@
101101
if HAS_NUMPY:
102102
import numpy as np
103103

104-
if HAS_PYARROW:
105-
from hsfs.core.type_systems import PYARROW_HOPSWORKS_DTYPE_MAPPING
106-
107104
if HAS_AIOMYSQL and HAS_SQLALCHEMY:
108105
from hsfs.core import util_sql
109106

python/hsfs/engine/spark.py

+15-4
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,14 @@ def convert_to_default_dataframe(self, dataframe):
317317
)
318318
)
319319

320+
@staticmethod
321+
def utc_disguised_as_local(dt):
322+
local_tz = tzlocal.get_localzone()
323+
utc = timezone.utc
324+
if not dt.tzinfo:
325+
dt = dt.replace(tzinfo=utc)
326+
return dt.astimezone(utc).replace(tzinfo=local_tz)
327+
320328
def convert_list_to_spark_dataframe(self, dataframe):
321329
if HAS_NUMPY:
322330
return self.convert_numpy_to_spark_dataframe(np.array(dataframe))
@@ -342,7 +350,11 @@ def convert_list_to_spark_dataframe(self, dataframe):
342350
c = "col_" + str(n_col)
343351
dataframe_dict[c] = [dataframe[i][n_col] for i in range(len(dataframe))]
344352
return self.convert_pandas_to_spark_dataframe(pd.DataFrame(dataframe_dict))
345-
# We have neither numpy nor pandas, so there is no need to transform timestamps
353+
for i in range(len(dataframe)):
354+
dataframe[i] = [
355+
self.utc_disguised_as_local(d) if isinstance(d, datetime) else d
356+
for d in dataframe[i]
357+
]
346358
return self._spark_session.createDataFrame(
347359
dataframe, ["col_" + str(n) for n in range(num_cols)]
348360
)
@@ -361,13 +373,12 @@ def convert_numpy_to_spark_dataframe(self, dataframe):
361373
dataframe_dict[c] = dataframe[:, n_col]
362374
return self.convert_pandas_to_spark_dataframe(pd.DataFrame(dataframe_dict))
363375
# convert timestamps to current timezone
364-
local_tz = tzlocal.get_localzone()
365376
for n_col in range(num_cols):
366377
if dataframe[:, n_col].dtype == np.dtype("datetime64[ns]"):
367378
# set the timezone to the client's timezone because that is
368379
# what spark expects.
369-
dataframe[:, n_col] = dataframe[:, n_col].map(
370-
lambda d: local_tz.fromutc(d.item().astimezone(local_tz))
380+
dataframe[:, n_col] = np.array(
381+
[self.utc_disguised_as_local(d.item()) for d in dataframe[:, n_col]]
371382
)
372383
return self._spark_session.createDataFrame(
373384
dataframe.tolist(), ["col_" + str(n) for n in range(num_cols)]

0 commit comments

Comments
 (0)