31
31
from pyspark .rdd import RDD
32
32
from pyspark .sql import DataFrame
33
33
34
- import numpy as np
35
34
import pandas as pd
36
35
import tzlocal
36
+ from hopsworks_common .core .constants import HAS_NUMPY
37
37
from hsfs .constructor import query
38
38
39
39
# in case importing in %%local
40
40
from hsfs .core .vector_db_client import VectorDbClient
41
41
42
42
43
+ if HAS_NUMPY :
44
+ import numpy as np
45
+
46
+
43
47
try :
44
48
import pyspark
45
49
from pyspark import SparkFiles
@@ -258,9 +262,33 @@ def _return_dataframe_type(self, dataframe, dataframe_type):
258
262
259
263
def convert_to_default_dataframe (self , dataframe ):
260
264
if isinstance (dataframe , list ):
261
- dataframe = np .array (dataframe )
262
-
263
- if isinstance (dataframe , np .ndarray ):
265
+ #################### TODO TODO TODO TODO TODO ####################
266
+ if HAS_NUMPY :
267
+ dataframe = np .array (dataframe )
268
+ else :
269
+ try :
270
+ dataframe [0 ][0 ]
271
+ except TypeError :
272
+ raise TypeError (
273
+ "Cannot convert a list that has less than two dimensions to a dataframe."
274
+ ) from None
275
+ ok = False
276
+ try :
277
+ dataframe [0 ][0 ][0 ]
278
+ except TypeError :
279
+ ok = True
280
+ if not ok :
281
+ raise TypeError (
282
+ "Cannot convert a list that has more than two dimensions to a dataframe."
283
+ ) from None
284
+ num_cols = len (dataframe [0 ])
285
+ dataframe_dict = {}
286
+ for n_col in list (range (num_cols )):
287
+ col_name = "col_" + str (n_col )
288
+ dataframe_dict [col_name ] = dataframe [:, n_col ]
289
+ dataframe = pd .DataFrame (dataframe_dict )
290
+
291
+ if HAS_NUMPY and isinstance (dataframe , np .ndarray ):
264
292
if dataframe .ndim != 2 :
265
293
raise TypeError (
266
294
"Cannot convert numpy array that do not have two dimensions to a dataframe. "
@@ -284,7 +312,7 @@ def convert_to_default_dataframe(self, dataframe):
284
312
):
285
313
# convert to utc timestamp
286
314
dataframe_copy [c ] = dataframe_copy [c ].dt .tz_convert (None )
287
- if dataframe_copy [c ].dtype == np .dtype ("datetime64[ns]" ):
315
+ if HAS_NUMPY and dataframe_copy [c ].dtype == np .dtype ("datetime64[ns]" ):
288
316
# set the timezone to the client's timezone because that is
289
317
# what spark expects.
290
318
dataframe_copy [c ] = dataframe_copy [c ].dt .tz_localize (
0 commit comments