Skip to content

Commit fe1ea66

Browse files
committed
add a test with testRegexRouter router
1 parent 4df6e4b commit fe1ea66

File tree

1 file changed

+73
-0
lines changed

1 file changed

+73
-0
lines changed

connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,79 @@ public void testTombstoneRecordFilter() {
234234
httpPort);
235235
}
236236

237+
238+
@ParameterizedTest
239+
@ValueSource(booleans = {true, false})
240+
public void testRegexRouter(boolean useHttp) {
241+
// 1. Define source topics and expected target tables
242+
String ordersAddTopic = "orders.add";
243+
String ordersModifyTopic = "orders.modify";
244+
String authLoginTopic = "auth.login";
245+
String tradesTopic = "trades";
246+
247+
String ordersTable = "orders";
248+
String authTable = "auth";
249+
String tradesTable = "trades"; // same as topic, no change expected
250+
251+
// 2. Create source Kafka topics
252+
connect.kafka().createTopic(ordersAddTopic, 1);
253+
connect.kafka().createTopic(ordersModifyTopic, 1);
254+
connect.kafka().createTopic(authLoginTopic, 1);
255+
connect.kafka().createTopic(tradesTopic, 1);
256+
257+
// 3. Configure the connector
258+
// Use one of the topics for base props, then override 'topics'
259+
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, ordersAddTopic, useHttp);
260+
props.put(QuestDBSinkConnectorConfig.INCLUDE_KEY_CONFIG, "false"); // Simplify assertions
261+
262+
// Override the topics to subscribe to all source topics
263+
props.put("topics", String.join(",", ordersAddTopic, ordersModifyTopic, authLoginTopic, tradesTopic));
264+
265+
// Add the RegexRouter SMT configuration
266+
props.put("transforms", "RouteByPrefix");
267+
props.put("transforms.RouteByPrefix.type", "org.apache.kafka.connect.transforms.RegexRouter");
268+
// Note the double backslash needed for Java String literal escaping
269+
props.put("transforms.RouteByPrefix.regex", "(orders|auth)\\..+");
270+
props.put("transforms.RouteByPrefix.replacement", "$1");
271+
272+
connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props);
273+
ConnectTestUtils.assertConnectorTaskRunningEventually(connect);
274+
275+
// 4. Define Schema and Data
276+
Schema schema = SchemaBuilder.struct().name("com.example.Data")
277+
.field("id", Schema.INT32_SCHEMA)
278+
.field("payload", Schema.STRING_SCHEMA)
279+
.build();
280+
281+
Struct structOrdersAdd = new Struct(schema).put("id", 1).put("payload", "order added");
282+
Struct structOrdersModify = new Struct(schema).put("id", 2).put("payload", "order modified");
283+
Struct structAuthLogin = new Struct(schema).put("id", 10).put("payload", "user logged in");
284+
Struct structTrades = new Struct(schema).put("id", 100).put("payload", "trade executed");
285+
286+
// 5. Produce records to source topics
287+
connect.kafka().produce(ordersAddTopic, "key1", new String(converter.fromConnectData(ordersAddTopic, schema, structOrdersAdd)));
288+
connect.kafka().produce(ordersModifyTopic, "key2", new String(converter.fromConnectData(ordersModifyTopic, schema, structOrdersModify)));
289+
connect.kafka().produce(authLoginTopic, "key3", new String(converter.fromConnectData(authLoginTopic, schema, structAuthLogin)));
290+
connect.kafka().produce(tradesTopic, "key4", new String(converter.fromConnectData(tradesTopic, schema, structTrades)));
291+
292+
// 6. Assert data in target QuestDB tables
293+
QuestDBUtils.assertSqlEventually( "\"id\",\"payload\"\r\n"
294+
+ "1,\"order added\"\r\n"
295+
+ "2,\"order modified\"\r\n",
296+
"select id, payload from " + ordersTable + " order by id", // Ensure consistent order
297+
httpPort);
298+
299+
QuestDBUtils.assertSqlEventually( "\"id\",\"payload\"\r\n"
300+
+ "10,\"user logged in\"\r\n",
301+
"select id, payload from " + authTable,
302+
httpPort);
303+
304+
QuestDBUtils.assertSqlEventually( "\"id\",\"payload\"\r\n"
305+
+ "100,\"trade executed\"\r\n",
306+
"select id, payload from " + tradesTable,
307+
httpPort);
308+
}
309+
237310
@ParameterizedTest
238311
@ValueSource(booleans = {true, false})
239312
public void testTableTemplateWithKey_schemaless(boolean useHttp) {

0 commit comments

Comments
 (0)