27
27
28
28
import org .apache .avro .Schema ;
29
29
import org .apache .avro .SchemaValidationException ;
30
- import org .apache .avro .SchemaValidatorBuilder ;
31
30
import org .apache .avro .generic .GenericData ;
32
31
import org .apache .avro .generic .GenericDatumWriter ;
33
32
import org .apache .avro .generic .GenericRecord ;
34
33
import org .apache .avro .io .BinaryEncoder ;
35
34
import org .apache .avro .io .EncoderFactory ;
36
- import org .apache .avro .reflect .ReflectData ;
37
35
38
36
import org .apache .kafka .clients .producer .KafkaProducer ;
39
37
import org .apache .kafka .clients .producer .ProducerRecord ;
43
41
import java .io .IOException ;
44
42
import java .lang .reflect .Field ;
45
43
import java .nio .ByteBuffer ;
46
- import java .util .Arrays ;
47
- import java .util .Collections ;
44
+ import java .util .ArrayList ;
45
+ import java .util .Collection ;
48
46
import java .util .HashMap ;
49
47
import java .util .List ;
50
48
import java .util .Map ;
51
49
import java .util .Properties ;
52
- import java .util .stream .Collectors ;
53
50
54
51
public class Engine <T > extends EngineBase {
55
52
@@ -78,7 +75,8 @@ public List<T> writeStream(
78
75
complexFeatureSchemas .put (featureName .toString (),
79
76
new Schema .Parser ().parse (streamFeatureGroup .getFeatureAvroSchema (featureName .toString ())));
80
77
}
81
- Schema deserializedEncodedSchema = new Schema .Parser ().parse (streamFeatureGroup .getEncodedAvroSchema ());
78
+ Schema featureGroupSchema = new Schema .Parser ().parse (streamFeatureGroup .getAvroSchema ());
79
+ Schema encodedFeatureGroupSchema = new Schema .Parser ().parse (streamFeatureGroup .getEncodedAvroSchema ());
82
80
83
81
Properties kafkaProps = new Properties ();
84
82
kafkaProps .put ("key.serializer" , "org.apache.kafka.common.serialization.ByteArraySerializer" );
@@ -90,10 +88,8 @@ public List<T> writeStream(
90
88
91
89
try (KafkaProducer <byte [], byte []> producer = new KafkaProducer <>(kafkaProps )) {
92
90
for (Object input : featureData ) {
93
- // validate
94
- validatePojoAgainstSchema (input , new Schema .Parser ().parse (streamFeatureGroup .getAvroSchema ()));
95
-
96
- GenericRecord genericRecord = pojoToAvroRecord (input , deserializedEncodedSchema , complexFeatureSchemas );
91
+ GenericRecord genericRecord =
92
+ convertPojoToGenericRecord (input , featureGroupSchema , encodedFeatureGroupSchema , complexFeatureSchemas );
97
93
ProducerRecord <byte [], byte []> record = kafkaRecordSerializer .serialize (genericRecord );
98
94
99
95
producer .send (record );
@@ -103,62 +99,135 @@ public List<T> writeStream(
103
99
return featureData ;
104
100
}
105
101
106
- public GenericRecord pojoToAvroRecord (Object input , Schema deserializedEncodedSchema ,
107
- Map <String , Schema > complexFeatureSchemas )
108
- throws NoSuchFieldException , IOException , IllegalAccessException {
109
102
110
- // Create a new Avro record based on the given schema
111
- GenericRecord record = new GenericData .Record (deserializedEncodedSchema );
112
- // Get the fields of the POJO class and populate fields of the Avro record
113
- List <Field > fields =
114
- Arrays .stream (input .getClass ().getDeclaredFields ())
115
- .filter (f -> f .getName ().equals ("SCHEMA$" ))
116
- .collect (Collectors .toList ());
117
- if (!fields .isEmpty ()) {
118
- // it means POJO was generated from avro schema
119
- Field schemaField = input .getClass ().getDeclaredField ("SCHEMA$" );
120
- schemaField .setAccessible (true );
121
- Schema fieldSchema = (Schema ) schemaField .get (null );
122
- for (Schema .Field field : fieldSchema .getFields ()) {
123
- String fieldName = field .name ();
124
- Field pojoField = input .getClass ().getDeclaredField (fieldName );
125
- pojoField .setAccessible (true );
126
- Object fieldValue = pojoField .get (input );
127
- populateAvroRecord (record , fieldName , fieldValue , complexFeatureSchemas );
128
- }
103
+ private GenericRecord convertPojoToGenericRecord (Object input ,
104
+ Schema featureGroupSchema ,
105
+ Schema encodedFeatureGroupSchema ,
106
+ Map <String , Schema > complexFeatureSchemas )
107
+ throws NoSuchFieldException , IllegalAccessException , FeatureStoreException , IOException {
108
+
109
+ // Generate the genericRecord without nested serialization.
110
+ // Users have the option of providing directly a GenericRecord.
111
+ // If that's the case we also expect nested structures to be generic records.
112
+ GenericRecord plainRecord ;
113
+ if (input instanceof GenericRecord ) {
114
+ plainRecord = (GenericRecord ) input ;
129
115
} else {
130
- for (Field field : fields ) {
131
- String fieldName = field .getName ();
132
- Object fieldValue = field .get (input );
133
- populateAvroRecord (record , fieldName , fieldValue , complexFeatureSchemas );
116
+ plainRecord = convertPojoToGenericRecord (input , featureGroupSchema );
117
+ }
118
+
119
+ // Apply nested serialization for complex features
120
+ GenericRecord encodedRecord = new GenericData .Record (encodedFeatureGroupSchema );
121
+ for (Schema .Field field : encodedFeatureGroupSchema .getFields ()) {
122
+ if (complexFeatureSchemas .containsKey (field .name ())) {
123
+ Schema complexFieldSchema = complexFeatureSchemas .get (field .name ());
124
+ GenericDatumWriter <Object > complexFeatureDatumWriter = new GenericDatumWriter <>(complexFieldSchema );
125
+
126
+ try (ByteArrayOutputStream complexFeatureByteArrayOutputStream = new ByteArrayOutputStream ()) {
127
+ BinaryEncoder complexFeatureBinaryEncoder =
128
+ new EncoderFactory ().binaryEncoder (complexFeatureByteArrayOutputStream , null );
129
+ complexFeatureDatumWriter .write (plainRecord .get (field .name ()), complexFeatureBinaryEncoder );
130
+ complexFeatureBinaryEncoder .flush ();
131
+
132
+ // Replace the field in the generic record with the serialized version
133
+ encodedRecord .put (field .name (), ByteBuffer .wrap (complexFeatureByteArrayOutputStream .toByteArray ()));
134
+ }
135
+ } else {
136
+ encodedRecord .put (field .name (), plainRecord .get (field .name ()));
134
137
}
135
138
}
136
- return record ;
139
+
140
+ return encodedRecord ;
137
141
}
138
142
139
- private void populateAvroRecord (GenericRecord record , String fieldName , Object fieldValue ,
140
- Map <String , Schema > complexFeatureSchemas ) throws IOException {
141
- if (complexFeatureSchemas .containsKey (fieldName )) {
142
- GenericDatumWriter <Object > complexFeatureDatumWriter =
143
- new GenericDatumWriter <>(complexFeatureSchemas .get (fieldName ));
144
- ByteArrayOutputStream complexFeatureByteArrayOutputStream = new ByteArrayOutputStream ();
145
- complexFeatureByteArrayOutputStream .reset ();
146
- BinaryEncoder complexFeatureBinaryEncoder =
147
- new EncoderFactory ().binaryEncoder (complexFeatureByteArrayOutputStream , null );
148
- complexFeatureDatumWriter .write (fieldValue , complexFeatureBinaryEncoder );
149
- complexFeatureBinaryEncoder .flush ();
150
- record .put (fieldName , ByteBuffer .wrap (complexFeatureByteArrayOutputStream .toByteArray ()));
151
- complexFeatureByteArrayOutputStream .flush ();
152
- complexFeatureByteArrayOutputStream .close ();
153
- } else {
154
- record .put (fieldName , fieldValue );
143
+ private GenericRecord convertPojoToGenericRecord (Object input , Schema featureGroupSchema )
144
+ throws NoSuchFieldException , IllegalAccessException , FeatureStoreException {
145
+
146
+ // Create a new Avro record based on the given schema
147
+ GenericRecord record = new GenericData .Record (featureGroupSchema );
148
+
149
+ for (Schema .Field schemaField : featureGroupSchema .getFields ()) {
150
+ Field pojoField = input .getClass ().getDeclaredField (schemaField .name ());
151
+ pojoField .setAccessible (true );
152
+ Object pojoValue = pojoField .get (input );
153
+ record .put (schemaField .name (), convertValue (pojoValue , schemaField .schema ()));
155
154
}
155
+
156
+ return record ;
156
157
}
157
158
158
- private void validatePojoAgainstSchema (Object pojo , Schema avroSchema ) throws SchemaValidationException {
159
- Schema pojoSchema = ReflectData .get ().getSchema (pojo .getClass ());
160
- SchemaValidatorBuilder builder = new SchemaValidatorBuilder ();
161
- builder .canReadStrategy ().validateAll ().validate (avroSchema , Collections .singletonList (pojoSchema ));
159
+
160
+ private Object convertValue (Object value , Schema schema )
161
+ throws NoSuchFieldException , IllegalAccessException , FeatureStoreException {
162
+ if (value == null ) {
163
+ return null ;
164
+ }
165
+
166
+ switch (schema .getType ()) {
167
+ case RECORD :
168
+ return convertPojoToGenericRecord (value , schema ); // Recursive conversion
169
+
170
+ case ARRAY :
171
+ Schema elementType = schema .getElementType ();
172
+ if (value instanceof Collection ) {
173
+ Collection <?> collection = (Collection <?>) value ;
174
+ List <Object > avroList = new ArrayList <>();
175
+ for (Object item : collection ) {
176
+ avroList .add (convertValue (item , elementType ));
177
+ }
178
+ return avroList ;
179
+ } else if (value .getClass ().isArray ()) {
180
+ List <Object > avroList = new ArrayList <>();
181
+ for (Object item : (Object []) value ) {
182
+ avroList .add (convertValue (item , elementType ));
183
+ }
184
+ return avroList ;
185
+ }
186
+ throw new FeatureStoreException ("Unsupported array type: " + value .getClass ());
187
+
188
+ case UNION :
189
+ // Unions are tricky: Avro allows [null, "type"]
190
+ for (Schema subSchema : schema .getTypes ()) {
191
+ if (subSchema .getType () == Schema .Type .NULL ) {
192
+ continue ; // Skip null type
193
+ }
194
+ try {
195
+ return convertValue (value , subSchema );
196
+ } catch (Exception ignored ) {
197
+ // Try next type in union
198
+ }
199
+ }
200
+ throw new FeatureStoreException ("Cannot match union type for value: " + value .getClass ());
201
+
202
+ case ENUM :
203
+ return new GenericData .EnumSymbol (schema , value .toString ());
204
+
205
+ case STRING :
206
+ return value .toString ();
207
+
208
+ case INT :
209
+ case LONG :
210
+ case FLOAT :
211
+ case DOUBLE :
212
+ case BOOLEAN :
213
+ return value ; // Primitive types are directly compatible
214
+
215
+ case MAP :
216
+ if (value instanceof Map ) {
217
+ Map <String , Object > avroMap = new HashMap <>();
218
+ for (Map .Entry <?, ?> entry : ((Map <?, ?>) value ).entrySet ()) {
219
+ if (!(entry .getKey () instanceof String )) {
220
+ throw new FeatureStoreException ("Avro only supports string keys in maps." );
221
+ }
222
+ avroMap .put (entry .getKey ().toString (), convertValue (entry .getValue (), schema .getValueType ()));
223
+ }
224
+ return avroMap ;
225
+ }
226
+ throw new FeatureStoreException ("Unsupported map type: " + value .getClass ());
227
+
228
+ default :
229
+ throw new FeatureStoreException ("Unsupported Avro type: " + schema .getType ());
230
+ }
162
231
}
163
232
164
233
@ Override
0 commit comments