67
67
from hsfs import storage_connector as sc
68
68
from hsfs .constructor import query
69
69
from hsfs .core import (
70
- arrow_flight_client ,
71
70
dataset_api ,
72
71
feature_group_api ,
73
72
feature_view_api ,
83
82
)
84
83
from hsfs .core .constants import (
85
84
HAS_AIOMYSQL ,
86
- HAS_ARROW ,
87
85
HAS_GREAT_EXPECTATIONS ,
88
86
HAS_PANDAS ,
87
+ HAS_PYARROW ,
89
88
HAS_SQLALCHEMY ,
90
89
)
90
+ from hsfs .core .type_systems import PYARROW_HOPSWORKS_DTYPE_MAPPING
91
91
from hsfs .core .vector_db_client import VectorDbClient
92
92
from hsfs .feature_group import ExternalFeatureGroup , FeatureGroup
93
93
from hsfs .training_dataset import TrainingDataset
98
98
if HAS_GREAT_EXPECTATIONS :
99
99
import great_expectations
100
100
101
- if HAS_ARROW :
102
- from hsfs .core .type_systems import PYARROW_HOPSWORKS_DTYPE_MAPPING
103
101
if HAS_AIOMYSQL and HAS_SQLALCHEMY :
104
102
from hsfs .core import util_sql
105
103
@@ -157,6 +155,8 @@ def sql(
157
155
def is_flyingduck_query_supported (
158
156
self , query : "query.Query" , read_options : Optional [Dict [str , Any ]] = None
159
157
) -> bool :
158
+ from hsfs .core import arrow_flight_client
159
+
160
160
return arrow_flight_client .is_query_supported (query , read_options or {})
161
161
162
162
def _validate_dataframe_type (self , dataframe_type : str ):
@@ -180,6 +180,8 @@ def _sql_offline(
180
180
) -> Union [pd .DataFrame , pl .DataFrame ]:
181
181
self ._validate_dataframe_type (dataframe_type )
182
182
if isinstance (sql_query , dict ) and "query_string" in sql_query :
183
+ from hsfs .core import arrow_flight_client
184
+
183
185
result_df = util .run_with_loading_animation (
184
186
"Reading data from Hopsworks, using Hopsworks Feature Query Service" ,
185
187
arrow_flight_client .get_instance ().read_query ,
@@ -342,6 +344,8 @@ def _read_hopsfs_remote(
342
344
343
345
for inode in inode_list :
344
346
if not self ._is_metadata_file (inode .path ):
347
+ from hsfs .core import arrow_flight_client
348
+
345
349
if arrow_flight_client .is_data_format_supported (
346
350
data_format , read_options
347
351
):
@@ -539,7 +543,10 @@ def profile(
539
543
or pa .types .is_list (field .type )
540
544
or pa .types .is_large_list (field .type )
541
545
or pa .types .is_struct (field .type )
542
- ) and PYARROW_HOPSWORKS_DTYPE_MAPPING [field .type ] in ["timestamp" , "date" ]:
546
+ ) and PYARROW_HOPSWORKS_DTYPE_MAPPING .get (field .type , None ) in [
547
+ "timestamp" ,
548
+ "date" ,
549
+ ]:
543
550
if HAS_POLARS and (
544
551
isinstance (df , pl .DataFrame )
545
552
or isinstance (df , pl .dataframe .frame .DataFrame )
@@ -573,15 +580,21 @@ def profile(
573
580
or pa .types .is_list (arrow_type )
574
581
or pa .types .is_large_list (arrow_type )
575
582
or pa .types .is_struct (arrow_type )
576
- or PYARROW_HOPSWORKS_DTYPE_MAPPING [ arrow_type ]
583
+ or PYARROW_HOPSWORKS_DTYPE_MAPPING . get ( arrow_type , None )
577
584
in ["timestamp" , "date" , "binary" , "string" ]
578
585
):
579
586
dataType = "String"
580
- elif PYARROW_HOPSWORKS_DTYPE_MAPPING [arrow_type ] in ["float" , "double" ]:
587
+ elif PYARROW_HOPSWORKS_DTYPE_MAPPING .get (arrow_type , None ) in [
588
+ "float" ,
589
+ "double" ,
590
+ ]:
581
591
dataType = "Fractional"
582
- elif PYARROW_HOPSWORKS_DTYPE_MAPPING [arrow_type ] in ["int" , "bigint" ]:
592
+ elif PYARROW_HOPSWORKS_DTYPE_MAPPING .get (arrow_type , None ) in [
593
+ "int" ,
594
+ "bigint" ,
595
+ ]:
583
596
dataType = "Integral"
584
- elif PYARROW_HOPSWORKS_DTYPE_MAPPING [ arrow_type ] == "boolean" :
597
+ elif PYARROW_HOPSWORKS_DTYPE_MAPPING . get ( arrow_type , None ) == "boolean" :
585
598
dataType = "Boolean"
586
599
else :
587
600
print (
@@ -1077,8 +1090,16 @@ def write_training_dataset(
1077
1090
"Currently only query based training datasets are supported by the Python engine"
1078
1091
)
1079
1092
1093
+ try :
1094
+ from hsfs .core import arrow_flight_client
1095
+
1096
+ arrow_flight_client_imported = True
1097
+ except ImportError :
1098
+ arrow_flight_client_imported = False
1099
+
1080
1100
if (
1081
- arrow_flight_client .is_query_supported (dataset , user_write_options )
1101
+ arrow_flight_client_imported
1102
+ and arrow_flight_client .is_query_supported (dataset , user_write_options )
1082
1103
and len (training_dataset .splits ) == 0
1083
1104
and feature_view_obj
1084
1105
and len (feature_view_obj .transformation_functions ) == 0
@@ -1251,7 +1272,7 @@ def _apply_transformation_function(
1251
1272
or isinstance (dataset , pl .dataframe .frame .DataFrame )
1252
1273
):
1253
1274
# Converting polars dataframe to pandas because currently we support only pandas UDF's as transformation functions.
1254
- if HAS_ARROW :
1275
+ if HAS_PYARROW :
1255
1276
dataset = dataset .to_pandas (
1256
1277
use_pyarrow_extension_array = True
1257
1278
) # Zero copy if pyarrow extension can be used.
0 commit comments