15
15
#
16
16
from __future__ import annotations
17
17
18
+ import datetime
18
19
import hopsworks_common
19
20
import numpy
20
21
import pandas as pd
@@ -1707,21 +1708,24 @@ def test_save_online_dataframe(self, mocker, backend_fixtures):
1707
1708
== 1
1708
1709
)
1709
1710
1710
- def test_serialize_to_avro (self , mocker ):
1711
+ def test_serialize_deserialize_avro (self , mocker ):
1711
1712
# Arrange
1712
- mocker .patch ("hopsworks_common.client.get_instance" )
1713
- mocker .patch (
1714
- "hsfs.feature_group.FeatureGroup.get_complex_features" ,
1715
- return_value = ["col_1" ],
1716
- )
1717
- mocker .patch ("hsfs.feature_group.FeatureGroup._get_feature_avro_schema" )
1718
-
1719
1713
spark_engine = spark .Engine ()
1720
1714
1721
- d = {"col_0" : ["test_1" , "test_2" ], "col_1" : ["test_1" , "test_2" ]}
1722
- df = pd .DataFrame (data = d )
1715
+ now = datetime .datetime .now ()
1723
1716
1724
- spark_df = spark_engine ._spark_session .createDataFrame (df )
1717
+ fg_data = []
1718
+ fg_data .append (("ekarson" , ["GRAVITY RUSH 2" , "KING'S QUEST" ], pd .Timestamp (now .timestamp ())))
1719
+ fg_data .append (("ratmilkdrinker" , ["NBA 2K" , "CALL OF DUTY" ], pd .Timestamp (now .timestamp ())))
1720
+ pandas_df = pd .DataFrame (fg_data , columns = ["account_id" , "last_played_games" , "event_time" ])
1721
+
1722
+ df = spark_engine ._spark_session .createDataFrame (pandas_df )
1723
+
1724
+ features = [
1725
+ feature .Feature (name = "account_id" , type = "str" ),
1726
+ feature .Feature (name = "last_played_games" , type = "xx" ),
1727
+ feature .Feature (name = "event_time" , type = "timestamp" ),
1728
+ ]
1725
1729
1726
1730
fg = feature_group .FeatureGroup (
1727
1731
name = "test" ,
@@ -1730,22 +1734,30 @@ def test_serialize_to_avro(self, mocker):
1730
1734
primary_key = [],
1731
1735
partition_key = [],
1732
1736
id = 10 ,
1737
+ features = features ,
1733
1738
)
1734
- fg ._subject = {"schema" : '{"fields": [{"name": "col_0"}]}' }
1735
-
1736
- expected = pd .DataFrame (data = {"col_0" : ["test_1" , "test_2" ]})
1739
+ fg ._subject = {
1740
+ 'id' : 1025 ,
1741
+ 'subject' : 'fg_1' ,
1742
+ 'version' : 1 ,
1743
+ 'schema' : '{"type":"record","name":"fg_1","namespace":"test_featurestore.db","fields":[{"name":"account_id","type":["null","string"]},{"name":"last_played_games","type":["null",{"type":"array","items":["null","string"]}]},{"name":"event_time","type":["null",{"type":"long","logicalType":"timestamp-micros"}]}]}'
1744
+ }
1737
1745
1738
1746
# Act
1739
- result = spark_engine ._serialize_to_avro (
1747
+ serialized_df = spark_engine ._serialize_to_avro (
1740
1748
feature_group = fg ,
1741
- dataframe = spark_df ,
1749
+ dataframe = df ,
1750
+ )
1751
+
1752
+ deserialized_df = spark_engine ._deserialize_from_avro (
1753
+ feature_group = fg ,
1754
+ dataframe = serialized_df ,
1742
1755
)
1743
1756
1744
1757
# Assert
1745
- result_df = result .toPandas ()
1746
- assert list (result_df ) == list (expected )
1747
- for column in list (result_df ):
1748
- assert result_df [column ].equals (expected [column ])
1758
+ assert serialized_df .schema .json () == '{"fields":[{"metadata":{},"name":"key","nullable":false,"type":"binary"},{"metadata":{},"name":"value","nullable":false,"type":"binary"}],"type":"struct"}'
1759
+ assert df .schema == deserialized_df .schema
1760
+ assert df .collect () == deserialized_df .collect ()
1749
1761
1750
1762
def test_get_training_data (self , mocker ):
1751
1763
# Arrange
0 commit comments