@@ -99,53 +99,15 @@ public List<T> writeStream(
99
99
return featureData ;
100
100
}
101
101
102
-
103
- private Object normalize (Object value , Schema targetSchema ) {
104
- if (value == null ) {
105
- return null ;
106
- }
107
- switch (targetSchema .getType ()) {
108
- case RECORD :
109
- // Create a new record with the target schema
110
- GenericRecord newRecord = new GenericData .Record (targetSchema );
111
- GenericRecord oldRecord = (GenericRecord ) value ;
112
- for (Schema .Field field : targetSchema .getFields ()) {
113
- // Recursively normalize each field based on its target schema
114
- Object fieldValue = oldRecord .get (field .name ());
115
- newRecord .put (field .name (), normalize (fieldValue , field .schema ()));
116
- }
117
- return newRecord ;
118
- case ARRAY :
119
- Collection <?> oldArray = (Collection <?>) value ;
120
- List <Object > newArray = new ArrayList <>();
121
- // Normalize each element using the array’s element type
122
- for (Object item : oldArray ) {
123
- newArray .add (normalize (item , targetSchema .getElementType ()));
124
- }
125
- return newArray ;
126
- case UNION :
127
- // For a union, pick the non-null branch if value is non-null.
128
- // You might need additional logic to pick the right branch.
129
- for (Schema s : targetSchema .getTypes ()) {
130
- if (s .getType () == Schema .Type .NULL ) {
131
- continue ;
132
- }
133
- return normalize (value , s );
134
- }
135
- throw new RuntimeException ("Cannot normalize value for union schema: " + targetSchema );
136
- default :
137
- // For primitives (or already matching types), return the value directly.
138
- return value ;
139
- }
140
- }
141
-
142
102
private GenericRecord convertPojoToGenericRecord (Object input ,
143
103
Schema featureGroupSchema ,
144
104
Schema encodedFeatureGroupSchema ,
145
105
Map <String , Schema > complexFeatureSchemas )
146
106
throws NoSuchFieldException , IllegalAccessException , FeatureStoreException , IOException {
147
107
148
108
// Generate the genericRecord without nested serialization.
109
+ // Users have the option of providing directly a GenericRecord.
110
+ // If that's the case we also expect nested structures to be generic records.
149
111
GenericRecord plainRecord ;
150
112
if (input instanceof GenericRecord ) {
151
113
plainRecord = (GenericRecord ) input ;
@@ -155,23 +117,19 @@ private GenericRecord convertPojoToGenericRecord(Object input,
155
117
156
118
// Apply nested serialization for complex features
157
119
GenericRecord encodedRecord = new GenericData .Record (encodedFeatureGroupSchema );
158
- for (Schema .Field field : encodedFeatureGroupSchema .getFields ()) {
120
+ for (Schema .Field field : encodedFeatureGroupSchema .getFields ()) {
159
121
if (complexFeatureSchemas .containsKey (field .name ())) {
160
122
Schema complexFieldSchema = complexFeatureSchemas .get (field .name ());
161
- Object fieldValue = plainRecord .get (field .name ());
162
- // Normalize the field to match the expected schema exactly.
163
- Object normalizedValue = normalize (fieldValue , complexFieldSchema );
164
-
165
123
GenericDatumWriter <Object > complexFeatureDatumWriter = new GenericDatumWriter <>(complexFieldSchema );
124
+
166
125
try (ByteArrayOutputStream complexFeatureByteArrayOutputStream = new ByteArrayOutputStream ()) {
167
126
BinaryEncoder complexFeatureBinaryEncoder =
168
127
new EncoderFactory ().binaryEncoder (complexFeatureByteArrayOutputStream , null );
169
- complexFeatureDatumWriter .write (normalizedValue , complexFeatureBinaryEncoder );
128
+ complexFeatureDatumWriter .write (plainRecord . get ( field . name ()) , complexFeatureBinaryEncoder );
170
129
complexFeatureBinaryEncoder .flush ();
171
130
172
- // Replace the field with the serialized (double-serialized) bytes.
173
- encodedRecord .put (field .name (),
174
- ByteBuffer .wrap (complexFeatureByteArrayOutputStream .toByteArray ()));
131
+ // Replace the field in the generic record with the serialized version
132
+ encodedRecord .put (field .name (), ByteBuffer .wrap (complexFeatureByteArrayOutputStream .toByteArray ()));
175
133
}
176
134
} else {
177
135
encodedRecord .put (field .name (), plainRecord .get (field .name ()));
0 commit comments